diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 912a63d0..c9dea1ce 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -60,7 +60,7 @@ def start_workflow(workflow, *input, options: {}, **args) # If unspecified, individual runs should have the full time for the execution (which includes retries). run_timeout: compute_run_timeout(execution_options), task_timeout: execution_options.timeouts[:task], - workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy, headers: execution_options.headers, memo: execution_options.memo, search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes), @@ -77,7 +77,7 @@ def start_workflow(workflow, *input, options: {}, **args) execution_timeout: execution_options.timeouts[:execution], run_timeout: compute_run_timeout(execution_options), task_timeout: execution_options.timeouts[:task], - workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy, headers: execution_options.headers, memo: execution_options.memo, search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes), @@ -126,7 +126,7 @@ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args) # than the execution timeout. run_timeout: compute_run_timeout(execution_options), task_timeout: execution_options.timeouts[:task], - workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy, headers: execution_options.headers, cron_schedule: cron_schedule, memo: execution_options.memo, diff --git a/lib/temporal/concerns/executable.rb b/lib/temporal/concerns/executable.rb index 17e15362..5de03981 100644 --- a/lib/temporal/concerns/executable.rb +++ b/lib/temporal/concerns/executable.rb @@ -29,6 +29,11 @@ def headers(*args) return @headers if args.empty? @headers = args.first end + + def workflow_id_reuse_policy(*args) + return @workflow_id_reuse_policy if args.empty? + @workflow_id_reuse_policy = args.first + end end end end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index b397d03c..3d9b993e 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -9,10 +9,10 @@ module Temporal class Configuration Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true) - Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true) + Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, :workflow_id_reuse_policy, keyword_init: true) attr_reader :timeouts, :error_handlers - attr_accessor :connection_type, :converter, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes + attr_accessor :connection_type, :converter, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :workflow_id_reuse_policy # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -43,6 +43,9 @@ class Configuration Temporal::Connection::Converter::Payload::JSON.new ] ).freeze + # default workflow id reuse policy is nil for backwards compatibility. in reality, the + # default is :allow, due to that being the temporal server's default + DEFAULT_WORKFLOW_ID_REUSE_POLICY = nil def initialize @connection_type = :grpc @@ -57,6 +60,7 @@ def initialize @credentials = :this_channel_is_insecure @identity = nil @search_attributes = {} + @workflow_id_reuse_policy = DEFAULT_WORKFLOW_ID_REUSE_POLICY end def on_error(&block) @@ -91,7 +95,8 @@ def default_execution_options task_queue: task_list, timeouts: timeouts, headers: headers, - search_attributes: search_attributes + search_attributes: search_attributes, + workflow_id_reuse_policy: workflow_id_reuse_policy ).freeze end diff --git a/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb b/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb index b3040197..8d1a6233 100644 --- a/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb +++ b/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb @@ -4,7 +4,6 @@ module Temporal module Connection module Serializer class WorkflowIdReusePolicy < Base - WORKFLOW_ID_REUSE_POLICY = { allow_failed: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, allow: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, @@ -14,7 +13,7 @@ class WorkflowIdReusePolicy < Base def to_proto return unless object - policy = WORKFLOW_ID_REUSE_POLICY[object] + policy = WORKFLOW_ID_REUSE_POLICY[object.to_sym] raise ArgumentError, "Unknown workflow_id_reuse_policy specified: #{object}" unless policy policy diff --git a/lib/temporal/execution_options.rb b/lib/temporal/execution_options.rb index 681cac83..13935586 100644 --- a/lib/temporal/execution_options.rb +++ b/lib/temporal/execution_options.rb @@ -3,7 +3,7 @@ module Temporal class ExecutionOptions - attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes + attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes, :workflow_id_reuse_policy def initialize(object, options, defaults = nil) # Options are treated as overrides and take precedence @@ -15,6 +15,7 @@ def initialize(object, options, defaults = nil) @headers = options[:headers] || {} @memo = options[:memo] || {} @search_attributes = options[:search_attributes] || {} + @workflow_id_reuse_policy = options[:workflow_id_reuse_policy] # For Temporal::Workflow and Temporal::Activity use defined values as the next option if has_executable_concern?(object) @@ -23,6 +24,7 @@ def initialize(object, options, defaults = nil) @retry_policy = object.retry_policy.merge(@retry_policy) if object.retry_policy @timeouts = object.timeouts.merge(@timeouts) if object.timeouts @headers = object.headers.merge(@headers) if object.headers + @workflow_id_reuse_policy ||= object.workflow_id_reuse_policy end # Lastly consider defaults if they are given @@ -32,6 +34,7 @@ def initialize(object, options, defaults = nil) @timeouts = defaults.timeouts.merge(@timeouts) @headers = defaults.headers.merge(@headers) @search_attributes = defaults.search_attributes.merge(@search_attributes) + @workflow_id_reuse_policy ||= defaults.workflow_id_reuse_policy end if @retry_policy.empty? diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index bbd5cb0b..3664f2fb 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -133,7 +133,7 @@ def execute_workflow(workflow_class, *input, **args) headers: execution_options.headers, cron_schedule: cron_schedule, memo: execution_options.memo, - workflow_id_reuse_policy: workflow_id_reuse_policy, + workflow_id_reuse_policy: workflow_id_reuse_policy || execution_options.workflow_id_reuse_policy, search_attributes: Helpers.process_search_attributes(execution_options.search_attributes), ) diff --git a/spec/unit/lib/temporal/execution_options_spec.rb b/spec/unit/lib/temporal/execution_options_spec.rb index 98fbe380..61d43218 100644 --- a/spec/unit/lib/temporal/execution_options_spec.rb +++ b/spec/unit/lib/temporal/execution_options_spec.rb @@ -50,7 +50,7 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow let(:options) do { name: 'OtherTestWorkflow', namespace: 'test-namespace', task_queue: 'test-task-queue' } end - + it 'is initialized with name from options' do expect(subject.name).to eq(options[:name]) expect(subject.namespace).to eq(options[:namespace]) @@ -77,9 +77,10 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow timeouts: { schedule_to_close: 42 }, headers: { 'DefaultHeader' => 'Default' }, search_attributes: { 'DefaultIntSearchAttribute' => 256 }, + workflow_id_reuse_policy: :reject ) end - + it 'is initialized with a mix of options and defaults' do expect(subject.name).to eq(object) expect(subject.namespace).to eq(options[:namespace]) @@ -88,6 +89,7 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow expect(subject.timeouts).to eq(schedule_to_close: 42, start_to_close: 10) expect(subject.headers).to eq('DefaultHeader' => 'Default', 'TestHeader' => 'Test') expect(subject.search_attributes).to eq('DefaultIntSearchAttribute' => 256, 'DoubleSearchAttribute' => 3.14) + expect(subject.workflow_id_reuse_policy).to eq(:reject) end end @@ -99,10 +101,11 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow task_queue: 'test-task-queue', retry_policy: { interval: 1, backoff: 2, max_attempts: 5 }, timeouts: { start_to_close: 10 }, - headers: { 'TestHeader' => 'Test' } + headers: { 'TestHeader' => 'Test' }, + workflow_id_reuse_policy: :reject } end - + it 'is initialized with full options' do expect(subject.name).to eq(options[:name]) expect(subject.namespace).to eq(options[:namespace]) @@ -113,12 +116,13 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow expect(subject.retry_policy.max_attempts).to eq(options[:retry_policy][:max_attempts]) expect(subject.timeouts).to eq(options[:timeouts]) expect(subject.headers).to eq(options[:headers]) + expect(subject.workflow_id_reuse_policy).to eq(options[:workflow_id_reuse_policy]) end end - + context 'when retry policy options are invalid' do let(:options) { { retry_policy: { max_attempts: 10 } } } - + it 'raises' do expect { subject }.to raise_error( Temporal::RetryPolicy::InvalidRetryPolicy, @@ -128,7 +132,6 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow end end - context 'when initialized with an Executable' do class TestWorkflow < Temporal::Workflow namespace 'namespace' @@ -136,6 +139,7 @@ class TestWorkflow < Temporal::Workflow retry_policy interval: 1, backoff: 2, max_attempts: 5 timeouts start_to_close: 10 headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' + workflow_id_reuse_policy :reject end let(:object) { TestWorkflow } @@ -151,6 +155,7 @@ class TestWorkflow < Temporal::Workflow expect(subject.retry_policy.max_attempts).to eq(5) expect(subject.timeouts).to eq(start_to_close: 10) expect(subject.headers).to eq('HeaderA' => 'TestA', 'HeaderB' => 'TestB') + expect(subject.workflow_id_reuse_policy).to eq(:reject) end context 'when options are present' do @@ -160,10 +165,11 @@ class TestWorkflow < Temporal::Workflow task_queue: 'test-task-queue', retry_policy: { interval: 2, max_attempts: 10 }, timeouts: { schedule_to_close: 20 }, - headers: { 'TestHeader' => 'Value', 'HeaderB' => 'ValueB' } + headers: { 'TestHeader' => 'Value', 'HeaderB' => 'ValueB' }, + workflow_id_reuse_policy: :allow_failed } end - + it 'is initialized with a mix of options and executable values' do expect(subject.name).to eq(options[:name]) expect(subject.namespace).to eq('namespace') @@ -178,6 +184,7 @@ class TestWorkflow < Temporal::Workflow 'HeaderA' => 'TestA', 'HeaderB' => 'ValueB' # overriden by options ) + expect(subject.workflow_id_reuse_policy).to eq(:allow_failed) end end @@ -196,9 +203,10 @@ class TestWorkflow < Temporal::Workflow timeouts: { schedule_to_close: 42 }, headers: { 'DefaultHeader' => 'Default', 'HeaderA' => 'DefaultA' }, search_attributes: {}, + workflow_id_reuse_policy: :allow_failed ) end - + it 'is initialized with a mix of executable values, options and defaults' do expect(subject.name).to eq(object.name) expect(subject.namespace).to eq(options[:namespace]) @@ -214,12 +222,13 @@ class TestWorkflow < Temporal::Workflow 'HeaderB' => 'TestB', # not overriden by defaults 'DefaultHeader' => 'Default' ) + expect(subject.workflow_id_reuse_policy).to eq(:reject) end end context 'when retry policy options are invalid' do let(:options) { { retry_policy: { interval: 1.5 } } } - + it 'raises' do expect { subject }.to raise_error( Temporal::RetryPolicy::InvalidRetryPolicy,