diff --git a/examples/spec/integration/interceptor_spec.rb b/examples/spec/integration/interceptor_spec.rb new file mode 100644 index 00000000..73df49d1 --- /dev/null +++ b/examples/spec/integration/interceptor_spec.rb @@ -0,0 +1,54 @@ +require 'workflows/hello_world_workflow' + +describe 'GRPC interceptors', :integration do + class ExampleInterceptor < GRPC::ClientInterceptor + attr_reader :called_methods + + def initialize + @called_methods = [] + end + + def request_response(request: nil, call: nil, method: nil, metadata: nil) + @called_methods << method + yield + end + end + + def run_workflow_with_client(client, workflow, *input, **args) + args[:options] = { workflow_id: SecureRandom.uuid }.merge(args[:options] || {}) + run_id = client.start_workflow(workflow, *input, **args) + + [args[:options][:workflow_id], run_id] + end + + let(:interceptor) { ExampleInterceptor.new } + let(:config) do + # We can't depend on test order here and the memoized + # Temporal.default_client will not include our interceptors. Therefore we + # build a new config and client based on the one used in the other tests. + common_config = Temporal.configuration + Temporal::Configuration.new.tap do |config| + config.host = common_config.host + config.port = common_config.port + config.namespace = common_config.namespace + config.task_queue = common_config.task_queue + config.metrics_adapter = common_config.metrics_adapter + config.interceptors = [interceptor] + end + end + let(:client) { Temporal::Client.new(config) } + + it 'calls the given interceptors when performing operations' do + workflow_id, run_id = run_workflow_with_client(client, HelloWorldWorkflow, 'Tom') + client.await_workflow_result( + HelloWorldWorkflow, + workflow_id: workflow_id, + run_id: run_id + ) + + expect(interceptor.called_methods).to match_array([ + "/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution", + "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory", + ]) + end +end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index d2edaead..1628cfa6 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -7,12 +7,12 @@ module Temporal class Configuration - Connection = Struct.new(:type, :host, :port, :credentials, keyword_init: true) + Connection = Struct.new(:type, :host, :port, :credentials, :interceptors, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true) attr_reader :timeouts, :error_handlers attr_writer :converter - attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers + attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :interceptors # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -54,6 +54,7 @@ def initialize @converter = DEFAULT_CONVERTER @error_handlers = [] @credentials = :this_channel_is_insecure + @interceptors = [] end def on_error(&block) @@ -81,7 +82,8 @@ def for_connection type: connection_type, host: host, port: port, - credentials: credentials + credentials: credentials, + interceptors: interceptors ).freeze end diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb index 791534bf..fba67376 100644 --- a/lib/temporal/connection.rb +++ b/lib/temporal/connection.rb @@ -11,12 +11,13 @@ def self.generate(configuration) host = configuration.host port = configuration.port credentials = configuration.credentials + interceptors = configuration.interceptors hostname = `hostname` thread_id = Thread.current.object_id identity = "#{thread_id}@#{hostname}" - connection_class.new(host, port, identity, credentials) + connection_class.new(host, port, identity, credentials, interceptors) end end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 9b0d9699..1e30150b 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -31,10 +31,11 @@ class GRPC max_page_size: 100 }.freeze - def initialize(host, port, identity, credentials, options = {}) + def initialize(host, port, identity, credentials, interceptors, options = {}) @url = "#{host}:#{port}" @identity = identity @credentials = credentials + @interceptors = interceptors @poll = true @poll_mutex = Mutex.new @poll_request = nil @@ -537,13 +538,14 @@ def cancel_polling_request private - attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request + attr_reader :url, :identity, :credentials, :interceptors, :options, :poll_mutex, :poll_request def client @client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new( url, credentials, - timeout: 60 + timeout: 60, + interceptors: interceptors ) end diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index ef617341..a69d9575 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -10,7 +10,7 @@ let(:run_id) { SecureRandom.uuid } let(:now) { Time.now} - subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure) } + subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, []) } class TestDeserializer extend Temporal::Concerns::Payloads