Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make workflow id reuse policy configurable #209

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def start_workflow(workflow, *input, options: {}, **args)
# If unspecified, individual runs should have the full time for the execution (which includes retries).
run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy,
headers: execution_options.headers,
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
Expand All @@ -77,7 +77,7 @@ def start_workflow(workflow, *input, options: {}, **args)
execution_timeout: execution_options.timeouts[:execution],
run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy,
headers: execution_options.headers,
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
Expand Down Expand Up @@ -126,7 +126,7 @@ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args)
# than the execution timeout.
run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy] || execution_options.workflow_id_reuse_policy,
headers: execution_options.headers,
cron_schedule: cron_schedule,
memo: execution_options.memo,
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/concerns/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def headers(*args)
return @headers if args.empty?
@headers = args.first
end

def workflow_id_reuse_policy(*args)
return @workflow_id_reuse_policy if args.empty?
@workflow_id_reuse_policy = args.first
end
end
end
end
11 changes: 8 additions & 3 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, :workflow_id_reuse_policy, keyword_init: true)

attr_reader :timeouts, :error_handlers
attr_accessor :connection_type, :converter, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes
attr_accessor :connection_type, :converter, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :workflow_id_reuse_policy

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -43,6 +43,9 @@ class Configuration
Temporal::Connection::Converter::Payload::JSON.new
]
).freeze
# default workflow id reuse policy is nil for backwards compatibility. in reality, the
# default is :allow, due to that being the temporal server's default
DEFAULT_WORKFLOW_ID_REUSE_POLICY = nil

def initialize
@connection_type = :grpc
Expand All @@ -57,6 +60,7 @@ def initialize
@credentials = :this_channel_is_insecure
@identity = nil
@search_attributes = {}
@workflow_id_reuse_policy = DEFAULT_WORKFLOW_ID_REUSE_POLICY
end

def on_error(&block)
Expand Down Expand Up @@ -91,7 +95,8 @@ def default_execution_options
task_queue: task_list,
timeouts: timeouts,
headers: headers,
search_attributes: search_attributes
search_attributes: search_attributes,
workflow_id_reuse_policy: workflow_id_reuse_policy
).freeze
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ module Temporal
module Connection
module Serializer
class WorkflowIdReusePolicy < Base

WORKFLOW_ID_REUSE_POLICY = {
allow_failed: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
allow: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
Expand All @@ -14,7 +13,7 @@ class WorkflowIdReusePolicy < Base
def to_proto
return unless object

policy = WORKFLOW_ID_REUSE_POLICY[object]
policy = WORKFLOW_ID_REUSE_POLICY[object.to_sym]
raise ArgumentError, "Unknown workflow_id_reuse_policy specified: #{object}" unless policy

policy
Expand Down
5 changes: 4 additions & 1 deletion lib/temporal/execution_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

module Temporal
class ExecutionOptions
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes, :workflow_id_reuse_policy

def initialize(object, options, defaults = nil)
# Options are treated as overrides and take precedence
Expand All @@ -15,6 +15,7 @@ def initialize(object, options, defaults = nil)
@headers = options[:headers] || {}
@memo = options[:memo] || {}
@search_attributes = options[:search_attributes] || {}
@workflow_id_reuse_policy = options[:workflow_id_reuse_policy]

# For Temporal::Workflow and Temporal::Activity use defined values as the next option
if has_executable_concern?(object)
Expand All @@ -23,6 +24,7 @@ def initialize(object, options, defaults = nil)
@retry_policy = object.retry_policy.merge(@retry_policy) if object.retry_policy
@timeouts = object.timeouts.merge(@timeouts) if object.timeouts
@headers = object.headers.merge(@headers) if object.headers
@workflow_id_reuse_policy ||= object.workflow_id_reuse_policy
end

# Lastly consider defaults if they are given
Expand All @@ -32,6 +34,7 @@ def initialize(object, options, defaults = nil)
@timeouts = defaults.timeouts.merge(@timeouts)
@headers = defaults.headers.merge(@headers)
@search_attributes = defaults.search_attributes.merge(@search_attributes)
@workflow_id_reuse_policy ||= defaults.workflow_id_reuse_policy
end

if @retry_policy.empty?
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def execute_workflow(workflow_class, *input, **args)
headers: execution_options.headers,
cron_schedule: cron_schedule,
memo: execution_options.memo,
workflow_id_reuse_policy: workflow_id_reuse_policy,
workflow_id_reuse_policy: workflow_id_reuse_policy || execution_options.workflow_id_reuse_policy,
search_attributes: Helpers.process_search_attributes(execution_options.search_attributes),
)

Expand Down
31 changes: 20 additions & 11 deletions spec/unit/lib/temporal/execution_options_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
let(:options) do
{ name: 'OtherTestWorkflow', namespace: 'test-namespace', task_queue: 'test-task-queue' }
end

it 'is initialized with name from options' do
expect(subject.name).to eq(options[:name])
expect(subject.namespace).to eq(options[:namespace])
Expand All @@ -77,9 +77,10 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
timeouts: { schedule_to_close: 42 },
headers: { 'DefaultHeader' => 'Default' },
search_attributes: { 'DefaultIntSearchAttribute' => 256 },
workflow_id_reuse_policy: :reject
)
end

it 'is initialized with a mix of options and defaults' do
expect(subject.name).to eq(object)
expect(subject.namespace).to eq(options[:namespace])
Expand All @@ -88,6 +89,7 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
expect(subject.timeouts).to eq(schedule_to_close: 42, start_to_close: 10)
expect(subject.headers).to eq('DefaultHeader' => 'Default', 'TestHeader' => 'Test')
expect(subject.search_attributes).to eq('DefaultIntSearchAttribute' => 256, 'DoubleSearchAttribute' => 3.14)
expect(subject.workflow_id_reuse_policy).to eq(:reject)
end
end

Expand All @@ -99,10 +101,11 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
task_queue: 'test-task-queue',
retry_policy: { interval: 1, backoff: 2, max_attempts: 5 },
timeouts: { start_to_close: 10 },
headers: { 'TestHeader' => 'Test' }
headers: { 'TestHeader' => 'Test' },
workflow_id_reuse_policy: :reject
}
end

it 'is initialized with full options' do
expect(subject.name).to eq(options[:name])
expect(subject.namespace).to eq(options[:namespace])
Expand All @@ -113,12 +116,13 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
expect(subject.retry_policy.max_attempts).to eq(options[:retry_policy][:max_attempts])
expect(subject.timeouts).to eq(options[:timeouts])
expect(subject.headers).to eq(options[:headers])
expect(subject.workflow_id_reuse_policy).to eq(options[:workflow_id_reuse_policy])
end
end

context 'when retry policy options are invalid' do
let(:options) { { retry_policy: { max_attempts: 10 } } }

it 'raises' do
expect { subject }.to raise_error(
Temporal::RetryPolicy::InvalidRetryPolicy,
Expand All @@ -128,14 +132,14 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
end
end


context 'when initialized with an Executable' do
class TestWorkflow < Temporal::Workflow
namespace 'namespace'
task_queue 'task-queue'
retry_policy interval: 1, backoff: 2, max_attempts: 5
timeouts start_to_close: 10
headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB'
workflow_id_reuse_policy :reject
end

let(:object) { TestWorkflow }
Expand All @@ -151,6 +155,7 @@ class TestWorkflow < Temporal::Workflow
expect(subject.retry_policy.max_attempts).to eq(5)
expect(subject.timeouts).to eq(start_to_close: 10)
expect(subject.headers).to eq('HeaderA' => 'TestA', 'HeaderB' => 'TestB')
expect(subject.workflow_id_reuse_policy).to eq(:reject)
end

context 'when options are present' do
Expand All @@ -160,10 +165,11 @@ class TestWorkflow < Temporal::Workflow
task_queue: 'test-task-queue',
retry_policy: { interval: 2, max_attempts: 10 },
timeouts: { schedule_to_close: 20 },
headers: { 'TestHeader' => 'Value', 'HeaderB' => 'ValueB' }
headers: { 'TestHeader' => 'Value', 'HeaderB' => 'ValueB' },
workflow_id_reuse_policy: :allow_failed
}
end

it 'is initialized with a mix of options and executable values' do
expect(subject.name).to eq(options[:name])
expect(subject.namespace).to eq('namespace')
Expand All @@ -178,6 +184,7 @@ class TestWorkflow < Temporal::Workflow
'HeaderA' => 'TestA',
'HeaderB' => 'ValueB' # overriden by options
)
expect(subject.workflow_id_reuse_policy).to eq(:allow_failed)
end
end

Expand All @@ -196,9 +203,10 @@ class TestWorkflow < Temporal::Workflow
timeouts: { schedule_to_close: 42 },
headers: { 'DefaultHeader' => 'Default', 'HeaderA' => 'DefaultA' },
search_attributes: {},
workflow_id_reuse_policy: :allow_failed
)
end

it 'is initialized with a mix of executable values, options and defaults' do
expect(subject.name).to eq(object.name)
expect(subject.namespace).to eq(options[:namespace])
Expand All @@ -214,12 +222,13 @@ class TestWorkflow < Temporal::Workflow
'HeaderB' => 'TestB', # not overriden by defaults
'DefaultHeader' => 'Default'
)
expect(subject.workflow_id_reuse_policy).to eq(:reject)
end
end

context 'when retry policy options are invalid' do
let(:options) { { retry_policy: { interval: 1.5 } } }

it 'raises' do
expect { subject }.to raise_error(
Temporal::RetryPolicy::InvalidRetryPolicy,
Expand Down