From e8b73289eafb5dd34d89702578eda4fc6254f7f5 Mon Sep 17 00:00:00 2001 From: Christopher Brown Date: Fri, 10 Jun 2022 17:46:30 -0700 Subject: [PATCH 1/4] add configuration option for gRPC interceptors --- examples/spec/integration/interceptor_spec.rb | 40 +++++++++++++++++++ lib/temporal/configuration.rb | 8 ++-- lib/temporal/connection.rb | 3 +- lib/temporal/connection/grpc.rb | 8 ++-- 4 files changed, 52 insertions(+), 7 deletions(-) create mode 100644 examples/spec/integration/interceptor_spec.rb diff --git a/examples/spec/integration/interceptor_spec.rb b/examples/spec/integration/interceptor_spec.rb new file mode 100644 index 00000000..60dc416d --- /dev/null +++ b/examples/spec/integration/interceptor_spec.rb @@ -0,0 +1,40 @@ +require 'workflows/hello_world_workflow' + +describe 'GRPC interceptors', :integration do + class ExampleInterceptor < GRPC::ClientInterceptor + attr_reader :called_methods + + def initialize + @called_methods = [] + end + + def request_response(request: nil, call: nil, method: nil, metadata: nil) + @called_methods << method + yield + end + end + + let(:interceptor) { ExampleInterceptor.new } + + around(:each) do |example| + Temporal.configure do |config| + config.interceptors = [interceptor] + end + + example.run + ensure + Temporal.configure do |config| + config.interceptors = [] + end + end + + it 'calls the given interceptors when performing operations' do + workflow_id, run_id = run_workflow(HelloWorldWorkflow, 'Tom') + wait_for_workflow_completion(workflow_id, run_id) + + expect(interceptor.called_methods).to match_array([ + "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution", + "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory", + ]) + end +end \ No newline at end of file diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index d2edaead..1628cfa6 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -7,12 +7,12 @@ module Temporal class Configuration - Connection = Struct.new(:type, :host, :port, :credentials, keyword_init: true) + Connection = Struct.new(:type, :host, :port, :credentials, :interceptors, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true) attr_reader :timeouts, :error_handlers attr_writer :converter - attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers + attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :interceptors # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -54,6 +54,7 @@ def initialize @converter = DEFAULT_CONVERTER @error_handlers = [] @credentials = :this_channel_is_insecure + @interceptors = [] end def on_error(&block) @@ -81,7 +82,8 @@ def for_connection type: connection_type, host: host, port: port, - credentials: credentials + credentials: credentials, + interceptors: interceptors ).freeze end diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb index 791534bf..fba67376 100644 --- a/lib/temporal/connection.rb +++ b/lib/temporal/connection.rb @@ -11,12 +11,13 @@ def self.generate(configuration) host = configuration.host port = configuration.port credentials = configuration.credentials + interceptors = configuration.interceptors hostname = `hostname` thread_id = Thread.current.object_id identity = "#{thread_id}@#{hostname}" - connection_class.new(host, port, identity, credentials) + connection_class.new(host, port, identity, credentials, interceptors) end end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 9b0d9699..1e30150b 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -31,10 +31,11 @@ class GRPC max_page_size: 100 }.freeze - def initialize(host, port, identity, credentials, options = {}) + def initialize(host, port, identity, credentials, interceptors, options = {}) @url = "#{host}:#{port}" @identity = identity @credentials = credentials + @interceptors = interceptors @poll = true @poll_mutex = Mutex.new @poll_request = nil @@ -537,13 +538,14 @@ def cancel_polling_request private - attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request + attr_reader :url, :identity, :credentials, :interceptors, :options, :poll_mutex, :poll_request def client @client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new( url, credentials, - timeout: 60 + timeout: 60, + interceptors: interceptors ) end From b24bc6d63dc7697bae775b0a1c5e0b48cea10fd3 Mon Sep 17 00:00:00 2001 From: Christopher Brown Date: Fri, 10 Jun 2022 17:47:34 -0700 Subject: [PATCH 2/4] add missing trailing newline --- examples/spec/integration/interceptor_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/spec/integration/interceptor_spec.rb b/examples/spec/integration/interceptor_spec.rb index 60dc416d..77890e6a 100644 --- a/examples/spec/integration/interceptor_spec.rb +++ b/examples/spec/integration/interceptor_spec.rb @@ -37,4 +37,4 @@ def request_response(request: nil, call: nil, method: nil, metadata: nil) "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory", ]) end -end \ No newline at end of file +end From 103213b4f90702ffbe10f1a8d389f5c3fac6e34d Mon Sep 17 00:00:00 2001 From: Christopher Brown Date: Fri, 24 Jun 2022 19:42:28 -0700 Subject: [PATCH 3/4] use new Temporal client in interceptor test to avoid test pollution --- examples/spec/integration/interceptor_spec.rb | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/examples/spec/integration/interceptor_spec.rb b/examples/spec/integration/interceptor_spec.rb index 77890e6a..73df49d1 100644 --- a/examples/spec/integration/interceptor_spec.rb +++ b/examples/spec/integration/interceptor_spec.rb @@ -14,23 +14,37 @@ def request_response(request: nil, call: nil, method: nil, metadata: nil) end end - let(:interceptor) { ExampleInterceptor.new } + def run_workflow_with_client(client, workflow, *input, **args) + args[:options] = { workflow_id: SecureRandom.uuid }.merge(args[:options] || {}) + run_id = client.start_workflow(workflow, *input, **args) - around(:each) do |example| - Temporal.configure do |config| - config.interceptors = [interceptor] - end + [args[:options][:workflow_id], run_id] + end - example.run - ensure - Temporal.configure do |config| - config.interceptors = [] + let(:interceptor) { ExampleInterceptor.new } + let(:config) do + # We can't depend on test order here and the memoized + # Temporal.default_client will not include our interceptors. Therefore we + # build a new config and client based on the one used in the other tests. + common_config = Temporal.configuration + Temporal::Configuration.new.tap do |config| + config.host = common_config.host + config.port = common_config.port + config.namespace = common_config.namespace + config.task_queue = common_config.task_queue + config.metrics_adapter = common_config.metrics_adapter + config.interceptors = [interceptor] end end + let(:client) { Temporal::Client.new(config) } it 'calls the given interceptors when performing operations' do - workflow_id, run_id = run_workflow(HelloWorldWorkflow, 'Tom') - wait_for_workflow_completion(workflow_id, run_id) + workflow_id, run_id = run_workflow_with_client(client, HelloWorldWorkflow, 'Tom') + client.await_workflow_result( + HelloWorldWorkflow, + workflow_id: workflow_id, + run_id: run_id + ) expect(interceptor.called_methods).to match_array([ "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution", From 54c184bb487359b6eefe9cf348542002d55754c7 Mon Sep 17 00:00:00 2001 From: Christopher Brown Date: Fri, 24 Jun 2022 19:55:56 -0700 Subject: [PATCH 4/4] fix unit tests --- spec/unit/lib/temporal/grpc_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index ef617341..a69d9575 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -10,7 +10,7 @@ let(:run_id) { SecureRandom.uuid } let(:now) { Time.now} - subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure) } + subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, []) } class TestDeserializer extend Temporal::Concerns::Payloads