Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sentry-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly.

Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks.
Spring users should use the Sentry Spring (Boot) SDKs, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks.
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ private boolean isIgnored() {
}

final @NotNull TransactionContext txContext =
continued != null ? continued : new TransactionContext("queue.process", "queue.process");
txContext.setName("queue.process");
continued != null ? continued : new TransactionContext(record.topic(), "queue.process");
txContext.setName(record.topic());
txContext.setOperation("queue.process");

final @NotNull TransactionOptions txOptions = new TransactionOptions();
Expand Down Expand Up @@ -204,7 +204,6 @@ private void finishTransaction(
}
transaction.finish();
} catch (Throwable t) {
// Instrumentation must never break customer processing.
scopes
.getOptions()
.getLogger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,8 @@
import org.jetbrains.annotations.Nullable;

/**
* Wraps a Kafka {@link Producer} via {@link Proxy} to record a {@code queue.publish} span around
* each {@code send} and to inject Sentry trace propagation headers into the produced record.
*
* <p>Only the two {@code send} overloads are intercepted; every other {@link Producer} method is
* forwarded directly to the delegate. Because the wrapper is a dynamic proxy, it is compatible with
* any Kafka client version — new methods added to the {@link Producer} interface in future Kafka
* releases are forwarded automatically without recompilation.
* Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send}
* and to inject Sentry trace propagation headers into the produced record.
*
* <p>For raw Kafka usage:
*
Expand All @@ -44,9 +39,8 @@
* SentryKafkaProducer.wrap(new KafkaProducer<>(props));
* }</pre>
*
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code
* sentry-spring-jakarta} installs this wrapper automatically via {@code
* ProducerFactory.addPostProcessor(...)}.
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} installs this wrapper
* automatically.
*/
@ApiStatus.Experimental
public final class SentryKafkaProducer {
Expand All @@ -57,7 +51,7 @@ public final class SentryKafkaProducer {
private SentryKafkaProducer() {}

/**
* Wraps the given producer with Sentry instrumentation using the global scopes.
* Wraps the given producer with Sentry instrumentation.
*
* @param delegate the Kafka producer to wrap
* @return an instrumented producer that records {@code queue.publish} spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SentryKafkaConsumerTracingTest {
verify(forkedScopes).continueTrace(eq(sentryTraceValue), eq(listOf(baggageValue)))
verify(forkedScopes).startTransaction(txContextCaptor.capture(), txOptionsCaptor.capture())

assertEquals("queue.process", txContextCaptor.firstValue.name)
assertEquals("my-topic", txContextCaptor.firstValue.name)
assertEquals("queue.process", txContextCaptor.firstValue.operation)
assertEquals(SentryKafkaConsumerTracing.TRACE_ORIGIN, txOptionsCaptor.firstValue.origin)
assertTrue(txOptionsCaptor.firstValue.isBindToScope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ class SentryKafkaProducerTest {
Sentry.close()
}

private fun createTransaction(): SentryTracer {
val tx = SentryTracer(TransactionContext("tx", "op"), scopes)
whenever(scopes.span).thenReturn(tx)
return tx
}

@Test
fun `creates queue publish span and injects headers`() {
val tx = createTransaction()
Expand Down Expand Up @@ -358,4 +352,10 @@ class SentryKafkaProducerTest {
val producer = SentryKafkaProducer.wrap(delegate, scopes)
assertTrue(producer.toString().startsWith("SentryKafkaProducer[delegate="))
}

private fun createTransaction(): SentryTracer {
val tx = SentryTracer(TransactionContext("tx", "op"), scopes)
whenever(scopes.span).thenReturn(tx)
return tx
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,6 @@ private void transferSpanDetails(
maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_ID);
maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_NAME);

// Root transactions don't bulk-copy OTel attributes into span data (unlike child spans).
// The Sentry Queues product reads `trace.data.messaging.*`, so messaging attributes must
// be explicitly transferred for consumer root transactions to show up correctly. These are
// operational metadata (no payload contents) and are safe to transfer unconditionally.
maybeTransferOtelAttribute(
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_SYSTEM);
maybeTransferOtelAttribute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelS
@SuppressWarnings("deprecation")
private @NotNull String opForMessaging(final @NotNull SpanData otelSpan) {
final @NotNull Attributes attributes = otelSpan.getAttributes();
// Prefer `messaging.operation.type` (current OTel semconv), fall back to legacy
// `messaging.operation`. OTel's SpanKind.CONSUMER is overloaded for both `receive` and
// `process`, so attribute-first mapping is required. SpanKind is used only as a last resort.
@Nullable
String operationType = attributes.get(MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE);
if (operationType == null) {
Expand All @@ -139,7 +136,6 @@ private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelS
case "settle":
return "queue.settle";
default:
// fall through to SpanKind mapping
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ private boolean isIgnored() {
final @NotNull TransactionContext txContext =
transactionContext != null
? transactionContext
: new TransactionContext("queue.process", "queue.process");
txContext.setName("queue.process");
: new TransactionContext(record.topic(), "queue.process");
txContext.setName(record.topic());
txContext.setOperation("queue.process");

final @NotNull TransactionOptions txOptions = new TransactionOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ class SentryKafkaRecordInterceptorTest {

verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor")
verify(forkedScopes).makeCurrent()
verify(forkedScopes)
.startTransaction(
org.mockito.kotlin.check<TransactionContext> {
assertEquals("my-topic", it.name)
assertEquals("queue.process", it.operation)
},
any(),
)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ private boolean isIgnored() {
final @NotNull TransactionContext txContext =
transactionContext != null
? transactionContext
: new TransactionContext("queue.process", "queue.process");
txContext.setName("queue.process");
: new TransactionContext(record.topic(), "queue.process");
txContext.setName(record.topic());
txContext.setOperation("queue.process");

final @NotNull TransactionOptions txOptions = new TransactionOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ class SentryKafkaRecordInterceptorTest {

verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor")
verify(forkedScopes).makeCurrent()
verify(forkedScopes)
.startTransaction(
org.mockito.kotlin.check<TransactionContext> {
assertEquals("my-topic", it.name)
assertEquals("queue.process", it.operation)
},
any(),
)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ private boolean isIgnored() {
final @NotNull TransactionContext txContext =
transactionContext != null
? transactionContext
: new TransactionContext("queue.process", "queue.process");
txContext.setName("queue.process");
: new TransactionContext(record.topic(), "queue.process");
txContext.setName(record.topic());
txContext.setOperation("queue.process");

final @NotNull TransactionOptions txOptions = new TransactionOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ class SentryKafkaRecordInterceptorTest {

verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor")
verify(forkedScopes).makeCurrent()
verify(forkedScopes)
.startTransaction(
org.mockito.kotlin.check<TransactionContext> {
assertEquals("my-topic", it.name)
assertEquals("queue.process", it.operation)
},
any(),
)
}

@Test
Expand Down
Loading