From a5c48d10d364f489b3e02c3e8c0fd48f5700d61b Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Wed, 23 Oct 2024 15:34:21 -0400 Subject: [PATCH] Tethering join position (#1672) * [Java] Fix java tethering join position * [C] Align c with java publication image tethering join position * [Java] Fix join position of tethered subscriptions * [C] Align C tethering with java * [Java] Add position check to untethered system test * [Java] checkstyle (cherry picked from commit 297f69c8fdd0be6c51781f781c12ef709c9ab27e) --- aeron-driver/src/main/c/aeron_ipc_publication.c | 3 ++- aeron-driver/src/main/c/aeron_publication_image.c | 3 ++- .../src/main/java/io/aeron/driver/IpcPublication.java | 3 ++- .../src/main/java/io/aeron/driver/PublicationImage.java | 3 ++- .../java/io/aeron/driver/UntetheredSubscriptionTest.java | 2 +- .../src/test/java/io/aeron/UntetheredSubscriptionTest.java | 7 +++++++ 6 files changed, 16 insertions(+), 5 deletions(-) diff --git a/aeron-driver/src/main/c/aeron_ipc_publication.c b/aeron-driver/src/main/c/aeron_ipc_publication.c index 8d1663c3f7..1595c7aebe 100644 --- a/aeron-driver/src/main/c/aeron_ipc_publication.c +++ b/aeron-driver/src/main/c/aeron_ipc_publication.c @@ -370,7 +370,8 @@ void aeron_ipc_publication_check_untethered_subscriptions( case AERON_SUBSCRIPTION_TETHER_RESTING: if (now_ns > (tetherable_position->time_of_last_update_ns + resting_timeout_ns)) { - aeron_counter_set_ordered(tetherable_position->value_addr, consumer_position); + int64_t join_position = aeron_ipc_publication_join_position(publication); + aeron_counter_set_ordered(tetherable_position->value_addr, join_position); aeron_driver_conductor_on_available_image( conductor, publication->conductor_fields.managed_resource.registration_id, diff --git a/aeron-driver/src/main/c/aeron_publication_image.c b/aeron-driver/src/main/c/aeron_publication_image.c index f07ab9f6a3..aa1f36449c 100644 --- a/aeron-driver/src/main/c/aeron_publication_image.c +++ b/aeron-driver/src/main/c/aeron_publication_image.c @@ -1002,7 +1002,8 @@ void aeron_publication_image_check_untethered_subscriptions( case AERON_SUBSCRIPTION_TETHER_RESTING: if (now_ns > (tetherable_position->time_of_last_update_ns + resting_timeout_ns)) { - aeron_counter_set_ordered(tetherable_position->value_addr, *image->rcv_pos_position.value_addr); + int64_t join_position = aeron_publication_image_join_position(image); + aeron_counter_set_ordered(tetherable_position->value_addr, join_position); aeron_driver_conductor_on_available_image( conductor, image->conductor_fields.managed_resource.registration_id, diff --git a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java index ade7016b66..e063206ff4 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java @@ -455,13 +455,14 @@ else if (UntetheredSubscription.State.RESTING == untethered.state) { if ((untethered.timeOfLastUpdateNs + untetheredRestingTimeoutNs) - nowNs <= 0) { + final long joinPosition = joinPosition(); subscriberPositions = ArrayUtil.add(subscriberPositions, untethered.position); conductor.notifyAvailableImageLink( registrationId, sessionId, untethered.subscriptionLink, untethered.position.id(), - joinPosition(), + joinPosition, rawLog.fileName(), CommonContext.IPC_CHANNEL); untethered.state(UntetheredSubscription.State.ACTIVE, nowNs, streamId, sessionId); diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 51f8d09c50..a481b57b08 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -1040,13 +1040,14 @@ else if (UntetheredSubscription.State.RESTING == untethered.state) { if ((untethered.timeOfLastUpdateNs + untetheredRestingTimeoutNs) - nowNs <= 0) { + final long joinPosition = joinPosition(); subscriberPositions = ArrayUtil.add(subscriberPositions, untethered.position); conductor.notifyAvailableImageLink( correlationId, sessionId, untethered.subscriptionLink, untethered.position.id(), - joinPosition(), + joinPosition, rawLog.fileName(), sourceIdentity); untethered.state(UntetheredSubscription.State.ACTIVE, nowNs, streamId, sessionId); diff --git a/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java b/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java index 079e494f86..beddc17c05 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java @@ -110,7 +110,7 @@ void shouldLifeCycleTimeoutsAndRelink() eq(SESSION_ID), eq(untetheredLink), anyInt(), - eq(ipcPublication.joinPosition()), + eq(tetheredPosition.get()), eq(rawLog.fileName()), eq(CommonContext.IPC_CHANNEL)); } diff --git a/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java index f1e3333382..2076d12e15 100644 --- a/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -195,6 +196,12 @@ void shouldRejoinAfterResting(final String channel) aeron.conductorAgentInvoker().invoke(); } + if (!channel.startsWith("aeron-spy")) + { + final long tetheredPosition = tetheredSub.imageAtIndex(0).position(); + final long untetheredJoinPosition = untetheredSub.imageAtIndex(0).joinPosition(); + assertEquals(tetheredPosition, untetheredJoinPosition); + } return; } }