Skip to content

Commit b82ecfb

Browse files
committed
[Java] Complete adding header and version information to the recording log.
1 parent 4b94275 commit b82ecfb

File tree

5 files changed

+129
-95
lines changed

5 files changed

+129
-95
lines changed

aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java

+86-52
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.agrona.DirectBuffer;
2424
import org.agrona.LangUtil;
2525
import org.agrona.MutableDirectBuffer;
26+
import org.agrona.SemanticVersion;
2627
import org.agrona.Strings;
2728
import org.agrona.collections.IntArrayList;
2829
import org.agrona.collections.Long2LongHashMap;
@@ -50,6 +51,9 @@
5051
import static io.aeron.archive.client.AeronArchive.NULL_POSITION;
5152
import static java.lang.Math.max;
5253
import static java.nio.ByteOrder.LITTLE_ENDIAN;
54+
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
55+
import static java.nio.file.StandardCopyOption.COPY_ATTRIBUTES;
56+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
5357
import static java.nio.file.StandardOpenOption.*;
5458
import static org.agrona.BitUtil.*;
5559

@@ -68,12 +72,13 @@
6872
* 0 1 2 3
6973
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
7074
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
71-
* | 0xFFA3F010 (Magic Number) |
72-
* | |
75+
* | 0xFFA3F010 (Magic Number) |
76+
* | 0x00000000 |
7377
* +---------------------------------------------------------------+
7478
* | Version |
7579
* +---------------------------------------------------------------+
76-
* | Reserved |
80+
* | Reserved (52 bytes) ...
81+
* ... |
7782
* +---------------------------------------------------------------+
7883
* </pre>
7984
* Recording log entry as follows:
@@ -117,7 +122,13 @@
117122
public final class RecordingLog implements AutoCloseable
118123
{
119124
public static final long MAGIC_NUMBER = 0xFFA3F010_00000000L;
120-
private static final int HEADER_SIZE = 64;
125+
public static final int HEADER_SIZE = 64;
126+
public static final int MAJOR_VERSION = 0;
127+
public static final int MINOR_VERSION = 1;
128+
public static final int PATCH_VERSION = 0;
129+
public static final int SEMANTIC_VERSION = SemanticVersion.compose(MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION);
130+
private static final int MAGIC_NUMBER_OFFSET = 0;
131+
private static final int VERSION_OFFSET = MAGIC_NUMBER_OFFSET + SIZE_OF_LONG;
121132

122133
/**
123134
* Representation of the entry in the {@link RecordingLog}.
@@ -646,6 +657,9 @@ public String toString()
646657
*/
647658
public static final String RECORDING_LOG_FILE_NAME = "recording.log";
648659

660+
static final String RECORDING_LOG_MIGRATED_FILE_NAME = RECORDING_LOG_FILE_NAME + ".migrated";
661+
static final String RECORDING_LOG_NEW_FILE_NAME = RECORDING_LOG_FILE_NAME + ".new";
662+
649663
/**
650664
* The log entry is for a recording of messages within a leadership term to the log.
651665
*/
@@ -785,6 +799,7 @@ public RecordingLog(final File parentDir, final boolean createNew)
785799
if (isNewFile)
786800
{
787801
syncDirectory(parentDir);
802+
writeHeader(fileChannel);
788803
}
789804
else
790805
{
@@ -802,54 +817,6 @@ public RecordingLog(final File parentDir, final boolean createNew)
802817
}
803818
}
804819

805-
private void checkForVersionAndMigrate(final File logFile) throws IOException
806-
{
807-
if (requiresMigration(logFile))
808-
{
809-
final File oldMigratedFile = new File(logFile.getParentFile(), RECORDING_LOG_FILE_NAME + ".migrated");
810-
if (!logFile.renameTo(oldMigratedFile))
811-
{
812-
throw new IOException("Unable to backup old file to new one");
813-
}
814-
815-
final File newFile = new File(logFile.getParentFile(), RECORDING_LOG_FILE_NAME);
816-
final MutableDirectBuffer header = new UnsafeBuffer(new byte[HEADER_SIZE]);
817-
// TODO: Offset constants
818-
header.putLong(0, MAGIC_NUMBER, LITTLE_ENDIAN);
819-
header.putInt(8, 0 /* TODO: version */, LITTLE_ENDIAN);
820-
821-
try (FileOutputStream outputStream = new FileOutputStream(newFile))
822-
{
823-
outputStream.write(header.byteArray());
824-
Files.copy(oldMigratedFile.toPath(), outputStream);
825-
}
826-
}
827-
}
828-
829-
private boolean requiresMigration(final File logFile) throws IOException
830-
{
831-
if (logFile.length() < HEADER_SIZE)
832-
{
833-
return true;
834-
}
835-
836-
try (FileChannel fileChannel = FileChannel.open(logFile.toPath(), READ))
837-
{
838-
return isMagicNumberInvalid(fileChannel);
839-
}
840-
}
841-
842-
private static boolean isMagicNumberInvalid(final FileChannel fileChannel) throws IOException
843-
{
844-
final DirectBuffer header = new UnsafeBuffer(ByteBuffer.allocateDirect(HEADER_SIZE));
845-
if (HEADER_SIZE != fileChannel.read(header.byteBuffer()))
846-
{
847-
throw new IOException("Unable to read header");
848-
}
849-
final long magicNumber = header.getLong(0, LITTLE_ENDIAN);
850-
return magicNumber != MAGIC_NUMBER;
851-
}
852-
853820
/**
854821
* {@inheritDoc}
855822
*/
@@ -1884,6 +1851,73 @@ private int captureEntriesFromBuffer(
18841851
return consumed;
18851852
}
18861853

1854+
private void applyHeader(final MutableDirectBuffer header)
1855+
{
1856+
header.putLong(MAGIC_NUMBER_OFFSET, MAGIC_NUMBER, LITTLE_ENDIAN);
1857+
header.putInt(VERSION_OFFSET, SEMANTIC_VERSION, LITTLE_ENDIAN);
1858+
}
1859+
1860+
private void writeHeader(final FileChannel fileChannel) throws IOException
1861+
{
1862+
final ByteBuffer headerBuffer = ByteBuffer.allocateDirect(HEADER_SIZE);
1863+
final MutableDirectBuffer header = new UnsafeBuffer(headerBuffer);
1864+
applyHeader(header);
1865+
1866+
if (HEADER_SIZE != fileChannel.write(headerBuffer))
1867+
{
1868+
throw new IOException("Failed to write full header");
1869+
}
1870+
}
1871+
1872+
private void checkForVersionAndMigrate(final File logFile) throws IOException
1873+
{
1874+
if (requiresMigration(logFile))
1875+
{
1876+
final File oldMigratedFile = new File(logFile.getParentFile(), RECORDING_LOG_MIGRATED_FILE_NAME);
1877+
Files.copy(logFile.toPath(), oldMigratedFile.toPath(), COPY_ATTRIBUTES, REPLACE_EXISTING);
1878+
1879+
final File newFile = new File(logFile.getParentFile(), RECORDING_LOG_NEW_FILE_NAME);
1880+
Files.deleteIfExists(newFile.toPath());
1881+
1882+
final MutableDirectBuffer header = new UnsafeBuffer(new byte[HEADER_SIZE]);
1883+
applyHeader(header);
1884+
1885+
try (FileOutputStream outputStream = new FileOutputStream(newFile, false))
1886+
{
1887+
outputStream.write(header.byteArray());
1888+
Files.copy(oldMigratedFile.toPath(), outputStream);
1889+
}
1890+
1891+
Files.move(
1892+
newFile.toPath(), new File(logFile.getParentFile(), RECORDING_LOG_FILE_NAME).toPath(),
1893+
ATOMIC_MOVE, REPLACE_EXISTING);
1894+
}
1895+
}
1896+
1897+
private boolean requiresMigration(final File logFile) throws IOException
1898+
{
1899+
if (logFile.length() < HEADER_SIZE)
1900+
{
1901+
return true;
1902+
}
1903+
1904+
try (FileChannel fileChannel = FileChannel.open(logFile.toPath(), READ))
1905+
{
1906+
return isMagicNumberInvalid(fileChannel);
1907+
}
1908+
}
1909+
1910+
private static boolean isMagicNumberInvalid(final FileChannel fileChannel) throws IOException
1911+
{
1912+
final DirectBuffer header = new UnsafeBuffer(ByteBuffer.allocateDirect(HEADER_SIZE));
1913+
if (HEADER_SIZE != fileChannel.read(header.byteBuffer()))
1914+
{
1915+
throw new IOException("Unable to read header");
1916+
}
1917+
final long magicNumber = header.getLong(0, LITTLE_ENDIAN);
1918+
return magicNumber != MAGIC_NUMBER;
1919+
}
1920+
18871921
private static void syncDirectory(final File dir)
18881922
{
18891923
try (FileChannel fileChannel = FileChannel.open(dir.toPath()))

aeron-cluster/src/test/java/io/aeron/cluster/RecordingLogTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ void entriesInTheRecordingLogShouldBeSorted()
670670

671671
recordingLog.reload();
672672

673+
assertEquals(sortedList.size(), recordingLog.entries().size());
673674
assertEquals(sortedList, recordingLog.entries()); // reload from disc and re-sort
674675
}
675676
}

aeron-cluster/src/test/java/io/aeron/cluster/RecordingLogVersioningTest.java

+33-42
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,34 @@
1515
*/
1616
package io.aeron.cluster;
1717

18-
import org.junit.jupiter.api.Disabled;
1918
import org.junit.jupiter.api.Test;
2019
import org.junit.jupiter.api.io.TempDir;
20+
import org.junit.jupiter.params.ParameterizedTest;
21+
import org.junit.jupiter.params.provider.ValueSource;
2122

2223
import java.io.File;
2324
import java.io.IOException;
2425
import java.nio.charset.StandardCharsets;
2526
import java.nio.file.Files;
26-
import java.nio.file.StandardCopyOption;
2727
import java.util.ArrayList;
2828
import java.util.List;
2929
import java.util.Objects;
3030
import java.util.Random;
3131

32+
import static io.aeron.cluster.RecordingLog.ENTRY_TYPE_SNAPSHOT;
33+
import static io.aeron.cluster.RecordingLog.ENTRY_TYPE_STANDBY_SNAPSHOT;
3234
import static io.aeron.cluster.RecordingLog.ENTRY_TYPE_TERM;
3335
import static org.junit.jupiter.api.Assertions.assertEquals;
3436
import static org.junit.jupiter.api.Assertions.assertNotEquals;
35-
import static org.junit.jupiter.api.Assertions.assertTrue;
3637

3738
public class RecordingLogVersioningTest
3839
{
3940
private static final byte[] CHARACTER_TABLE = new byte[27];
4041
public static final long FIXED_SEED_FOR_CONSISTENT_DATA = 892374458763L;
4142

43+
@TempDir File tempDirA;
44+
@TempDir File tempDirB;
45+
4246
static
4347
{
4448
for (int i = 0; i < 26; i++)
@@ -60,7 +64,7 @@ private static class TestEntry
6064
private final int serviceId;
6165
private final String endpoint;
6266

63-
public TestEntry(
67+
TestEntry(
6468
final int entryType,
6569
final long recordingId,
6670
final long leadershipTermId,
@@ -140,13 +144,19 @@ private List<TestEntry> generateData()
140144
return entries;
141145
}
142146

143-
@SuppressWarnings("checkstyle:MethodName")
144147
@Test
145-
@Disabled // Used to generate existing data.
146-
void generateRecordingLogForVersionTest_1_45()
148+
void verifyDataRemainsConsistent()
149+
{
150+
assertEquals(generateData(), generateData());
151+
}
152+
153+
@ParameterizedTest
154+
@ValueSource(booleans = { true, false })
155+
void shouldLoadOldVersionAndMigrate(final boolean hasPartiallyProcessedFiles) throws IOException
147156
{
148-
final File parentDir = new File("src/test/resources/v1_45_x");
149-
try (UnversionedRecordingLog recordingLog = new UnversionedRecordingLog(parentDir, true))
157+
assertNotEquals(tempDirA, tempDirB);
158+
159+
try (UnversionedRecordingLog recordingLog = new UnversionedRecordingLog(tempDirA, true))
150160
{
151161
for (final TestEntry testEntry : generateData())
152162
{
@@ -160,17 +170,20 @@ void generateRecordingLogForVersionTest_1_45()
160170
testEntry.serviceId,
161171
testEntry.endpoint);
162172
}
163-
final List<UnversionedRecordingLog.Entry> entries = recordingLog.entries();
164-
final String s = entries.toString();
165-
System.out.println(s);
166173
}
167-
}
168174

169-
private static void appendToRecordingLog(final RecordingLog recordingLog, final List<TestEntry> testEntries)
170-
{
171-
for (final TestEntry testEntry : testEntries)
175+
if (hasPartiallyProcessedFiles)
176+
{
177+
Files.createFile(new File(tempDirA, RecordingLog.RECORDING_LOG_MIGRATED_FILE_NAME).toPath());
178+
Files.createFile(new File(tempDirA, RecordingLog.RECORDING_LOG_NEW_FILE_NAME).toPath());
179+
}
180+
181+
final RecordingLog recordingLogMigrated = new RecordingLog(tempDirA, false);
182+
final RecordingLog recordingLogNew = new RecordingLog(tempDirB, true);
183+
184+
for (final TestEntry testEntry : generateData())
172185
{
173-
recordingLog.append(
186+
recordingLogNew.append(
174187
testEntry.entryType,
175188
testEntry.recordingId,
176189
testEntry.leadershipTermId,
@@ -180,28 +193,6 @@ private static void appendToRecordingLog(final RecordingLog recordingLog, final
180193
testEntry.serviceId,
181194
testEntry.endpoint);
182195
}
183-
}
184-
185-
@Test
186-
void verifyDataRemainsConsistent()
187-
{
188-
assertEquals(generateData(), generateData());
189-
}
190-
191-
@Test
192-
void shouldLoadOldVersionAndMigrate(@TempDir final File tempDirA, @TempDir final File tempDirB) throws IOException
193-
{
194-
assertNotEquals(tempDirA, tempDirB);
195-
196-
final File parentDir = new File("src/test/resources/v1_45_x");
197-
final File oldFile = new File(parentDir, RecordingLog.RECORDING_LOG_FILE_NAME);
198-
final File tempOldFile = new File(tempDirA, RecordingLog.RECORDING_LOG_FILE_NAME);
199-
200-
Files.copy(oldFile.toPath(), tempOldFile.toPath());
201-
202-
final RecordingLog recordingLogMigrated = new RecordingLog(tempDirA, false);
203-
final RecordingLog recordingLogNew = new RecordingLog(tempDirB, true);
204-
appendToRecordingLog(recordingLogNew, generateData());
205196

206197
assertEquals(recordingLogMigrated.entries(), recordingLogNew.entries());
207198
}
@@ -213,16 +204,16 @@ private int recordingType(final Random r)
213204
switch (type)
214205
{
215206
case 0: return ENTRY_TYPE_TERM;
216-
case 1: return RecordingLog.ENTRY_TYPE_SNAPSHOT;
217-
case 2: return RecordingLog.ENTRY_TYPE_STANDBY_SNAPSHOT;
207+
case 1: return ENTRY_TYPE_SNAPSHOT;
208+
case 2: return ENTRY_TYPE_STANDBY_SNAPSHOT;
218209
}
219210

220211
throw new IllegalStateException();
221212
}
222213

223214
private String endpoint(final Random r, final int type, final byte[] bs)
224215
{
225-
if (RecordingLog.ENTRY_TYPE_STANDBY_SNAPSHOT == type)
216+
if (ENTRY_TYPE_STANDBY_SNAPSHOT == type)
226217
{
227218
final int length = 50 + r.nextInt(50);
228219
for (int i = 0; i < length; i++)

aeron-system-tests/src/test/java/io/aeron/cluster/ClusterToolTest.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.aeron.test.SystemTestWatcher;
2323
import io.aeron.test.cluster.TestCluster;
2424
import io.aeron.test.cluster.TestNode;
25+
import org.agrona.MutableDirectBuffer;
26+
import org.agrona.concurrent.UnsafeBuffer;
2527
import org.junit.jupiter.api.Test;
2628
import org.junit.jupiter.api.extension.ExtendWith;
2729
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,6 +40,7 @@
3840
import java.util.regex.Pattern;
3941

4042
import static io.aeron.test.cluster.TestCluster.aCluster;
43+
import static java.nio.ByteOrder.LITTLE_ENDIAN;
4144
import static java.nio.charset.StandardCharsets.US_ASCII;
4245
import static java.nio.file.StandardOpenOption.CREATE_NEW;
4346
import static java.nio.file.StandardOpenOption.WRITE;
@@ -220,7 +223,12 @@ void sortRecordingLogIsANoOpIfRecordLogIsEmpty(final @TempDir Path emptyClusterD
220223
final boolean result = ClusterTool.sortRecordingLog(clusterDir);
221224

222225
assertFalse(result);
223-
assertArrayEquals(new byte[0], Files.readAllBytes(logFile));
226+
227+
final MutableDirectBuffer header = new UnsafeBuffer(new byte[RecordingLog.HEADER_SIZE]);
228+
header.putLong(0, RecordingLog.MAGIC_NUMBER, LITTLE_ENDIAN);
229+
header.putInt(8, RecordingLog.SEMANTIC_VERSION, LITTLE_ENDIAN);
230+
231+
assertArrayEquals(header.byteArray(), Files.readAllBytes(logFile));
224232
}
225233

226234
@Test

0 commit comments

Comments
 (0)