diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb index 1423e7f9b8c..cbfdddc745a 100644 --- a/lib/datadog/tracing/contrib/karafka/patcher.rb +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -25,35 +25,34 @@ def propagation # @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35 def each(&block) @messages_array.each do |message| - if configuration[:distributed_tracing] + trace_digest = if configuration[:distributed_tracing] headers = if message.metadata.respond_to?(:raw_headers) message.metadata.raw_headers else message.metadata.headers end - trace_digest = Karafka.extract(headers) - Datadog::Tracing.continue_trace!(trace_digest) if trace_digest + Karafka.extract(headers) end - if Datadog::DataStreams.enabled? - begin - headers = if message.metadata.respond_to?(:raw_headers) - message.metadata.raw_headers - else - message.metadata.headers + Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace| + if Datadog::DataStreams.enabled? + begin + headers = if message.metadata.respond_to?(:raw_headers) + message.metadata.raw_headers + else + message.metadata.headers + end + + Datadog::DataStreams.set_consume_checkpoint( + type: 'kafka', + source: message.topic, + auto_instrumentation: true + ) { |key| headers[key] } + rescue => e + Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end - - Datadog::DataStreams.set_consume_checkpoint( - type: 'kafka', - source: message.topic, - auto_instrumentation: true - ) { |key| headers[key] } - rescue => e - Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end - end - Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span| span.set_tag(Ext::TAG_OFFSET, message.metadata.offset) span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic) span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM) diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index ae989e511b7..96a9ee814a5 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -53,6 +53,95 @@ expect(span).to_not have_error expect(span.resource).to eq 'topic_a' end + + context 'when the message has tracing headers' do + let(:message) do + headers = {} + Datadog::Tracing.trace('producer') do |span, trace| + Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers) + end + metadata = ::Karafka::Messages::Metadata.new + metadata['offset'] = 412 + metadata[headers_accessor] = headers + raw_payload = rand.to_s + + message = ::Karafka::Messages::Message.new(raw_payload, metadata) + allow(message).to receive(:timestamp).and_return(Time.now) + allow(message).to receive(:topic).and_return('topic_a') + message + end + let(:headers_accessor) do + ::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers' + end + + context 'when distributed tracing is enabled' do + it 'continues the span that produced the message' do + producer_trace_digest = Datadog::Tracing::Contrib::Karafka.extract(message.metadata[headers_accessor]) + + consumer_span = nil + consumer_trace = nil + + Datadog::Tracing.trace('consumer') do + consumer_span = Datadog::Tracing.active_span + consumer_trace = Datadog::Tracing.active_trace + + topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + # NOTE: The following will iterate through the messages and create a new span representing + # the individual message processing (and `span` will refer to that particular span) + expect(messages).to all(be_a(::Karafka::Messages::Message)) + + # assert that the current trace re-set to the original trace after iterating the messages + expect(Datadog::Tracing.active_trace).to eq(consumer_trace) + expect(Datadog::Tracing.active_span).to eq(consumer_span) + end + + # spans: + # [consumer span] + # [producer span] + # ↳ [message processing span] + expect(spans).to have(3).items + + # assert that the message processing span is a continuation of the producer span, NOT of the consumer span + expect(span.parent_id).to eq producer_trace_digest.span_id + expect(span.trace_id).to eq producer_trace_digest.trace_id + end + end + + context 'when distributed tracing is not enabled' do + let(:configuration_options) { {distributed_tracing: false} } + + it 'does not continue the span that produced the message' do + consumer_span = nil + consumer_trace = nil + + Datadog::Tracing.trace('consumer') do + consumer_span = Datadog::Tracing.active_span + consumer_trace = Datadog::Tracing.active_trace + + topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + # NOTE: The following will iterate through the messages and create a new span representing + # the individual message processing (and `span` will refer to that particular span) + expect(messages).to all(be_a(::Karafka::Messages::Message)) + + # assert that the current trace re-set to the original trace after iterating the messages + expect(Datadog::Tracing.active_trace).to eq(consumer_trace) + expect(Datadog::Tracing.active_span).to eq(consumer_span) + end + + # spans: + # [consumer span] + # ↳ [message processing span] + # [producer span] + expect(spans).to have(3).items + + # assert that the message processing span is a continuation of the consumer span, NOT of the producer span + expect(span.parent_id).to eq(consumer_span.id) + expect(span.trace_id).to eq(consumer_trace.id) + end + end + end end describe 'worker.processed' do