diff --git a/BUCK b/BUCK index c7fd89eeb18d..1c25d11f4603 100644 --- a/BUCK +++ b/BUCK @@ -310,6 +310,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "utilities/memory/memory_util.cc", "utilities/merge_operators.cc", "utilities/merge_operators/bytesxor.cc", + "utilities/merge_operators/int64add/int64_add.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", "utilities/merge_operators/sortlist.cc", @@ -5292,6 +5293,12 @@ cpp_unittest_wrapper(name="merge_helper_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="merge_operators_test", + srcs=["utilities/merge_operators/test/merge_operators_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="merge_test", srcs=["db/merge_test.cc"], deps=[":rocksdb_test_lib"], @@ -5532,12 +5539,6 @@ cpp_unittest_wrapper(name="stats_history_test", extra_compiler_flags=[]) -cpp_unittest_wrapper(name="stringappend_test", - srcs=["utilities/merge_operators/string_append/stringappend_test.cc"], - deps=[":rocksdb_test_lib"], - extra_compiler_flags=[]) - - cpp_unittest_wrapper(name="table_properties_collector_test", srcs=["db/table_properties_collector_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/CMakeLists.txt b/CMakeLists.txt index 03837b672ac4..281a1941e452 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -961,6 +961,7 @@ set(SOURCES utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/uint64add.cc + utilities/merge_operators/int64add/int64_add.cc utilities/object_registry.cc utilities/option_change_migration/option_change_migration.cc utilities/options/options_util.cc @@ -1526,7 +1527,7 @@ if(WITH_TESTS) utilities/checkpoint/checkpoint_test.cc utilities/env_timed_test.cc utilities/memory/memory_test.cc - utilities/merge_operators/string_append/stringappend_test.cc + utilities/merge_operators/test/merge_operators_test.cc utilities/object_registry_test.cc utilities/option_change_migration/option_change_migration_test.cc utilities/options/options_util_test.cc diff --git a/Makefile b/Makefile index 403c804c17f7..1063e9f3e284 100644 --- a/Makefile +++ b/Makefile @@ -1349,7 +1349,7 @@ option_change_migration_test: $(OBJ_DIR)/utilities/option_change_migration/optio agg_merge_test: $(OBJ_DIR)/utilities/agg_merge/agg_merge_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) -stringappend_test: $(OBJ_DIR)/utilities/merge_operators/string_append/stringappend_test.o $(TEST_LIBRARY) $(LIBRARY) +merge_operators_test: $(OBJ_DIR)/utilities/merge_operators/test/merge_operators_test.o $(OBJ_DIR)/utilities/merge_operators/test/stringappend_test.o $(OBJ_DIR)/utilities/merge_operators/test/int64add_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) cassandra_format_test: $(OBJ_DIR)/utilities/cassandra/cassandra_format_test.o $(TEST_LIBRARY) $(LIBRARY) diff --git a/build_tools/run_ci_db_test.ps1 b/build_tools/run_ci_db_test.ps1 index f20d3213fd03..4ae01214ef18 100644 --- a/build_tools/run_ci_db_test.ps1 +++ b/build_tools/run_ci_db_test.ps1 @@ -41,7 +41,7 @@ $RunOnly = New-Object System.Collections.Generic.HashSet[string] $RunOnly.Add("c_test") | Out-Null $RunOnly.Add("compact_on_deletion_collector_test") | Out-Null $RunOnly.Add("merge_test") | Out-Null -$RunOnly.Add("stringappend_test") | Out-Null # Apparently incorrectly written +$RunOnly.Add("merge_operators_test") | Out-Null # Apparently incorrectly written $RunOnly.Add("backup_engine_test") | Out-Null # Disabled $RunOnly.Add("timer_queue_test") | Out-Null # Not a gtest diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index ffc374102699..283320283c20 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -185,6 +185,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/IndexType.java src/main/java/org/rocksdb/InfoLogLevel.java src/main/java/org/rocksdb/IngestExternalFileOptions.java + src/main/java/org/rocksdb/Int64AddOperator.java src/main/java/org/rocksdb/KeyMayExist.java src/main/java/org/rocksdb/LRUCache.java src/main/java/org/rocksdb/LevelMetaData.java @@ -298,6 +299,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/util/BytewiseComparator.java src/main/java/org/rocksdb/util/Environment.java src/main/java/org/rocksdb/util/IntComparator.java + src/main/java/org/rocksdb/util/MergeEncodings.java src/main/java/org/rocksdb/util/ReverseBytewiseComparator.java src/main/java/org/rocksdb/util/SizeUnit.java src/main/java/org/rocksdb/util/StdErrLogger.java @@ -426,6 +428,7 @@ set(JAVA_TEST_CLASSES src/test/java/org/rocksdb/util/ReverseBytewiseComparatorIntTest.java src/test/java/org/rocksdb/util/SizeUnitTest.java src/test/java/org/rocksdb/util/StdErrLoggerTest.java + src/test/java/org/rocksdb/util/MergeEncodingsTest.java src/test/java/org/rocksdb/util/TestUtil.java ) @@ -716,6 +719,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4") org.rocksdb.HashSkipListMemTableConfig org.rocksdb.HyperClockCache org.rocksdb.IngestExternalFileOptions + org.rocksdb.Int64AddOperator org.rocksdb.Logger org.rocksdb.LRUCache org.rocksdb.MemoryUtil diff --git a/java/Makefile b/java/Makefile index 5e00921c62b9..917253129a0f 100644 --- a/java/Makefile +++ b/java/Makefile @@ -35,6 +35,7 @@ NATIVE_JAVA_CLASSES = \ org.rocksdb.FlushOptions\ org.rocksdb.Filter\ org.rocksdb.IngestExternalFileOptions\ + org.rocksdb.Int64AddOperator\ org.rocksdb.HashLinkedListMemTableConfig\ org.rocksdb.HashSkipListMemTableConfig\ org.rocksdb.ConcurrentTaskLimiter\ @@ -140,6 +141,7 @@ JAVA_TESTS = \ org.rocksdb.IngestExternalFileOptionsTest\ org.rocksdb.util.IntComparatorTest\ org.rocksdb.util.JNIComparatorTest\ + org.rocksdb.util.MergeEncodingsTest\ org.rocksdb.FilterTest\ org.rocksdb.FlushTest\ org.rocksdb.ImportColumnFamilyTest\ diff --git a/java/rocksjni/merge_operator.cc b/java/rocksjni/merge_operator.cc index 5696a058604d..7ebbd1a6deeb 100644 --- a/java/rocksjni/merge_operator.cc +++ b/java/rocksjni/merge_operator.cc @@ -16,6 +16,7 @@ #include #include +#include "include/org_rocksdb_Int64AddOperator.h" #include "include/org_rocksdb_StringAppendOperator.h" #include "include/org_rocksdb_UInt64AddOperator.h" #include "rocksdb/db.h" @@ -96,3 +97,30 @@ void Java_org_rocksdb_UInt64AddOperator_disposeInternalJni(JNIEnv* /*env*/, jhandle); delete sptr_uint64_add_op; // delete std::shared_ptr } + +/* + * Class: org_rocksdb_Int64AddOperator + * Method: newSharedInt64AddOperator + * Signature: ()J + */ +jlong Java_org_rocksdb_Int64AddOperator_newSharedInt64AddOperator( + JNIEnv* /*env*/, jclass /*jclazz*/) { + auto* sptr_int64_add_op = + new std::shared_ptr( + ROCKSDB_NAMESPACE::MergeOperators::CreateInt64AddOperator()); + return GET_CPLUSPLUS_POINTER(sptr_int64_add_op); +} + +/* + * Class: org_rocksdb_Int64AddOperator + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_Int64AddOperator_disposeInternal(JNIEnv* /*env*/, + jobject /*jobj*/, + jlong jhandle) { + auto* sptr_int64_add_op = + reinterpret_cast*>( + jhandle); + delete sptr_int64_add_op; // delete this instance of the std::shared_ptr +} diff --git a/java/src/main/java/org/rocksdb/Int64AddOperator.java b/java/src/main/java/org/rocksdb/Int64AddOperator.java new file mode 100644 index 000000000000..174c0dd2abf3 --- /dev/null +++ b/java/src/main/java/org/rocksdb/Int64AddOperator.java @@ -0,0 +1,16 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +public class Int64AddOperator extends MergeOperator { + protected Int64AddOperator() { + super(newSharedInt64AddOperator()); + } + + @Override protected native void disposeInternal(final long handle); + + private static native long newSharedInt64AddOperator(); +} diff --git a/java/src/main/java/org/rocksdb/util/MergeEncodings.java b/java/src/main/java/org/rocksdb/util/MergeEncodings.java new file mode 100644 index 000000000000..14d06c8fd8d2 --- /dev/null +++ b/java/src/main/java/org/rocksdb/util/MergeEncodings.java @@ -0,0 +1,150 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb.util; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +/** + * Java implementations of merge encodings for merge operators including {@link + * org.rocksdb.Int64AddOperator} + */ +public class MergeEncodings { + /** + * Zigzag decode an unsigned long value to a signed long value + * Use the parameter to index into the list [0,-1,1,-2,2,-3,3,...] + * unsigned integers are in 1:1 mapping to the signed integers + * + * @param zigzag to decode + * @return the value at {@code zigzag}'s index in the list [0,-1,1,-2,2,-3,3,...] + */ + public static long fromZigzag(final long zigzag) { + final long half = zigzag / 2; + return (half + half == zigzag) ? half : -half - 1; + } + + /** + * Zigzag encode a signed long value to an unsigned long value + * Each signed long integer is represented by its position in the list [0,-1,1,-2,2,-3,3,...] + * unsigned integers are in 1:1 mapping to the signed integers + * + * @param value to encode + * @return {@code value}'s position in the list [0,-1,1,-2,2,-3,3,...] + */ + public static long toZigzag(final long value) { + final long MAX_ZIGZAG = Long.MAX_VALUE / 2; + final long MIN_ZIGZAG = Long.MIN_VALUE / 2; + if (value < MIN_ZIGZAG || value > MAX_ZIGZAG) { + throw new IllegalArgumentException( + "Zigzag can only be applied to Long.MIN_VALUE/2..Long.MAX_VALUE/2, parameter is " + + value); + } + final long twice = value + value; + return (value >= 0) ? twice : -twice - 1; + } + + /** + * Create a variable length encoding of a signed integer + * The encoding is little-endian (least significant byte first) + * and each byte encodes a full 8 bits of the supplied {@code value} + * + * This is possible because the length of the {@code byte[]} is known/stored elsewhere, + * such as in the value length of a {@code (key,value)}-pair, so we always know how many + * bytes to decode. + * + * The MSB is 2s complement, and is sign-extended on decoding. + * + * @param value to encode as a sequence of bytes + * @return a byte array encoding the value, and which is just exactly as long as necessary + */ + @SuppressWarnings("PMD.AvoidReassigningParameters") + public static byte[] encodeVarintSigned(long value) { + final byte[] bytes = new byte[Long.BYTES]; + int i = 0; + if (value < 0) { + while (value < -0x80) { + bytes[i++] = (byte) (value & 0xff); + value >>= 8; + } + } else { + while (value > 0x7f) { + bytes[i++] = (byte) (value & 0xff); + value >>= 8; + } + } + bytes[i++] = (byte) (value & 0xff); + return Arrays.copyOfRange(bytes, 0, i); + } + + /** + * Decode a variable length encoding of a signed integer + * The encoding is little-endian (least significant byte first) + * and each byte encodes a full 8 bits of the supplied {@code value}, + * except for the MSB, which is 2s complement + * + * This is possible because the length of {@code byte[] bytes} is known/stored elsewhere, + * such as in the value length of a {@code (key,value)}-pair, so we always know how many + * bytes to decode. + * + * @param bytes which encode the value as a sequence of bytes + * @return the value decoded from {@code bytes} + */ + public static long decodeVarintSigned(final byte[] bytes) { + long acc = 0; + if (bytes.length > 0) { + int pos = bytes.length; + acc = bytes[--pos]; + while (pos > 0) { + acc = (acc << 8) | (bytes[--pos] & 0xff); + } + } + + return acc; + } + + /** + * Encode a signed value + * @param value to encode + * @return the signed value's encoding as a variable-length run of bytes + */ + public static byte[] encodeSigned(final long value) { + return encodeVarintSigned(value); + } + + /** + * Decode a run of bytes into a signed values + * @param bytes to decode + * @return the signed value represented by the encoding + */ + public static long decodeSigned(final byte[] bytes) { + return decodeVarintSigned(bytes); + } + + /** + * Fixed-length encode a long into a byte array + * @param l the long value to encode + * @return the byte array into which the value is encoded + */ + public static byte[] longToByteArray(final long l) { + final ByteBuffer buf = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); + buf.putLong(l); + return buf.array(); + } + + /** + * Decode a fixed-length byte array into a long value + * @param a the byte-array to decode + * @return the long value encoded in the input array + */ + public static long longFromByteArray(final byte[] a) { + final ByteBuffer buf = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); + buf.put(a); + buf.position(Math.max(buf.position(), Long.BYTES)); // guard against BufferOverflowException + buf.flip(); + return buf.getLong(); + } +} diff --git a/java/src/test/java/org/rocksdb/MergeTest.java b/java/src/test/java/org/rocksdb/MergeTest.java index 10ffeb7788b8..d11ec3fce11a 100644 --- a/java/src/test/java/org/rocksdb/MergeTest.java +++ b/java/src/test/java/org/rocksdb/MergeTest.java @@ -6,6 +6,7 @@ package org.rocksdb; import static org.assertj.core.api.Assertions.assertThat; +import static org.rocksdb.util.MergeEncodings.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -56,6 +57,8 @@ static long longFromByteArray(final byte[] a) { final ByteBuffer buf = ByteBuffer.allocate(Long.SIZE / Byte.SIZE).order(ByteOrder.LITTLE_ENDIAN); buf.put(a); + buf.position( + Math.max(buf.position(), Long.SIZE / Byte.SIZE)); // guard against BufferOverflowException buf.flip(); return buf.getLong(); } @@ -318,6 +321,68 @@ public void cFUInt64AddOperatorOption() assertThat(longValue).isEqualTo(101); assertThat(longValueTmpCf).isEqualTo(250); } + } finally { + for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) { + columnFamilyHandle.close(); + } + } + } + } + } + + @Test + public void cFInt64AddOperatorOption() throws InterruptedException, RocksDBException { + try (final Int64AddOperator int64AddOperator = new Int64AddOperator(); + final ColumnFamilyOptions cfOpt1 = + new ColumnFamilyOptions().setMergeOperator(int64AddOperator); + final ColumnFamilyOptions cfOpt2 = + new ColumnFamilyOptions().setMergeOperator(int64AddOperator)) { + final List cfDescriptors = + Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpt1), + new ColumnFamilyDescriptor("new_cf".getBytes(), cfOpt2)); + final List columnFamilyHandleList = new ArrayList<>(); + try (final DBOptions opt = + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + final RocksDB db = RocksDB.open( + opt, dbFolder.getRoot().getAbsolutePath(), cfDescriptors, columnFamilyHandleList)) { + try { + // writing (long)100 under key + db.put(columnFamilyHandleList.get(1), "cfkey".getBytes(), encodeSigned(50)); + // merge (long)1 under key + db.merge(columnFamilyHandleList.get(1), "cfkey".getBytes(), encodeSigned(1)); + byte[] value = db.get(columnFamilyHandleList.get(1), "cfkey".getBytes()); + assertThat(decodeSigned(value)).isEqualTo(51); + + // Test also with createColumnFamily + try (final ColumnFamilyOptions cfHandleOpts = + new ColumnFamilyOptions().setMergeOperator(int64AddOperator); + final ColumnFamilyHandle cfHandle = db.createColumnFamily( + new ColumnFamilyDescriptor("new_cf2".getBytes(), cfHandleOpts))) { + // writing (long)200 under cfkey2 + db.put(cfHandle, "cfkey2".getBytes(), encodeSigned(200)); + // merge (long)50 under cfkey2 + db.merge(cfHandle, new WriteOptions(), "cfkey2".getBytes(), encodeSigned(50)); + value = db.get(cfHandle, "cfkey2".getBytes()); + + assertThat(decodeSigned(value)).isEqualTo(250); + + // writing negative value(s) under cfkey3 + db.put(cfHandle, "cfkey3".getBytes(), encodeSigned(-40)); + // merge (long)3 under cfkey3 + db.merge(cfHandle, new WriteOptions(), "cfkey3".getBytes(), encodeSigned(3)); + value = db.get(cfHandle, "cfkey3".getBytes()); + + assertThat(decodeSigned(value)).isEqualTo(-37); + + // writing negative value(s) under cfkey4 + db.put(cfHandle, "cfkey4".getBytes(), encodeSigned(-(long) Integer.MIN_VALUE)); + // merge (long)50 under cfkey4 + db.merge( + cfHandle, new WriteOptions(), "cfkey4".getBytes(), encodeSigned(-Short.MIN_VALUE)); + value = db.get(cfHandle, "cfkey4".getBytes()); + + assertThat(decodeSigned(value)).isEqualTo(-(long) Integer.MIN_VALUE - Short.MIN_VALUE); + } } finally { for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) { diff --git a/java/src/test/java/org/rocksdb/util/MergeEncodingsTest.java b/java/src/test/java/org/rocksdb/util/MergeEncodingsTest.java new file mode 100644 index 000000000000..502dd6f84c32 --- /dev/null +++ b/java/src/test/java/org/rocksdb/util/MergeEncodingsTest.java @@ -0,0 +1,85 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb.util; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.rocksdb.util.MergeEncodings.*; + +import org.junit.Test; + +public class MergeEncodingsTest { + @Test + public void testZigzagEncoding() { + assertThat(toZigzag(0)).isEqualTo(0); + assertThat(toZigzag(-1)).isEqualTo(1); + assertThat(toZigzag(1)).isEqualTo(2); + assertThat(toZigzag(-2)).isEqualTo(3); + assertThat(toZigzag(2)).isEqualTo(4); + assertThat(toZigzag(-3)).isEqualTo(5); + assertThat(toZigzag(3)).isEqualTo(6); + assertThat(toZigzag(-4)).isEqualTo(7); + assertThat(toZigzag(100)).isEqualTo(200); + assertThat(fromZigzag(202)).isEqualTo(101); + } + + @Test + public void testVarintDecoding() { + byte[] bytes = new byte[1]; + bytes[0] = 127; + assertThat(decodeVarintSigned(bytes)).isEqualTo(127); + bytes[0] = -127; + assertThat(decodeVarintSigned(bytes)).isEqualTo(-127); + + bytes = new byte[2]; + bytes[1] = -1; + assertThat(decodeVarintSigned(bytes)).isEqualTo(-256); + bytes[0] = 3; + assertThat(decodeVarintSigned(bytes)).isEqualTo(-253); + + bytes = new byte[3]; + bytes[2] = -1; + assertThat(decodeVarintSigned(bytes)).isEqualTo(-65536); + bytes[1] = -128; + assertThat(decodeVarintSigned(bytes)).isEqualTo(-32768); + bytes[2] = 0; + assertThat(decodeVarintSigned(bytes)).isEqualTo(32768); + bytes[1] = 0; + bytes[2] = 1; + assertThat(decodeVarintSigned(bytes)).isEqualTo(65536); + } + + @Test + public void testVarintEncoding() { + byte[] bytes; + + bytes = encodeVarintSigned(127); + assertThat(bytes.length).isEqualTo(1); + assertThat(bytes[0]).isEqualTo((byte) 127); + + bytes = encodeVarintSigned(128); + assertThat(bytes.length).isEqualTo(2); + assertThat(bytes[0]).isEqualTo((byte) -128); + assertThat(bytes[1]).isEqualTo((byte) 0); + + bytes = encodeVarintSigned(32768); + assertThat(bytes.length).isEqualTo(3); + assertThat(bytes[0]).isEqualTo((byte) 0); + assertThat(bytes[1]).isEqualTo((byte) -128); + assertThat(bytes[2]).isEqualTo((byte) 0); + } + @Test + public void testSignedEncoding() { + final long[] values = new long[] {127, 128, 129, -1, 0, 1, -32769, 32769, -32768, 32768, -32767, + 32767, -65537, 65537, -65536, 65536, -65535, 65535, Integer.MAX_VALUE, Integer.MIN_VALUE, + Long.MAX_VALUE, Long.MIN_VALUE}; + for (long value : values) { + assertThat(decodeSigned(encodeSigned(value))).isEqualTo(value); + } + for (long value : values) { + assertThat(decodeVarintSigned(encodeVarintSigned(value))).isEqualTo(value); + } + } +} diff --git a/options/customizable_test.cc b/options/customizable_test.cc index 8549e7947fa8..199cc12778aa 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -1900,10 +1900,15 @@ TEST_F(LoadCustomizableTest, LoadMemTableRepFactoryTest) { TEST_F(LoadCustomizableTest, LoadMergeOperatorTest) { std::shared_ptr result; std::vector failed; - std::unordered_set expected = { - "put", "put_v1", "PutOperator", "uint64add", "UInt64AddOperator", - "max", "MaxOperator", - }; + std::unordered_set expected = {"put", + "put_v1", + "PutOperator", + "uint64add", + "UInt64AddOperator", + "max", + "MaxOperator", + "int64add", + "Int64AddOperator"}; expected.insert({ StringAppendOperator::kClassName(), StringAppendOperator::kNickName(), diff --git a/options/options_test.cc b/options/options_test.cc index 7111872f541b..50a33ae3d09d 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -5103,6 +5103,12 @@ TEST_F(ConfigOptionsTest, MergeOperatorFromString) { ASSERT_NE(merge_op, nullptr); ASSERT_TRUE(merge_op->IsInstanceOf("PutOperator")); + ASSERT_OK( + MergeOperator::CreateFromString(config_options, "int64add", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("int64add")); + ASSERT_STREQ(merge_op->Name(), "Int64AddOperator"); + ASSERT_OK( MergeOperator::CreateFromString(config_options, "uint64add", &merge_op)); ASSERT_NE(merge_op, nullptr); diff --git a/src.mk b/src.mk index 5eac640572d1..5facf8fafae9 100644 --- a/src.mk +++ b/src.mk @@ -299,6 +299,7 @@ LIB_SOURCES = \ utilities/merge_operators/string_append/stringappend.cc \ utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/uint64add.cc \ + utilities/merge_operators/int64add/int64_add.cc \ utilities/merge_operators/bytesxor.cc \ utilities/object_registry.cc \ utilities/option_change_migration/option_change_migration.cc \ @@ -642,7 +643,7 @@ TEST_MAIN_SOURCES = \ utilities/checkpoint/checkpoint_test.cc \ utilities/env_timed_test.cc \ utilities/memory/memory_test.cc \ - utilities/merge_operators/string_append/stringappend_test.cc \ + utilities/merge_operators/test/merge_operators_test.cc \ utilities/object_registry_test.cc \ utilities/option_change_migration/option_change_migration_test.cc \ utilities/options/options_util_test.cc \ diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 64eb676d7cfc..b04a76d70e18 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -1339,7 +1339,7 @@ def execute_cmd(cmd, timeout=None, timeout_pstack=False): print("KILLED %d\n" % child.pid) outs, errs = child.communicate() - return hit_timeout, child.returncode, outs.decode("utf-8"), errs.decode("utf-8") + return hit_timeout, child.returncode, outs.decode("utf-8", "backslashreplace"), errs.decode("utf-8", "backslashreplace") def print_output_and_exit_on_error(stdout, stderr, print_stderr_separately=False): diff --git a/util/coding.h b/util/coding.h index 8648d9a13ba2..69d06da53424 100644 --- a/util/coding.h +++ b/util/coding.h @@ -195,6 +195,36 @@ inline void PutVarsignedint64(std::string* dst, int64_t v) { dst->append(buf, static_cast(ptr - buf)); } +/** + * @brief encode a signed int using 8 bits of every byte + * + * Efficient use of all 8 bits of as many bytes as necessary + * because the length of the encoding is known independently, + * e.g. as the length of the "value" in a (key,value)-pair + * + * -128..127 are a single byte sign extended value + * -32768..32767 are a 2-byte sign extended value + * + * @param dst buffer to encode into + * @param v value to encode + */ +inline void Put8BitVarsignedint64(std::string* dst, int64_t v) { + if (v >= 0) { + while (v > 0x7f) { + unsigned char byte = static_cast(v) & 0xff; + dst->push_back(byte); + v >>= 8; + } + } else { + while (v < -0x80) { + unsigned char byte = static_cast(v) & 0xff; + dst->push_back(byte); + v >>= 8; + } + } + dst->push_back(static_cast(v)); +} + inline void PutVarint64Varint64(std::string* dst, uint64_t v1, uint64_t v2) { char buf[20]; char* ptr = EncodeVarint64(buf, v1); @@ -317,6 +347,36 @@ inline bool GetVarsignedint64(Slice* input, int64_t* value) { } } +/** + * @brief decode a signed int encoded using 8 bits of every byte, inverse of + * `Put8BitVarsignedint64` + * + * Efficient use of all 8 bits of as many bytes as necessary + * because the length of the encoding is known independently, + * e.g. as the length of the "value" in a (key,value)-pair + * + * -128..127 are a single byte sign extended value + * -32768..32767 are a 2-byte sign extended value + * + * @param input buffer to decode out of + * @return an int64_t which is the decoded value of the buffer + */ +inline int64_t Get8BitVarsignedint64(Slice* input) { + const char* start = input->data(); + const char* p = start + input->size(); + int64_t s = 0; + if (start < p) { + s = *(reinterpret_cast(--p)); + while (start < p) { + int64_t byte = + static_cast(*(reinterpret_cast(--p))); + s = (s * 0x100) | byte; // s = (s << 8) | byte; fails ubsan check, + // -ve << is undefined in C/C++ + } + } + return s; +} + inline bool GetLengthPrefixedSlice(Slice* input, Slice* result) { uint32_t len = 0; if (GetVarint32(input, &len) && input->size() >= len) { diff --git a/utilities/merge_operators.cc b/utilities/merge_operators.cc index 020064d0924a..f757a505e4fc 100644 --- a/utilities/merge_operators.cc +++ b/utilities/merge_operators.cc @@ -12,6 +12,7 @@ #include "rocksdb/utilities/customizable_util.h" #include "rocksdb/utilities/object_registry.h" #include "utilities/merge_operators/bytesxor.h" +#include "utilities/merge_operators/int64add/int64_add.h" #include "utilities/merge_operators/max_operator.h" #include "utilities/merge_operators/put_operator.h" #include "utilities/merge_operators/sortlist.h" @@ -20,6 +21,7 @@ #include "utilities/merge_operators/uint64add.h" namespace ROCKSDB_NAMESPACE { + static int RegisterBuiltinMergeOperators(ObjectLibrary& library, const std::string& /*arg*/) { size_t num_types; @@ -63,6 +65,14 @@ static int RegisterBuiltinMergeOperators(ObjectLibrary& library, guard->reset(new UInt64AddOperator()); return guard->get(); }); + library.AddFactory( + ObjectLibrary::PatternEntry(Int64AddOperator::kClassName()) + .AnotherName(Int64AddOperator::kNickName()), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new Int64AddOperator()); + return guard->get(); + }); library.AddFactory( ObjectLibrary::PatternEntry(MaxOperator::kClassName()) .AnotherName(MaxOperator::kNickName()), diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 9b90107e3983..7e17ab622c94 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -17,6 +17,7 @@ class MergeOperators { public: static std::shared_ptr CreatePutOperator(); static std::shared_ptr CreateDeprecatedPutOperator(); + static std::shared_ptr CreateInt64AddOperator(); static std::shared_ptr CreateUInt64AddOperator(); static std::shared_ptr CreateStringAppendOperator(); static std::shared_ptr CreateStringAppendOperator( diff --git a/utilities/merge_operators/int64add/int64_add.cc b/utilities/merge_operators/int64add/int64_add.cc new file mode 100644 index 000000000000..c928dfc86dd7 --- /dev/null +++ b/utilities/merge_operators/int64add/int64_add.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/merge_operators/int64add/int64_add.h" + +#include + +#include "logging/logging.h" +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +// A 'model' merge operator with int64 addition semantics +// operands and database value should be variable length encoded +// int64_t values, as encoded/decoded by `util/coding.h`. +bool Int64AddOperator::Merge(const Slice&, const Slice* existing_value, + const Slice& value, std::string* new_value, + Logger* /*logger*/) const { + int64_t orig_value = 0; + if (existing_value) { + Slice ev(*existing_value); + orig_value = Get8BitVarsignedint64(&ev); + } + + Slice v(value); + int64_t operand = Get8BitVarsignedint64(&v); + + assert(new_value); + new_value->clear(); + const int64_t new_number = orig_value + operand; + ROCKSDB_NAMESPACE::Put8BitVarsignedint64(new_value, new_number); + + return true; +} + +std::shared_ptr MergeOperators::CreateInt64AddOperator() { + return std::make_shared(); +} + +}; // namespace ROCKSDB_NAMESPACE diff --git a/utilities/merge_operators/int64add/int64_add.h b/utilities/merge_operators/int64add/int64_add.h new file mode 100644 index 000000000000..cf71b0df93cf --- /dev/null +++ b/utilities/merge_operators/int64add/int64_add.h @@ -0,0 +1,31 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// A 'model' merge operator with int64 addition semantics +// operands and database value should be variable length encoded +// int64_t values, as encoded/decoded by `util/coding.h`. + +#pragma once + +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { +class Logger; +class Slice; + +class Int64AddOperator : public AssociativeMergeOperator { + public: + static const char* kClassName() { return "Int64AddOperator"; } + static const char* kNickName() { return "int64add"; } + const char* Name() const override { return kClassName(); } + const char* NickName() const override { return kNickName(); } + + bool Merge(const Slice& /*key*/, const Slice* existing_value, + const Slice& value, std::string* new_value, + Logger* logger) const override; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/merge_operators/test/int64add_test.cc b/utilities/merge_operators/test/int64add_test.cc new file mode 100644 index 000000000000..ba0e142c83de --- /dev/null +++ b/utilities/merge_operators/test/int64add_test.cc @@ -0,0 +1,266 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +/** + * This file has no main() - it contributes tests to merge_operators_test + * + */ + +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "test_util/testharness.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +class Int64AddMergeOperatorTest : public testing::Test { + public: + Int64AddMergeOperatorTest() { + options_.merge_operator = MergeOperators::CreateFromStringId("int64add"); + options_.create_if_missing = true; + dbname_ = test::PerThreadDBPath("int64add_merge_operator_test"); + EXPECT_OK(DestroyDB(dbname_, options_)); + } + + ~Int64AddMergeOperatorTest() { + if (db_ != nullptr) { + delete db_; + EXPECT_OK(DestroyDB(dbname_, options_)); + } + } + + Status OpenDB() { return DB::Open(options_, dbname_, &db_); } + + public: + DB* db_; + std::string dbname_; + Options options_; + WriteOptions write_opts_; + ReadOptions read_opts_; +}; + +class EmptyDbTest : public Int64AddMergeOperatorTest, + public testing::WithParamInterface { + public: + EmptyDbTest() : Int64AddMergeOperatorTest() {} +}; + +class NonEmptyDbTest + : public Int64AddMergeOperatorTest, + public testing::WithParamInterface> { + public: + NonEmptyDbTest() : Int64AddMergeOperatorTest() {} +}; + +class EncodeDecodeTest : public testing::Test, + public testing::WithParamInterface {}; + +TEST_P(EmptyDbTest, MergeEmptyDb) { + const int64_t merge_num = GetParam(); + + std::string value; + + ASSERT_OK(OpenDB()); + + Put8BitVarsignedint64(&value, merge_num); + Status s = + db_->Merge(write_opts_, "key", value); // Merging merge_num under key + ASSERT_OK(s); + + value.clear(); + + s = db_->Get(read_opts_, "key", &value); + ASSERT_OK(s); + Slice read_slice(value); + int64_t read_value = Get8BitVarsignedint64(&read_slice); + + const int64_t expected = 0 + merge_num; + ASSERT_EQ(read_value, + expected); // Merge operators should have been applied on empty db, + // i.e. 0 and then added merge_num. +} + +TEST_P(EmptyDbTest, MergeEmptyDbCf) { + const int64_t merge_num = GetParam(); + + std::string value; + + ASSERT_OK(OpenDB()); + ColumnFamilyOptions cf_opts; + cf_opts.merge_operator = options_.merge_operator; + ColumnFamilyHandle* cf1; + Status s = db_->CreateColumnFamily(cf_opts, "cf1", &cf1); + ASSERT_OK(s); + + Put8BitVarsignedint64(&value, merge_num); + s = db_->Merge(write_opts_, cf1, "key", + value); // Merging merge_num under key + ASSERT_OK(s); + + value.clear(); + + s = db_->Get(read_opts_, cf1, "key", &value); + ASSERT_OK(s); + Slice read_slice(value); + int64_t read_value = Get8BitVarsignedint64(&read_slice); + + ASSERT_OK(db_->DropColumnFamily(cf1)); + ASSERT_OK(db_->DestroyColumnFamilyHandle(cf1)); + + const int64_t expected = 0 + merge_num; + ASSERT_EQ(read_value, + expected); // Merge operators should have been applied on empty db, + // i.e. 0 and then added merge_num. +} + +TEST_P(NonEmptyDbTest, MergeNonEmptyDb) { + const int64_t initial_db_num = std::get<0>(GetParam()); + const int64_t merge_num = std::get<1>(GetParam()); + + std::string value; + + ASSERT_OK(OpenDB()); + + Put8BitVarsignedint64(&value, initial_db_num); + Status s = + db_->Put(write_opts_, "key", value); // Put initial_db_num under key + ASSERT_OK(s); + + value.clear(); + + Put8BitVarsignedint64(&value, merge_num); + s = db_->Merge(write_opts_, "key", value); // Merging merge_num under key + ASSERT_OK(s); + + value.clear(); + + s = db_->Get(read_opts_, "key", &value); + ASSERT_OK(s); + Slice read_slice(value); + int64_t read_value = Get8BitVarsignedint64(&read_slice); + + const int64_t expected = initial_db_num + merge_num; + ASSERT_EQ(read_value, + expected); // Merge operators should have been applied on non-empty + // db, and then merge added merge_num. +} + +TEST_P(NonEmptyDbTest, MergeNonEmptyDbCf) { + const int64_t initial_db_num = std::get<0>(GetParam()); + const int64_t merge_num = std::get<1>(GetParam()); + + std::string value; + + ASSERT_OK(OpenDB()); + ColumnFamilyOptions cf_opts; + cf_opts.merge_operator = options_.merge_operator; + ColumnFamilyHandle* cf1; + Status s = db_->CreateColumnFamily(cf_opts, "cf1", &cf1); + + Put8BitVarsignedint64(&value, initial_db_num); + s = db_->Put(write_opts_, cf1, "key", value); // Put initial_db_num under key + ASSERT_OK(s); + + value.clear(); + + Put8BitVarsignedint64(&value, merge_num); + s = db_->Merge(write_opts_, cf1, "key", + value); // Merging merge_num under key + ASSERT_OK(s); + + value.clear(); + + s = db_->Get(read_opts_, cf1, "key", &value); + ASSERT_OK(s); + Slice read_slice(value); + int64_t read_value = Get8BitVarsignedint64(&read_slice); + + ASSERT_OK(db_->DropColumnFamily(cf1)); + ASSERT_OK(db_->DestroyColumnFamilyHandle(cf1)); + + const int64_t expected = initial_db_num + merge_num; + ASSERT_EQ(read_value, + expected); // Merge operators should have been applied on non-empty + // db, and then merge added merge_num. +} + +TEST_P(EncodeDecodeTest, EncodeDecode) { + std::string value; + const int64_t num = GetParam(); + + Put8BitVarsignedint64(&value, num); + Slice read_slice(value); + int64_t read_value = Get8BitVarsignedint64(&read_slice); + ASSERT_EQ(read_value, num); +} + +TEST_P(EncodeDecodeTest, EncodeDecodeSignExtend) { + std::string value; + const int64_t num = GetParam(); + + Put8BitVarsignedint64(&value, num); + Slice read_slice(value); + int64_t read_value = Get8BitVarsignedint64(&read_slice); + ASSERT_EQ(read_value, num); +} + +TEST_F(Int64AddMergeOperatorTest, MergeMultipleValues) { + std::string value; + + ASSERT_OK(OpenDB()); + + Put8BitVarsignedint64(&value, 123); + Status s = db_->Merge(write_opts_, "key", value); // Merging 123 under key + ASSERT_OK(s); + value.clear(); + + Put8BitVarsignedint64(&value, -1234); + s = db_->Merge(write_opts_, "key", value); // Merging -1234 under key + ASSERT_OK(s); + value.clear(); + + Put8BitVarsignedint64(&value, 99); + s = db_->Merge(write_opts_, "key", value); // Merging 99 under key + ASSERT_OK(s); + value.clear(); + + Put8BitVarsignedint64(&value, -101); + s = db_->Merge(write_opts_, "key", value); // Merging -101 under key + ASSERT_OK(s); + value.clear(); + + s = db_->Get(read_opts_, "key", &value); + ASSERT_OK(s); + Slice read_slice(value); + int64_t read_value = Get8BitVarsignedint64(&read_slice); + + const int64_t expected = 0 + 123 + (-1234) + 99 + (-101); + ASSERT_EQ(read_value, + expected); // Merge operators should have been applied on non-empty + // db, and then merge added merge_num. +} + +INSTANTIATE_TEST_CASE_P( + Int64AddMergeOperatorTest, EncodeDecodeTest, + testing::Values(0, 1, 2, -1, 2, 123, 127, -127, 128, -128, 254, -254, 255, + -255, 256, -256, 257, -257, 32767, -32767, 32768, -32768, + 32769, -32769, 65534, -65534, 65535, -65535, 65536, -65536, + 65537, -65537, 8388607, -8388607, 8388608, -8388608, + 8388609, -8388609, 16777215, -16777215, 16777217, -16777217, + 33554431, -33554431, 33554433, -33554433, 2147483647, + -2147483648)); +INSTANTIATE_TEST_CASE_P(Int64AddMergeOperatorTest, EmptyDbTest, + testing::Values(-255, -2, -1, 0, 1, 2, 255)); +INSTANTIATE_TEST_CASE_P( + Int64AddMergeOperatorTest, NonEmptyDbTest, + testing::Combine(testing::Values(-255, -2, -1, 0, 1, 2, 255), + testing::Values(-255, -2, -1, 0, 1, 2, 255))); + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/merge_operators/test/merge_operators_test.cc b/utilities/merge_operators/test/merge_operators_test.cc new file mode 100644 index 000000000000..878df5430f32 --- /dev/null +++ b/utilities/merge_operators/test/merge_operators_test.cc @@ -0,0 +1,16 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include "utilities/merge_operators.h" + +#include "port/stack_trace.h" +#include "test_util/testharness.h" + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/test/stringappend_test.cc similarity index 98% rename from utilities/merge_operators/string_append/stringappend_test.cc rename to utilities/merge_operators/test/stringappend_test.cc index acc71c8e49c1..46363651f4a4 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/test/stringappend_test.cc @@ -4,6 +4,11 @@ // (found in the LICENSE.Apache file in the root directory). // +/** + * This file has no main() - it contributes tests to merge_operators_test + * + */ + /** * An persistent map : key -> (list of strings), using rocksdb merge. * This file is a test-harness / use-case for the StringAppendOperator. @@ -18,7 +23,6 @@ #include #include -#include "port/stack_trace.h" #include "rocksdb/db.h" #include "rocksdb/merge_operator.h" #include "rocksdb/utilities/db_ttl.h" @@ -631,9 +635,3 @@ INSTANTIATE_TEST_CASE_P(StringAppendOperatorTest, StringAppendOperatorTest, testing::Bool()); } // namespace ROCKSDB_NAMESPACE - -int main(int argc, char** argv) { - ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}