From 356dae5dd4e60bb2cefc434d7b1290cf38ca1b58 Mon Sep 17 00:00:00 2001 From: Milan Savic Date: Tue, 10 Sep 2019 14:52:31 +0200 Subject: [PATCH 1/4] TrackingEventProcessor logging of received events. --- .../eventhandling/TrackingEventProcessor.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java b/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java index 900c30cdad..4d8d3f5584 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java @@ -343,6 +343,7 @@ private void processBatch(Segment segment, BlockingStream Collection processingSegments; if (eventStream.hasNextAvailable(1, SECONDS)) { final TrackedEventMessage firstMessage = eventStream.nextAvailable(); + logReceivedEvent(segment, firstMessage); lastToken = firstMessage.trackingToken(); processingSegments = processingSegments(lastToken, segment); if (canHandle(firstMessage, processingSegments)) { @@ -354,6 +355,7 @@ private void processBatch(Segment segment, BlockingStream && i < batchSize * 10 && batch.size() < batchSize && eventStream.peek().map(m -> isRegularProcessing(segment, m)).orElse(false); i++) { final TrackedEventMessage trackedEventMessage = eventStream.nextAvailable(); + logReceivedEvent(segment, trackedEventMessage); lastToken = trackedEventMessage.trackingToken(); if (canHandle(trackedEventMessage, processingSegments)) { batch.add(trackedEventMessage); @@ -383,6 +385,7 @@ private void processBatch(Segment segment, BlockingStream while (lastToken != null && eventStream.peek().filter(event -> finalLastToken.equals(event.trackingToken())).isPresent()) { final TrackedEventMessage trackedEventMessage = eventStream.nextAvailable(); + logReceivedEvent(segment, trackedEventMessage); if (canHandle(trackedEventMessage, processingSegments)) { batch.add(trackedEventMessage); } @@ -402,6 +405,15 @@ private void processBatch(Segment segment, BlockingStream } } + private void logReceivedEvent(Segment segment, TrackedEventMessage message) { + logger.trace("Processor {} receiving event {}. Tracking token {}. Segment id {}. Thread {}.", + getName(), + message.getIdentifier(), + message.trackingToken(), + segment.getSegmentId(), + Thread.currentThread().getName()); + } + /** * Indicates whether any of the components handling events for this Processor are able to handle the given * {@code eventMessage} for any of the given {@code segments}. From 326c8e09768d5e284579836c9a132777d083e130 Mon Sep 17 00:00:00 2001 From: Milan Savic Date: Tue, 10 Sep 2019 15:47:42 +0200 Subject: [PATCH 2/4] Logging of received events. --- .../event/axon/AxonServerEventStore.java | 4 +++- .../connector/event/axon/EventBuffer.java | 15 ++++++++++++-- .../SimpleEventHandlerInvoker.java | 8 ++++++++ .../eventhandling/TrackingEventProcessor.java | 20 +++++++++++++------ .../modelling/saga/AbstractSagaManager.java | 4 ++++ 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java index b00127623a..b105e41e56 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java @@ -424,7 +424,9 @@ public TrackingEventStream openStream(TrackingToken trackingToken) { .listEvents(new StreamObserver() { @Override public void onNext(EventWithToken eventWithToken) { - logger.debug("Received event with token: {}", eventWithToken.getToken()); + logger.debug("Received event with token: {}. Stream Observer {}.", + eventWithToken.getToken(), + this); consumer.push(eventWithToken); } diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java index c2755b26ec..d272bb0ad4 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java @@ -156,7 +156,12 @@ public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) throws Interrupt public TrackedEventMessage nextAvailable() { try { hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - return peekEvent == null ? eventStream.next() : peekEvent; + TrackedEventMessage next = peekEvent == null ? eventStream.next() : peekEvent; + logger.trace("Polling next available event {} in an Event Buffer {}. Tracking Token {}.", + next.getIdentifier(), + this, + next.trackingToken()); + return next; } catch (InterruptedException e) { logger.warn("Consumer thread was interrupted. Returning thread to event processor.", e); Thread.currentThread().interrupt(); @@ -180,7 +185,13 @@ public boolean push(EventWithToken event) { } try { TrackingToken trackingToken = new GlobalSequenceTrackingToken(event.getToken()); - events.put(new TrackedDomainEventData<>(trackingToken, new GrpcBackedDomainEventData(event.getEvent()))); + TrackedDomainEventData trackedDomainEventData = + new TrackedDomainEventData<>(trackingToken, new GrpcBackedDomainEventData(event.getEvent())); + logger.trace("Pushing event {} in an Event Buffer {}. Tracking Token {}.", + trackedDomainEventData.getEventIdentifier(), + this, + trackingToken); + events.put(trackedDomainEventData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); closeCallback.accept(this); diff --git a/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java b/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java index 0af8474c36..2663f46fb6 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java @@ -21,6 +21,8 @@ import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy; import org.axonframework.messaging.annotation.HandlerDefinition; import org.axonframework.messaging.annotation.ParameterResolverFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; import java.util.stream.Collectors; @@ -39,6 +41,8 @@ */ public class SimpleEventHandlerInvoker implements EventHandlerInvoker { + private static final Logger logger = LoggerFactory.getLogger(SimpleEventHandlerInvoker.class); + private final List eventHandlers; private final List wrappedEventHandlers; private final ListenerInvocationErrorHandler listenerInvocationErrorHandler; @@ -102,6 +106,10 @@ public List eventHandlers() { @Override public void handle(EventMessage message, Segment segment) throws Exception { + logger.trace("Received event {}. Segment Id {}. Thread {}.", + message.getIdentifier(), + segment.getSegmentId(), + Thread.currentThread().getName()); if (sequencingPolicyMatchesSegment(message, segment)) { for (EventMessageHandler handler : wrappedEventHandlers) { try { diff --git a/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java b/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java index 4d8d3f5584..4447eb88a1 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.singleton; import static java.util.Objects.nonNull; @@ -343,7 +344,7 @@ private void processBatch(Segment segment, BlockingStream Collection processingSegments; if (eventStream.hasNextAvailable(1, SECONDS)) { final TrackedEventMessage firstMessage = eventStream.nextAvailable(); - logReceivedEvent(segment, firstMessage); + logReceivedEvent(segment, firstMessage, eventStream); lastToken = firstMessage.trackingToken(); processingSegments = processingSegments(lastToken, segment); if (canHandle(firstMessage, processingSegments)) { @@ -355,7 +356,7 @@ private void processBatch(Segment segment, BlockingStream && i < batchSize * 10 && batch.size() < batchSize && eventStream.peek().map(m -> isRegularProcessing(segment, m)).orElse(false); i++) { final TrackedEventMessage trackedEventMessage = eventStream.nextAvailable(); - logReceivedEvent(segment, trackedEventMessage); + logReceivedEvent(segment, trackedEventMessage, eventStream); lastToken = trackedEventMessage.trackingToken(); if (canHandle(trackedEventMessage, processingSegments)) { batch.add(trackedEventMessage); @@ -385,7 +386,7 @@ private void processBatch(Segment segment, BlockingStream while (lastToken != null && eventStream.peek().filter(event -> finalLastToken.equals(event.trackingToken())).isPresent()) { final TrackedEventMessage trackedEventMessage = eventStream.nextAvailable(); - logReceivedEvent(segment, trackedEventMessage); + logReceivedEvent(segment, trackedEventMessage, eventStream); if (canHandle(trackedEventMessage, processingSegments)) { batch.add(trackedEventMessage); } @@ -395,6 +396,11 @@ private void processBatch(Segment segment, BlockingStream unitOfWork.attachTransaction(transactionManager); unitOfWork.resources().put(segmentIdResourceKey, segment.getSegmentId()); unitOfWork.resources().put(lastTokenResourceKey, finalLastToken); + logger.trace("Processor {} processing batch {}. Segment id {}. Thread {}.", + getName(), + batch.stream().map(EventMessage::getIdentifier).collect(Collectors.joining(",")), + segment.getSegmentId(), + Thread.currentThread().getName()); processInUnitOfWork(batch, unitOfWork, processingSegments); activeSegments.computeIfPresent(segment.getSegmentId(), (k, v) -> v.advancedTo(finalLastToken)); @@ -405,13 +411,15 @@ private void processBatch(Segment segment, BlockingStream } } - private void logReceivedEvent(Segment segment, TrackedEventMessage message) { - logger.trace("Processor {} receiving event {}. Tracking token {}. Segment id {}. Thread {}.", + private void logReceivedEvent(Segment segment, TrackedEventMessage message, BlockingStream eventStream) { + logger.trace("Processor {} receiving event {}. Tracking token {}. Segment id {}. Thread {}. Processor instance {}. Event Stream {}.", getName(), message.getIdentifier(), message.trackingToken(), segment.getSegmentId(), - Thread.currentThread().getName()); + Thread.currentThread().getName(), + this, + eventStream); } /** diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java b/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java index 4153bf2f2a..523fc74eff 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java @@ -73,6 +73,10 @@ protected AbstractSagaManager(Builder builder) { @Override public void handle(EventMessage event, Segment segment) throws Exception { + logger.trace("Received event {}. Segment Id {}. Thread {}.", + event.getIdentifier(), + segment.getSegmentId(), + Thread.currentThread().getName()); Set associationValues = extractAssociationValues(event); Set> sagas = associationValues.stream() From b0e917da826d93de233611591e0c5c55f0668740 Mon Sep 17 00:00:00 2001 From: Milan Savic Date: Tue, 10 Sep 2019 17:30:39 +0200 Subject: [PATCH 3/4] Added checking whether trace log level is enabled before logging. --- .../event/axon/AxonServerEventStore.java | 8 +++-- .../connector/event/axon/EventBuffer.java | 20 +++++++----- .../SimpleEventHandlerInvoker.java | 10 +++--- .../eventhandling/TrackingEventProcessor.java | 31 +++++++++++-------- .../modelling/saga/AbstractSagaManager.java | 10 +++--- 5 files changed, 47 insertions(+), 32 deletions(-) diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java index b105e41e56..11fd29dcc5 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/AxonServerEventStore.java @@ -424,9 +424,11 @@ public TrackingEventStream openStream(TrackingToken trackingToken) { .listEvents(new StreamObserver() { @Override public void onNext(EventWithToken eventWithToken) { - logger.debug("Received event with token: {}. Stream Observer {}.", - eventWithToken.getToken(), - this); + if (logger.isTraceEnabled()) { + logger.trace("Received event with token: {}. Stream Observer {}.", + eventWithToken.getToken(), + this); + } consumer.push(eventWithToken); } diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java index d272bb0ad4..0eade09d53 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/event/axon/EventBuffer.java @@ -157,10 +157,12 @@ public TrackedEventMessage nextAvailable() { try { hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); TrackedEventMessage next = peekEvent == null ? eventStream.next() : peekEvent; - logger.trace("Polling next available event {} in an Event Buffer {}. Tracking Token {}.", - next.getIdentifier(), - this, - next.trackingToken()); + if (logger.isTraceEnabled()) { + logger.trace("Polled next available event {} in an Event Buffer {}. Tracking Token {}.", + next.getIdentifier(), + this, + next.trackingToken()); + } return next; } catch (InterruptedException e) { logger.warn("Consumer thread was interrupted. Returning thread to event processor.", e); @@ -187,11 +189,13 @@ public boolean push(EventWithToken event) { TrackingToken trackingToken = new GlobalSequenceTrackingToken(event.getToken()); TrackedDomainEventData trackedDomainEventData = new TrackedDomainEventData<>(trackingToken, new GrpcBackedDomainEventData(event.getEvent())); - logger.trace("Pushing event {} in an Event Buffer {}. Tracking Token {}.", - trackedDomainEventData.getEventIdentifier(), - this, - trackingToken); events.put(trackedDomainEventData); + if (logger.isTraceEnabled()) { + logger.trace("Pushed event {} in an Event Buffer {}. Tracking Token {}.", + trackedDomainEventData.getEventIdentifier(), + this, + trackingToken); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); closeCallback.accept(this); diff --git a/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java b/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java index 2663f46fb6..9d4e3967ae 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/SimpleEventHandlerInvoker.java @@ -106,10 +106,12 @@ public List eventHandlers() { @Override public void handle(EventMessage message, Segment segment) throws Exception { - logger.trace("Received event {}. Segment Id {}. Thread {}.", - message.getIdentifier(), - segment.getSegmentId(), - Thread.currentThread().getName()); + if (logger.isTraceEnabled()) { + logger.trace("Received event {}. Segment Id {}. Thread {}.", + message.getIdentifier(), + segment.getSegmentId(), + Thread.currentThread().getName()); + } if (sequencingPolicyMatchesSegment(message, segment)) { for (EventMessageHandler handler : wrappedEventHandlers) { try { diff --git a/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java b/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java index 4447eb88a1..3fca134e46 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java @@ -396,11 +396,13 @@ private void processBatch(Segment segment, BlockingStream unitOfWork.attachTransaction(transactionManager); unitOfWork.resources().put(segmentIdResourceKey, segment.getSegmentId()); unitOfWork.resources().put(lastTokenResourceKey, finalLastToken); - logger.trace("Processor {} processing batch {}. Segment id {}. Thread {}.", - getName(), - batch.stream().map(EventMessage::getIdentifier).collect(Collectors.joining(",")), - segment.getSegmentId(), - Thread.currentThread().getName()); + if (logger.isTraceEnabled()) { + logger.trace("Processor {} processing batch {}. Segment id {}. Thread {}.", + getName(), + batch.stream().map(EventMessage::getIdentifier).collect(Collectors.joining(",")), + segment.getSegmentId(), + Thread.currentThread().getName()); + } processInUnitOfWork(batch, unitOfWork, processingSegments); activeSegments.computeIfPresent(segment.getSegmentId(), (k, v) -> v.advancedTo(finalLastToken)); @@ -412,14 +414,17 @@ private void processBatch(Segment segment, BlockingStream } private void logReceivedEvent(Segment segment, TrackedEventMessage message, BlockingStream eventStream) { - logger.trace("Processor {} receiving event {}. Tracking token {}. Segment id {}. Thread {}. Processor instance {}. Event Stream {}.", - getName(), - message.getIdentifier(), - message.trackingToken(), - segment.getSegmentId(), - Thread.currentThread().getName(), - this, - eventStream); + if (logger.isTraceEnabled()) { + logger.trace( + "Processor {} receiving event {}. Tracking token {}. Segment id {}. Thread {}. Processor instance {}. Event Stream {}.", + getName(), + message.getIdentifier(), + message.trackingToken(), + segment.getSegmentId(), + Thread.currentThread().getName(), + this, + eventStream); + } } /** diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java b/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java index 523fc74eff..fb014e5728 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/AbstractSagaManager.java @@ -73,10 +73,12 @@ protected AbstractSagaManager(Builder builder) { @Override public void handle(EventMessage event, Segment segment) throws Exception { - logger.trace("Received event {}. Segment Id {}. Thread {}.", - event.getIdentifier(), - segment.getSegmentId(), - Thread.currentThread().getName()); + if (logger.isTraceEnabled()) { + logger.trace("Received event {}. Segment Id {}. Thread {}.", + event.getIdentifier(), + segment.getSegmentId(), + Thread.currentThread().getName()); + } Set associationValues = extractAssociationValues(event); Set> sagas = associationValues.stream() From 59f16c8b04ac66f2549fe0994b41f31e525b7b43 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Tue, 30 Jul 2024 14:24:22 +0200 Subject: [PATCH 4/4] Set version of non-deployed modules Set version of non-deployed modules #release/4.7.7 --- coverage-report/pom.xml | 4 ++-- hibernate-6-integrationtests/pom.xml | 4 ++-- spring-boot-3-integrationtests/pom.xml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/coverage-report/pom.xml b/coverage-report/pom.xml index c0c998a4b4..779900abc5 100644 --- a/coverage-report/pom.xml +++ b/coverage-report/pom.xml @@ -1,6 +1,6 @@