Skip to content

Commit

Permalink
[Java] Extend test to assert actual position recovery.
Browse files Browse the repository at this point in the history
(cherry picked from commit ee54479)
  • Loading branch information
vyazelenko committed Oct 25, 2024
1 parent a5c48d1 commit 789bd0b
Showing 1 changed file with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -158,9 +158,10 @@ void shouldRejoinAfterResting(final String channel)
final FragmentHandler fragmentHandler = (buffer, offset, length, header) -> {};

final UnsafeBuffer srcBuffer = new UnsafeBuffer(ByteBuffer.allocate(MESSAGE_LENGTH));
srcBuffer.setMemory(0, MESSAGE_LENGTH, (byte)-1);
final String untetheredChannel = channel + "|tether=false";
final String publicationChannel = channel.startsWith("aeron-spy") ? channel.substring(10) : channel;
boolean pollingUntethered = true;
int untetheredPollLimit = 3;

try (Subscription tetheredSub = aeron.addSubscription(channel, STREAM_ID);
Subscription untetheredSub = aeron.addSubscription(
Expand All @@ -173,36 +174,41 @@ void shouldRejoinAfterResting(final String channel)
aeron.conductorAgentInvoker().invoke();
}

while (true)
while (0 == unavailableImageCount.get())
{
if (publication.offer(srcBuffer) < 0)
if (publication.offer(srcBuffer, 0, ThreadLocalRandom.current().nextInt(1, MESSAGE_LENGTH)) < 0)
{
Tests.yield();
aeron.conductorAgentInvoker().invoke();
}

if (pollingUntethered && untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT) > 0)
if (untetheredPollLimit > 0 && untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT) > 0)
{
pollingUntethered = false;
untetheredPollLimit--;
}

tetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT);
}

if (unavailableImageCount.get() == 1)
{
while (availableImageCount.get() < 2)
{
Tests.yield();
aeron.conductorAgentInvoker().invoke();
}
while (availableImageCount.get() < 2)
{
publication.offer(srcBuffer, 0, ThreadLocalRandom.current().nextInt(1, MESSAGE_LENGTH));
Tests.yield();
aeron.conductorAgentInvoker().invoke();
}

if (!channel.startsWith("aeron-spy"))
{
final long tetheredPosition = tetheredSub.imageAtIndex(0).position();
final long untetheredJoinPosition = untetheredSub.imageAtIndex(0).joinPosition();
assertEquals(tetheredPosition, untetheredJoinPosition);
}
return;
final Image tetheredImage = tetheredSub.imageAtIndex(0);
final Image untetheredImage = untetheredSub.imageAtIndex(0);
while (untetheredImage.position() < publication.position() ||
tetheredImage.position() < publication.position())
{
int fragments = 0;
fragments += tetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT);
fragments += untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT);
if (0 == fragments)
{
Tests.yield();
aeron.conductorAgentInvoker().invoke();
}
}
}
Expand Down

0 comments on commit 789bd0b

Please sign in to comment.