Skip to content
This repository was archived by the owner on Feb 17, 2025. It is now read-only.

Commit feb235a

Browse files
Release 0.1.7
1 parent 39f5ba8 commit feb235a

File tree

10 files changed

+72
-42
lines changed

10 files changed

+72
-42
lines changed

inspector-axon-api/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.inspector</groupId>
2323
<artifactId>inspector-axon-parent</artifactId>
24-
<version>0.1.7-SNAPSHOT</version>
24+
<version>0.1.7</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

inspector-axon-spring-boot-starter/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.inspector</groupId>
2323
<artifactId>inspector-axon-parent</artifactId>
24-
<version>0.1.7-SNAPSHOT</version>
24+
<version>0.1.7</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

inspector-axon/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>io.axoniq.inspector</groupId>
2323
<artifactId>inspector-axon-parent</artifactId>
24-
<version>0.1.7-SNAPSHOT</version>
24+
<version>0.1.7</version>
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

inspector-axon/src/main/java/io/axoniq/inspector/eventprocessor/metrics/InspectorHandlerProcessorInterceptor.kt

+22-16
Original file line numberDiff line numberDiff line change
@@ -25,37 +25,43 @@ import org.axonframework.messaging.unitofwork.BatchingUnitOfWork
2525
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork
2626
import org.axonframework.messaging.unitofwork.UnitOfWork
2727
import org.axonframework.serialization.UnknownSerializedType
28+
import org.slf4j.LoggerFactory
2829
import java.time.Instant
2930
import java.time.temporal.ChronoUnit
3031

3132
class InspectorHandlerProcessorInterceptor(
3233
private val processorMetricsRegistry: ProcessorMetricsRegistry,
3334
private val processorName: String,
3435
) : MessageHandlerInterceptor<Message<*>> {
36+
private val logger = LoggerFactory.getLogger(this::class.java)
3537

3638
override fun handle(unitOfWork: UnitOfWork<out Message<*>>, interceptorChain: InterceptorChain): Any? {
3739
val uow = CurrentUnitOfWork.map { it }.orElse(null)
3840
if (uow == null || unitOfWork.message.payload is UnknownSerializedType) {
3941
return interceptorChain.proceed()
4042
}
41-
unitOfWork.resources()[INSPECTOR_PROCESSING_GROUP] = processorName
42-
val message = unitOfWork.message
43-
if (message is EventMessage) {
44-
val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
45-
processorMetricsRegistry.registerIngested(
46-
processorName,
47-
segment,
48-
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
49-
)
50-
if(unitOfWork !is BatchingUnitOfWork<*> || unitOfWork.isLastMessage) {
51-
unitOfWork.afterCommit {
52-
processorMetricsRegistry.registerCommitted(
53-
processorName,
54-
segment,
55-
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
56-
)
43+
try {
44+
unitOfWork.resources()[INSPECTOR_PROCESSING_GROUP] = processorName
45+
val message = unitOfWork.message
46+
if (message is EventMessage) {
47+
val segment = unitOfWork.resources()["Processor[$processorName]/SegmentId"] as? Int ?: -1
48+
processorMetricsRegistry.registerIngested(
49+
processorName,
50+
segment,
51+
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
52+
)
53+
if (unitOfWork !is BatchingUnitOfWork<*> || unitOfWork.isLastMessage) {
54+
unitOfWork.afterCommit {
55+
processorMetricsRegistry.registerCommitted(
56+
processorName,
57+
segment,
58+
ChronoUnit.NANOS.between(message.timestamp, Instant.now())
59+
)
60+
}
5761
}
5862
}
63+
} catch (e: Exception) {
64+
logger.info("Inspector Axon could not register metrics for processor $processorName", e)
5965
}
6066
return interceptorChain.proceed()
6167
}

inspector-axon/src/main/java/io/axoniq/inspector/eventprocessor/metrics/ProcessorMetricsRegistry.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.axoniq.inspector.eventprocessor.metrics
1818

19+
import io.axoniq.inspector.computeIfAbsentWithRetry
1920
import java.time.Clock
2021
import java.util.concurrent.ConcurrentHashMap
2122
import java.util.concurrent.atomic.AtomicLong
@@ -34,11 +35,15 @@ class ProcessorMetricsRegistry {
3435
}
3536

3637
fun ingestLatencyForProcessor(processor: String, segment: Int): ExpiringLatencyValue {
37-
return ingestLatencyRegistry.computeIfAbsent(processor) { mutableMapOf() }.computeIfAbsent(segment) { ExpiringLatencyValue() }
38+
return ingestLatencyRegistry
39+
.computeIfAbsentWithRetry(processor) { mutableMapOf() }
40+
.computeIfAbsentWithRetry(segment) { ExpiringLatencyValue() }
3841
}
3942

4043
fun commitLatencyForProcessor(processor: String, segment: Int): ExpiringLatencyValue {
41-
return commitLatencyRegistry.computeIfAbsent(processor) { mutableMapOf() }.computeIfAbsent(segment) { ExpiringLatencyValue() }
44+
return commitLatencyRegistry
45+
.computeIfAbsentWithRetry(processor) { mutableMapOf() }
46+
.computeIfAbsentWithRetry(segment) { ExpiringLatencyValue() }
4247
}
4348

4449
class ExpiringLatencyValue(

inspector-axon/src/main/java/io/axoniq/inspector/messaging/HandlerMetricsRegistry.kt

+6-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package io.axoniq.inspector.messaging
1919
import io.axoniq.inspector.api.Routes
2020
import io.axoniq.inspector.api.metrics.*
2121
import io.axoniq.inspector.client.RSocketInspectorClient
22+
import io.axoniq.inspector.computeIfAbsentWithRetry
2223
import io.micrometer.core.instrument.Timer
2324
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
2425
import org.axonframework.lifecycle.Lifecycle
@@ -123,14 +124,14 @@ class HandlerMetricsRegistry(
123124
duration: Long,
124125
metrics: Map<Metric, Long>
125126
) {
126-
val handlerStats = handlers.computeIfAbsent(handler) { _ ->
127+
val handlerStats = handlers.computeIfAbsentWithRetry(handler) { _ ->
127128
HandlerRegistryStatistics(createTimer(handler, "total"))
128129
}
129130
handlerStats.totalTimer.record(duration, TimeUnit.NANOSECONDS)
130131
metrics.filter { it.key.targetTypes.contains(MetricTargetType.HANDLER) }
131132
.forEach { (metric, value) ->
132133
handlerStats.metrics
133-
.computeIfAbsent(metric) { createTimer(handler, metric.fullIdentifier) }
134+
.computeIfAbsentWithRetry(metric) { createTimer(handler, metric.fullIdentifier) }
134135
.record(value, metric.type.distributionUnit)
135136
}
136137

@@ -141,13 +142,13 @@ class HandlerMetricsRegistry(
141142

142143
if (handler.type == HandlerType.Aggregate) {
143144
val id = AggregateStatisticIdentifier(handler.component!!)
144-
val aggStats = aggregates.computeIfAbsent(id) { _ ->
145+
val aggStats = aggregates.computeIfAbsentWithRetry(id) { _ ->
145146
AggregateRegistryStatistics(createTimer(id, "total"))
146147
}
147148

148149
metrics.filter { it.key.targetTypes.contains(MetricTargetType.AGGREGATE) }.forEach { (metric, value) ->
149150
aggStats.metrics
150-
.computeIfAbsent(metric) { createTimer(id, metric.fullIdentifier) }
151+
.computeIfAbsentWithRetry(metric) { createTimer(id, metric.fullIdentifier) }
151152
.record(value, metric.type.distributionUnit)
152153
}
153154
aggStats.totalTimer.record(duration, TimeUnit.NANOSECONDS)
@@ -162,7 +163,7 @@ class HandlerMetricsRegistry(
162163
fun registerMessageDispatchedDuringHandling(
163164
dispatcher: DispatcherStatisticIdentifier,
164165
) {
165-
dispatches.computeIfAbsent(dispatcher) { _ ->
166+
dispatches.computeIfAbsentWithRetry(dispatcher) { _ ->
166167
RollingCountMeasure()
167168
}.increment()
168169
}

inspector-axon/src/main/java/io/axoniq/inspector/messaging/InspectorHandlerEnhancerDefinition.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.axoniq.inspector.messaging
1818

19+
import io.axoniq.inspector.computeIfAbsentWithRetry
1920
import org.axonframework.common.Priority
2021
import org.axonframework.config.ProcessingGroup
2122
import org.axonframework.messaging.Message
@@ -45,7 +46,7 @@ class InspectorHandlerEnhancerDefinition : HandlerEnhancerDefinition {
4546
}
4647
val uow = CurrentUnitOfWork.get()
4748
uow.resources()[INSPECTOR_DECLARING_CLASS] = declaringClassName
48-
uow.resources().computeIfAbsent(INSPECTOR_PROCESSING_GROUP) { processingGroup }
49+
uow.resources().computeIfAbsentWithRetry(INSPECTOR_PROCESSING_GROUP) { processingGroup }
4950

5051
val start = System.nanoTime()
5152
try {

inspector-axon/src/main/java/io/axoniq/inspector/messaging/InspectorSpanFactory.kt

+19-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.axoniq.inspector.messaging
1818

1919
import io.axoniq.inspector.api.metrics.*
20+
import io.axoniq.inspector.computeIfAbsentWithRetry
2021
import org.axonframework.messaging.Message
2122
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork
2223
import org.axonframework.tracing.Span
@@ -105,20 +106,24 @@ class InspectorSpanFactory : SpanFactory {
105106
}
106107

107108
private fun report(end: Long) {
108-
logger.trace("Reporting span for message id $messageId = $handlerMetricIdentifier")
109-
val success = handlerSuccessful && transactionSuccessful
110-
HandlerMetricsRegistry.getInstance()?.registerMessageHandled(
111-
handler = handlerMetricIdentifier!!,
112-
success = success,
113-
duration = end - timeStarted!!,
114-
metrics = metrics
115-
)
116-
if(success) {
117-
dispatchedMessages.forEach {
118-
HandlerMetricsRegistry.getInstance()?.registerMessageDispatchedDuringHandling(
119-
DispatcherStatisticIdentifier(handlerMetricIdentifier, it)
120-
)
109+
try {
110+
logger.trace("Reporting span for message id $messageId = $handlerMetricIdentifier")
111+
val success = handlerSuccessful && transactionSuccessful
112+
HandlerMetricsRegistry.getInstance()?.registerMessageHandled(
113+
handler = handlerMetricIdentifier!!,
114+
success = success,
115+
duration = end - timeStarted!!,
116+
metrics = metrics
117+
)
118+
if (success) {
119+
dispatchedMessages.forEach {
120+
HandlerMetricsRegistry.getInstance()?.registerMessageDispatchedDuringHandling(
121+
DispatcherStatisticIdentifier(handlerMetricIdentifier, it)
122+
)
123+
}
121124
}
125+
} catch (e: Exception) {
126+
logger.info("Could not report metrics for message $handlerMetricIdentifier", e)
122127
}
123128
}
124129

@@ -193,7 +198,7 @@ class InspectorSpanFactory : SpanFactory {
193198
if (ACTIVE_ROOT_SPANS.containsKey(message.identifier)) {
194199
return NOOP_SPAN
195200
}
196-
return ACTIVE_ROOT_SPANS.computeIfAbsent(message.identifier) {
201+
return ACTIVE_ROOT_SPANS.computeIfAbsentWithRetry(message.identifier) {
197202
MeasuringInspectorSpan(message.identifier)
198203
}
199204
}

inspector-axon/src/main/java/io/axoniq/inspector/utils.kt

+12
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,15 @@ private fun <T : Any> T.fieldOfMatchingType(clazz: Class<out T>): Field? {
3737
return ReflectionUtils.fieldsOf(this::class.java)
3838
.firstOrNull { f -> f.type.isAssignableFrom(clazz) }
3939
}
40+
41+
fun <K, V> MutableMap<K, V>.computeIfAbsentWithRetry(key: K, retries: Int = 0, defaultValue: (K) -> V): V {
42+
try {
43+
return computeIfAbsent(key, defaultValue)
44+
} catch (e: ConcurrentModificationException) {
45+
if(retries < 3) {
46+
return computeIfAbsentWithRetry(key, retries + 1, defaultValue)
47+
}
48+
// We cannot get it from the map. Return the default value without putting it in, so the code can continue.
49+
return defaultValue(key)
50+
}
51+
}

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
<groupId>io.axoniq.inspector</groupId>
2424
<artifactId>inspector-axon-parent</artifactId>
25-
<version>0.1.7-SNAPSHOT</version>
25+
<version>0.1.7</version>
2626

2727
<modules>
2828
<module>inspector-axon-api</module>

0 commit comments

Comments
 (0)