diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/AxonServerConfiguration.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/AxonServerConfiguration.java index 150abbfd09..83bd218215 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/AxonServerConfiguration.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/AxonServerConfiguration.java @@ -151,6 +151,12 @@ public class AxonServerConfiguration { */ private FlowControlConfiguration commandFlowControl; + /** + * A toggle dictating whether to invoke query handlers directly if they are registered in the local environment. + * Defaults to {@code false}. + */ + private boolean shortcutQueriesToLocalHandlers = false; + /** * The number of threads executing commands. Defaults to {@code 10} threads. */ @@ -234,9 +240,9 @@ public class AxonServerConfiguration { /** * Flag that allows block-listing of event types to be enabled. *

- * Disabling this may have serious performance impact, as it requires all - * {@link EventMessage events} from Axon Server to be sent to clients, even if a - * client is unable to process the event. Default is to have block-listing enabled. + * Disabling this may have serious performance impact, as it requires all {@link EventMessage events} from Axon + * Server to be sent to clients, even if a client is unable to process the event. Default is to have block-listing + * enabled. */ private boolean eventBlockListingEnabled = true; @@ -298,8 +304,7 @@ public class AxonServerConfiguration { /** * Defines the number of threads that should be used for connection management activities by the - * {@link AxonServerConnectionFactory} used by the - * {@link AxonServerConnectionManager}. + * {@link AxonServerConnectionFactory} used by the {@link AxonServerConnectionManager}. *

* This includes activities related to connecting to Axon Server, setting up instruction streams, sending and * validating heartbeats, etc. @@ -320,8 +325,7 @@ public class AxonServerConfiguration { private Eventhandling eventhandling = new Eventhandling(); /** - * Properties describing the settings for the - * {@link AxonServerEventStore EventStore}. + * Properties describing the settings for the {@link AxonServerEventStore EventStore}. */ private EventStoreConfiguration eventStoreConfiguration = new EventStoreConfiguration(); @@ -358,6 +362,14 @@ public boolean isEnabled() { return enabled; } + public boolean isShortcutQueriesToLocalHandlers() { + return shortcutQueriesToLocalHandlers; + } + + public void setShortcutQueriesToLocalHandlers(boolean shortcutQueriesToLocalHandlers) { + this.shortcutQueriesToLocalHandlers = shortcutQueriesToLocalHandlers; + } + /** * Set whether (automatic) configuration of the Axon Server Connector is enabled. When {@code false}, the connector * will not be implicitly be configured. Defaults to {@code true}. @@ -945,9 +957,9 @@ public void setDisableEventBlacklisting(boolean disableEventBlacklisting) { /** * Flag that allows block-listing of event types to be enabled. *

- * Disabling this may have serious performance impact, as it requires all - * {@link EventMessage events} from Axon Server to be sent to clients, even if a - * client is unable to process the event. Default is to have block-listing enabled. + * Disabling this may have serious performance impact, as it requires all {@link EventMessage events} from Axon + * Server to be sent to clients, even if a client is unable to process the event. Default is to have block-listing + * enabled. * * @return Flag that allows block-listing of event types to be enabled. */ @@ -958,9 +970,9 @@ public boolean isEventBlockListingEnabled() { /** * Sets flag that allows block-listing of event types to be enabled. *

- * Disabling this may have serious performance impact, as it requires all - * {@link EventMessage events} from Axon Server to be sent to clients, even if a - * client is unable to process the event. Default is to have block-listing enabled. + * Disabling this may have serious performance impact, as it requires all {@link EventMessage events} from Axon + * Server to be sent to clients, even if a client is unable to process the event. Default is to have block-listing + * enabled. * * @param eventBlockListingEnabled Flag that allows block-listing of event types to be enabled. */ @@ -1128,8 +1140,7 @@ public void setForceReconnectThroughServers(boolean forceReconnectThroughServers /** * The number of threads that should be used for connection management activities by the - * {@link AxonServerConnectionFactory} used by the - * {@link AxonServerConnectionManager}. + * {@link AxonServerConnectionFactory} used by the {@link AxonServerConnectionManager}. *

* This includes activities related to connecting to Axon Server, setting up instruction streams, sending and * validating heartbeats, etc. @@ -1137,8 +1148,7 @@ public void setForceReconnectThroughServers(boolean forceReconnectThroughServers * Defaults to a pool size of {@code 2} threads. * * @return The number of threads that should be used for connection management activities by the - * {@link AxonServerConnectionFactory} used by the - * {@link AxonServerConnectionManager}. + * {@link AxonServerConnectionFactory} used by the {@link AxonServerConnectionManager}. */ public int getConnectionManagementThreadPoolSize() { return connectionManagementThreadPoolSize; @@ -1146,8 +1156,7 @@ public int getConnectionManagementThreadPoolSize() { /** * Define the number of threads that should be used for connection management activities by the - * {@link AxonServerConnectionFactory} used by the - * {@link AxonServerConnectionManager}. + * {@link AxonServerConnectionFactory} used by the {@link AxonServerConnectionManager}. *

* This includes activities related to connecting to Axon Server, setting up instruction streams, sending and * validating heartbeats, etc. @@ -1155,9 +1164,8 @@ public int getConnectionManagementThreadPoolSize() { * Defaults to a pool size of {@code 2} threads. * * @param connectionManagementThreadPoolSize The number of threads that should be used for connection management - * activities by the - * {@link AxonServerConnectionFactory} used - * by the {@link AxonServerConnectionManager}. + * activities by the {@link AxonServerConnectionFactory} used by the + * {@link AxonServerConnectionManager}. */ public void setConnectionManagementThreadPoolSize(int connectionManagementThreadPoolSize) { this.connectionManagementThreadPoolSize = connectionManagementThreadPoolSize; @@ -1490,8 +1498,8 @@ public static class ProcessorSettings { * The load balancing strategy tells Axon Server how to share the event handling load among all available * application instances running this event processor, by moving segments from one instance to another. Note * that load balancing is only supported for - * {@link StreamingEventProcessor StreamingEventProcessors}, as only - * {@code StreamingEventProcessors} are capable of splitting the event handling load in segments. + * {@link StreamingEventProcessor StreamingEventProcessors}, as only {@code StreamingEventProcessors} are + * capable of splitting the event handling load in segments. *

* As the strategies names may change per Axon Server version it is recommended to check the documentation * for the possible strategies. diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/AxonServerQueryBus.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/AxonServerQueryBus.java index 577a95b681..13935f6072 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/AxonServerQueryBus.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/AxonServerQueryBus.java @@ -91,14 +91,19 @@ import reactor.core.publisher.SignalType; import reactor.core.scheduler.Scheduler; +import javax.annotation.Nonnull; import java.lang.invoke.MethodHandles; import java.lang.reflect.Type; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Spliterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -106,11 +111,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import javax.annotation.Nonnull; import static java.lang.String.format; import static org.axonframework.common.BuilderUtils.assertNonEmpty; @@ -150,6 +155,10 @@ public class AxonServerQueryBus implements QueryBus, Distributed, Life private final LocalSegmentAdapter localSegmentAdapter; private final String context; private final QueryBusSpanFactory spanFactory; + private final boolean localSegmentShortCut; + private final Duration queryInProgressAwait; + + private final Set queryHandlerNames = new CopyOnWriteArraySet<>(); /** * Instantiate a {@link AxonServerQueryBus} based on the fields contained in the {@link Builder}. @@ -168,12 +177,14 @@ public AxonServerQueryBus(Builder builder) { this.context = StringUtils.nonEmptyOrNull(builder.defaultContext) ? builder.defaultContext : configuration.getContext(); this.targetContextResolver = builder.targetContextResolver.orElse(m -> context); this.spanFactory = builder.spanFactory; + this.queryInProgressAwait = builder.queryInProgressAwait; dispatchInterceptors = new DispatchInterceptors<>(); PriorityBlockingQueue queryProcessQueue = new PriorityBlockingQueue<>(QUERY_QUEUE_CAPACITY); queryExecutor = builder.executorServiceBuilder.apply(configuration, queryProcessQueue); localSegmentAdapter = new LocalSegmentAdapter(); + this.localSegmentShortCut = builder.localSegmentShortCut; } @Override @@ -188,12 +199,16 @@ public Publisher> streamingQuery(StreamingQueryMe TASK_SEQUENCE)); return Mono.fromSupplier(this::registerStreamingQueryActivity).flatMapMany( activity -> Mono.just(dispatchInterceptors.intercept(queryWithContext)) - .flatMapMany(intercepted -> - Mono.just(serializeStreaming(intercepted, priority)) - .flatMapMany(queryRequest -> new ResultStreamPublisher<>( - () -> sendRequest(intercepted, queryRequest))) - .concatMap(queryResponse -> deserialize(intercepted, - queryResponse)) + .flatMapMany(intercepted -> { + if (shouldRunQueryLocally(intercepted.getQueryName())) { + return localSegment.streamingQuery(intercepted); + } + return Mono.just(serializeStreaming(intercepted, priority)) + .flatMapMany(queryRequest -> new ResultStreamPublisher<>( + () -> sendRequest(intercepted, queryRequest))) + .concatMap(queryResponse -> deserialize(intercepted, + queryResponse)); + } ) .publishOn(scheduler.get()) .doOnError(span::recordException) @@ -246,11 +261,21 @@ public Registration subscribe(@Nonnull String queryName, .queryChannel() .registerQueryHandler(localSegmentAdapter, queryDefinition); - return new AxonServerRegistration(localRegistration, serverRegistration::cancel); + queryHandlerNames.add(queryName); + return new AxonServerRegistration(() -> unsubscribe(queryName, localRegistration), serverRegistration::cancel); + } + + private boolean unsubscribe(String queryName, Registration localSegmentRegistration) { + boolean result = localSegmentRegistration.cancel(); + if (result) { + queryHandlerNames.remove(queryName); + } + return result; } @Override public CompletableFuture> query(@Nonnull QueryMessage queryMessage) { + Span span = spanFactory.createQuerySpan(queryMessage, true).start(); try (SpanScope unused = span.makeCurrent()) { QueryMessage queryWithContext = spanFactory.propagateContext(queryMessage); @@ -263,21 +288,25 @@ public CompletableFuture> query(@Nonnull QueryMes ShutdownLatch.ActivityHandle queryInTransit = shutdownLatch.registerActivity(); CompletableFuture> queryTransaction = new CompletableFuture<>(); try { - int priority = priorityCalculator.determinePriority(interceptedQuery); - QueryRequest queryRequest = serialize(interceptedQuery, false, priority); - ResultStream result = sendRequest(interceptedQuery, queryRequest); - queryTransaction.whenComplete((r, e) -> result.close()); - Span responseTaskSpan = spanFactory.createResponseProcessingSpan(interceptedQuery); - Runnable responseProcessingTask = new ResponseProcessingTask<>(result, - serializer, - queryTransaction, - queryMessage.getResponseType(), - responseTaskSpan); - - result.onAvailable(() -> queryExecutor.execute(new PriorityRunnable( - responseProcessingTask, - priority, - TASK_SEQUENCE.incrementAndGet()))); + if (shouldRunQueryLocally(interceptedQuery.getQueryName())) { + queryTransaction = localSegment.query(interceptedQuery); + } else { + int priority = priorityCalculator.determinePriority(interceptedQuery); + QueryRequest queryRequest = serialize(interceptedQuery, false, priority); + ResultStream result = sendRequest(interceptedQuery, queryRequest); + queryTransaction.whenComplete((r, e) -> result.close()); + Span responseTaskSpan = spanFactory.createResponseProcessingSpan(interceptedQuery); + Runnable responseProcessingTask = new ResponseProcessingTask<>(result, + serializer, + queryTransaction, + queryMessage.getResponseType(), + responseTaskSpan); + + result.onAvailable(() -> queryExecutor.execute(new PriorityRunnable( + responseProcessingTask, + priority, + TASK_SEQUENCE.incrementAndGet()))); + } } catch (Exception e) { logger.debug("There was a problem issuing a query {}.", interceptedQuery, e); AxonException exception = ErrorCode.QUERY_DISPATCH_ERROR.convert(configuration.getClientId(), e); @@ -285,7 +314,7 @@ public CompletableFuture> query(@Nonnull QueryMes span.recordException(e).end(); } - queryTransaction.whenComplete((r, e) -> { + queryTransaction.whenComplete((r, e) -> { queryInTransit.end(); if (e != null) { span.recordException(e); @@ -299,6 +328,10 @@ public CompletableFuture> query(@Nonnull QueryMes } } + private boolean shouldRunQueryLocally(String queryName) { + return localSegmentShortCut && queryHandlerNames.contains(queryName); + } + private QueryRequest serializeStreaming(QueryMessage query, int priority) { return serialize(query, true, priority); } @@ -416,7 +449,7 @@ public Stream> scatterGather(@Nonnull QueryMessag AtomicBoolean closed = new AtomicBoolean(false); Runnable closeHandler = () -> { - if(closed.compareAndSet(false, true)) { + if (closed.compareAndSet(false, true)) { queryInTransit.end(); span.end(); } @@ -517,14 +550,18 @@ Registration registerDispatchInterceptor( } /** - * Disconnect the query bus from Axon Server, by unsubscribing all known query handlers. This shutdown operation is - * performed in the {@link Phase#INBOUND_QUERY_CONNECTOR} phase. + * Disconnect the query bus from Axon Server, by unsubscribing all known query handlers and aborting all queries in progress. */ public void disconnect() { if (axonServerConnectionManager.isConnected(context)) { - axonServerConnectionManager.getConnection(context).queryChannel().prepareDisconnect(); + axonServerConnectionManager.getConnection(context) + .queryChannel() + .prepareDisconnect(); + } + if (!localSegmentAdapter.awaitTermination(queryInProgressAwait)) { + logger.info("Awaited termination of queries in progress without success. Going to cancel remaining queries in progress."); + localSegmentAdapter.cancel(); } - localSegmentAdapter.cancel(); } /** @@ -567,6 +604,8 @@ public static class Builder { private QueryBusSpanFactory spanFactory = DefaultQueryBusSpanFactory.builder() .spanFactory(NoOpSpanFactory.INSTANCE) .build(); + private boolean localSegmentShortCut; + private Duration queryInProgressAwait = Duration.ofSeconds(5); /** * Sets the {@link AxonServerConnectionManager} used to create connections between this application and an Axon @@ -771,6 +810,33 @@ public Builder spanFactory(@Nonnull QueryBusSpanFactory spanFactory) { return this; } + /** + * Enables shortcut to local {@link QueryBus}. If query handlers are registered in the local environment they + * will be invoked directly instead of sending request to axon server. + * + * @return the current Builder instance, for fluent interfacing + */ + public Builder enabledLocalSegmentShortCut() { + this.localSegmentShortCut = true; + return this; + } + + /** + * Sets the {@link Duration query in progress await timeout} used to await the successful termination of queries + * in progress. When this timeout is exceeded, the query in progress will be canceled. + *

+ * Defaults to a {@code Duration} of 5 seconds. + * + * @param queryInProgressAwait The {@link Duration query in progress await timeout} used to await the successful + * termination of queries in progress + * @return The current Builder instance, for fluent interfacing. + */ + public Builder queryInProgressAwait(@Nonnull Duration queryInProgressAwait) { + assertNonNull(queryInProgressAwait, "Query in progress await timeout may not be null"); + this.queryInProgressAwait = queryInProgressAwait; + return this; + } + /** * Initializes a {@link AxonServerQueryBus} as specified through this Builder. * @@ -928,12 +994,6 @@ private class LocalSegmentAdapter implements QueryHandler { private final Map queriesInProgress = new ConcurrentHashMap<>(); - public void cancel() { - queriesInProgress.values() - .iterator() - .forEachRemaining(QueryProcessingTask::cancel); - } - @Override public void handle(QueryRequest query, ReplyChannel responseHandler) { stream(query, responseHandler).request(Long.MAX_VALUE); @@ -1000,5 +1060,27 @@ public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(Sub return CompletableFuture.completedFuture(null); }; } + + private boolean awaitTermination(Duration timeout) { + Instant startAwait = Instant.now(); + Instant endAwait = startAwait.plusSeconds(timeout.getSeconds()); + while (Instant.now().isBefore(endAwait) && !queriesInProgress.isEmpty()) { + queriesInProgress.values() + .stream() + .findFirst() + .ifPresent(queryInProgress -> { + while (Instant.now().isBefore(endAwait) && queryInProgress.resultPending()) { + LockSupport.parkNanos(10_000_000); + } + }); + } + return queriesInProgress.isEmpty(); + } + + private void cancel() { + queriesInProgress.values() + .iterator() + .forEachRemaining(QueryProcessingTask::cancel); + } } } diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/QueryProcessingTask.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/QueryProcessingTask.java index b37ae816cb..544d0a0e6a 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/QueryProcessingTask.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/query/QueryProcessingTask.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2023. Axon Framework + * Copyright (c) 2010-2024. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -184,6 +184,17 @@ public void cancel() { } } + /** + * Returns {@code true} if this task is still waiting for a result, and {@code false} otherwise. + *

+ * Note that this would this return {@code true}, even if the streamable result has not been canceled yet! + * + * @return {@code true} if this task is still waiting for a result, and {@code false} otherwise. + */ + public boolean resultPending() { + return streamableResultRef.get() == null; + } + private void streamingQuery(QueryMessage originalQueryMessage) { // noinspection unchecked StreamingQueryMessage streamingQueryMessage = new GenericStreamingQueryMessage<>( diff --git a/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/AxonServerQueryBusTest.java b/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/AxonServerQueryBusTest.java index 872de0097b..c62db51dff 100644 --- a/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/AxonServerQueryBusTest.java +++ b/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/AxonServerQueryBusTest.java @@ -30,11 +30,7 @@ import io.axoniq.axonserver.grpc.query.QueryRequest; import io.axoniq.axonserver.grpc.query.QueryResponse; import io.axoniq.axonserver.grpc.query.QueryUpdate; -import org.axonframework.axonserver.connector.AxonServerConfiguration; -import org.axonframework.axonserver.connector.AxonServerConnectionManager; -import org.axonframework.axonserver.connector.ErrorCode; -import org.axonframework.axonserver.connector.TargetContextResolver; -import org.axonframework.axonserver.connector.TestTargetContextResolver; +import org.axonframework.axonserver.connector.*; import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper; import org.axonframework.axonserver.connector.utils.TestSerializer; import org.axonframework.common.Registration; @@ -42,42 +38,23 @@ import org.axonframework.messaging.Message; import org.axonframework.messaging.MessageHandler; import org.axonframework.messaging.MessageHandlerInterceptor; +import org.axonframework.messaging.MetaData; import org.axonframework.messaging.responsetypes.InstanceResponseType; -import org.axonframework.queryhandling.DefaultQueryBusSpanFactory; -import org.axonframework.queryhandling.GenericQueryMessage; -import org.axonframework.queryhandling.GenericQueryResponseMessage; -import org.axonframework.queryhandling.GenericStreamingQueryMessage; -import org.axonframework.queryhandling.GenericSubscriptionQueryMessage; -import org.axonframework.queryhandling.QueryBus; -import org.axonframework.queryhandling.QueryExecutionException; -import org.axonframework.queryhandling.QueryMessage; -import org.axonframework.queryhandling.QueryResponseMessage; -import org.axonframework.queryhandling.SimpleQueryUpdateEmitter; -import org.axonframework.queryhandling.StreamingQueryMessage; -import org.axonframework.queryhandling.SubscriptionQueryMessage; -import org.axonframework.queryhandling.SubscriptionQueryResult; -import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage; +import org.axonframework.queryhandling.*; import org.axonframework.serialization.Serializer; import org.axonframework.tracing.TestSpanFactory; -import org.junit.jupiter.api.*; -import org.mockito.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.time.Duration; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -102,7 +79,7 @@ class AxonServerQueryBusTest { private static final String TEST_QUERY = "testQuery"; private static final String CONTEXT = "default-test"; - public static final String INSTANCE_RESPONSE_TYPE_XML = "java.lang.String"; + private static final String INSTANCE_RESPONSE_TYPE_XML = "java.lang.String"; private final QueryBus localSegment = mock(QueryBus.class); private final Serializer serializer = TestSerializer.xStreamSerializer(); @@ -196,10 +173,86 @@ void query() throws Exception { assertEquals("test", testSubject.query(testQuery).get().getPayload()); verify(targetContextResolver).resolveContext(testQuery); + verify(localSegment, never()).query(testQuery); spanFactory.verifySpanCompleted("QueryBus.queryDistributed"); spanFactory.verifySpanPropagated("QueryBus.queryDistributed", testQuery); } + @Nested + class LocalSegmentShortCutEnabled { + + private Registration registration; + private final QueryMessage testQuery = new GenericQueryMessage<>("Hello, World", + TEST_QUERY, + instanceOf(String.class)); + + private final StreamingQueryMessage testStreamingQuery = + new GenericStreamingQueryMessage<>("Hello, World", TEST_QUERY, String.class); + + @BeforeEach + void setUp() { + testSubject = AxonServerQueryBus.builder() + .axonServerConnectionManager(axonServerConnectionManager) + .configuration(configuration) + .localSegment(localSegment) + .updateEmitter(SimpleQueryUpdateEmitter.builder().build()) + .messageSerializer(serializer) + .genericSerializer(serializer) + .targetContextResolver(targetContextResolver) + .enabledLocalSegmentShortCut() + .spanFactory( + DefaultQueryBusSpanFactory.builder() + .spanFactory(spanFactory) + .build() + ) + .build(); + + registration = testSubject.subscribe(TEST_QUERY, String.class, q -> "test"); + } + + @Test + void queryWhenLocalHandlerIsPresent() { + when(localSegment.query(testQuery)) + .thenReturn(CompletableFuture.completedFuture(new GenericQueryResponseMessage<>("ok"))); + + testSubject.query(testQuery); + + verify(localSegment).query(testQuery); + verify(mockQueryChannel, never()).query(any()); + } + + @Test + void queryWhenRegistrationIsCancel() { + registration.cancel(); + + testSubject.query(testQuery); + + verify(localSegment, never()).query(testQuery); + } + + @Test + void streamingQueryWhenLocalHandlerIsPresent() { + when(localSegment.streamingQuery(testStreamingQuery)) + .thenReturn(Flux.just(new GenericQueryResponseMessage<>("ok"))); + + StepVerifier.create(Flux.from(testSubject.streamingQuery(testStreamingQuery)) + .map(Message::getPayload)) + .expectNext("ok") + .verifyComplete(); + + verify(localSegment).streamingQuery(testStreamingQuery); + verify(mockQueryChannel, never()).query(any()); + } + + @Test + void streamingQueryWhenRegistrationIsCancel() { + registration.cancel(); + testSubject.streamingQuery(testStreamingQuery); + + verify(localSegment, never()).streamingQuery(testStreamingQuery); + } + } + @Test void queryReportsDispatchException() throws Exception { //noinspection rawtypes @@ -338,8 +391,8 @@ void scatterGatherCloseStreamDoesNotThrowExceptionOnCloseMethod() { stubResponse("3"))); Stream> stream = testSubject.scatterGather(testQuery, - 12, - TimeUnit.SECONDS); + 12, + TimeUnit.SECONDS); assertEquals(3, stream.count()); stream.close(); } @@ -362,6 +415,7 @@ void streamingFluxQuery() { .verifyComplete(); verify(targetContextResolver).resolveContext(testQuery); + verify(localSegment, never()).streamingQuery(testQuery); //noinspection resource verify(mockQueryChannel).query(argThat( r -> r.getPayload().getData().toStringUtf8().equals("Hello, World") @@ -619,7 +673,7 @@ void equalPriorityMessagesProcessedInOrder() throws InterruptedException { .putMetaData("index", MetaDataValue.newBuilder().setNumberValue(i).build()) .build(); - queryHandler.handle(queryRequest, new NoOpReplyChannel()); + queryHandler.handle(queryRequest, new StubReplyChannel()); } startProcessingGate.countDown(); //noinspection ResultOfMethodCallIgnored @@ -629,6 +683,117 @@ void equalPriorityMessagesProcessedInOrder() throws InterruptedException { assertEquals(expected, actual); } + @Test + void disconnectCancelsQueriesInProgressIfAwaitDurationIsSurpassed() { + AxonServerQueryBus queryInProgressTestSubject = + AxonServerQueryBus.builder() + .axonServerConnectionManager(axonServerConnectionManager) + .configuration(configuration) + .localSegment(localSegment) + .updateEmitter(SimpleQueryUpdateEmitter.builder().build()) + .messageSerializer(serializer) + .genericSerializer(serializer) + .targetContextResolver(targetContextResolver) + .executorServiceBuilder((c, q) -> new ThreadPoolExecutor( + 1, 1, 5, TimeUnit.SECONDS, q + )) + .queryInProgressAwait(Duration.ofSeconds(1)) + .build(); + CountDownLatch handlerLatch = new CountDownLatch(1); + AtomicReference responseReference = new AtomicReference<>(); + queriesInProgressTestSetup(queryInProgressTestSubject, handlerLatch, responseReference); + + // Start disconnecting right away. As a blocking operation, this ensures we surpass the await duration. + queryInProgressTestSubject.disconnect(); + // Release te latch, to let go of the blocking query handler. + handlerLatch.countDown(); + + await().atMost(Duration.ofSeconds(1)) + .pollDelay(Duration.ofMillis(250)) + .untilAsserted(() -> assertNull(responseReference.get())); + } + + @Test + void disconnectReturnsResponseFromQueriesInProgressIfAwaitDurationIsNotExceeded() throws InterruptedException { + AxonServerQueryBus queryInProgressTestSubject = + AxonServerQueryBus.builder() + .axonServerConnectionManager(axonServerConnectionManager) + .configuration(configuration) + .localSegment(localSegment) + .updateEmitter(SimpleQueryUpdateEmitter.builder().build()) + .messageSerializer(serializer) + .genericSerializer(serializer) + .targetContextResolver(targetContextResolver) + .executorServiceBuilder((c, q) -> new ThreadPoolExecutor( + 1, 1, 5, TimeUnit.SECONDS, q + )) + .queryInProgressAwait(Duration.ofSeconds(1)) + .build(); + CountDownLatch handlerLatch = new CountDownLatch(1); + AtomicReference responseReference = new AtomicReference<>(); + queriesInProgressTestSetup(queryInProgressTestSubject, handlerLatch, responseReference); + + // Start disconnecting in a separate thread to ensure the response latch is released + new Thread(queryInProgressTestSubject::disconnect).start(); + // Sleep a little, to ensure there is some space between disconnecting and releasing the query handler latch + Thread.sleep(250); + // Release te latch, to let go of the blocking query handler. + handlerLatch.countDown(); + + await().atMost(Duration.ofSeconds(1)) + .pollDelay(Duration.ofMillis(250)) + .untilAsserted(() -> assertNotNull(responseReference.get())); + assertEquals("Hello", responseReference.get().getMetaDataOrThrow("response").getTextValue()); + } + + private void queriesInProgressTestSetup(AxonServerQueryBus queryInProgressTestSubject, + CountDownLatch responseLatch, + AtomicReference responseReference) { + AtomicReference handlerReference = new AtomicReference<>(); + doAnswer(i -> { + handlerReference.set(i.getArgument(0)); + return (io.axoniq.axonserver.connector.Registration) () -> CompletableFuture.completedFuture(null); + }).when(mockQueryChannel) + .registerQueryHandler(any(), any()); + + when(localSegment.query(any())).thenAnswer(i -> { + responseLatch.await(); + QueryMessage message = i.getArgument(0); + QueryResponseMessage queryResponse = new GenericQueryResponseMessage<>(message.getPayload()).withMetaData( + MetaData.with("response", message.getPayload())); + return CompletableFuture.completedFuture(queryResponse); + }); + + // We create a subscription to force a registration for this type of query. + // It doesn't get invoked because the localSegment is mocked + //noinspection resource + queryInProgressTestSubject.subscribe("testQuery", + String.class, + (MessageHandler>) message -> "ok"); + await().atMost(Duration.ofSeconds(1)) + .pollDelay(Duration.ofMillis(250)) + .untilAsserted(() -> assertNotNull(handlerReference.get())); + + QueryHandler queryHandler = handlerReference.get(); + QueryRequest queryRequest = + QueryRequest.newBuilder() + .setQuery("testQuery") + .setMessageIdentifier(UUID.randomUUID().toString()) + .setPayload(SerializedObject.newBuilder() + .setType("java.lang.String") + .setData(ByteString.copyFromUtf8("Hello")) + ) + .setResponseType(SerializedObject.newBuilder() + .setData(ByteString.copyFromUtf8( + INSTANCE_RESPONSE_TYPE_XML + )) + .setType(InstanceResponseType.class.getName()) + .build()) + .putMetaData("response", MetaDataValue.newBuilder().setTextValue("Hello").build()) + .build(); + queryHandler.handle(queryRequest, new StubReplyChannel(responseReference)); + } + private QueryResponse stubResponse(String payload) { return QueryResponse.newBuilder() .setRequestIdentifier("request") @@ -771,36 +936,37 @@ public ResultStream updates() { } } - private static class NoOpReplyChannel implements ReplyChannel { + private static class StubReplyChannel implements ReplyChannel { - @Override - public void send(QueryResponse outboundMessage) { - // Do nothing - no-op implementation + private final AtomicReference responseReference; + + // No-arg constructor acts like a Noop version of this ReplyChannel + private StubReplyChannel() { + this(new AtomicReference<>()); } - @Override - public void sendAck() { - // Do nothing - no-op implementation + private StubReplyChannel(AtomicReference responseReference) { + this.responseReference = responseReference; } @Override - public void sendNack(ErrorMessage errorMessage) { - // Do nothing - no-op implementation + public void send(QueryResponse outboundMessage) { + responseReference.set(outboundMessage); } @Override public void complete() { - // Do nothing - no-op implementation + // Do nothing - not required for testing } @Override public void completeWithError(ErrorMessage errorMessage) { - // Do nothing - no-op implementation + // Do nothing - not required for testing } @Override public void completeWithError(ErrorCategory errorCategory, String message) { - // Do nothing - no-op implementation + // Do nothing - not required for testing } } } diff --git a/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/QueryProcessingTaskIntegrationTest.java b/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/QueryProcessingTaskIntegrationTest.java index 4fa846b64e..65c68a6778 100644 --- a/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/QueryProcessingTaskIntegrationTest.java +++ b/axon-server-connector/src/test/java/org/axonframework/axonserver/connector/query/QueryProcessingTaskIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2023. Axon Framework + * Copyright (c) 2010-2024. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,14 +91,16 @@ void setUp() { .build(); querySerializer = new QuerySerializer(serializer, serializer, config); queryHandlingComponent1 = new QueryHandlingComponent1(); + //noinspection resource new AnnotationQueryHandlerAdapter<>(queryHandlingComponent1).subscribe(localSegment); + //noinspection resource new AnnotationQueryHandlerAdapter<>(new QueryHandlingComponent2()).subscribe(localSegment); } @Test void directQueryWhenRequesterDoesntSupportStreaming() { - QueryMessage> queryMessage = new GenericQueryMessage<>(new FluxQuery(1000), - ResponseTypes.publisherOf(String.class)); + QueryMessage> queryMessage = + new GenericQueryMessage<>(new FluxQuery(1000), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1); @@ -118,8 +120,8 @@ void directQueryWhenRequesterDoesntSupportStreaming() { @Test void queryProcessingTaskIsTraced() { - QueryMessage> queryMessage = new GenericQueryMessage<>(new FluxQuery(1000), - ResponseTypes.publisherOf(String.class)); + QueryMessage> queryMessage = + new GenericQueryMessage<>(new FluxQuery(1000), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1); @@ -136,8 +138,8 @@ void queryProcessingTaskIsTraced() { @Test void directQueryWhenRequesterDoesntSupportStreamingAndFlowControlMessagesComesBeforeQueryExecution() { - QueryMessage> queryMessage = new GenericQueryMessage<>(new FluxQuery(1000), - ResponseTypes.publisherOf(String.class)); + QueryMessage> queryMessage = + new GenericQueryMessage<>(new FluxQuery(1000), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1); @@ -157,8 +159,8 @@ void directQueryWhenRequesterDoesntSupportStreamingAndFlowControlMessagesComesBe @Test void directQueryWhenRequesterDoesntSupportStreamingAndCancelMessagesComesBeforeQueryExecution() { - QueryMessage> queryMessage = new GenericQueryMessage<>(new FluxQuery(1000), - ResponseTypes.publisherOf(String.class)); + QueryMessage> queryMessage = + new GenericQueryMessage<>(new FluxQuery(1000), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1); @@ -177,8 +179,8 @@ void directQueryWhenRequesterDoesntSupportStreamingAndCancelMessagesComesBeforeQ @Test void streamingQuery() { - QueryMessage> queryMessage = new GenericQueryMessage<>(new FluxQuery(1000), - ResponseTypes.publisherOf(String.class)); + QueryMessage> queryMessage = + new GenericQueryMessage<>(new FluxQuery(1000), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1, true) @@ -210,8 +212,7 @@ void streamingQuery() { @Test void streamingAList() { QueryMessage> queryMessage = - new GenericQueryMessage<>(new ListQuery(1000), - ResponseTypes.multipleInstancesOf(String.class)); + new GenericQueryMessage<>(new ListQuery(1000), ResponseTypes.multipleInstancesOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1, true) @@ -243,8 +244,7 @@ void streamingAList() { @Test void streamingAListWhenReactorIsNotOnClasspath() { QueryMessage> queryMessage = - new GenericQueryMessage<>(new ListQuery(1000), - ResponseTypes.multipleInstancesOf(String.class)); + new GenericQueryMessage<>(new ListQuery(1000), ResponseTypes.multipleInstancesOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1, true) @@ -274,11 +274,9 @@ void streamingAListWhenReactorIsNotOnClasspath() { } @Test - void streamingAListWhenReactorIsNotOnClasspathWithConcurrentRequests() - throws InterruptedException { + void streamingAListWhenReactorIsNotOnClasspathWithConcurrentRequests() throws InterruptedException { QueryMessage> queryMessage = - new GenericQueryMessage<>(new ListQuery(1000), - ResponseTypes.multipleInstancesOf(String.class)); + new GenericQueryMessage<>(new ListQuery(1000), ResponseTypes.multipleInstancesOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1, true) @@ -327,8 +325,7 @@ void streamingAListWhenReactorIsNotOnClasspathWithConcurrentRequests() @Test void streamingQueryWithConcurrentRequests() throws InterruptedException { QueryMessage> queryMessage = - new GenericQueryMessage<>(new FluxQuery(1000), - ResponseTypes.publisherOf(String.class)); + new GenericQueryMessage<>(new FluxQuery(1000), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1, true) @@ -535,8 +532,8 @@ void streamingListQueryWhenCancelMessageComesFirst() { @Test void fluxEmittingErrorAfterAWhile() { - QueryMessage> queryMessage = - new GenericQueryMessage<>(new ErroringAfterAWhileFluxQuery(), ResponseTypes.publisherOf(String.class)); + QueryMessage> queryMessage = + new GenericQueryMessage<>(new ErrorAfterAWhileFluxQuery(), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1, true) @@ -562,8 +559,8 @@ void fluxEmittingErrorAfterAWhile() { @Test void fluxEmittingErrorRightAway() { - QueryMessage> queryMessage = - new GenericQueryMessage<>(new ErroringFluxQuery(), ResponseTypes.publisherOf(String.class)); + QueryMessage> queryMessage = + new GenericQueryMessage<>(new ErrorFluxQuery(), ResponseTypes.publisherOf(String.class)); QueryRequest request = querySerializer.serializeRequest(queryMessage, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1) @@ -691,6 +688,42 @@ void listStreamingQueryWhenRequestingTooMany() { assertTrue(responseHandler.completed()); } + @Test + void responsePendingReturnsTrueForUncompletedTask() { + QueryMessage> testQuery = + new GenericQueryMessage<>(new FluxQuery(1000), ResponseTypes.publisherOf(String.class)); + QueryRequest testRequest = querySerializer.serializeRequest(testQuery, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1); + QueryProcessingTask testSubject = new QueryProcessingTask(localSegment, + testRequest, + responseHandler, + querySerializer, + CLIENT_ID, + queryBusSpanFactory); + + assertTrue(testSubject.resultPending()); + } + + @Test + void responsePendingReturnsFalseForCompletedTask() { + QueryMessage> testQuery = + new GenericQueryMessage<>(new FluxQuery(1), ResponseTypes.publisherOf(String.class)); + QueryRequest testRequest = querySerializer.serializeRequest(testQuery, DIRECT_QUERY_NUMBER_OF_RESULTS, 1000, 1); + QueryProcessingTask testSubject = new QueryProcessingTask(localSegment, + testRequest, + responseHandler, + querySerializer, + CLIENT_ID, + queryBusSpanFactory); + + assertTrue(testSubject.resultPending()); + testSubject.run(); + testSubject.request(1); + assertEquals(1, responseHandler.sent().size()); + assertOrder(responseHandler.sent().get(0)); + assertTrue(responseHandler.completed()); + assertFalse(testSubject.resultPending()); + } + private void assertOrder(List responses) { for (int i = 0; i < responses.size(); i++) { QueryResponseMessage responseMessage = @@ -717,6 +750,7 @@ private ProcessingInstruction asSupportsStreaming() { .build(); } + @SuppressWarnings("unused") // Suppressing query handler unused message, as they are used. private static class QueryHandlingComponent1 { private final AtomicBoolean fluxQueryCancelled = new AtomicBoolean(); @@ -752,6 +786,7 @@ public Flux fluxMultiInstance(MultipleInstanceQuery query) { } } + @SuppressWarnings("unused") // Suppressing query handler unused message, as they are used. private static class QueryHandlingComponent2 { @QueryHandler @@ -763,23 +798,23 @@ public List listMultiInstance(MultipleInstanceQuery query) { } @QueryHandler - public Flux erroringFluxQuery(ErroringFluxQuery query) { + public Flux errorFluxQuery(ErrorFluxQuery query) { return Flux.error(new RuntimeException("oops")); } @QueryHandler - public Flux erroringFluxQuery(ErroringAfterAWhileFluxQuery query) { + public Flux errorFluxQuery(ErrorAfterAWhileFluxQuery query) { return Flux.just("0", "1", "2") .concatWith(Flux.error(new RuntimeException("oops"))); } @QueryHandler - public Flux erroringFluxQuery(ThrowingExceptionFluxQuery query) { + public Flux errorFluxQuery(ThrowingExceptionFluxQuery query) { throw new RuntimeException("oops"); } @QueryHandler - public Flux erroringFluxQuery(ThrowingExceptionListQuery query) { + public Flux errorFluxQuery(ThrowingExceptionListQuery query) { throw new RuntimeException("oops"); } } @@ -823,11 +858,11 @@ public int numberOfResults() { } } - private static class ErroringFluxQuery { + private static class ErrorFluxQuery { } - private static class ErroringAfterAWhileFluxQuery { + private static class ErrorAfterAWhileFluxQuery { } @@ -853,16 +888,6 @@ public void send(T outboundMessage) { cache.add(outboundMessage); } - @Override - public void sendAck() { - // noop - } - - @Override - public void sendNack(ErrorMessage errorMessage) { - // noop - } - @Override public void complete() { completed = true; diff --git a/config/src/main/java/org/axonframework/config/AggregateConfigurer.java b/config/src/main/java/org/axonframework/config/AggregateConfigurer.java index 9f1e153305..3552db04e6 100644 --- a/config/src/main/java/org/axonframework/config/AggregateConfigurer.java +++ b/config/src/main/java/org/axonframework/config/AggregateConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2023. Axon Framework + * Copyright (c) 2010-2024. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -312,12 +312,16 @@ public AggregateConfigurer configureAggregateFactory( } /** - * Defines the factory to create new Aggregates instances of the type under configuration when initializing those - * instances from non constructor Command handlers annotated with + * Defines the factory to create new aggregate instances of the type under configuration when initializing those + * instances from non-constructor command handlers annotated with * {@link org.axonframework.modelling.command.CreationPolicy}. + *

+ * When {@link #withSubtypes(Class[]) subtypes} are provided, the given {@link CreationPolicyAggregateFactory} is + * used for every implementation of the aggregate under construction. * - * @param creationPolicyAggregateFactoryBuilder The builder function for the CreationPolicyAggregateFactory - * @return this configurer instance for chaining + * @param creationPolicyAggregateFactoryBuilder The builder function for the + * {@link CreationPolicyAggregateFactory}. + * @return This configurer instance for chaining. */ public AggregateConfigurer configureCreationPolicyAggregateFactory( Function> creationPolicyAggregateFactoryBuilder diff --git a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jdbc/JdbcSequencedDeadLetterQueue.java b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jdbc/JdbcSequencedDeadLetterQueue.java index c9f6eb6221..54958744f4 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jdbc/JdbcSequencedDeadLetterQueue.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jdbc/JdbcSequencedDeadLetterQueue.java @@ -203,7 +203,8 @@ public void enqueue(@Nonnull Object sequenceIdentifier, Optional optionalCause = letter.cause(); if (optionalCause.isPresent()) { logger.info("Adding dead letter with message id [{}] because [{}].", - letter.message().getIdentifier(), optionalCause.get()); + letter.message().getIdentifier(), + optionalCause.get().type()); } else { logger.info( "Adding dead letter with message id [{}] because the sequence identifier [{}] is already present.", diff --git a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jpa/JpaSequencedDeadLetterQueue.java b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jpa/JpaSequencedDeadLetterQueue.java index db75a956d2..b9324cdc6e 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jpa/JpaSequencedDeadLetterQueue.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/jpa/JpaSequencedDeadLetterQueue.java @@ -138,10 +138,14 @@ public void enqueue(@Nonnull Object sequenceIdentifier, @Nonnull DeadLetter optionalCause = letter.cause(); if (optionalCause.isPresent()) { - logger.info("Adding dead letter with message id [{}] because [{}].", letter.message().getIdentifier(), optionalCause.get()); + logger.info("Adding dead letter with message id [{}] because [{}].", + letter.message().getIdentifier(), + optionalCause.get().type()); } else { - logger.info("Adding dead letter with message id [{}] because the sequence identifier [{}] is already present.", - letter.message().getIdentifier(), stringSequenceIdentifier); + logger.info( + "Adding dead letter with message id [{}] because the sequence identifier [{}] is already present.", + letter.message().getIdentifier(), + stringSequenceIdentifier); } DeadLetterEventEntry entry = converters diff --git a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/legacyjpa/JpaSequencedDeadLetterQueue.java b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/legacyjpa/JpaSequencedDeadLetterQueue.java index c5705a2dcd..d18c1f5211 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/legacyjpa/JpaSequencedDeadLetterQueue.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/legacyjpa/JpaSequencedDeadLetterQueue.java @@ -147,7 +147,7 @@ public void enqueue(@Nonnull Object sequenceIdentifier, @Nonnull DeadLetter optionalCause = letter.cause(); if (optionalCause.isPresent()) { - logger.debug("Adding dead letter with message id [{}] because [{}].", letter.message().getIdentifier(), optionalCause.get()); + logger.debug("Adding dead letter with message id [{}] because [{}].", + letter.message().getIdentifier(), + optionalCause.get().type()); } else { - logger.debug("Adding dead letter with message id [{}] because the sequence identifier [{}] is already present.", - letter.message().getIdentifier(), sequenceIdentifier); + logger.debug( + "Adding dead letter with message id [{}] because the sequence identifier [{}] is already present.", + letter.message().getIdentifier(), + sequenceIdentifier); } } diff --git a/messaging/src/main/java/org/axonframework/queryhandling/SimpleQueryBus.java b/messaging/src/main/java/org/axonframework/queryhandling/SimpleQueryBus.java index 1ee3e1cab5..8892b76040 100644 --- a/messaging/src/main/java/org/axonframework/queryhandling/SimpleQueryBus.java +++ b/messaging/src/main/java/org/axonframework/queryhandling/SimpleQueryBus.java @@ -15,7 +15,6 @@ */ package org.axonframework.queryhandling; -import org.axonframework.commandhandling.CommandBusSpanFactory; import org.axonframework.common.Assert; import org.axonframework.common.AxonConfigurationException; import org.axonframework.common.Registration; @@ -474,7 +473,8 @@ public SubscriptionQueryResult, SubscriptionQu Mono> initialResult = Mono.fromFuture(() -> query(query)) .doOnError(error -> logger.error( "An error happened while trying to report an initial result. Query: {}", - query, error)); + query, + error)); UpdateHandlerRegistration updateHandlerRegistration = queryUpdateEmitter.registerUpdateHandler(query, backpressure, updateBufferSize); @@ -495,7 +495,8 @@ public SubscriptionQueryResult, SubscriptionQu Mono> initialResult = Mono.fromFuture(() -> query(query)) .doOnError(error -> logger.error( "An error happened while trying to report an initial result. Query: {}", - query, error)); + query, + error)); UpdateHandlerRegistration updateHandlerRegistration = queryUpdateEmitter.registerUpdateHandler(query, updateBufferSize); @@ -651,8 +652,8 @@ Registration registerDispatchInterceptor( *

* The {@link MessageMonitor} is defaulted to {@link NoOpMessageMonitor}, {@link TransactionManager} to * {@link NoTransactionManager}, {@link QueryInvocationErrorHandler} to {@link LoggingQueryInvocationErrorHandler}, - * the {@link QueryUpdateEmitter} to {@link SimpleQueryUpdateEmitter} and the {@link QueryBusSpanFactory} defaults to a - * {@link DefaultQueryBusSpanFactory} backed by a {@link NoOpSpanFactory}. + * the {@link QueryUpdateEmitter} to {@link SimpleQueryUpdateEmitter} and the {@link QueryBusSpanFactory} defaults + * to a {@link DefaultQueryBusSpanFactory} backed by a {@link NoOpSpanFactory}. */ public static class Builder { @@ -663,7 +664,11 @@ public static class Builder { .build(); private DuplicateQueryHandlerResolver duplicateQueryHandlerResolver = DuplicateQueryHandlerResolution.logAndAccept(); private QueryUpdateEmitter queryUpdateEmitter = SimpleQueryUpdateEmitter.builder() - .spanFactory(DefaultQueryUpdateEmitterSpanFactory.builder().spanFactory(NoOpSpanFactory.INSTANCE).build()) + .spanFactory( + DefaultQueryUpdateEmitterSpanFactory.builder() + .spanFactory( + NoOpSpanFactory.INSTANCE) + .build()) .build(); private QueryBusSpanFactory spanFactory = DefaultQueryBusSpanFactory.builder() .spanFactory(NoOpSpanFactory.INSTANCE) diff --git a/messaging/src/main/java/org/axonframework/queryhandling/SinksManyWrapper.java b/messaging/src/main/java/org/axonframework/queryhandling/SinksManyWrapper.java index 3d3dc4153e..bb848c611c 100644 --- a/messaging/src/main/java/org/axonframework/queryhandling/SinksManyWrapper.java +++ b/messaging/src/main/java/org/axonframework/queryhandling/SinksManyWrapper.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Sinks; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; /** @@ -32,6 +33,7 @@ class SinksManyWrapper implements SinkWrapper { private final Sinks.Many fluxSink; + private final ReentrantLock lock; /** * Initializes this wrapper with delegate sink. @@ -40,6 +42,7 @@ class SinksManyWrapper implements SinkWrapper { */ SinksManyWrapper(Sinks.Many fluxSink) { this.fluxSink = fluxSink; + this.lock = new ReentrantLock(); } /** @@ -73,19 +76,25 @@ public void error(Throwable t) { private Sinks.EmitResult performWithBusyWaitSpin(Supplier action) { int i = 0; Sinks.EmitResult result; - while ((result = action.get()) == Sinks.EmitResult.FAIL_NON_SERIALIZED) { - // For 100 iterations, just busy-spin. Will resolve most conditions. - if (i < 100) { - i++; - // Busy spin... - } else if (i < 200) { - // For the next 100 iterations, yield, to force other threads to have a chance. - i++; - Thread.yield(); - } else { - // Then after, park the thread to force other threads to perform their work. - LockSupport.parkNanos(100); + try { + // The lock's in place to have a safe and efficient way to pause a thread before jumping in the while-loop. + lock.lock(); + while ((result = action.get()) == Sinks.EmitResult.FAIL_NON_SERIALIZED) { + // For 100 iterations, just busy-spin. Will resolve most conditions. + if (i < 100) { + i++; + // Busy spin... + } else if (i < 200) { + // For the next 100 iterations, yield, to force other threads to have a chance. + i++; + Thread.yield(); + } else { + // Then after, park the thread to force other threads to perform their work. + LockSupport.parkNanos(100); + } } + } finally { + lock.unlock(); } return result; } diff --git a/messaging/src/test/java/org/axonframework/messaging/GenericMessageTest.java b/messaging/src/test/java/org/axonframework/messaging/GenericMessageTest.java index 963c6dee44..164316401e 100644 --- a/messaging/src/test/java/org/axonframework/messaging/GenericMessageTest.java +++ b/messaging/src/test/java/org/axonframework/messaging/GenericMessageTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2024. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.axonframework.messaging; +import com.fasterxml.jackson.databind.ObjectMapper; import org.axonframework.eventhandling.GenericEventMessage; import org.axonframework.messaging.correlation.ThrowingCorrelationDataProvider; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; @@ -24,9 +25,9 @@ import org.axonframework.serialization.CannotConvertBetweenTypesException; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.json.JacksonSerializer; -import org.junit.jupiter.api.*; - -import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Collections; @@ -34,7 +35,8 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Test correct operations of the {@link GenericMessage} class. @@ -70,11 +72,11 @@ void correlationDataAddedToNewMessage() { } @Test - void messageSerialization() throws IOException{ + void messageSerialization() throws IOException { Map metaDataMap = Collections.singletonMap("key", "value"); GenericMessage message = new GenericMessage<>("payload", metaDataMap); - + JacksonSerializer jacksonSerializer = JacksonSerializer.builder().build(); @@ -83,11 +85,11 @@ void messageSerialization() throws IOException{ assertEquals("\"payload\"", serializedPayload.getData()); - + ObjectMapper objectMapper = jacksonSerializer.getObjectMapper(); Map actualMetaData = objectMapper.readValue(serializedMetaData.getData(), Map.class); - assertTrue(actualMetaData.entrySet().containsAll(metaDataMap.entrySet())); + assertTrue(actualMetaData.entrySet().containsAll(metaDataMap.entrySet())); } @Test @@ -110,7 +112,7 @@ void asMessageWrapsProvidedObjectsInMessage() { } @Test - void whenCorrelationDataProviderThrowsException_thenCatchException(){ + void whenCorrelationDataProviderThrowsException_thenCatchException() { unitOfWork = new DefaultUnitOfWork<>(new GenericEventMessage<>("Input 1")); CurrentUnitOfWork.set(unitOfWork); unitOfWork.registerCorrelationDataProvider(new ThrowingCorrelationDataProvider()); diff --git a/modelling/src/main/java/org/axonframework/modelling/command/AggregateAnnotationCommandHandler.java b/modelling/src/main/java/org/axonframework/modelling/command/AggregateAnnotationCommandHandler.java index e44692a121..72d80460ac 100644 --- a/modelling/src/main/java/org/axonframework/modelling/command/AggregateAnnotationCommandHandler.java +++ b/modelling/src/main/java/org/axonframework/modelling/command/AggregateAnnotationCommandHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2024. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,7 +72,7 @@ public class AggregateAnnotationCommandHandler implements CommandMessageHandl private final List>> handlers; private final Set supportedCommandNames; private final Map>>> supportedCommandsByName; - private final CreationPolicyAggregateFactory creationPolicyAggregateFactory; + private final Map, CreationPolicyAggregateFactory> factoryPerType; /** * Instantiate a Builder to be able to create a {@link AggregateAnnotationCommandHandler}. @@ -111,16 +111,30 @@ protected AggregateAnnotationCommandHandler(Builder builder) { this.supportedCommandNames = new HashSet<>(); this.supportedCommandsByName = new HashMap<>(); AggregateModel aggregateModel = builder.buildAggregateModel(); - this.creationPolicyAggregateFactory = - initializeAggregateFactory(aggregateModel.entityClass(), builder.creationPolicyAggregateFactory); + // Suppressing cast to Class as we are definitely dealing with implementations of T. + //noinspection unchecked + this.factoryPerType = initializeAggregateFactories( + aggregateModel.types() + .map(type -> (Class) type) + .collect(Collectors.toList()), + builder.creationPolicyAggregateFactory + ); + this.handlers = initializeHandlers(aggregateModel); } - private CreationPolicyAggregateFactory initializeAggregateFactory(Class aggregateClass, - CreationPolicyAggregateFactory configuredAggregateFactory) { - return configuredAggregateFactory != null - ? configuredAggregateFactory - : new NoArgumentConstructorCreationPolicyAggregateFactory<>(aggregateClass); + private Map, CreationPolicyAggregateFactory> initializeAggregateFactories( + List> aggregateTypes, + CreationPolicyAggregateFactory configuredAggregateFactory + ) { + Map, CreationPolicyAggregateFactory> typeToFactory = new HashMap<>(); + for (Class aggregateType : aggregateTypes) { + typeToFactory.put(aggregateType, configuredAggregateFactory != null + ? configuredAggregateFactory + : new NoArgumentConstructorCreationPolicyAggregateFactory<>(aggregateType) + ); + } + return typeToFactory; } /** @@ -189,12 +203,14 @@ private void initializeHandler(AggregateModel aggregateModel, } else { switch (policy.orElse(NEVER)) { case ALWAYS: - messageHandler = - new AlwaysCreateAggregateCommandHandler(handler, creationPolicyAggregateFactory); + messageHandler = new AlwaysCreateAggregateCommandHandler( + handler, factoryPerType.get(handler.declaringClass()) + ); break; case CREATE_IF_MISSING: - messageHandler = - new AggregateCreateOrUpdateCommandHandler(handler, creationPolicyAggregateFactory); + messageHandler = new AggregateCreateOrUpdateCommandHandler( + handler, factoryPerType.get(handler.declaringClass()) + ); break; case NEVER: messageHandler = new AggregateCommandHandler(handler); @@ -351,11 +367,15 @@ public Builder aggregateModel(AggregateModel aggregateModel) { } /** - * Sets the {@link CreationPolicyAggregateFactory} for generic type {@code T}. The aggregate factory must - * produce a new instance of the Aggregate root based on the supplied Identifier. + * Sets the {@link CreationPolicyAggregateFactory} for generic type {@code T}. + *

+ * The aggregate factory must produce a new instance of the aggregate root based on the supplied identifier. + * When dealing with a polymorphic aggregate, the given {@code creationPolicyAggregateFactory} will be used for + * every {@link AggregateModel#types() type}. * - * @param creationPolicyAggregateFactory that returns the aggregate instance based on the identifier - * @return the current Builder instance, for fluent interfacing + * @param creationPolicyAggregateFactory The {@link CreationPolicyAggregateFactory} the constructs an aggregate + * instance based on an identifier. + * @return The current Builder instance, for fluent interfacing. */ public Builder creationPolicyAggregateFactory( CreationPolicyAggregateFactory creationPolicyAggregateFactory diff --git a/pom.xml b/pom.xml index 4c06b901e8..b008f43369 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ ${project.basedir}/../coverage-report/target/site/jacoco-aggregate/jacoco.xml - 2024.1.0 + 2024.1.1 2.10.9.2 3.10.8 diff --git a/spring-boot-autoconfigure/src/main/java/org/axonframework/springboot/autoconfig/AxonServerBusAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/axonframework/springboot/autoconfig/AxonServerBusAutoConfiguration.java index ef57967bb8..581bed1e92 100644 --- a/spring-boot-autoconfigure/src/main/java/org/axonframework/springboot/autoconfig/AxonServerBusAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/axonframework/springboot/autoconfig/AxonServerBusAutoConfiguration.java @@ -43,7 +43,6 @@ import org.axonframework.queryhandling.SimpleQueryBus; import org.axonframework.serialization.Serializer; import org.axonframework.springboot.util.ConditionalOnMissingQualifiedBean; -import org.axonframework.tracing.SpanFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -116,17 +115,22 @@ public AxonServerQueryBus queryBus(AxonServerConnectionManager axonServerConnect new CorrelationDataInterceptor<>(axonConfiguration.correlationDataProviders()) ); - return AxonServerQueryBus.builder() - .axonServerConnectionManager(axonServerConnectionManager) - .configuration(axonServerConfiguration) - .localSegment(simpleQueryBus) - .updateEmitter(simpleQueryBus.queryUpdateEmitter()) - .messageSerializer(messageSerializer) - .genericSerializer(genericSerializer) - .priorityCalculator(priorityCalculator) - .targetContextResolver(targetContextResolver) - .spanFactory(axonConfiguration.getComponent(QueryBusSpanFactory.class)) - .build(); + AxonServerQueryBus.Builder axonQueryBuilder = AxonServerQueryBus.builder() + .axonServerConnectionManager( + axonServerConnectionManager) + .configuration(axonServerConfiguration) + .localSegment(simpleQueryBus) + .updateEmitter(simpleQueryBus.queryUpdateEmitter()) + .messageSerializer(messageSerializer) + .genericSerializer(genericSerializer) + .priorityCalculator(priorityCalculator) + .targetContextResolver(targetContextResolver) + .spanFactory(axonConfiguration.getComponent( + QueryBusSpanFactory.class)); + if (axonServerConfiguration.isShortcutQueriesToLocalHandlers()) { + axonQueryBuilder.enabledLocalSegmentShortCut(); + } + return axonQueryBuilder.build(); } @Bean diff --git a/test/src/test/java/org/axonframework/test/aggregate/FixtureTest_CreationPolicy.java b/test/src/test/java/org/axonframework/test/aggregate/FixtureTest_CreationPolicy.java index 1d5fa2e022..31d9ac6cf8 100644 --- a/test/src/test/java/org/axonframework/test/aggregate/FixtureTest_CreationPolicy.java +++ b/test/src/test/java/org/axonframework/test/aggregate/FixtureTest_CreationPolicy.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2024. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -176,6 +176,26 @@ void whenProtectedConstructorCombinedWithAlwaysPolicyThenAggregateWorksAsExpecte .expectSuccessfulHandlerExecution(); } + @Test + void whenPolymorphicAggregateWithUniquelyNamedCreateIfMissingPolicyOnChildThenWorksAsExpected() { + new AggregateTestFixture<>(TestAggregateParentForPolymorphicCase.class) + .withSubtypes(TestAggregateChildForPolymorphicCase.class) + .givenNoPriorActivity() + .when(new CreateOrUpdateCommand(AGGREGATE_ID, PUBLISH_EVENTS)) + .expectEvents(new CreatedOrUpdatedEvent(AGGREGATE_ID)) + .expectSuccessfulHandlerExecution(); + } + + @Test + void whenPolymorphicAggregateWithUniquelyNamedAlwaysPolicyOnChildThenWorksAsExpected() { + new AggregateTestFixture<>(TestAggregateParentForPolymorphicCase.class) + .withSubtypes(TestAggregateChildForPolymorphicCase.class) + .givenNoPriorActivity() + .when(new AlwaysCreateWithoutResultCommand(AGGREGATE_ID, PUBLISH_EVENTS)) + .expectEvents(new AlwaysCreatedEvent(AGGREGATE_ID)) + .expectSuccessfulHandlerExecution(); + } + private static class CreateCommand { @TargetAggregateIdentifier @@ -552,6 +572,49 @@ public void on(AlwaysCreatedEvent event) { } } + public static abstract class TestAggregateParentForPolymorphicCase { + + @AggregateIdentifier + protected ComplexAggregateId id; + + public TestAggregateParentForPolymorphicCase() { + // Constructor made public on purpose for testing. + } + } + + public static class TestAggregateChildForPolymorphicCase extends TestAggregateParentForPolymorphicCase { + + public TestAggregateChildForPolymorphicCase() { + // Constructor made public on purpose for testing. + } + + @CommandHandler + @CreationPolicy(AggregateCreationPolicy.CREATE_IF_MISSING) + public void handle(CreateOrUpdateCommand command) { + if (command.shouldPublishEvent()) { + apply(new CreatedOrUpdatedEvent(command.getId())); + } + } + + @CommandHandler + @CreationPolicy(AggregateCreationPolicy.ALWAYS) + public void handle(AlwaysCreateWithoutResultCommand command) { + if (command.shouldPublishEvent()) { + apply(new AlwaysCreatedEvent(command.getId())); + } + } + + @EventSourcingHandler + private void on(CreatedOrUpdatedEvent event) { + this.id = event.getId(); + } + + @EventSourcingHandler + public void on(AlwaysCreatedEvent event) { + this.id = event.getId(); + } + } + /** * Test identifier introduced for issue #1356. diff --git a/test/src/test/java/org/axonframework/test/server/AxonServerEEContainerTest.java b/test/src/test/java/org/axonframework/test/server/AxonServerEEContainerTest.java index 9aaa7a7a89..7fc69eec9c 100644 --- a/test/src/test/java/org/axonframework/test/server/AxonServerEEContainerTest.java +++ b/test/src/test/java/org/axonframework/test/server/AxonServerEEContainerTest.java @@ -17,6 +17,7 @@ package org.axonframework.test.server; import org.junit.jupiter.api.*; +import org.junit.jupiter.api.condition.DisabledOnOs; import org.testcontainers.utility.DockerImageName; import static org.junit.jupiter.api.Assertions.*; @@ -29,6 +30,7 @@ class AxonServerEEContainerTest { @Test + @DisabledOnOs(architectures = {"aarch64"}) void supportsAxonServer_4_5_X() { try ( final AxonServerEEContainer axonServerEEContainer = @@ -40,6 +42,7 @@ void supportsAxonServer_4_5_X() { } @Test + @DisabledOnOs(architectures = {"aarch64"}) void axonServer_4_5_X_genericTest() { try ( final AxonServerEEContainer axonServerEEContainer = diff --git a/test/src/test/java/org/axonframework/test/server/AxonServerSEContainerTest.java b/test/src/test/java/org/axonframework/test/server/AxonServerSEContainerTest.java index eda3e6eefa..c828c1e6f6 100644 --- a/test/src/test/java/org/axonframework/test/server/AxonServerSEContainerTest.java +++ b/test/src/test/java/org/axonframework/test/server/AxonServerSEContainerTest.java @@ -17,6 +17,7 @@ package org.axonframework.test.server; import org.junit.jupiter.api.*; +import org.junit.jupiter.api.condition.DisabledOnOs; import org.testcontainers.utility.DockerImageName; import static org.junit.jupiter.api.Assertions.*; @@ -29,6 +30,7 @@ class AxonServerSEContainerTest { @Test + @DisabledOnOs(architectures = {"aarch64"}) void supportsAxonServer_4_4_X() { try ( final AxonServerSEContainer axonServerSEContainer =