Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions lib/datadog/tracing/contrib/karafka/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,34 @@ def propagation
# @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35
def each(&block)
@messages_array.each do |message|
if configuration[:distributed_tracing]
trace_digest = if configuration[:distributed_tracing]
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
else
message.metadata.headers
end
trace_digest = Karafka.extract(headers)
Datadog::Tracing.continue_trace!(trace_digest) if trace_digest
Karafka.extract(headers)
end

if Datadog::DataStreams.enabled?
begin
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
else
message.metadata.headers
Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace|
if Datadog::DataStreams.enabled?
begin
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
else
message.metadata.headers
end

Datadog::DataStreams.set_consume_checkpoint(
type: 'kafka',
source: message.topic,
auto_instrumentation: true
) { |key| headers[key] }
rescue => e
Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
end

Datadog::DataStreams.set_consume_checkpoint(
type: 'kafka',
source: message.topic,
auto_instrumentation: true
) { |key| headers[key] }
rescue => e
Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
end
end

Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span|
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic)
span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM)
Expand Down
77 changes: 77 additions & 0 deletions spec/datadog/tracing/contrib/karafka/patcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,83 @@
expect(span).to_not have_error
expect(span.resource).to eq 'topic_a'
end

context 'when the message has tracing headers' do
let(:message) do
headers = {}
Datadog::Tracing.trace('producer') do |span, trace|
Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers)
end
metadata = ::Karafka::Messages::Metadata.new
metadata['offset'] = 412
metadata[headers_accessor] = headers
raw_payload = rand.to_s

message = ::Karafka::Messages::Message.new(raw_payload, metadata)
allow(message).to receive(:timestamp).and_return(Time.now)
allow(message).to receive(:topic).and_return('topic_a')
message
end
let(:headers_accessor) do
::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers'
end

context 'when distributed tracing is enabled' do
it 'continues the span that produced the message' do
producer_trace_digest = Datadog::Tracing::Contrib::Karafka.extract(message.metadata[headers_accessor])

consumer_span = nil
consumer_trace = nil

Datadog::Tracing.trace('consumer') do
consumer_span = Datadog::Tracing.active_span
consumer_trace = Datadog::Tracing.active_trace

topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
expect(messages).to all(be_a(::Karafka::Messages::Message))

# assert that the current trace re-set to the original trace after iterating the messages
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
expect(Datadog::Tracing.active_span).to eq(consumer_span)
end

expect(spans).to have(3).items

# assert that the message span is a continuation of the producer span
expect(span.parent_id).to eq producer_trace_digest.span_id
expect(span.trace_id).to eq producer_trace_digest.trace_id
end
end

context 'when distributed tracing is not enabled' do
let(:configuration_options) { {distributed_tracing: false} }

it 'does not continue the span that produced the message' do
consumer_span = nil
consumer_trace = nil

Datadog::Tracing.trace('consumer') do
consumer_span = Datadog::Tracing.active_span
consumer_trace = Datadog::Tracing.active_trace

topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
expect(messages).to all(be_a(::Karafka::Messages::Message))

# assert that the current trace re-set to the original trace after iterating the messages
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
expect(Datadog::Tracing.active_span).to eq(consumer_span)
end

expect(spans).to have(3).items

# assert that the message span is not continuation of the producer span
expect(span.parent_id).to eq(consumer_span.id)
expect(span.trace_id).to eq(consumer_trace.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other test, is consumer (the middle span, if I am understanding the logic correctly) expected to be a child of producer? Is that asserted somewhere?

Copy link
Contributor Author

@Drowze Drowze Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In these tests, the consumer span is never expected to be a child of any span. Referring to each test individually:

  1. when the message has tracing headers, when distributed tracing is enabled, it continues the span that produced the message

    • consumer is a root span and it doesn't have any child spans
    • producer is a root span and it has one child span from iterating the messages(*)
  2. when the message has tracing headers, when distributed tracing is not enabled, it does not continue the span that produced the message

    • consumer is a root span and it has one child span from iterating the messages(*)
    • producer is a root span and it doesn't have any child spans

Hope that makes sense! 🙇


(*) we create spans from iterating the messages as the karafka integration patches ::Karafka::Messages::Messages#each

Copy link
Contributor Author

@Drowze Drowze Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(*) we create spans from iterating the messages as the karafka integration patches ::Karafka::Messages::Messages#each

I think that this might not be clear enough from our tests... The messages are being iterated here:

# this creates a new span
expect(messages).to all(be_a(::Karafka::Messages::Message))

# ...

# in these expectations, `span` refers to the span created from iterating the messages
expect(span.parent_id).to eq(consumer_span.id)
expect(span.trace_id).to eq(consumer_trace.id)

# because of these `let`s (from the beginning of the file)
let(:span) do
  spans.find { |s| s.name == span_name }
end
let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME }

I'll update the test now to use an explicit each and add a comment about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updated the test with comments better expliciting the test intentions - please let me know if it's more clear now 🙇


note: I didn't add an explicit each because this same pattern of relying on expect(...).to all(...) to create a span is used across pretty much all the tests in that file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think I understand what you are saying, but it's strange to me that the iterating span changed from being under the producer to being under the consumer but there is no relationship between the producer and the consumer. Might be a question outside the scope of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating the message does not really change any spans - it's continuing a trace that changes the parent/child relationship of spans.
In other words:

  • when distributed_tracing=true, the message span is a continuation of the span that created it (if any) (i.e.: the message span turns into a child of the span who produced the message)
  • when distributed_tracing=false, the message span is just a regular span (which may have a parent - which in the case of theses tests is the consumer span who's iterating the messages)

Distributed tracing gets confusing quickly 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just fyi: I found this issue describing a similar use-case, so I've included our own use-case there as well (along with an example & screenshots)

end
end
end
end

describe 'worker.processed' do
Expand Down