diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index 5a13beb720f..903d5b65702 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -1154,6 +1154,10 @@ require 'datadog' Datadog.configure do |c| c.tracing.instrument :karafka, **options + + # different options for different kafka topics + c.tracing.instrument :karafka, describes: "some-topic", distributed_tracing: false + c.tracing.instrument :karafka, describes: /^topic-regex-.*/, enabled: false end ``` @@ -1176,6 +1180,10 @@ require 'datadog' Datadog.configure do |c| c.tracing.instrument :waterdrop, **options + + # different options for different kafka topics + c.tracing.instrument :waterdrop, describes: "some-topic", distributed_tracing: false + c.tracing.instrument :waterdrop, describes: /^topic-regex-.*/, enabled: false end ``` diff --git a/lib/datadog/tracing/contrib/karafka/framework.rb b/lib/datadog/tracing/contrib/karafka/framework.rb index 979af04310f..b6491fb61b5 100644 --- a/lib/datadog/tracing/contrib/karafka/framework.rb +++ b/lib/datadog/tracing/contrib/karafka/framework.rb @@ -9,18 +9,23 @@ module Karafka # - instrument parts of the framework when needed module Framework def self.setup + karafka_configurations = Datadog.configuration.tracing.fetch_integration(:karafka).configurations + Datadog.configure do |datadog_config| - karafka_config = datadog_config.tracing[:karafka] - activate_waterdrop!(datadog_config, karafka_config) + karafka_configurations.each do |config_name, karafka_config| + activate_waterdrop!(datadog_config, config_name, karafka_config) + end end end # Apply relevant configuration from Karafka to WaterDrop - def self.activate_waterdrop!(datadog_config, karafka_config) + def self.activate_waterdrop!(datadog_config, config_name, karafka_config) datadog_config.tracing.instrument( :waterdrop, + enabled: karafka_config[:enabled], service_name: karafka_config[:service_name], distributed_tracing: karafka_config[:distributed_tracing], + describes: config_name, ) end end diff --git a/lib/datadog/tracing/contrib/karafka/integration.rb b/lib/datadog/tracing/contrib/karafka/integration.rb index 8c0708b029a..45417c0c7db 100644 --- a/lib/datadog/tracing/contrib/karafka/integration.rb +++ b/lib/datadog/tracing/contrib/karafka/integration.rb @@ -38,6 +38,10 @@ def new_configuration def patcher Patcher end + + def resolver + @resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new + end end end end diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb index 1423e7f9b8c..6660026228e 100644 --- a/lib/datadog/tracing/contrib/karafka/patcher.rb +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -10,14 +10,6 @@ module Contrib module Karafka # Patch to add tracing to Karafka::Messages::Messages module MessagesPatch - def configuration - Datadog.configuration.tracing[:karafka] - end - - def propagation - @propagation ||= Contrib::Karafka::Distributed::Propagation.new - end - # `each` is the most popular access point to Karafka messages, # but not the only one # Other access patterns do not have a straightforward tracing avenue @@ -25,35 +17,35 @@ def propagation # @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35 def each(&block) @messages_array.each do |message| - if configuration[:distributed_tracing] + configuration = datadog_configuration(message.topic) + trace_digest = if configuration[:distributed_tracing] headers = if message.metadata.respond_to?(:raw_headers) message.metadata.raw_headers else message.metadata.headers end - trace_digest = Karafka.extract(headers) - Datadog::Tracing.continue_trace!(trace_digest) if trace_digest + Karafka.extract(headers) end - if Datadog::DataStreams.enabled? - begin - headers = if message.metadata.respond_to?(:raw_headers) - message.metadata.raw_headers - else - message.metadata.headers + Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace| + if Datadog::DataStreams.enabled? + begin + headers = if message.metadata.respond_to?(:raw_headers) + message.metadata.raw_headers + else + message.metadata.headers + end + + Datadog::DataStreams.set_consume_checkpoint( + type: 'kafka', + source: message.topic, + auto_instrumentation: true + ) { |key| headers[key] } + rescue => e + Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end - - Datadog::DataStreams.set_consume_checkpoint( - type: 'kafka', - source: message.topic, - auto_instrumentation: true - ) { |key| headers[key] } - rescue => e - Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end - end - Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span| span.set_tag(Ext::TAG_OFFSET, message.metadata.offset) span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic) span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM) @@ -64,17 +56,11 @@ def each(&block) end end end - end - module AppPatch - ONLY_ONCE_PER_APP = Hash.new { |h, key| h[key] = Core::Utils::OnlyOnce.new } + private - def initialized! - ONLY_ONCE_PER_APP[self].run do - # Activate tracing on components related to Karafka (e.g. WaterDrop) - Contrib::Karafka::Framework.setup - end - super + def datadog_configuration(topic) + Datadog.configuration.tracing[:karafka, topic] end end @@ -82,6 +68,8 @@ def initialized! module Patcher include Contrib::Patcher + ACTIVATE_FRAMEWORK_ONLY_ONCE = Core::Utils::OnlyOnce.new + module_function def target_version @@ -91,10 +79,20 @@ def target_version def patch require_relative 'monitor' require_relative 'framework' + require_relative '../waterdrop' ::Karafka::Instrumentation::Monitor.prepend(Monitor) ::Karafka::Messages::Messages.prepend(MessagesPatch) - ::Karafka::App.singleton_class.prepend(AppPatch) + + if Contrib::WaterDrop::Integration.compatible? + ::Karafka.monitor.subscribe('app.initialized') do |event| + ACTIVATE_FRAMEWORK_ONLY_ONCE.run do + Contrib::Karafka::Framework.setup + end + + Contrib::WaterDrop::Patcher.add_middleware(::Karafka.producer) + end + end end end end diff --git a/lib/datadog/tracing/contrib/waterdrop/integration.rb b/lib/datadog/tracing/contrib/waterdrop/integration.rb index f55c8358bce..115f40eb265 100644 --- a/lib/datadog/tracing/contrib/waterdrop/integration.rb +++ b/lib/datadog/tracing/contrib/waterdrop/integration.rb @@ -36,6 +36,10 @@ def new_configuration def patcher Patcher end + + def resolver + @resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new + end end end end diff --git a/lib/datadog/tracing/contrib/waterdrop/middleware.rb b/lib/datadog/tracing/contrib/waterdrop/middleware.rb index fb0b214e06e..4392fd36083 100644 --- a/lib/datadog/tracing/contrib/waterdrop/middleware.rb +++ b/lib/datadog/tracing/contrib/waterdrop/middleware.rb @@ -13,7 +13,7 @@ def call(message) trace_op = Datadog::Tracing.active_trace if trace_op && Datadog::Tracing::Distributed::PropagationPolicy.enabled?( - global_config: configuration, + global_config: datadog_configuration(message[:topic]), trace: trace_op ) WaterDrop.inject(trace_op.to_digest, message[:headers] ||= {}) @@ -35,8 +35,8 @@ def call(message) private - def configuration - Datadog.configuration.tracing[:waterdrop] + def datadog_configuration(topic) + Datadog.configuration.tracing[:waterdrop, topic] end end end diff --git a/lib/datadog/tracing/contrib/waterdrop/patcher.rb b/lib/datadog/tracing/contrib/waterdrop/patcher.rb index aa1b04d370a..293d2937cf3 100644 --- a/lib/datadog/tracing/contrib/waterdrop/patcher.rb +++ b/lib/datadog/tracing/contrib/waterdrop/patcher.rb @@ -25,9 +25,7 @@ def patch ::WaterDrop::Producer.prepend(Producer) ::WaterDrop.instrumentation.subscribe('producer.configured') do |event| producer = event[:producer] - - included_middlewares = producer.middleware.instance_variable_get(:@steps) - producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware) + add_middleware(producer) if Datadog.configuration.data_streams.enabled producer.monitor.subscribe('message.acknowledged') do |ack_event| @@ -39,6 +37,11 @@ def patch end end end + + def add_middleware(producer) + included_middlewares = producer.middleware.instance_variable_get(:@steps) + producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware) + end end end end diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index ae989e511b7..fb02ec2e422 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -17,6 +17,7 @@ before do Datadog.configure do |c| c.tracing.instrument :karafka, configuration_options + c.tracing.instrument :karafka, describes: /special_/, distributed_tracing: false end end @@ -31,16 +32,12 @@ let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME } it 'is expected to send a span' do - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 + metadata = ::Karafka::Messages::Metadata.new(offset: 412, timestamp: Time.now, topic: 'topic_a') raw_payload = rand.to_s message = ::Karafka::Messages::Message.new(raw_payload, metadata) - allow(message).to receive(:timestamp).and_return(Time.now) - allow(message).to receive(:topic).and_return('topic_a') - - topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + topic = ::Karafka::Routing::Topic.new(message.topic, double(id: 0)) messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) expect(messages).to all(be_a(::Karafka::Messages::Message)) @@ -53,18 +50,136 @@ expect(span).to_not have_error expect(span.resource).to eq 'topic_a' end + + context 'when the message has tracing headers' do + let(:topic_name) { 'topic_a' } + let(:message) do + headers = {} + Datadog::Tracing.trace('producer') do |span, trace| + Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers) + end + metadata = ::Karafka::Messages::Metadata.new( + :offset => 412, + headers_accessor => headers, + :topic => topic_name, + :timestamp => Time.now + ) + raw_payload = rand.to_s + + ::Karafka::Messages::Message.new(raw_payload, metadata) + end + let(:headers_accessor) do + ::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers' + end + + context 'when distributed tracing is enabled' do + it 'continues the span that produced the message' do + producer_trace_digest = Datadog::Tracing::Contrib::Karafka.extract(message.metadata[headers_accessor]) + + consumer_span = nil + consumer_trace = nil + + Datadog::Tracing.trace('consumer') do + consumer_span = Datadog::Tracing.active_span + consumer_trace = Datadog::Tracing.active_trace + + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) + messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + # NOTE: The following will iterate through the messages and create a new span representing + # the individual message processing (and `span` will refer to that particular span) + expect(messages).to all(be_a(::Karafka::Messages::Message)) + + # assert that the current trace re-set to the original trace after iterating the messages + expect(Datadog::Tracing.active_trace).to eq(consumer_trace) + expect(Datadog::Tracing.active_span).to eq(consumer_span) + end + + # spans: + # [consumer span] + # [producer span] + # ↳ [message processing span] + expect(spans).to have(3).items + + # assert that the message processing span is a continuation of the producer span, NOT of the consumer span + expect(span.parent_id).to eq producer_trace_digest.span_id + expect(span.trace_id).to eq producer_trace_digest.trace_id + end + end + + context 'when distributed tracing is disabled for the topic in particular' do + let(:topic_name) { 'special_topic' } + + it 'does not continue the span that produced the message' do + consumer_span = nil + consumer_trace = nil + + Datadog::Tracing.trace('consumer') do + consumer_span = Datadog::Tracing.active_span + consumer_trace = Datadog::Tracing.active_trace + + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) + messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + expect(messages).to all(be_a(::Karafka::Messages::Message)) + + # assert that the current trace re-set to the original trace after iterating the messages + expect(Datadog::Tracing.active_trace).to eq(consumer_trace) + expect(Datadog::Tracing.active_span).to eq(consumer_span) + end + + expect(spans).to have(3).items + + # assert that the message span is not continuation of the producer span + expect(span.parent_id).to eq(consumer_span.id) + expect(span.trace_id).to eq(consumer_trace.id) + end + end + + context 'when distributed tracing is not enabled' do + let(:configuration_options) { {distributed_tracing: false} } + + it 'does not continue the span that produced the message' do + consumer_span = nil + consumer_trace = nil + + Datadog::Tracing.trace('consumer') do + consumer_span = Datadog::Tracing.active_span + consumer_trace = Datadog::Tracing.active_trace + + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) + + messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + # NOTE: The following will iterate through the messages and create a new span representing + # the individual message processing (and `span` will refer to that particular span) + expect(messages).to all(be_a(::Karafka::Messages::Message)) + + # assert that the current trace re-set to the original trace after iterating the messages + expect(Datadog::Tracing.active_trace).to eq(consumer_trace) + expect(Datadog::Tracing.active_span).to eq(consumer_span) + end + + # spans: + # [consumer span] + # ↳ [message processing span] + # [producer span] + expect(spans).to have(3).items + + # assert that the message processing span is a continuation of the consumer span, NOT of the producer span + expect(span.parent_id).to eq(consumer_span.id) + expect(span.trace_id).to eq(consumer_trace.id) + end + end + end end describe 'worker.processed' do let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_WORKER_PROCESS } it 'is expected to send a span' do - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 + metadata = ::Karafka::Messages::Metadata.new(offset: 412, topic: 'topic_a') raw_payload = rand.to_s message = ::Karafka::Messages::Message.new(raw_payload, metadata) - job = double(executor: double(topic: double(name: 'topic_a', consumer: 'ABC'), partition: 0), messages: [message]) + job = double(executor: double(topic: double(name: message.topic, consumer: 'ABC'), partition: 0), messages: [message]) Karafka.monitor.instrument('worker.processed', {job: job}) do # Noop @@ -80,4 +195,104 @@ expect(span.resource).to eq 'ABC#consume' end end + + describe 'framework auto-instrumentation' do + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:waterdrop].reset_configuration! + example.run + Datadog.registry[:waterdrop].reset_configuration! + + # reset Karafka internal state as well + Karafka::App.config.internal.status.reset! + Karafka::App.config.producer = nil + Karafka.refresh! + end + + let(:producer_middlewares) { Karafka.producer.middleware.instance_variable_get(:@steps) } + + def waterdrop_compatible? + Datadog::Tracing::Contrib::WaterDrop::Integration.compatible? + end + + it 'automatically enables WaterDrop instrumentation' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(Datadog.configuration.tracing[:karafka, 'special_topic'][:enabled]).to be true + expect(Datadog.configuration.tracing[:karafka, 'special_topic'][:distributed_tracing]).to be false + + expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true + expect(Datadog.configuration.tracing[:waterdrop, 'special_topic'][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop, 'special_topic'][:distributed_tracing]).to be false + end + + context 'when user does not supply a custom producer' do + it 'sets up Karafka.producer with the datadog waterdrop middleware' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(producer_middlewares).to eq([ + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end + end + + context 'when the user does supply a custom producer with custom middlewares' do + let(:custom_middleware) { ->(message) { messsage } } + + it 'appends the datadog middleware at the end of the Karafka.producer middleware stack' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + c.producer = WaterDrop::Producer.new do |producer_config| + producer_config.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + producer_config.middleware.append(custom_middleware) + end + end + + expect(producer_middlewares).to eq([ + custom_middleware, + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end + end + + context 'when the waterdrop integration is manually configured' do + it 'appends the datadog middleware to Karafka.producer only once' do + skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible? + + Datadog.configure do |c| + c.tracing.instrument :waterdrop, configuration_options + end + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(producer_middlewares).to eq([ + Datadog::Tracing::Contrib::WaterDrop::Middleware + ]) + end + end + + context 'when the waterdrop integration is not on a compatbile version' do + it 'does not attempt to activate waterdrop or append any producer middleware' do + skip 'WaterDrop is not activated unless it is on a supported version' if waterdrop_compatible? + + Karafka::App.setup do |c| + c.kafka = {"bootstrap.servers": '127.0.0.1:9092'} + end + + expect(producer_middlewares).to be_empty + end + end + end end diff --git a/spec/datadog/tracing/contrib/waterdrop/middleware_spec.rb b/spec/datadog/tracing/contrib/waterdrop/middleware_spec.rb index e191ebaaa6d..681b98ac436 100644 --- a/spec/datadog/tracing/contrib/waterdrop/middleware_spec.rb +++ b/spec/datadog/tracing/contrib/waterdrop/middleware_spec.rb @@ -8,6 +8,7 @@ before do Datadog.configure do |c| c.tracing.instrument :waterdrop, tracing_options + c.tracing.instrument :waterdrop, describes: /special_/, distributed_tracing: false end end @@ -57,6 +58,17 @@ end end + context 'when distributed tracing is disabled for the topic in particular' do + it 'does not propagate trace context in message headers' do + message_1 = {topic: 'special_topic', payload: 'foo'} + Datadog::Tracing.trace('test.span') do + middleware.call(message_1) + end + + expect(message_1[:headers]).to be_nil + end + end + context 'when DataStreams is enabled' do before do allow(Datadog::DataStreams).to receive(:enabled?).and_return(true)