Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow clients to configure gRPC client interceptors #185

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions examples/spec/integration/interceptor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require 'workflows/hello_world_workflow'

describe 'GRPC interceptors', :integration do
class ExampleInterceptor < GRPC::ClientInterceptor
attr_reader :called_methods

def initialize
@called_methods = []
end

def request_response(request: nil, call: nil, method: nil, metadata: nil)
@called_methods << method
yield
end
end

def run_workflow_with_client(client, workflow, *input, **args)
args[:options] = { workflow_id: SecureRandom.uuid }.merge(args[:options] || {})
run_id = client.start_workflow(workflow, *input, **args)

[args[:options][:workflow_id], run_id]
end

let(:interceptor) { ExampleInterceptor.new }
let(:config) do
# We can't depend on test order here and the memoized
# Temporal.default_client will not include our interceptors. Therefore we
# build a new config and client based on the one used in the other tests.
common_config = Temporal.configuration
Temporal::Configuration.new.tap do |config|
config.host = common_config.host
config.port = common_config.port
config.namespace = common_config.namespace
config.task_queue = common_config.task_queue
config.metrics_adapter = common_config.metrics_adapter
config.interceptors = [interceptor]
end
end
let(:client) { Temporal::Client.new(config) }

it 'calls the given interceptors when performing operations' do
workflow_id, run_id = run_workflow_with_client(client, HelloWorldWorkflow, 'Tom')
client.await_workflow_result(
HelloWorldWorkflow,
workflow_id: workflow_id,
run_id: run_id
)

expect(interceptor.called_methods).to match_array([
"/temporal.api.workflowservice.v1.WorkflowService/StartWorkflowExecution",
"/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory",
])
end
end
8 changes: 5 additions & 3 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, :credentials, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :credentials, :interceptors, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true)

attr_reader :timeouts, :error_handlers
attr_writer :converter
attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers
attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :interceptors

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -54,6 +54,7 @@ def initialize
@converter = DEFAULT_CONVERTER
@error_handlers = []
@credentials = :this_channel_is_insecure
@interceptors = []
end

def on_error(&block)
Expand Down Expand Up @@ -81,7 +82,8 @@ def for_connection
type: connection_type,
host: host,
port: port,
credentials: credentials
credentials: credentials,
interceptors: interceptors
).freeze
end

Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ def self.generate(configuration)
host = configuration.host
port = configuration.port
credentials = configuration.credentials
interceptors = configuration.interceptors

hostname = `hostname`
thread_id = Thread.current.object_id
identity = "#{thread_id}@#{hostname}"

connection_class.new(host, port, identity, credentials)
connection_class.new(host, port, identity, credentials, interceptors)
end
end
end
8 changes: 5 additions & 3 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class GRPC
max_page_size: 100
}.freeze

def initialize(host, port, identity, credentials, options = {})
def initialize(host, port, identity, credentials, interceptors, options = {})
@url = "#{host}:#{port}"
@identity = identity
@credentials = credentials
@interceptors = interceptors
@poll = true
@poll_mutex = Mutex.new
@poll_request = nil
Expand Down Expand Up @@ -537,13 +538,14 @@ def cancel_polling_request

private

attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request
attr_reader :url, :identity, :credentials, :interceptors, :options, :poll_mutex, :poll_request

def client
@client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new(
url,
credentials,
timeout: 60
timeout: 60,
interceptors: interceptors
)
end

Expand Down
2 changes: 1 addition & 1 deletion spec/unit/lib/temporal/grpc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
let(:run_id) { SecureRandom.uuid }
let(:now) { Time.now}

subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure) }
subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, []) }

class TestDeserializer
extend Temporal::Concerns::Payloads
Expand Down