Skip to content

Commit 4ec3244

Browse files
authored
Merge pull request #5341 from getsentry/fix/kafka-consumer-interceptor-reflection
fix(kafka): [Queue Instrumentation 36] Avoid dropping customer interceptor
2 parents 6c836c1 + 934fed9 commit 4ec3244

2 files changed

Lines changed: 108 additions & 15 deletions

File tree

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,26 @@
2121
public final class SentryKafkaConsumerBeanPostProcessor
2222
implements BeanPostProcessor, PriorityOrdered {
2323

24+
private static final @NotNull String RECORD_INTERCEPTOR_FIELD_NAME = "recordInterceptor";
25+
26+
private final @NotNull String recordInterceptorFieldName;
27+
28+
public SentryKafkaConsumerBeanPostProcessor() {
29+
this(RECORD_INTERCEPTOR_FIELD_NAME);
30+
}
31+
32+
SentryKafkaConsumerBeanPostProcessor(final @NotNull String recordInterceptorFieldName) {
33+
this.recordInterceptorFieldName = recordInterceptorFieldName;
34+
}
35+
36+
private static final class InterceptorReadFailedException extends Exception {
37+
private static final long serialVersionUID = 1L;
38+
39+
InterceptorReadFailedException(final @NotNull Throwable cause) {
40+
super(cause);
41+
}
42+
}
43+
2444
@Override
2545
@SuppressWarnings("unchecked")
2646
public @NotNull Object postProcessAfterInitialization(
@@ -29,7 +49,23 @@ public final class SentryKafkaConsumerBeanPostProcessor
2949
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory =
3050
(AbstractKafkaListenerContainerFactory<?, ?, ?>) bean;
3151

32-
final @Nullable RecordInterceptor<?, ?> existing = getExistingInterceptor(factory);
52+
final @Nullable RecordInterceptor<?, ?> existing;
53+
try {
54+
existing = getExistingInterceptor(factory);
55+
} catch (InterceptorReadFailedException e) {
56+
ScopesAdapter.getInstance()
57+
.getOptions()
58+
.getLogger()
59+
.log(
60+
SentryLevel.ERROR,
61+
e,
62+
"Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read "
63+
+ "existing recordInterceptor via reflection. Refusing to install Sentry's "
64+
+ "interceptor to avoid overwriting a customer-configured RecordInterceptor.",
65+
beanName);
66+
return bean;
67+
}
68+
3369
if (existing instanceof SentryKafkaRecordInterceptor) {
3470
return bean;
3571
}
@@ -42,25 +78,16 @@ public final class SentryKafkaConsumerBeanPostProcessor
4278
return bean;
4379
}
4480

45-
@SuppressWarnings("unchecked")
4681
private @Nullable RecordInterceptor<?, ?> getExistingInterceptor(
47-
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
82+
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory)
83+
throws InterceptorReadFailedException {
4884
try {
4985
final @NotNull Field field =
50-
AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor");
86+
AbstractKafkaListenerContainerFactory.class.getDeclaredField(recordInterceptorFieldName);
5187
field.setAccessible(true);
5288
return (RecordInterceptor<?, ?>) field.get(factory);
53-
} catch (NoSuchFieldException | IllegalAccessException e) {
54-
ScopesAdapter.getInstance()
55-
.getOptions()
56-
.getLogger()
57-
.log(
58-
SentryLevel.WARNING,
59-
"Unable to read existing recordInterceptor from "
60-
+ "AbstractKafkaListenerContainerFactory via reflection. "
61-
+ "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.",
62-
e);
63-
return null;
89+
} catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) {
90+
throw new InterceptorReadFailedException(e);
6491
}
6592
}
6693

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package io.sentry.spring.jakarta.kafka
33
import kotlin.test.Test
44
import kotlin.test.assertSame
55
import kotlin.test.assertTrue
6+
import org.apache.kafka.clients.consumer.Consumer
7+
import org.apache.kafka.clients.consumer.ConsumerRecord
68
import org.mockito.kotlin.mock
79
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
810
import org.springframework.kafka.core.ConsumerFactory
11+
import org.springframework.kafka.listener.RecordInterceptor
912

1013
class SentryKafkaConsumerBeanPostProcessorTest {
1114

@@ -55,4 +58,67 @@ class SentryKafkaConsumerBeanPostProcessorTest {
5558

5659
assertSame(someBean, result)
5760
}
61+
62+
@Test
63+
fun `chains existing customer RecordInterceptor as delegate`() {
64+
val consumerFactory = mock<ConsumerFactory<String, String>>()
65+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
66+
factory.consumerFactory = consumerFactory
67+
68+
val customerInterceptor =
69+
object : RecordInterceptor<String, String> {
70+
override fun intercept(
71+
record: ConsumerRecord<String, String>,
72+
consumer: Consumer<String, String>,
73+
): ConsumerRecord<String, String>? = record
74+
}
75+
factory.setRecordInterceptor(customerInterceptor)
76+
77+
val processor = SentryKafkaConsumerBeanPostProcessor()
78+
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
79+
80+
val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
81+
field.isAccessible = true
82+
val installed = field.get(factory)
83+
assertTrue(
84+
installed is SentryKafkaRecordInterceptor<*, *>,
85+
"expected SentryKafkaRecordInterceptor, got ${installed?.javaClass}",
86+
)
87+
88+
val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate")
89+
delegateField.isAccessible = true
90+
assertSame(
91+
customerInterceptor,
92+
delegateField.get(installed),
93+
"customer interceptor must be preserved as delegate",
94+
)
95+
}
96+
97+
@Test
98+
fun `skips installation when reflection fails and preserves customer interceptor`() {
99+
val consumerFactory = mock<ConsumerFactory<String, String>>()
100+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
101+
factory.consumerFactory = consumerFactory
102+
val customerInterceptor =
103+
object : RecordInterceptor<String, String> {
104+
override fun intercept(
105+
record: ConsumerRecord<String, String>,
106+
consumer: Consumer<String, String>,
107+
): ConsumerRecord<String, String>? = record
108+
}
109+
factory.setRecordInterceptor(customerInterceptor)
110+
111+
val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
112+
field.isAccessible = true
113+
assertSame(customerInterceptor, field.get(factory))
114+
115+
val processor = SentryKafkaConsumerBeanPostProcessor("missingRecordInterceptor")
116+
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
117+
118+
assertSame(
119+
customerInterceptor,
120+
field.get(factory),
121+
"customer interceptor must remain installed when Sentry cannot read it",
122+
)
123+
}
58124
}

0 commit comments

Comments
 (0)