From 4b6bfd2d2c236f8f69a7b8b1a623dd8c77482100 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Thu, 11 Jul 2024 12:37:02 -0400 Subject: [PATCH 1/3] Replicate official SDK to fetch results from Temporal --- lib/temporal/client.rb | 38 +++++++++++++++------------ spec/unit/lib/temporal/client_spec.rb | 25 ++++++++++++++++++ 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index e738e2b6..9af51145 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -229,25 +229,29 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) max_timeout = Temporal::Connection::GRPC::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL history_response = nil - begin - history_response = connection.get_workflow_execution_history( - namespace: execution_options.namespace, - workflow_id: workflow_id, - run_id: run_id, - wait_for_new_event: true, - event_type: :close, - timeout: timeout || max_timeout, - ) - rescue GRPC::DeadlineExceeded => e - message = if timeout - "Timed out after your specified limit of timeout: #{timeout} seconds" - else - "Timed out after #{max_timeout} seconds, which is the maximum supported amount." + while(true) do + begin + history_response = connection.get_workflow_execution_history( + namespace: execution_options.namespace, + workflow_id: workflow_id, + run_id: run_id, + wait_for_new_event: true, + event_type: :close, + timeout: timeout || max_timeout, + ) + rescue GRPC::DeadlineExceeded => e + message = if timeout + "Timed out after your specified limit of timeout: #{timeout} seconds" + else + "Timed out after #{max_timeout} seconds, which is the maximum supported amount." + end + raise TimeoutError.new(message) end - raise TimeoutError.new(message) + history = Workflow::History.new(history_response.history.events) + closed_event = history.events.first + + break unless closed_event.nil? end - history = Workflow::History.new(history_response.history.events) - closed_event = history.events.first case closed_event.type when 'WORKFLOW_EXECUTION_COMPLETED' payloads = closed_event.attributes.result diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index 1dd4995d..1d1965a6 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -449,6 +449,31 @@ class NamespacedWorkflow < Temporal::Workflow ) end + it 'retries if there is till there is no closed event' do + completed_event = Fabricate(:workflow_completed_event, result: nil) + response_with_no_closed_event = Fabricate(:workflow_execution_history, events: []) + response_with_closed_event = Fabricate(:workflow_execution_history, events: [completed_event]) + + expect(connection) + .to receive(:get_workflow_execution_history) + .with( + namespace: 'some-namespace', + workflow_id: workflow_id, + run_id: run_id, + wait_for_new_event: true, + event_type: :close, + timeout: 30, + ) + .and_return(response_with_no_closed_event, response_with_closed_event) + + + subject.await_workflow_result( + NamespacedWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + end + it 'can override the namespace' do completed_event = Fabricate(:workflow_completed_event, result: nil) response = Fabricate(:workflow_execution_history, events: [completed_event]) From 606d664e043b48d3859951d71d9853b3b3ae1dae Mon Sep 17 00:00:00 2001 From: Abhishek Date: Thu, 11 Jul 2024 16:37:31 -0400 Subject: [PATCH 2/3] Update according to code review comments --- lib/temporal/client.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 9af51145..0653ea14 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -229,7 +229,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) max_timeout = Temporal::Connection::GRPC::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL history_response = nil - while(true) do + loop do begin history_response = connection.get_workflow_execution_history( namespace: execution_options.namespace, @@ -250,7 +250,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam history = Workflow::History.new(history_response.history.events) closed_event = history.events.first - break unless closed_event.nil? + break if closed_event end case closed_event.type when 'WORKFLOW_EXECUTION_COMPLETED' From 0d9356f94e01b163f12adf3ab8171d2fe1f19906 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Mon, 22 Jul 2024 12:59:32 -0400 Subject: [PATCH 3/3] Update --- lib/temporal/client.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 0653ea14..9927c320 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -229,6 +229,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) max_timeout = Temporal::Connection::GRPC::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL history_response = nil + closed_event = nil loop do begin history_response = connection.get_workflow_execution_history(