diff --git a/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java index 2076d12e15..7e79be80ce 100644 --- a/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java @@ -35,12 +35,12 @@ import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -158,9 +158,10 @@ void shouldRejoinAfterResting(final String channel) final FragmentHandler fragmentHandler = (buffer, offset, length, header) -> {}; final UnsafeBuffer srcBuffer = new UnsafeBuffer(ByteBuffer.allocate(MESSAGE_LENGTH)); + srcBuffer.setMemory(0, MESSAGE_LENGTH, (byte)-1); final String untetheredChannel = channel + "|tether=false"; final String publicationChannel = channel.startsWith("aeron-spy") ? channel.substring(10) : channel; - boolean pollingUntethered = true; + int untetheredPollLimit = 3; try (Subscription tetheredSub = aeron.addSubscription(channel, STREAM_ID); Subscription untetheredSub = aeron.addSubscription( @@ -173,36 +174,41 @@ void shouldRejoinAfterResting(final String channel) aeron.conductorAgentInvoker().invoke(); } - while (true) + while (0 == unavailableImageCount.get()) { - if (publication.offer(srcBuffer) < 0) + if (publication.offer(srcBuffer, 0, ThreadLocalRandom.current().nextInt(1, MESSAGE_LENGTH)) < 0) { Tests.yield(); aeron.conductorAgentInvoker().invoke(); } - if (pollingUntethered && untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT) > 0) + if (untetheredPollLimit > 0 && untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT) > 0) { - pollingUntethered = false; + untetheredPollLimit--; } tetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT); + } - if (unavailableImageCount.get() == 1) - { - while (availableImageCount.get() < 2) - { - Tests.yield(); - aeron.conductorAgentInvoker().invoke(); - } + while (availableImageCount.get() < 2) + { + publication.offer(srcBuffer, 0, ThreadLocalRandom.current().nextInt(1, MESSAGE_LENGTH)); + Tests.yield(); + aeron.conductorAgentInvoker().invoke(); + } - if (!channel.startsWith("aeron-spy")) - { - final long tetheredPosition = tetheredSub.imageAtIndex(0).position(); - final long untetheredJoinPosition = untetheredSub.imageAtIndex(0).joinPosition(); - assertEquals(tetheredPosition, untetheredJoinPosition); - } - return; + final Image tetheredImage = tetheredSub.imageAtIndex(0); + final Image untetheredImage = untetheredSub.imageAtIndex(0); + while (untetheredImage.position() < publication.position() || + tetheredImage.position() < publication.position()) + { + int fragments = 0; + fragments += tetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT); + fragments += untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT); + if (0 == fragments) + { + Tests.yield(); + aeron.conductorAgentInvoker().invoke(); } } }