diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/codec/LuceneOptimizedStoredFieldsReader.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/codec/LuceneOptimizedStoredFieldsReader.java index ab2c10b87b..e840aa70ab 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/codec/LuceneOptimizedStoredFieldsReader.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/codec/LuceneOptimizedStoredFieldsReader.java @@ -20,7 +20,6 @@ package com.apple.foundationdb.record.lucene.codec; -import com.apple.foundationdb.KeyValue; import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings; import com.apple.foundationdb.record.RecordCoreException; import com.apple.foundationdb.record.lucene.LuceneExceptions; @@ -65,10 +64,10 @@ public LuceneOptimizedStoredFieldsReader(final FDBDirectory directory, final Seg public static List getPrimaryKeys(final String segmentName, final FDBDirectory directory) throws IOException { try { - final List rawStoredFields = directory.readAllStoredFields(segmentName); + final List rawStoredFields = directory.readAllStoredFields(segmentName); List primaryKeys = new ArrayList<>(); - for (final KeyValue rawStoredField : rawStoredFields) { - final var storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(rawStoredField.getValue()); + for (final byte[] rawStoredField : rawStoredFields) { + final var storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(rawStoredField); primaryKeys.add(storedFields.getPrimaryKey().toByteArray()); } return primaryKeys; diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectory.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectory.java index 5cc47ab9f6..453daa06d3 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectory.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectory.java @@ -92,6 +92,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.CRC32; @@ -338,11 +339,12 @@ public void setFieldInfoId(final String filename, final long id, final ByteStrin writeFDBLuceneFileReference(filename, reference); } - void writeFieldInfos(long id, byte[] value) { + void writeFieldInfos(long id, byte[] rawBytes) { if (id == 0) { throw new RecordCoreArgumentException("FieldInfo id should never be 0"); } byte[] key = fieldInfosSubspace.pack(id); + byte[] value = serializer.encode(rawBytes); agilityContext.recordSize(LuceneEvents.SizeEvents.LUCENE_WRITE, key.length + value.length); if (LOGGER.isTraceEnabled()) { LOGGER.trace(getLogMessage("Write lucene stored field infos data", @@ -357,7 +359,9 @@ Stream> getAllFieldInfosStream() { LuceneEvents.Waits.WAIT_LUCENE_READ_FIELD_INFOS, agilityContext.apply(aContext -> aContext.ensureActive().getRange(fieldInfosSubspace.range()).asList())) .stream() - .map(keyValue -> NonnullPair.of(fieldInfosSubspace.unpack(keyValue.getKey()).getLong(0), keyValue.getValue())); + .map(keyValue -> NonnullPair.of( + fieldInfosSubspace.unpack(keyValue.getKey()).getLong(0), + serializer.decodePossiblyWithoutPrefix(keyValue.getValue()))); } public CompletableFuture getFieldInfosCount() { @@ -444,10 +448,11 @@ public int writeData(final long id, final int block, @Nonnull final byte[] value * Write stored fields document to the DB. * @param segmentName the segment name writing to * @param docID the document ID to write - * @param value the bytes value of the stored fields + * @param rawBytes the bytes value of the stored fields */ - public void writeStoredFields(@Nonnull String segmentName, int docID, @Nonnull final byte[] value) { + public void writeStoredFields(@Nonnull String segmentName, int docID, @Nonnull final byte[] rawBytes) { byte[] key = storedFieldsSubspace.pack(Tuple.from(segmentName, docID)); + byte[] value = serializer.encode(rawBytes); agilityContext.recordSize(LuceneEvents.SizeEvents.LUCENE_WRITE_STORED_FIELDS, key.length + value.length); if (LOGGER.isTraceEnabled()) { LOGGER.trace(getLogMessage("Write lucene stored fields data", @@ -542,7 +547,7 @@ private CompletableFuture readData(long id, int block) { } @Nonnull - public byte[] readStoredFields(String segmentName, int docId) throws IOException { + public byte[] readStoredFields(String segmentName, int docId) { final byte[] key = storedFieldsSubspace.pack(Tuple.from(segmentName, docId)); final byte[] rawBytes = asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_STORED_FIELDS, agilityContext.instrument(LuceneEvents.Events.LUCENE_READ_STORED_FIELDS, @@ -553,11 +558,11 @@ public byte[] readStoredFields(String segmentName, int docId) throws IOException .addLogInfo(LuceneLogMessageKeys.DOC_ID, docId) .addLogInfo(LogMessageKeys.KEY, ByteArrayUtil2.loggable(key)); } - return rawBytes; + return Objects.requireNonNull(serializer.decodePossiblyWithoutPrefix(rawBytes)); } @Nonnull - public List readAllStoredFields(String segmentName) { + public List readAllStoredFields(String segmentName) { final Range range = storedFieldsSubspace.range(Tuple.from(segmentName)); final List list = asyncToSync(LuceneEvents.Waits.WAIT_LUCENE_GET_ALL_STORED_FIELDS, agilityContext.getRange(range.begin, range.end)); @@ -567,7 +572,7 @@ public List readAllStoredFields(String segmentName) { .addLogInfo(LogMessageKeys.RANGE_START, ByteArrayUtil2.loggable(range.begin)) .addLogInfo(LogMessageKeys.RANGE_END, ByteArrayUtil2.loggable(range.end)); } - return list; + return list.stream().map(KeyValue::getValue).map(serializer::decodePossiblyWithoutPrefix).collect(Collectors.toList()); } /** diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/LuceneSerializer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/LuceneSerializer.java index c1d589d167..a4e1a9b054 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/LuceneSerializer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/LuceneSerializer.java @@ -131,7 +131,7 @@ public byte[] decode(@Nullable byte[] data) { return null; } - if (data.length < 2) { + if (data.length < 1) { throw new RecordCoreException("Invalid data") .addLogInfo(LuceneLogMessageKeys.DATA_VALUE, data); } @@ -270,4 +270,37 @@ private void decryptIfNeeded(@Nonnull CompressedAndEncryptedSerializerState stat } encodedDataInput.reset(decrypted); } + + @Nullable + public byte[] decodePossiblyWithoutPrefix(@Nullable byte[] bytes) { + if (bytes == null) { + return null; + } + + if (isProtobufMessageWithoutPrefix(bytes)) { + return bytes; + } + return decode(bytes); + } + + // This can be removed once it is guaranteed that all indexes are using the encoded format. + // Only works for Protobuf messages all of whose fields are themselves length-delimited, + // such as LuceneStoredFields (StoredField or bytes) or FieldInfos (FieldInfo). + private boolean isProtobufMessageWithoutPrefix(@Nonnull byte[] bytes) { + if (bytes.length < 1) { + return true; // No room for prefix; empty message. + } + final int byte0 = bytes[0]; + final int fieldTypeOrPrefixFlags = byte0 & 7; + // Either Protobuf LEN or ENCODING_COMPRESSED. + if (fieldTypeOrPrefixFlags != 2) { + return false; + } + final int fieldNumberOrKeyNumber = byte0 >> 3; + // ENCODING_COMPRESSED will never have a key; 0 is not a valid field number. + if (fieldNumberOrKeyNumber == 0) { + return false; + } + return true; + } } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneSerializerTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneSerializerTest.java index 9ccf947ef3..2003c00585 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneSerializerTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneSerializerTest.java @@ -27,15 +27,18 @@ import com.apple.test.BooleanSource; import com.apple.test.Tags; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; +import java.security.GeneralSecurityException; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; /** * Test for Lucene data compression/decompression and encryption/decryption validation. @@ -43,8 +46,8 @@ @Tag(Tags.RequiresFDB) class LuceneSerializerTest { @Test - void testEncodingWithoutCompression() throws InvalidProtocolBufferException { - final LuceneSerializer serializer = new LuceneSerializer(true, false, null); + void testEncodingWithoutCompression() throws Exception { + final LuceneSerializer serializer = getSerializer(true, false); final ByteString content = RandomUtil.randomByteString(ThreadLocalRandom.current(), 100); final LuceneFileSystemProto.LuceneFileReference reference = LuceneFileSystemProto.LuceneFileReference.newBuilder() .setId(1) @@ -73,8 +76,8 @@ void testEncodingWithoutCompression() throws InvalidProtocolBufferException { } @Test - void testEncodingWithCompression() throws InvalidProtocolBufferException { - final LuceneSerializer serializer = new LuceneSerializer(true, false, null); + void testEncodingWithCompression() throws Exception { + final LuceneSerializer serializer = getSerializer(true, false); final String duplicateMsg = "abcdefghijklmnopqrstuvwxyz"; final String content = "content_" + duplicateMsg + "_" + duplicateMsg; final LuceneFileSystemProto.LuceneFileReference reference = LuceneFileSystemProto.LuceneFileReference.newBuilder() @@ -103,11 +106,7 @@ void testEncodingWithCompression() throws InvalidProtocolBufferException { @ParameterizedTest @BooleanSource void testEncodingWithEncryption(boolean compressToo) throws Exception { - KeyGenerator keyGen = KeyGenerator.getInstance("AES"); - keyGen.init(128); - SecretKey key = keyGen.generateKey(); - final SerializationKeyManager keyManager = new FixedZeroKeyManager(key, null, null); - final LuceneSerializer serializer = new LuceneSerializer(compressToo, true, keyManager); + final LuceneSerializer serializer = getSerializer(compressToo, true); final ByteString content = RandomUtil.randomByteString(ThreadLocalRandom.current(), 100); final LuceneFileSystemProto.LuceneFileReference reference = LuceneFileSystemProto.LuceneFileReference.newBuilder() .setId(1) @@ -122,4 +121,52 @@ void testEncodingWithEncryption(boolean compressToo) throws Exception { final LuceneFileSystemProto.LuceneFileReference decryptedReference = LuceneFileSystemProto.LuceneFileReference.parseFrom(decodedValue); Assertions.assertEquals(content, decryptedReference.getContent()); } + + @ParameterizedTest + @MethodSource("encodedCompressedAndEncrypted") + void testProtobufMessageWithoutPrefix(boolean encode, boolean compress, boolean encrypt) throws Exception { + final LuceneSerializer serializer = getSerializer(compress, encrypt); + final String storedField = RandomUtil.randomAlphanumericString(ThreadLocalRandom.current(), 20); + final LuceneStoredFieldsProto.LuceneStoredFields.Builder builder = LuceneStoredFieldsProto.LuceneStoredFields.newBuilder(); + builder.addStoredFieldsBuilder().setFieldNumber(5).setStringValue(storedField); + final byte[] value = builder.build().toByteArray(); + final byte[] encodedValue = encode ? serializer.encode(value) : value; + final byte[] decodedValue = serializer.decodePossiblyWithoutPrefix(encodedValue); + Assertions.assertArrayEquals(value, decodedValue); + final LuceneStoredFieldsProto.LuceneStoredFields storedFields = LuceneStoredFieldsProto.LuceneStoredFields.parseFrom(decodedValue); + Assertions.assertEquals(storedField, storedFields.getStoredFields(0).getStringValue()); + } + + @ParameterizedTest + @MethodSource("encodedCompressedAndEncrypted") + void testEmpty(boolean encode, boolean compress, boolean encrypt) throws Exception { + final LuceneSerializer serializer = getSerializer(compress, encrypt); + final byte[] value = new byte[0]; + final byte[] encodedValue = encode ? serializer.encode(value) : value; + final byte[] decodedValue = serializer.decodePossiblyWithoutPrefix(encodedValue); + Assertions.assertArrayEquals(value, decodedValue); + } + + static Stream encodedCompressedAndEncrypted() { + return Stream.of( + Arguments.of(false, false, false), + Arguments.of(true, false, false), + Arguments.of(true, false, true), + Arguments.of(true, true, false), + Arguments.of(true, true, true) + ); + } + + private LuceneSerializer getSerializer(boolean compress, boolean encrypt) throws GeneralSecurityException { + final SerializationKeyManager keyManager; + if (encrypt) { + KeyGenerator keyGen = KeyGenerator.getInstance("AES"); + keyGen.init(128); + SecretKey key = keyGen.generateKey(); + keyManager = new FixedZeroKeyManager(key, null, null); + } else { + keyManager = null; + } + return new LuceneSerializer(compress, encrypt, keyManager); + } } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/codec/TestFDBDirectory.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/codec/TestFDBDirectory.java index caa1a42b26..88fe127d15 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/codec/TestFDBDirectory.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/codec/TestFDBDirectory.java @@ -47,6 +47,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -163,7 +164,7 @@ public IndexInput openInput(@Nonnull final String name, @Nonnull final IOContext getAgilityContext().getRange(key, ByteArrayUtil.strinc(key)))); final Map storedFields = rawStoredFields.stream().collect(Collectors.toMap( keyValue -> storedFieldsSubspace.unpack(keyValue.getKey()).getLong(1), - keyValue -> keyValue.getValue() + keyValue -> Objects.requireNonNull(Objects.requireNonNull(getSerializer()).decodePossiblyWithoutPrefix(keyValue.getValue())) )); final NonnullPair> previous = previousStoredFields.getAndSet(NonnullPair.of(name, storedFields)); if (previous != null) {