Skip to content

Commit 3a84cb6

Browse files
committed
[Java] Rework archive replay to not use memory mapped files so reads can be batched for major performance improvement in throughput and latency.
1 parent 11b5ff3 commit 3a84cb6

File tree

7 files changed

+170
-136
lines changed

7 files changed

+170
-136
lines changed

aeron-archive/src/main/java/io/aeron/archive/Archive.java

-13
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,6 @@ public static class Configuration
184184
public static final String MAX_CONCURRENT_REPLAYS_PROP_NAME = "aeron.archive.max.concurrent.replays";
185185
public static final int MAX_CONCURRENT_REPLAYS_DEFAULT = 128;
186186

187-
public static final String REPLAY_FRAGMENT_LIMIT_PROP_NAME = "aeron.archive.replay.fragment.limit";
188-
public static final int REPLAY_FRAGMENT_LIMIT_DEFAULT = 64;
189-
190187
public static final String MAX_CATALOG_ENTRIES_PROP_NAME = "aeron.archive.max.catalog.entries";
191188
public static final long MAX_CATALOG_ENTRIES_DEFAULT = Catalog.DEFAULT_MAX_ENTRIES;
192189

@@ -298,16 +295,6 @@ public static int maxConcurrentReplays()
298295
return Integer.getInteger(MAX_CONCURRENT_REPLAYS_PROP_NAME, MAX_CONCURRENT_REPLAYS_DEFAULT);
299296
}
300297

301-
/**
302-
* Limit for the number of fragments to be replayed per duty cycle on a replay.
303-
*
304-
* @return the limit for the number of fragments to be replayed per duty cycle on a replay.
305-
*/
306-
public static int replayFragmentLimit()
307-
{
308-
return Integer.getInteger(REPLAY_FRAGMENT_LIMIT_PROP_NAME, REPLAY_FRAGMENT_LIMIT_DEFAULT);
309-
}
310-
311298
/**
312299
* Maximum number of catalog entries to allocate for the catalog file.
313300
*

aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java

+24-26
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
import java.io.File;
3535
import java.io.IOException;
36-
import java.nio.ByteBuffer;
3736
import java.nio.channels.FileChannel;
3837
import java.nio.file.StandardOpenOption;
3938
import java.nio.file.attribute.FileAttribute;
@@ -54,6 +53,7 @@
5453
import static io.aeron.logbuffer.FrameDescriptor.FRAME_ALIGNMENT;
5554
import static java.nio.file.StandardOpenOption.READ;
5655
import static java.nio.file.StandardOpenOption.WRITE;
56+
import static org.agrona.BufferUtil.allocateDirectAligned;
5757
import static org.agrona.concurrent.status.CountersReader.METADATA_LENGTH;
5858

5959
abstract class ArchiveConductor extends SessionWorker<Session> implements AvailableImageHandler
@@ -66,13 +66,15 @@ abstract class ArchiveConductor extends SessionWorker<Session> implements Availa
6666
private final Long2ObjectHashMap<ReplaySession> replaySessionByIdMap = new Long2ObjectHashMap<>();
6767
private final Long2ObjectHashMap<RecordingSession> recordingSessionByIdMap = new Long2ObjectHashMap<>();
6868
private final Object2ObjectHashMap<String, Subscription> recordingSubscriptionMap = new Object2ObjectHashMap<>();
69+
private final RecordingSummary recordingSummary = new RecordingSummary();
6970
private final UnsafeBuffer descriptorBuffer = new UnsafeBuffer();
7071
private final RecordingDescriptorDecoder recordingDescriptorDecoder = new RecordingDescriptorDecoder();
71-
private final RecordingSummary recordingSummary = new RecordingSummary();
72-
private final UnsafeBuffer tempBuffer = new UnsafeBuffer(new byte[METADATA_LENGTH]);
73-
private final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(DataHeaderFlyweight.HEADER_LENGTH);
74-
private final DataHeaderFlyweight dataHeaderFlyweight = new DataHeaderFlyweight(byteBuffer);
7572
private final ControlResponseProxy controlResponseProxy = new ControlResponseProxy();
73+
private final UnsafeBuffer tempBuffer = new UnsafeBuffer(new byte[METADATA_LENGTH]);
74+
private final UnsafeBuffer dataHeaderBuffer = new UnsafeBuffer(
75+
allocateDirectAligned(DataHeaderFlyweight.HEADER_LENGTH, 128));
76+
private final UnsafeBuffer replayBuffer = new UnsafeBuffer(
77+
allocateDirectAligned(ReplaySession.REPLAY_BLOCK_LENGTH, 128));
7678

7779
private final Aeron aeron;
7880
private final AgentInvoker aeronAgentInvoker;
@@ -421,7 +423,8 @@ void startReplay(
421423
replayPosition = position;
422424
}
423425

424-
if (!hasInitialSegmentFile(controlSession, archiveDir, replayPosition, recordingId, correlationId))
426+
final File segmentFile = segmentFile(controlSession, archiveDir, replayPosition, recordingId, correlationId);
427+
if (null == segmentFile)
425428
{
426429
return;
427430
}
@@ -439,8 +442,10 @@ void startReplay(
439442
correlationId,
440443
controlSession,
441444
controlResponseProxy,
445+
replayBuffer,
442446
catalog,
443447
archiveDir,
448+
segmentFile,
444449
cachedEpochClock,
445450
replayPublication,
446451
recordingSummary,
@@ -578,36 +583,29 @@ void truncateRecording(
578583
final int segmentIndex = segmentFileIndex(startPosition, position, segmentLength);
579584
final File file = new File(archiveDir, segmentFileName(recordingId, segmentIndex));
580585

581-
final long segmentOffset = position & (segmentLength - 1);
586+
final int segmentOffset = (int)(position & (segmentLength - 1));
582587
final int termLength = summary.termBufferLength;
583588
final int termOffset = (int)(position & (termLength - 1));
584589

585590
if (termOffset > 0)
586591
{
587-
try (FileChannel fileChannel = FileChannel.open(file.toPath(), FILE_OPTIONS, NO_ATTRIBUTES))
592+
try (FileChannel channel = FileChannel.open(file.toPath(), FILE_OPTIONS, NO_ATTRIBUTES))
588593
{
589-
byteBuffer.clear();
590-
if (DataHeaderFlyweight.HEADER_LENGTH != fileChannel.read(byteBuffer, segmentOffset))
591-
{
592-
throw new ArchiveException("failed to read fragment header");
593-
}
594-
595-
final long termCount = position >> LogBufferDescriptor.positionBitsToShift(termLength);
596-
final int termId = summary.initialTermId + (int)termCount;
594+
final int termCount = (int)(position >> LogBufferDescriptor.positionBitsToShift(termLength));
595+
final int termId = summary.initialTermId + termCount;
597596

598-
if (dataHeaderFlyweight.termOffset() != termOffset ||
599-
dataHeaderFlyweight.termId() != termId ||
600-
dataHeaderFlyweight.streamId() != summary.streamId)
597+
if (ReplaySession.notHeaderAligned(
598+
channel, dataHeaderBuffer, segmentOffset, termOffset, termId, summary.streamId))
601599
{
602-
final String msg = position + " position does not match header " + dataHeaderFlyweight;
600+
final String msg = position + " position not aligned to data header";
603601
controlSession.sendErrorResponse(correlationId, msg, controlResponseProxy);
604602

605603
return;
606604
}
607605

608-
fileChannel.truncate(segmentOffset);
609-
byteBuffer.put(0, (byte)0).limit(1).position(0);
610-
fileChannel.write(byteBuffer, segmentLength - 1);
606+
channel.truncate(segmentOffset);
607+
dataHeaderBuffer.byteBuffer().put(0, (byte)0).limit(1).position(0);
608+
channel.write(dataHeaderBuffer.byteBuffer(), segmentLength - 1);
611609
}
612610
catch (final IOException ex)
613611
{
@@ -982,7 +980,7 @@ private boolean validateReplayPosition(
982980
return true;
983981
}
984982

985-
private boolean hasInitialSegmentFile(
983+
private File segmentFile(
986984
final ControlSession controlSession,
987985
final File archiveDir,
988986
final long position,
@@ -999,9 +997,9 @@ private boolean hasInitialSegmentFile(
999997
final String msg = "initial segment file does not exist for replay recording id " + recordingId;
1000998
controlSession.sendErrorResponse(correlationId, msg, controlResponseProxy);
1001999

1002-
return false;
1000+
return null;
10031001
}
10041002

1005-
return true;
1003+
return segmentFile;
10061004
}
10071005
}

0 commit comments

Comments
 (0)