From 935d6a98d7614110943ce4563f42dc479e28fcc7 Mon Sep 17 00:00:00 2001 From: Grant Shively Date: Thu, 22 Dec 2022 15:10:10 -0700 Subject: [PATCH] Make workflow id reuse policy configurable - A default workflow_id_reuse_policy can now be set in configuration. It can also be set directly on workflows via the executable concern. Previously, the only way to set it was to provide it as an option to the *_workflow methods directly (this is also still supported). --- lib/temporal/client.rb | 6 ++-- lib/temporal/concerns/executable.rb | 5 +++ lib/temporal/configuration.rb | 11 +++++-- .../serializer/workflow_id_reuse_policy.rb | 3 +- lib/temporal/execution_options.rb | 5 ++- lib/temporal/workflow/context.rb | 2 +- .../lib/temporal/execution_options_spec.rb | 31 ++++++++++++------- 7 files changed, 42 insertions(+), 21 deletions(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 912a63d0..c9dea1ce 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -60,7 +60,7 @@ def start_workflow(workflow, *input, options: {}, **args) # If unspecified, individual runs should have the full time for the execution (which includes retries). run_timeout: compute_run_timeout(execution_options), task_timeout: execution_options.timeouts[:task], - workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy, headers: execution_options.headers, memo: execution_options.memo, search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes), @@ -77,7 +77,7 @@ def start_workflow(workflow, *input, options: {}, **args) execution_timeout: execution_options.timeouts[:execution], run_timeout: compute_run_timeout(execution_options), task_timeout: execution_options.timeouts[:task], - workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy, headers: execution_options.headers, memo: execution_options.memo, search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes), @@ -126,7 +126,7 @@ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args) # than the execution timeout. run_timeout: compute_run_timeout(execution_options), task_timeout: execution_options.timeouts[:task], - workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy, headers: execution_options.headers, cron_schedule: cron_schedule, memo: execution_options.memo, diff --git a/lib/temporal/concerns/executable.rb b/lib/temporal/concerns/executable.rb index 17e15362..5de03981 100644 --- a/lib/temporal/concerns/executable.rb +++ b/lib/temporal/concerns/executable.rb @@ -29,6 +29,11 @@ def headers(*args) return @headers if args.empty? @headers = args.first end + + def workflow_id_reuse_policy(*args) + return @workflow_id_reuse_policy if args.empty? + @workflow_id_reuse_policy = args.first + end end end end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index b397d03c..3d9b993e 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -9,10 +9,10 @@ module Temporal class Configuration Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true) - Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true) + Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, :workflow_id_reuse_policy, keyword_init: true) attr_reader :timeouts, :error_handlers - attr_accessor :connection_type, :converter, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes + attr_accessor :connection_type, :converter, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :workflow_id_reuse_policy # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -43,6 +43,9 @@ class Configuration Temporal::Connection::Converter::Payload::JSON.new ] ).freeze + # default workflow id reuse policy is nil for backwards compatibility. in reality, the + # default is :allow, due to that being the temporal server's default + DEFAULT_WORKFLOW_ID_REUSE_POLICY = nil def initialize @connection_type = :grpc @@ -57,6 +60,7 @@ def initialize @credentials = :this_channel_is_insecure @identity = nil @search_attributes = {} + @workflow_id_reuse_policy = DEFAULT_WORKFLOW_ID_REUSE_POLICY end def on_error(&block) @@ -91,7 +95,8 @@ def default_execution_options task_queue: task_list, timeouts: timeouts, headers: headers, - search_attributes: search_attributes + search_attributes: search_attributes, + workflow_id_reuse_policy: workflow_id_reuse_policy ).freeze end diff --git a/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb b/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb index b3040197..8d1a6233 100644 --- a/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb +++ b/lib/temporal/connection/serializer/workflow_id_reuse_policy.rb @@ -4,7 +4,6 @@ module Temporal module Connection module Serializer class WorkflowIdReusePolicy < Base - WORKFLOW_ID_REUSE_POLICY = { allow_failed: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, allow: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, @@ -14,7 +13,7 @@ class WorkflowIdReusePolicy < Base def to_proto return unless object - policy = WORKFLOW_ID_REUSE_POLICY[object] + policy = WORKFLOW_ID_REUSE_POLICY[object.to_sym] raise ArgumentError, "Unknown workflow_id_reuse_policy specified: #{object}" unless policy policy diff --git a/lib/temporal/execution_options.rb b/lib/temporal/execution_options.rb index 681cac83..13935586 100644 --- a/lib/temporal/execution_options.rb +++ b/lib/temporal/execution_options.rb @@ -3,7 +3,7 @@ module Temporal class ExecutionOptions - attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes + attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes, :workflow_id_reuse_policy def initialize(object, options, defaults = nil) # Options are treated as overrides and take precedence @@ -15,6 +15,7 @@ def initialize(object, options, defaults = nil) @headers = options[:headers] || {} @memo = options[:memo] || {} @search_attributes = options[:search_attributes] || {} + @workflow_id_reuse_policy = options[:workflow_id_reuse_policy] # For Temporal::Workflow and Temporal::Activity use defined values as the next option if has_executable_concern?(object) @@ -23,6 +24,7 @@ def initialize(object, options, defaults = nil) @retry_policy = object.retry_policy.merge(@retry_policy) if object.retry_policy @timeouts = object.timeouts.merge(@timeouts) if object.timeouts @headers = object.headers.merge(@headers) if object.headers + @workflow_id_reuse_policy ||= object.workflow_id_reuse_policy end # Lastly consider defaults if they are given @@ -32,6 +34,7 @@ def initialize(object, options, defaults = nil) @timeouts = defaults.timeouts.merge(@timeouts) @headers = defaults.headers.merge(@headers) @search_attributes = defaults.search_attributes.merge(@search_attributes) + @workflow_id_reuse_policy ||= defaults.workflow_id_reuse_policy end if @retry_policy.empty? diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index bbd5cb0b..3664f2fb 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -133,7 +133,7 @@ def execute_workflow(workflow_class, *input, **args) headers: execution_options.headers, cron_schedule: cron_schedule, memo: execution_options.memo, - workflow_id_reuse_policy: workflow_id_reuse_policy, + workflow_id_reuse_policy: workflow_id_reuse_policy || execution_options.workflow_id_reuse_policy, search_attributes: Helpers.process_search_attributes(execution_options.search_attributes), ) diff --git a/spec/unit/lib/temporal/execution_options_spec.rb b/spec/unit/lib/temporal/execution_options_spec.rb index 98fbe380..61d43218 100644 --- a/spec/unit/lib/temporal/execution_options_spec.rb +++ b/spec/unit/lib/temporal/execution_options_spec.rb @@ -50,7 +50,7 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow let(:options) do { name: 'OtherTestWorkflow', namespace: 'test-namespace', task_queue: 'test-task-queue' } end - + it 'is initialized with name from options' do expect(subject.name).to eq(options[:name]) expect(subject.namespace).to eq(options[:namespace]) @@ -77,9 +77,10 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow timeouts: { schedule_to_close: 42 }, headers: { 'DefaultHeader' => 'Default' }, search_attributes: { 'DefaultIntSearchAttribute' => 256 }, + workflow_id_reuse_policy: :reject ) end - + it 'is initialized with a mix of options and defaults' do expect(subject.name).to eq(object) expect(subject.namespace).to eq(options[:namespace]) @@ -88,6 +89,7 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow expect(subject.timeouts).to eq(schedule_to_close: 42, start_to_close: 10) expect(subject.headers).to eq('DefaultHeader' => 'Default', 'TestHeader' => 'Test') expect(subject.search_attributes).to eq('DefaultIntSearchAttribute' => 256, 'DoubleSearchAttribute' => 3.14) + expect(subject.workflow_id_reuse_policy).to eq(:reject) end end @@ -99,10 +101,11 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow task_queue: 'test-task-queue', retry_policy: { interval: 1, backoff: 2, max_attempts: 5 }, timeouts: { start_to_close: 10 }, - headers: { 'TestHeader' => 'Test' } + headers: { 'TestHeader' => 'Test' }, + workflow_id_reuse_policy: :reject } end - + it 'is initialized with full options' do expect(subject.name).to eq(options[:name]) expect(subject.namespace).to eq(options[:namespace]) @@ -113,12 +116,13 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow expect(subject.retry_policy.max_attempts).to eq(options[:retry_policy][:max_attempts]) expect(subject.timeouts).to eq(options[:timeouts]) expect(subject.headers).to eq(options[:headers]) + expect(subject.workflow_id_reuse_policy).to eq(options[:workflow_id_reuse_policy]) end end - + context 'when retry policy options are invalid' do let(:options) { { retry_policy: { max_attempts: 10 } } } - + it 'raises' do expect { subject }.to raise_error( Temporal::RetryPolicy::InvalidRetryPolicy, @@ -128,7 +132,6 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow end end - context 'when initialized with an Executable' do class TestWorkflow < Temporal::Workflow namespace 'namespace' @@ -136,6 +139,7 @@ class TestWorkflow < Temporal::Workflow retry_policy interval: 1, backoff: 2, max_attempts: 5 timeouts start_to_close: 10 headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' + workflow_id_reuse_policy :reject end let(:object) { TestWorkflow } @@ -151,6 +155,7 @@ class TestWorkflow < Temporal::Workflow expect(subject.retry_policy.max_attempts).to eq(5) expect(subject.timeouts).to eq(start_to_close: 10) expect(subject.headers).to eq('HeaderA' => 'TestA', 'HeaderB' => 'TestB') + expect(subject.workflow_id_reuse_policy).to eq(:reject) end context 'when options are present' do @@ -160,10 +165,11 @@ class TestWorkflow < Temporal::Workflow task_queue: 'test-task-queue', retry_policy: { interval: 2, max_attempts: 10 }, timeouts: { schedule_to_close: 20 }, - headers: { 'TestHeader' => 'Value', 'HeaderB' => 'ValueB' } + headers: { 'TestHeader' => 'Value', 'HeaderB' => 'ValueB' }, + workflow_id_reuse_policy: :allow_failed } end - + it 'is initialized with a mix of options and executable values' do expect(subject.name).to eq(options[:name]) expect(subject.namespace).to eq('namespace') @@ -178,6 +184,7 @@ class TestWorkflow < Temporal::Workflow 'HeaderA' => 'TestA', 'HeaderB' => 'ValueB' # overriden by options ) + expect(subject.workflow_id_reuse_policy).to eq(:allow_failed) end end @@ -196,9 +203,10 @@ class TestWorkflow < Temporal::Workflow timeouts: { schedule_to_close: 42 }, headers: { 'DefaultHeader' => 'Default', 'HeaderA' => 'DefaultA' }, search_attributes: {}, + workflow_id_reuse_policy: :allow_failed ) end - + it 'is initialized with a mix of executable values, options and defaults' do expect(subject.name).to eq(object.name) expect(subject.namespace).to eq(options[:namespace]) @@ -214,12 +222,13 @@ class TestWorkflow < Temporal::Workflow 'HeaderB' => 'TestB', # not overriden by defaults 'DefaultHeader' => 'Default' ) + expect(subject.workflow_id_reuse_policy).to eq(:reject) end end context 'when retry policy options are invalid' do let(:options) { { retry_policy: { interval: 1.5 } } } - + it 'raises' do expect { subject }.to raise_error( Temporal::RetryPolicy::InvalidRetryPolicy,