Skip to content

Commit 2e7b369

Browse files
committed
Add Observations for send/receive
* Observes sends on PulsarTemplate * Observes receives on PulsarListener * Adds auto-generated adocs Closes #29
1 parent 6c19765 commit 2e7b369

21 files changed

+1260
-49
lines changed

spring-pulsar-dependencies/build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ ext {
1717
jaywayJsonPathVersion = '2.6.0'
1818
junitJupiterVersion = '5.9.0'
1919
log4jVersion = '2.18.0'
20+
micrometerVersion = '1.10.0-SNAPSHOT'
21+
micrometerTracingVersion = '1.0.0-SNAPSHOT'
2022
mockitoVersion = '4.6.1'
2123
protobufJavaVersion = '3.21.5'
2224
pulsarTestcontainersVersion = '1.17.3'
@@ -35,6 +37,8 @@ dependencies {
3537
api platform("org.junit:junit-bom:$junitJupiterVersion")
3638
api platform("org.mockito:mockito-bom:$mockitoVersion")
3739
api platform("org.springframework:spring-framework-bom:$springVersion")
40+
api platform("io.micrometer:micrometer-bom:$micrometerVersion")
41+
api platform("io.micrometer:micrometer-tracing-bom:$micrometerTracingVersion")
3842

3943
constraints {
4044
api "com.github.ben-manes.caffeine:caffeine:$caffeineVersion"

spring-pulsar-docs/build.gradle

+33-3
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,21 @@ plugins {
55

66
description = 'Spring Pulsar Docs'
77

8+
ext {
9+
micrometerDocsVersion="1.0.0-SNAPSHOT"
10+
}
11+
812
configurations {
913
configurationProperties
14+
observationDocs
1015
}
1116

1217
dependencies {
1318
api project (':spring-pulsar')
1419
api 'org.springframework.boot:spring-boot-starter'
1520
configurationProperties(project(path: ":spring-pulsar-spring-boot-autoconfigure", configuration: "configurationPropertiesMetadata"))
21+
observationDocs "io.micrometer:micrometer-docs-generator-spans:$micrometerDocsVersion"
22+
observationDocs "io.micrometer:micrometer-docs-generator-metrics:$micrometerDocsVersion"
1623
}
1724

1825
task aggregatedJavadoc(type: Javadoc) {
@@ -53,6 +60,29 @@ task documentConfigurationProperties(type: org.springframework.pulsar.gradle.doc
5360
outputDir = file("${buildDir}/docs/generated/")
5461
}
5562

63+
def observationsInputDir = file("${rootDir}/spring-pulsar/src/main/java/org/springframework/pulsar/observation").absolutePath
64+
def observationsOutputDir = file("${buildDir}/docs/generated/observation/").absolutePath
65+
66+
task generateObservabilityMetricsDocs(type: JavaExec) {
67+
mainClass = 'io.micrometer.docs.metrics.DocsFromSources'
68+
inputs.dir(observationsInputDir)
69+
outputs.dir(observationsOutputDir)
70+
classpath configurations.observationDocs
71+
args observationsInputDir, '.*', observationsOutputDir
72+
}
73+
74+
task generateObservabilitySpansDocs(type: JavaExec) {
75+
mainClass = 'io.micrometer.docs.spans.DocsFromSources'
76+
inputs.dir(observationsInputDir)
77+
outputs.dir(observationsOutputDir)
78+
classpath configurations.observationDocs
79+
args observationsInputDir, '.*', observationsOutputDir
80+
}
81+
82+
task generateObservabilityDocs {
83+
dependsOn generateObservabilityMetricsDocs, generateObservabilitySpansDocs
84+
}
85+
5686
tasks.withType(org.asciidoctor.gradle.jvm.AbstractAsciidoctorTask) {
5787
asciidoctorj {
5888
fatalWarnings = ['^((?!successfully validated).)*$']
@@ -81,7 +111,7 @@ task asciidoctorMultipage(type: org.asciidoctor.gradle.jvm.AsciidoctorTask) {
81111
}
82112

83113
syncDocumentationSourceForAsciidoctor {
84-
dependsOn documentConfigurationProperties
114+
dependsOn documentConfigurationProperties, generateObservabilityDocs
85115
from("${buildDir}/docs/generated") {
86116
into "asciidoc"
87117
}
@@ -91,7 +121,7 @@ syncDocumentationSourceForAsciidoctor {
91121
}
92122

93123
syncDocumentationSourceForAsciidoctorMultipage {
94-
dependsOn documentConfigurationProperties
124+
dependsOn documentConfigurationProperties, generateObservabilityDocs
95125
from("${buildDir}/docs/generated") {
96126
into "asciidoc"
97127
}
@@ -101,7 +131,7 @@ syncDocumentationSourceForAsciidoctorMultipage {
101131
}
102132

103133
syncDocumentationSourceForAsciidoctorPdf {
104-
dependsOn documentConfigurationProperties
134+
dependsOn documentConfigurationProperties, generateObservabilityDocs
105135
from("${buildDir}/docs/generated") {
106136
into "asciidoc"
107137
}

spring-pulsar-docs/src/main/asciidoc/pulsar.adoc

+21
Original file line numberDiff line numberDiff line change
@@ -1222,6 +1222,27 @@ PulsarTopic partitionedTopic {
12221222
----
12231223
====
12241224

1225+
[[micrometer]]
1226+
=== Observability
1227+
1228+
[[observation]]
1229+
==== Micrometer Observation
1230+
The `PulsarTemplate` and `PulsarListener` are instrumented with the Micrometer observations API.
1231+
When enabled, send and receive operations are traced and timed.
1232+
1233+
To enable, set `observationEnabled` on each component.
1234+
1235+
===== Custom tags
1236+
The default implementation adds the `bean.name` tag for template observations and `listener.id` tag for listener observations.
1237+
To add other tags to timers/traces, configure a custom `PulsarTemplateObservationConvention` or `PulsarListenerObservationConvention` to the template or listener container, respectively.
1238+
1239+
TIP: You can either subclass `DefaultPulsarTemplateObservationConvention` or `DefaultPulsarListenerObservationConvention` or provide completely new implementations.
1240+
1241+
include::observation/_metrics.adoc[leveloffset=+2]
1242+
1243+
include::observation/_spans.adoc[leveloffset=+2]
1244+
1245+
Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.
12251246

12261247
==== Appendix
12271248
The reference documentation has the following appendices:

spring-pulsar/build.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ description = 'Spring Pulsar Support'
77
dependencies {
88
api 'com.github.ben-manes.caffeine:caffeine'
99
api 'com.google.protobuf:protobuf-java'
10+
api 'io.micrometer:micrometer-observation'
1011
api 'org.apache.pulsar:pulsar-client-all'
1112
api 'org.springframework:spring-context'
1213
api 'org.springframework:spring-messaging'
@@ -26,6 +27,10 @@ dependencies {
2627
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
2728
testRuntimeOnly 'org.apache.logging.log4j:log4j-core'
2829
testRuntimeOnly 'org.apache.logging.log4j:log4j-jcl'
30+
testImplementation 'io.micrometer:micrometer-observation-test'
31+
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
32+
testImplementation 'io.micrometer:micrometer-tracing-test'
33+
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
2934
testImplementation 'org.assertj:assertj-core'
3035
testImplementation 'org.awaitility:awaitility'
3136
testImplementation 'org.hamcrest:hamcrest'

spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void afterPropertiesSet() {
118118
@Override
119119
public C createListenerContainer(PulsarListenerEndpoint endpoint) {
120120
C instance = createContainerInstance(endpoint);
121-
JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getSubscriptionName(), instance::setBeanName);
121+
JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
122122
if (endpoint instanceof AbstractPulsarListenerEndpoint) {
123123
configureEndpoint((AbstractPulsarListenerEndpoint<C>) endpoint);
124124
}
@@ -171,6 +171,8 @@ else if (this.autoStartup != null) {
171171
instanceProperties.setMaxNumMessages(this.containerProperties.getMaxNumMessages());
172172
instanceProperties.setMaxNumBytes(this.containerProperties.getMaxNumBytes());
173173
instanceProperties.setBatchTimeoutMillis(this.containerProperties.getBatchTimeoutMillis());
174+
instanceProperties.setObservationEnabled(this.containerProperties.isObservationEnabled());
175+
instanceProperties.setObservationConvention(this.containerProperties.getObservationConvention());
174176

175177
JavaUtils.INSTANCE.acceptIfNotNull(this.phase, instance::setPhase)
176178
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)

spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java

-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
7373
}
7474

7575
properties.setSchemaType(endpoint.getSchemaType());
76-
7776
return new ConcurrentPulsarMessageListenerContainer<T>(getPulsarConsumerFactory(), properties);
7877
}
7978

spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java

+105-17
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,19 @@
2929
import org.apache.pulsar.client.api.TypedMessageBuilder;
3030
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
3131

32+
import org.springframework.beans.factory.BeanNameAware;
33+
import org.springframework.beans.factory.ObjectProvider;
34+
import org.springframework.beans.factory.SmartInitializingSingleton;
35+
import org.springframework.context.ApplicationContext;
36+
import org.springframework.context.ApplicationContextAware;
3237
import org.springframework.core.log.LogAccessor;
38+
import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention;
39+
import org.springframework.pulsar.observation.PulsarMessageSenderContext;
40+
import org.springframework.pulsar.observation.PulsarTemplateObservation;
41+
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;
42+
43+
import io.micrometer.observation.Observation;
44+
import io.micrometer.observation.ObservationRegistry;
3345

3446
/**
3547
* A thread-safe template for executing high-level Pulsar operations.
@@ -39,16 +51,27 @@
3951
* @author Chris Bono
4052
* @author Alexander Preuß
4153
*/
42-
public class PulsarTemplate<T> implements PulsarOperations<T> {
54+
public class PulsarTemplate<T>
55+
implements PulsarOperations<T>, ApplicationContextAware, BeanNameAware, SmartInitializingSingleton {
4356

4457
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
4558

4659
private final PulsarProducerFactory<T> producerFactory;
4760

4861
private final List<ProducerInterceptor> interceptors;
4962

63+
private ApplicationContext applicationContext;
64+
65+
private String beanName;
66+
5067
private Schema<T> schema;
5168

69+
private boolean observationEnabled;
70+
71+
private PulsarTemplateObservationConvention observationConvention;
72+
73+
private ObservationRegistry observationRegistry;
74+
5275
/**
5376
* Construct a template instance.
5477
* @param producerFactory the factory used to create the backing Pulsar producers.
@@ -92,14 +115,52 @@ public SendMessageBuilder<T> newMessage(T message) {
92115
return new SendMessageBuilderImpl<>(this, message);
93116
}
94117

118+
@Override
119+
public void setApplicationContext(ApplicationContext applicationContext) {
120+
this.applicationContext = applicationContext;
121+
}
122+
123+
@Override
124+
public void setBeanName(String beanName) {
125+
this.beanName = beanName;
126+
}
127+
95128
/**
96-
* Setter for schema.
129+
* Set the schema to use on this template.
97130
* @param schema provides the {@link Schema} used on this template
98131
*/
99132
public void setSchema(Schema<T> schema) {
100133
this.schema = schema;
101134
}
102135

136+
/**
137+
* Set to true to enable observation via Micrometer.
138+
* @param observationEnabled true to enable.
139+
*/
140+
public void setObservationEnabled(boolean observationEnabled) {
141+
this.observationEnabled = observationEnabled;
142+
}
143+
144+
/**
145+
* Set a custom observation convention.
146+
* @param observationConvention the convention.
147+
*/
148+
public void setObservationConvention(PulsarTemplateObservationConvention observationConvention) {
149+
this.observationConvention = observationConvention;
150+
}
151+
152+
@Override
153+
public void afterSingletonsInstantiated() {
154+
// TODO is this how we want to do this? What about SBAC?
155+
// TODO when would AC be null? Should we assert or at least log the fact if it
156+
// happens?
157+
if (this.observationEnabled && this.observationRegistry == null && this.applicationContext != null) {
158+
ObjectProvider<ObservationRegistry> registry = this.applicationContext
159+
.getBeanProvider(ObservationRegistry.class);
160+
this.observationRegistry = registry.getIfUnique();
161+
}
162+
}
163+
103164
private MessageId doSend(String topic, T message, TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer,
104165
MessageRouter messageRouter, ProducerBuilderCustomizer<T> producerCustomizer) throws PulsarClientException {
105166
try {
@@ -115,22 +176,49 @@ private CompletableFuture<MessageId> doSendAsync(String topic, T message,
115176
ProducerBuilderCustomizer<T> producerCustomizer) throws PulsarClientException {
116177
final String topicName = ProducerUtils.resolveTopicName(topic, this.producerFactory);
117178
this.logger.trace(() -> String.format("Sending msg to '%s' topic", topicName));
118-
final Producer<T> producer = prepareProducerForSend(topic, message, messageRouter, producerCustomizer);
119-
TypedMessageBuilder<T> messageBuilder = producer.newMessage().value(message);
120-
if (typedMessageBuilderCustomizer != null) {
121-
typedMessageBuilderCustomizer.customize(messageBuilder);
122-
}
123-
return messageBuilder.sendAsync().whenComplete((msgId, ex) -> {
124-
if (ex == null) {
125-
this.logger.trace(() -> String.format("Sent msg to '%s' topic", topicName));
126-
// TODO success metrics
127-
}
128-
else {
129-
this.logger.error(ex, () -> String.format("Failed to send msg to '%s' topic", topicName));
130-
// TODO fail metrics
179+
180+
PulsarMessageSenderContext senderContext = PulsarMessageSenderContext.newContext(topicName, this.beanName);
181+
Observation observation = newObservation(senderContext);
182+
try {
183+
observation.start();
184+
final Producer<T> producer = prepareProducerForSend(topic, message, messageRouter, producerCustomizer);
185+
TypedMessageBuilder<T> messageBuilder = producer.newMessage().value(message);
186+
if (typedMessageBuilderCustomizer != null) {
187+
typedMessageBuilderCustomizer.customize(messageBuilder);
131188
}
132-
ProducerUtils.closeProducerAsync(producer, this.logger);
133-
});
189+
senderContext.properties().forEach(messageBuilder::property); // propagate
190+
// props to
191+
// message
192+
return messageBuilder.sendAsync().whenComplete((msgId, ex) -> {
193+
if (ex == null) {
194+
this.logger.trace(() -> String.format("Sent msg to '%s' topic", topicName));
195+
observation.stop();
196+
}
197+
else {
198+
this.logger.error(ex, () -> String.format("Failed to send msg to '%s' topic", topicName));
199+
observation.error(ex);
200+
observation.stop();
201+
}
202+
ProducerUtils.closeProducerAsync(producer, this.logger);
203+
});
204+
}
205+
catch (RuntimeException ex) {
206+
observation.error(ex);
207+
observation.stop();
208+
throw ex;
209+
}
210+
}
211+
212+
private Observation newObservation(PulsarMessageSenderContext senderContext) {
213+
Observation observation;
214+
if (!this.observationEnabled || this.observationRegistry == null) {
215+
observation = Observation.NOOP;
216+
}
217+
else {
218+
observation = PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention,
219+
DefaultPulsarTemplateObservationConvention.INSTANCE, senderContext, this.observationRegistry);
220+
}
221+
return observation;
134222
}
135223

136224
private Producer<T> prepareProducerForSend(String topic, T message, MessageRouter messageRouter,

spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void setBeanName(String name) {
111111
*/
112112
@Nullable
113113
public String getBeanName() {
114-
return this.beanName;
114+
return this.beanName; // the container factory sets this to the listener id
115115
}
116116

117117
@Override

0 commit comments

Comments
 (0)