Skip to content

Load constants from names #337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, c
@metadata = Metadata.generate_activity_metadata(task, namespace, config.converter)
@task_token = task.task_token
@activity_name = task.activity_type.name
@activity_class = activity_lookup.find(activity_name)
@activity_lookup = activity_lookup
@middleware_chain = middleware_chain
@config = config
@heartbeat_thread_pool = heartbeat_thread_pool
Expand All @@ -30,6 +30,7 @@ def process

context = Activity::Context.new(connection, metadata, config, heartbeat_thread_pool)

activity_class = find_activity_class
if !activity_class
raise ActivityNotRegistered, 'Activity is not registered with this worker'
end
Expand Down Expand Up @@ -65,8 +66,8 @@ def metric_tags

private

attr_reader :task, :task_queue, :namespace, :task_token, :activity_name, :activity_class,
:middleware_chain, :metadata, :config, :heartbeat_thread_pool
attr_reader :task, :task_queue, :namespace, :task_token, :activity_name, :activity_lookup,
:middleware_chain, :metadata, :config, :heartbeat_thread_pool

def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
Expand All @@ -78,6 +79,10 @@ def queue_time_ms
((started - scheduled) * 1_000).round
end

def find_activity_class
activity_lookup.find(activity_name)
end

def respond_completed(result)
Temporal.logger.info("Activity task completed", metadata.to_h)
log_retry = proc do
Expand Down
22 changes: 18 additions & 4 deletions lib/temporal/executable_lookup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def initialize(previous_executable_name)
end
end

ExecutableNotFoundError = Class.new(StandardError)

def initialize
@executables = {}
end
Expand All @@ -27,20 +29,32 @@ def add_dynamic(name, executable)
raise SecondDynamicExecutableError, @fallback_executable_name
end

@fallback_executable = executable
@fallback_executable_class_name = executable.is_a?(String) ? executable : executable.name
@fallback_executable_name = name
end

def add(name, executable)
executables[name] = executable
executables[name] = executable.is_a?(String) ? executable : executable.name
end

def find(name)
executables[name] || @fallback_executable
if executables[name]
resolve_executable(executables[name])
elsif @fallback_executable_class_name
resolve_executable(@fallback_executable_class_name)
else
nil
end
end

private

attr_reader :executables, :fallback_executable, :fallback_executable_name
def resolve_executable(class_name)
Object.const_get(class_name)
rescue NameError
raise Temporal::ExecutableLookup::ExecutableNotFoundError, "Executable #{class_name} not found"
end

attr_reader :executables, :fallback_executable_name, :fallback_executable_class_name
end
end
4 changes: 2 additions & 2 deletions lib/temporal/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ def while_stopping_hook; end
def on_stopped_hook; end

def workflow_poller_for(namespace, task_queue, lookup)
Workflow::Poller.new(namespace, task_queue, lookup.freeze, config, workflow_task_middleware, workflow_middleware,
Workflow::Poller.new(namespace, task_queue, lookup, config, workflow_task_middleware, workflow_middleware,
workflow_poller_options)
end

def activity_poller_for(namespace, task_queue, lookup)
Activity::Poller.new(namespace, task_queue, lookup.freeze, config, activity_middleware, activity_poller_options)
Activity::Poller.new(namespace, task_queue, lookup, config, activity_middleware, activity_poller_options)
end

def executable_registration(executable_class, options)
Expand Down
9 changes: 7 additions & 2 deletions lib/temporal/workflow/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def initialize(task, task_queue, namespace, workflow_lookup, middleware_chain, w
@metadata = Metadata.generate_workflow_task_metadata(task, namespace)
@task_token = task.task_token
@workflow_name = task.workflow_type.name
@workflow_class = workflow_lookup.find(workflow_name)
@workflow_lookup = workflow_lookup
@middleware_chain = middleware_chain
@workflow_middleware_chain = workflow_middleware_chain
@config = config
Expand All @@ -42,6 +42,7 @@ def process
Temporal.logger.debug("Processing Workflow task", metadata.to_h)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, metric_tags)

workflow_class = find_workflow_class
raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker' unless workflow_class

history = fetch_full_history
Expand Down Expand Up @@ -85,7 +86,11 @@ def metric_tags

private

attr_reader :task, :task_queue, :namespace, :task_token, :workflow_name, :workflow_class,
def find_workflow_class
workflow_lookup.find(workflow_name)
end

attr_reader :task, :task_queue, :namespace, :task_token, :workflow_name, :workflow_lookup,
:middleware_chain, :workflow_middleware_chain, :metadata, :config, :binary_checksum

def connection
Expand Down
2 changes: 1 addition & 1 deletion spec/fabricators/grpc/activity_task_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

activity_id { SecureRandom.uuid }
task_token { |attrs| attrs[:task_token] || SecureRandom.uuid }
activity_type { Fabricate(:api_activity_type) }
activity_type { |attrs| Fabricate(:api_activity_type, name: attrs[:activity_name]) }
input { TEST_CONVERTER.to_payloads(nil) }
workflow_type { Fabricate(:api_workflow_type) }
workflow_execution { Fabricate(:api_workflow_execution) }
Expand Down
2 changes: 1 addition & 1 deletion spec/fabricators/grpc/activity_type_fabricator.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Fabricator(:api_activity_type, from: Temporalio::Api::Common::V1::ActivityType) do
name 'TestActivity'
name { |attrs| attrs[:name] || 'TestActivity' }
end
17 changes: 12 additions & 5 deletions spec/unit/lib/temporal/activity/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
require 'temporal/metric_keys'
require 'temporal/middleware/chain'
require 'temporal/scheduled_thread_pool'
require 'temporal/executable_lookup'

describe Temporal::Activity::TaskProcessor do
subject { described_class.new(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) }

let(:namespace) { 'test-namespace' }
let(:task_queue) { 'test-queue' }
let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) }
let(:lookup) { Temporal::ExecutableLookup.new }
let(:task) do
Fabricate(
:api_activity_task,
Expand All @@ -19,7 +20,7 @@
end
let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace, config.converter) }
let(:workflow_name) { task.workflow_type.name }
let(:activity_name) { 'TestActivity' }
let(:activity_name) { 'Temporal::MyActivity' }
let(:connection) { instance_double('Temporal::Connection::GRPC') }
let(:middleware_chain) { Temporal::Middleware::Chain.new }
let(:config) { Temporal::Configuration.new }
Expand Down Expand Up @@ -94,10 +95,16 @@
end

context 'when activity is registered' do
let(:activity_class) { double('Temporal::Activity', execute_in_context: nil) }
let(:activity_class) do
stub_const('Temporal::MyActivity', Class.new do
def self.execute_in_context(context, input)
'result'
end
end)
end

before do
allow(lookup).to receive(:find).with(activity_name).and_return(activity_class)
before "register activity in lookup" do
lookup.add(activity_class.name, activity_class)
end

context 'when activity completes' do
Expand Down
12 changes: 11 additions & 1 deletion spec/unit/lib/temporal/executable_lookup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class IllegalSecondDynamicActivity
it 'adds a class to the lookup map' do
subject.add('foo', TestClass)

expect(subject.send(:executables)).to eq('foo' => TestClass)
expect(subject.send(:executables)).to eq('foo' => "TestClass")
end
end

Expand All @@ -42,6 +42,16 @@ class IllegalSecondDynamicActivity
expect(subject.find('bar')).to eq(nil)
end

it "still returns the class even it was unloaded and has a new object_id" do
original_object_id = stub_const('TestClass', Class.new).object_id
subject.add('foo', TestClass)

expected_class = stub_const('TestClass', Class.new)

expect(expected_class.object_id).not_to eq(original_object_id)
expect(subject.find('foo')).to be(expected_class)
end

it 'falls back to the dynamic executable' do
subject.add('TestClass', TestClass)
subject.add_dynamic('MyDynamicActivity', MyDynamicActivity)
Expand Down