Skip to content

Commit 9830c0c

Browse files
authored
Merge pull request #5254 from getsentry/feat/queue-instrumentation-producer
feat(spring-jakarta): [Queue Instrumentation 3] Add Kafka producer instrumentation
2 parents 007d23f + fdb3a03 commit 9830c0c

7 files changed

Lines changed: 430 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features
66

7+
- Add Kafka producer instrumentation for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254))
78
- Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250))
89
- Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136))
910
- By default, the SDK now extracts the organization ID from the DSN (e.g. `o123.ingest.sentry.io`) and compares it with the `sentry-org_id` value in incoming baggage headers. When the two differ, the SDK starts a fresh trace instead of continuing the foreign one. This guards against accidentally linking traces across organizations.

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,20 @@ public final class io/sentry/spring/jakarta/graphql/SentrySpringSubscriptionHand
244244
public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object;
245245
}
246246

247+
public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered {
248+
public fun <init> ()V
249+
public fun getOrder ()I
250+
public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;
251+
}
252+
253+
public final class io/sentry/spring/jakarta/kafka/SentryProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
254+
public fun <init> (Lio/sentry/IScopes;)V
255+
public fun close ()V
256+
public fun configure (Ljava/util/Map;)V
257+
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
258+
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
259+
}
260+
247261
public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration {
248262
public fun <init> ()V
249263
public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration;

sentry-spring-jakarta/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dependencies {
4141
compileOnly(libs.servlet.jakarta.api)
4242
compileOnly(libs.slf4j.api)
4343
compileOnly(libs.springboot3.starter.graphql)
44+
compileOnly(libs.spring.kafka3)
4445
compileOnly(libs.springboot3.starter.quartz)
4546

4647
compileOnly(Config.Libs.springWebflux)
@@ -68,6 +69,7 @@ dependencies {
6869
testImplementation(libs.springboot3.starter.aop)
6970
testImplementation(libs.springboot3.starter.graphql)
7071
testImplementation(libs.springboot3.starter.security)
72+
testImplementation(libs.spring.kafka3)
7173
testImplementation(libs.springboot3.starter.test)
7274
testImplementation(libs.springboot3.starter.web)
7375
testImplementation(libs.springboot3.starter.webflux)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package io.sentry.spring.jakarta.kafka;
2+
3+
import io.sentry.ScopesAdapter;
4+
import io.sentry.SentryLevel;
5+
import java.lang.reflect.Field;
6+
import org.apache.kafka.clients.producer.ProducerInterceptor;
7+
import org.jetbrains.annotations.ApiStatus;
8+
import org.jetbrains.annotations.NotNull;
9+
import org.jetbrains.annotations.Nullable;
10+
import org.springframework.beans.BeansException;
11+
import org.springframework.beans.factory.config.BeanPostProcessor;
12+
import org.springframework.core.Ordered;
13+
import org.springframework.core.PriorityOrdered;
14+
import org.springframework.kafka.core.KafkaTemplate;
15+
import org.springframework.kafka.support.CompositeProducerInterceptor;
16+
17+
/**
18+
* Sets a {@link SentryProducerInterceptor} on {@link KafkaTemplate} beans via {@link
19+
* KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced.
20+
*
21+
* <p>If the template already has a {@link ProducerInterceptor}, both are composed using {@link
22+
* CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public
23+
* getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry
24+
* interceptor is set.
25+
*/
26+
@ApiStatus.Internal
27+
public final class SentryKafkaProducerBeanPostProcessor
28+
implements BeanPostProcessor, PriorityOrdered {
29+
30+
@Override
31+
@SuppressWarnings("unchecked")
32+
public @NotNull Object postProcessAfterInitialization(
33+
final @NotNull Object bean, final @NotNull String beanName) throws BeansException {
34+
if (bean instanceof KafkaTemplate) {
35+
final @NotNull KafkaTemplate<?, ?> template = (KafkaTemplate<?, ?>) bean;
36+
final @Nullable ProducerInterceptor<?, ?> existing = getExistingInterceptor(template);
37+
38+
if (existing instanceof SentryProducerInterceptor) {
39+
return bean;
40+
}
41+
42+
@SuppressWarnings("rawtypes")
43+
final SentryProducerInterceptor sentryInterceptor =
44+
new SentryProducerInterceptor<>(ScopesAdapter.getInstance());
45+
46+
if (existing != null) {
47+
@SuppressWarnings("rawtypes")
48+
final CompositeProducerInterceptor composite =
49+
new CompositeProducerInterceptor(sentryInterceptor, existing);
50+
template.setProducerInterceptor(composite);
51+
} else {
52+
template.setProducerInterceptor(sentryInterceptor);
53+
}
54+
}
55+
return bean;
56+
}
57+
58+
@SuppressWarnings("unchecked")
59+
private @Nullable ProducerInterceptor<?, ?> getExistingInterceptor(
60+
final @NotNull KafkaTemplate<?, ?> template) {
61+
try {
62+
final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor");
63+
field.setAccessible(true);
64+
return (ProducerInterceptor<?, ?>) field.get(template);
65+
} catch (NoSuchFieldException | IllegalAccessException e) {
66+
ScopesAdapter.getInstance()
67+
.getOptions()
68+
.getLogger()
69+
.log(
70+
SentryLevel.WARNING,
71+
"Unable to read existing producerInterceptor from KafkaTemplate via reflection. "
72+
+ "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.",
73+
e);
74+
return null;
75+
}
76+
}
77+
78+
@Override
79+
public int getOrder() {
80+
return Ordered.LOWEST_PRECEDENCE;
81+
}
82+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package io.sentry.spring.jakarta.kafka;
2+
3+
import io.sentry.BaggageHeader;
4+
import io.sentry.IScopes;
5+
import io.sentry.ISpan;
6+
import io.sentry.SentryTraceHeader;
7+
import io.sentry.SpanDataConvention;
8+
import io.sentry.SpanOptions;
9+
import io.sentry.SpanStatus;
10+
import io.sentry.util.TracingUtils;
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.Map;
13+
import org.apache.kafka.clients.producer.ProducerInterceptor;
14+
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.clients.producer.RecordMetadata;
16+
import org.apache.kafka.common.header.Headers;
17+
import org.jetbrains.annotations.ApiStatus;
18+
import org.jetbrains.annotations.NotNull;
19+
import org.jetbrains.annotations.Nullable;
20+
21+
/**
22+
* A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing
23+
* headers into outgoing records.
24+
*
25+
* <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing
26+
* "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link
27+
* #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread.
28+
*
29+
* <p>If the customer already has a {@link ProducerInterceptor}, the {@link
30+
* SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link
31+
* org.springframework.kafka.support.CompositeProducerInterceptor}.
32+
*/
33+
@ApiStatus.Internal
34+
public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
35+
36+
static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer";
37+
static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
38+
39+
private final @NotNull IScopes scopes;
40+
41+
public SentryProducerInterceptor(final @NotNull IScopes scopes) {
42+
this.scopes = scopes;
43+
}
44+
45+
@Override
46+
public @NotNull ProducerRecord<K, V> onSend(final @NotNull ProducerRecord<K, V> record) {
47+
if (!scopes.getOptions().isEnableQueueTracing()) {
48+
return record;
49+
}
50+
51+
final @Nullable ISpan activeSpan = scopes.getSpan();
52+
if (activeSpan == null || activeSpan.isNoOp()) {
53+
return record;
54+
}
55+
56+
final @NotNull SpanOptions spanOptions = new SpanOptions();
57+
spanOptions.setOrigin(TRACE_ORIGIN);
58+
final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
59+
if (span.isNoOp()) {
60+
return record;
61+
}
62+
63+
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
64+
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
65+
66+
try {
67+
injectHeaders(record.headers(), span);
68+
} catch (Throwable ignored) {
69+
// Header injection must not break the send
70+
}
71+
72+
span.setStatus(SpanStatus.OK);
73+
span.finish();
74+
75+
return record;
76+
}
77+
78+
@Override
79+
public void onAcknowledgement(
80+
final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {}
81+
82+
@Override
83+
public void close() {}
84+
85+
@Override
86+
public void configure(final @Nullable Map<String, ?> configs) {}
87+
88+
private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) {
89+
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
90+
TracingUtils.trace(scopes, null, span);
91+
if (tracingHeaders != null) {
92+
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
93+
headers.remove(sentryTraceHeader.getName());
94+
headers.add(
95+
sentryTraceHeader.getName(),
96+
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));
97+
98+
final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
99+
if (baggageHeader != null) {
100+
headers.remove(baggageHeader.getName());
101+
headers.add(
102+
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
103+
}
104+
}
105+
106+
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
107+
headers.add(
108+
SENTRY_ENQUEUED_TIME_HEADER,
109+
String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
110+
}
111+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package io.sentry.spring.jakarta.kafka
2+
3+
import kotlin.test.Test
4+
import kotlin.test.assertSame
5+
import kotlin.test.assertTrue
6+
import org.apache.kafka.clients.producer.ProducerInterceptor
7+
import org.mockito.kotlin.mock
8+
import org.springframework.kafka.core.KafkaTemplate
9+
import org.springframework.kafka.core.ProducerFactory
10+
import org.springframework.kafka.support.CompositeProducerInterceptor
11+
12+
class SentryKafkaProducerBeanPostProcessorTest {
13+
14+
private fun readInterceptor(template: KafkaTemplate<*, *>): Any? {
15+
val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor")
16+
field.isAccessible = true
17+
return field.get(template)
18+
}
19+
20+
@Test
21+
fun `sets SentryProducerInterceptor on KafkaTemplate`() {
22+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
23+
val processor = SentryKafkaProducerBeanPostProcessor()
24+
25+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
26+
27+
assertTrue(readInterceptor(template) is SentryProducerInterceptor<*, *>)
28+
}
29+
30+
@Test
31+
fun `does not double-wrap when SentryProducerInterceptor already set`() {
32+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
33+
val processor = SentryKafkaProducerBeanPostProcessor()
34+
35+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
36+
val firstInterceptor = readInterceptor(template)
37+
38+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
39+
val secondInterceptor = readInterceptor(template)
40+
41+
assertSame(firstInterceptor, secondInterceptor)
42+
}
43+
44+
@Test
45+
fun `does not modify non-KafkaTemplate beans`() {
46+
val someBean = "not a kafka template"
47+
val processor = SentryKafkaProducerBeanPostProcessor()
48+
49+
val result = processor.postProcessAfterInitialization(someBean, "someBean")
50+
51+
assertSame(someBean, result)
52+
}
53+
54+
@Test
55+
fun `returns the same bean instance`() {
56+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
57+
val processor = SentryKafkaProducerBeanPostProcessor()
58+
59+
val result = processor.postProcessAfterInitialization(template, "kafkaTemplate")
60+
61+
assertSame(template, result, "BPP should return the same bean, not a replacement")
62+
}
63+
64+
@Test
65+
fun `composes with existing customer interceptor using CompositeProducerInterceptor`() {
66+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
67+
val customerInterceptor = mock<ProducerInterceptor<String, String>>()
68+
template.setProducerInterceptor(customerInterceptor)
69+
70+
val processor = SentryKafkaProducerBeanPostProcessor()
71+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
72+
73+
assertTrue(
74+
readInterceptor(template) is CompositeProducerInterceptor<*, *>,
75+
"Should use CompositeProducerInterceptor when existing interceptor is present",
76+
)
77+
}
78+
}

0 commit comments

Comments
 (0)