From 0c722468a3360e8d85b627cd44f42928b326ed7c Mon Sep 17 00:00:00 2001 From: caojiajun Date: Fri, 3 Jan 2025 15:45:38 +0800 Subject: [PATCH] feat: keyReadWrite (#364) --- .../redis/base/resource/RedisType.java | 4 + .../camellia-redis-proxy-core/pom.xml | 21 +- .../redis/proxy/embedded/storage/wal/Wal.java | 7 - .../redis/proxy/reply/ErrorReply.java | 2 + .../embedded/storage/EmbeddedClient.java | 54 +++++ .../embedded/storage/codec/KeyCodec.java | 95 ++++++++ .../command/CommandOnEmbeddedStorage.java | 40 +++ .../command/EmbeddedStorageExecutors.java | 42 ++++ .../embedded/storage/command/string/Get.java | 57 +++++ .../embedded/storage/command/string/Set.java | 130 ++++++++++ .../storage/compress/CompressType.java | 29 +++ .../storage/compress/CompressUtils.java | 21 ++ .../storage/compress/ICompressor.java | 11 + .../storage/compress/NoneCompressor.java | 36 +++ .../storage/compress/ZstdCompressor.java | 36 +++ .../constants/EmbeddedStorageConstants.java | 11 + .../embedded/storage/enums/DataType.java | 2 +- .../embedded/storage/enums/FlushResult.java | 11 + .../embedded/storage/file/FileReadWrite.java | 15 ++ .../embedded/storage/key/KeyInfo.java | 189 +++++++++++++++ .../embedded/storage/key/KeyReadWrite.java | 59 +++++ .../storage/key/persist/KeyFlushExecutor.java | 228 ++++++++++++++++++ .../storage/key/persist/KeyFlushTask.java | 12 + .../storage/key/slot}/KeySlotMap.java | 37 ++- .../embedded/storage/key/slot}/SlotInfo.java | 2 +- .../storage/key/slot/SlotKeyBlockCache.java | 92 +++++++ .../storage/key/slot/SlotKeyReadWrite.java | 115 +++++++++ .../storage/key/util/KeyHashUtils.java | 13 + .../embedded/storage/value/ValueLocation.java | 8 + .../storage/value/hash/HashReadWrite.java | 7 + .../storage/value/set/SetReadWrite.java | 7 + .../storage/value/string/StringReadWrite.java | 18 ++ .../storage/value/zset/ZSetReadWrite.java | 7 + .../upstream/embedded/storage/wal/Wal.java | 7 + .../embedded/storage/wal/WalGroup.java | 13 + .../proxy/upstream/kv/utils/BytesUtils.java | 12 + .../redis/proxy/test/KeySlotMapTest.java | 4 +- 37 files changed, 1419 insertions(+), 35 deletions(-) delete mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/wal/Wal.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/EmbeddedClient.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/KeyCodec.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/EmbeddedStorageExecutors.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressType.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressUtils.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ICompressor.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/NoneCompressor.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java rename camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/{ => upstream}/embedded/storage/enums/DataType.java (91%) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushResult.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/FileReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushTask.java rename camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/{embedded/storage/key => upstream/embedded/storage/key/slot}/KeySlotMap.java (86%) rename camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/{embedded/storage/key => upstream/embedded/storage/key/slot}/SlotInfo.java (58%) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyBlockCache.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/util/KeyHashUtils.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/ValueLocation.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/hash/HashReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/set/SetReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/zset/ZSetReadWrite.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/Wal.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/WalGroup.java diff --git a/camellia-redis-base/src/main/java/com/netease/nim/camellia/redis/base/resource/RedisType.java b/camellia-redis-base/src/main/java/com/netease/nim/camellia/redis/base/resource/RedisType.java index d28062c74..ccd8985b7 100644 --- a/camellia-redis-base/src/main/java/com/netease/nim/camellia/redis/base/resource/RedisType.java +++ b/camellia-redis-base/src/main/java/com/netease/nim/camellia/redis/base/resource/RedisType.java @@ -87,6 +87,10 @@ public enum RedisType { //仅camellia-redis-proxy支持 RedisKV("redis-kv://", false), + //格式:embedded-storage:/home/data + //仅camellia-redis-proxy支持 + EmbeddedStorage("embedded-storage:", false), + ; private final String prefix; private final boolean tlsEnable; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/pom.xml b/camellia-redis-proxy/camellia-redis-proxy-core/pom.xml index 93beda4ac..4e5bf1a65 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/pom.xml +++ b/camellia-redis-proxy/camellia-redis-proxy-core/pom.xml @@ -13,6 +13,16 @@ + + com.netease.nim + camellia-redis-base + 1.4.0-SNAPSHOT + + + com.netease.nim + camellia-codec + 1.4.0-SNAPSHOT + org.jctools jctools-core @@ -33,11 +43,6 @@ slf4j-api ${slf4j.version} - - com.netease.nim - camellia-redis-base - 1.4.0-SNAPSHOT - com.googlecode.concurrentlinkedhashmap concurrentlinkedhashmap-lru @@ -48,6 +53,12 @@ camellia-http-console 1.4.0-SNAPSHOT + + com.github.luben + zstd-jni + 1.5.6-9 + + junit junit diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/wal/Wal.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/wal/Wal.java deleted file mode 100644 index 7efd56aea..000000000 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/wal/Wal.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.netease.nim.camellia.redis.proxy.embedded.storage.wal; - -/** - * Created by caojiajun on 2024/12/27 - */ -public class Wal { -} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java index a78d2492f..2ab74e282 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java @@ -44,6 +44,8 @@ public class ErrorReply implements Reply { public static final ErrorReply CROSS_SLOT_ERROR = new ErrorReply("CROSSSLOT Keys in request don't hash to the same slot"); public static final ErrorReply ILLEGAL_CLUSTER_HEATBEAT = new ErrorReply("ERR illegal cluster heatbeat"); + public static final ErrorReply KEY_TOO_LONG = new ErrorReply("ERR key too long"); + private static final char MARKER = Marker.ErrorReply.getMarker(); private final String error; private final byte[] raw; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/EmbeddedClient.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/EmbeddedClient.java new file mode 100644 index 000000000..7fa35d87d --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/EmbeddedClient.java @@ -0,0 +1,54 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage; + +import com.netease.nim.camellia.core.model.Resource; +import com.netease.nim.camellia.redis.proxy.command.Command; +import com.netease.nim.camellia.redis.proxy.reply.ErrorReply; +import com.netease.nim.camellia.redis.proxy.reply.Reply; +import com.netease.nim.camellia.redis.proxy.upstream.IUpstreamClient; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class EmbeddedClient implements IUpstreamClient { + + @Override + public void sendCommand(int db, List commands, List> futureList) { + if (db > 0) { + for (CompletableFuture future : futureList) { + future.complete(ErrorReply.DB_INDEX_OUT_OF_RANGE); + } + return; + } + } + + @Override + public void start() { + + } + + @Override + public void preheat() { + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public void shutdown() { + } + + @Override + public Resource getResource() { + return null; + } + + @Override + public void renew() { + + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/KeyCodec.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/KeyCodec.java new file mode 100644 index 000000000..8d05995f8 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/codec/KeyCodec.java @@ -0,0 +1,95 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec; + +import com.netease.nim.camellia.codec.Pack; +import com.netease.nim.camellia.codec.Unpack; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressUtils; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.ICompressor; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.kv.utils.BytesUtils; +import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils; +import com.netease.nim.camellia.tools.utils.BytesKey; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class KeyCodec { + + public static Map decodeSlot(byte[] all) { + ByteBuffer buffer = ByteBuffer.wrap(all); + int bucketSize = all.length / EmbeddedStorageConstants._4k; + Map result = new HashMap<>(); + for (int i=0; i map = KeyCodec.decodeBucket(bytes); + result.putAll(map); + } + return result; + } + + /** + * 解码bucket + * @param bytes 固定为4k输入 + * @return 解码结果 + */ + public static Map decodeBucket(byte[] bytes) { + int crc1 = BytesUtils.toInt(bytes, 0);//0,1,2,3 + int crc2 = RedisClusterCRC16Utils.getCRC16(bytes, 5, bytes.length); + if (crc1 != crc2) { + return new HashMap<>(); + } + int decompressLen = BytesUtils.toShort(bytes, 4);//4,5 + byte compressType = bytes[6];//6 + ICompressor compressor = CompressUtils.get(CompressType.getByValue(compressType)); + byte[] decompressData = compressor.decompress(bytes, 7, bytes.length - 7, decompressLen); + Unpack unpack = new Unpack(decompressData); + int size = unpack.popVarUint(); + Map map = new HashMap<>(); + for (int i=0; i keys) { + Pack pack = new Pack(); + pack.putVarUint(keys.size()); + for (Map.Entry entry : keys.entrySet()) { + pack.putMarshallable(entry.getValue()); + } + pack.getBuffer().capacity(pack.getBuffer().readableBytes()); + byte[] array = pack.getBuffer().array(); + short decompressLen = (short) array.length; + CompressType compressType = CompressType.zstd; + ICompressor compressor = CompressUtils.get(compressType); + byte[] compressed = compressor.compress(array, 0, array.length); + if (compressed.length > array.length) { + compressType = CompressType.none; + compressed = array; + } + if (compressed.length + 5 > EmbeddedStorageConstants._4k) { + return null; + } + int crc = RedisClusterCRC16Utils.getCRC16(compressed, 0, compressed.length); + ByteBuffer buffer = ByteBuffer.allocate(EmbeddedStorageConstants._4k); + buffer.putInt(crc); + buffer.putShort(decompressLen); + buffer.put(compressType.getType()); + buffer.put(compressed); + return buffer.array(); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java new file mode 100644 index 000000000..a1ccd56f6 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/CommandOnEmbeddedStorage.java @@ -0,0 +1,40 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command; + +import com.netease.nim.camellia.redis.proxy.command.Command; +import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; +import com.netease.nim.camellia.redis.proxy.reply.Reply; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string.StringReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.wal.WalGroup; + +/** + * Created by caojiajun on 2025/1/3 + */ +public abstract class CommandOnEmbeddedStorage { + + protected WalGroup walGroup; + + protected KeyReadWrite keyReadWrite; + protected StringReadWrite stringReadWrite; + + /** + * redis command of commander + * @return redis-command + */ + public abstract RedisCommand redisCommand(); + + /** + * check param + * @param command command + * @return success or fail + */ + protected abstract boolean parse(Command command); + + /** + * execute command + * @param slot slot + * @param command command + * @return reply + */ + protected abstract Reply execute(short slot, Command command) throws Exception; +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/EmbeddedStorageExecutors.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/EmbeddedStorageExecutors.java new file mode 100644 index 000000000..9ee097f3e --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/EmbeddedStorageExecutors.java @@ -0,0 +1,42 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command; + + +import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf; +import com.netease.nim.camellia.redis.proxy.util.MpscSlotHashExecutor; +import com.netease.nim.camellia.tools.utils.SysUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class EmbeddedStorageExecutors { + + private static final Logger logger = LoggerFactory.getLogger(EmbeddedStorageExecutors.class); + + private static volatile EmbeddedStorageExecutors INSTANCE; + + private final MpscSlotHashExecutor commandExecutor; + + private EmbeddedStorageExecutors() { + int threads = ProxyDynamicConf.getInt("embedded.storage.command.executor.threads", SysUtils.getCpuNum() * 4); + int queueSize = ProxyDynamicConf.getInt("embedded.storage.command.executor.queue.size", 1024*128); + commandExecutor = new MpscSlotHashExecutor("embedded-storage-command-executor", threads, queueSize, new MpscSlotHashExecutor.AbortPolicy()); + logger.info("EmbeddedStorageExecutors init success, threads = {}, queueSize = {}", threads, queueSize); + } + + public static EmbeddedStorageExecutors getInstance() { + if (INSTANCE == null) { + synchronized (EmbeddedStorageExecutors.class) { + if (INSTANCE == null) { + INSTANCE = new EmbeddedStorageExecutors(); + } + } + } + return INSTANCE; + } + + public MpscSlotHashExecutor getCommandExecutor() { + return commandExecutor; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java new file mode 100644 index 000000000..276a7f433 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Get.java @@ -0,0 +1,57 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.string; + +import com.netease.nim.camellia.redis.proxy.command.Command; +import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; +import com.netease.nim.camellia.redis.proxy.reply.BulkReply; +import com.netease.nim.camellia.redis.proxy.reply.ErrorReply; +import com.netease.nim.camellia.redis.proxy.reply.Reply; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; +import com.netease.nim.camellia.tools.utils.BytesKey; + +/** + * GET key + *

+ * Created by caojiajun on 2025/1/3 + */ +public class Get extends CommandOnEmbeddedStorage { + + @Override + public RedisCommand redisCommand() { + return RedisCommand.GET; + } + + @Override + protected boolean parse(Command command) { + byte[][] objects = command.getObjects(); + return objects.length == 2; + } + + @Override + protected Reply execute(short slot, Command command) throws Exception { + byte[][] objects = command.getObjects(); + BytesKey key = new BytesKey(objects[1]); + KeyInfo keyInfo = keyReadWrite.get(slot, key); + return execute0(keyInfo); + } + + private Reply execute0(KeyInfo keyInfo) { + if (keyInfo == null) { + return BulkReply.NIL_REPLY; + } + if (keyInfo.getDataType() != DataType.string) { + return ErrorReply.WRONG_TYPE; + } + if (keyInfo.containsExtra()) { + return new BulkReply(keyInfo.getExtra()); + } + ValueLocation valueLocation = keyInfo.getValueLocation(); + byte[] bytes = stringReadWrite.get(valueLocation); + if (bytes == null) { + return BulkReply.NIL_REPLY; + } + return new BulkReply(bytes); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java new file mode 100644 index 000000000..55a396094 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/command/string/Set.java @@ -0,0 +1,130 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.string; + +import com.netease.nim.camellia.redis.proxy.command.Command; +import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; +import com.netease.nim.camellia.redis.proxy.reply.BulkReply; +import com.netease.nim.camellia.redis.proxy.reply.ErrorReply; +import com.netease.nim.camellia.redis.proxy.reply.Reply; +import com.netease.nim.camellia.redis.proxy.reply.StatusReply; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; +import com.netease.nim.camellia.redis.proxy.upstream.kv.command.string.SetCommander; +import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector; +import com.netease.nim.camellia.redis.proxy.util.Utils; +import com.netease.nim.camellia.tools.utils.BytesKey; + +/** + * SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL] + * Created by caojiajun on 2025/1/3 + */ +public class Set extends CommandOnEmbeddedStorage { + + private static final int nx = 1; + private static final int xx = 2; + + @Override + public RedisCommand redisCommand() { + return RedisCommand.SET; + } + + @Override + protected boolean parse(Command command) { + byte[][] objects = command.getObjects(); + return objects.length >= 3; + } + + @Override + protected Reply execute(short slot, Command command) throws Exception { + byte[][] objects = command.getObjects(); + BytesKey key = new BytesKey(objects[1]); + byte[] value = objects[2]; + int nxxx = -1; + long expireTime = -1; + boolean get = false; + boolean keepTtl = false; + if (objects.length > 3) { + for (int i=3; i 0 && keepTtl) { + ErrorLogCollector.collect(SetCommander.class, "set command syntax error, expireTime > 0 && keepTtl"); + return ErrorReply.SYNTAX_ERROR; + } + + KeyInfo keyInfo = null; + if (nxxx == nx) { + if (get) { + ErrorLogCollector.collect(SetCommander.class, "set command syntax error, nx && get"); + return ErrorReply.SYNTAX_ERROR; + } + keyInfo = keyReadWrite.get(slot, key); + if (keyInfo != null) { + return BulkReply.NIL_REPLY; + } + } else if (nxxx == xx) { + keyInfo = keyReadWrite.get(slot, key); + if (keyInfo == null) { + return BulkReply.NIL_REPLY; + } + } + if (keepTtl && keyInfo == null) { + keyInfo = keyReadWrite.get(slot, key); + } + byte[] oldValue = keyInfo == null ? null : keyInfo.getExtra(); + if (keepTtl) { + expireTime = keyInfo == null ? -1 : keyInfo.getExpireTime(); + keyInfo = new KeyInfo(DataType.string); + keyInfo.setExpireTime(expireTime); + } else if (expireTime > 0) { + keyInfo = new KeyInfo(DataType.string); + keyInfo.setExpireTime(expireTime); + } else { + keyInfo = new KeyInfo(DataType.string); + } + + walGroup.append(slot, command); + + if (key.getKey().length + value.length <= 128) { + keyInfo.setExtra(value); + } else { + keyInfo.setExtra(null); + ValueLocation location = stringReadWrite.put(slot, keyInfo, value); + keyInfo.setValueLocation(location); + } + keyReadWrite.put(slot, keyInfo); + if (get) { + return new BulkReply(oldValue); + } else { + return StatusReply.OK; + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressType.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressType.java new file mode 100644 index 000000000..0bd1a7f95 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressType.java @@ -0,0 +1,29 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress; + +/** + * Created by caojiajun on 2025/1/2 + */ +public enum CompressType { + + none((byte) 0), + zstd((byte) 1), + ; + private final byte type; + + CompressType(byte type) { + this.type = type; + } + + public byte getType() { + return type; + } + + public static CompressType getByValue(byte type) { + for (CompressType compressType : CompressType.values()) { + if (compressType.type == type) { + return compressType; + } + } + return CompressType.none; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressUtils.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressUtils.java new file mode 100644 index 000000000..1d6c1012d --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/CompressUtils.java @@ -0,0 +1,21 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class CompressUtils { + + private static final NoneCompressor noneCompressor = new NoneCompressor(); + private static final ZstdCompressor zstdCompressor = new ZstdCompressor(); + + public static ICompressor get(CompressType compressType) { + if (compressType == CompressType.none) { + return noneCompressor; + } else if (compressType == CompressType.zstd) { + return zstdCompressor; + } else { + throw new IllegalArgumentException("unknown compress type"); + } + } + +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ICompressor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ICompressor.java new file mode 100644 index 000000000..6bb16c5d0 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ICompressor.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress; + +/** + * Created by caojiajun on 2025/1/3 + */ +public interface ICompressor { + + byte[] compress(byte[] data, int offset, int len); + + byte[] decompress(byte[] data, int offset, int len, int decompressLen); +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/NoneCompressor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/NoneCompressor.java new file mode 100644 index 000000000..5ad857922 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/NoneCompressor.java @@ -0,0 +1,36 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class NoneCompressor implements ICompressor { + + @Override + public byte[] compress(byte[] data, int offset, int len) { + if (data == null) { + return null; + } + if (offset == 0 && len == data.length) { + return data; + } + byte[] result = new byte[len]; + System.arraycopy(data, offset, result, 0, result.length); + return result; + } + + @Override + public byte[] decompress(byte[] data, int offset, int len, int decompressLen) { + if (data == null) { + return null; + } + if (len != decompressLen) { + throw new IllegalArgumentException("none compress len/decompressLen not equals"); + } + if (offset == 0 && len == data.length) { + return data; + } + byte[] result = new byte[len]; + System.arraycopy(data, offset, result, 0, result.length); + return result; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java new file mode 100644 index 000000000..6a19c3127 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/compress/ZstdCompressor.java @@ -0,0 +1,36 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress; + + +import com.github.luben.zstd.Zstd; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class ZstdCompressor implements ICompressor { + + private final int compressionLevel; + + public ZstdCompressor() { + compressionLevel = Zstd.defaultCompressionLevel(); + } + + @Override + public byte[] compress(byte[] data, int offset, int len) { + if (data == null) { + return null; + } + int maxSize = (int) Zstd.compressBound(len); + byte[] compressedData = new byte[maxSize]; + int compressedSize = (int) Zstd.compressByteArray(compressedData, 0, compressedData.length, data, offset, len, compressionLevel); + byte[] result = new byte[compressedSize]; + System.arraycopy(compressedData, 0, result, 0, compressedSize); + return result; + } + + @Override + public byte[] decompress(byte[] data, int offset, int len, int decompressLen) { + byte[] output = new byte[decompressLen]; + Zstd.decompressByteArray(output, 0, output.length, data, offset, len); + return output; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java new file mode 100644 index 000000000..b0fd5e54a --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants; + +/** + * Created by caojiajun on 2025/1/2 + */ +public class EmbeddedStorageConstants { + + public static final int _4k = 4*1024; + public static final int _64k = 64*1024; + public static final int bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/enums/DataType.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/DataType.java similarity index 91% rename from camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/enums/DataType.java rename to camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/DataType.java index e3cd5e490..14b8b29a9 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/enums/DataType.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/DataType.java @@ -1,4 +1,4 @@ -package com.netease.nim.camellia.redis.proxy.embedded.storage.enums; +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums; /** diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushResult.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushResult.java new file mode 100644 index 000000000..0e8c093fb --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/enums/FlushResult.java @@ -0,0 +1,11 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums; + +/** + * Created by caojiajun on 2025/1/3 + */ +public enum FlushResult { + OK, + ERROR, + SKIP, + ; +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/FileReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/FileReadWrite.java new file mode 100644 index 000000000..6b4b11fbc --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/file/FileReadWrite.java @@ -0,0 +1,15 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.file; + +/** + * Created by caojiajun on 2025/1/2 + */ +public class FileReadWrite { + + public void write(long fileId, long offset, byte[] data) { + + } + + public byte[] read(long fileId, long offset, int size) { + return null; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java new file mode 100644 index 000000000..441946254 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyInfo.java @@ -0,0 +1,189 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key; + +import com.netease.nim.camellia.codec.Marshallable; +import com.netease.nim.camellia.codec.Pack; +import com.netease.nim.camellia.codec.Unpack; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; + +/** + * Created by caojiajun on 2025/1/2 + */ +public class KeyInfo implements Marshallable { + + public static final KeyInfo DELETE = new KeyInfo(); + + private DataType dataType; + private byte flag; + private byte[] key; + private long expireTime; + private ValueLocation valueLocation; + private byte[] extra; + + public static enum FlagType { + DEFAULT((byte) 0), + CONTAINS_EXPIRE_TIME((byte) 1), + CONTAINS_EXTRA((byte) 2), + CONTAINS_VALUE((byte) 4), + ; + + private final byte value; + + FlagType(byte value) { + this.value = value; + } + + public byte getValue() { + return value; + } + + public static FlagType getByValue(int tagValue) { + for (FlagType t : FlagType.values()) { + if (t.getValue() == tagValue) { + return t; + } + } + return DEFAULT; + } + } + + public KeyInfo() { + } + + public KeyInfo(DataType dataType) { + this.dataType = dataType; + } + + public DataType getDataType() { + return dataType; + } + + public void setDataType(DataType dataType) { + this.dataType = dataType; + } + + public byte[] getKey() { + return key; + } + + public void setKey(byte[] key) { + this.key = key; + } + + public long getExpireTime() { + return expireTime; + } + + public boolean isExpire() { + if (expireTime <= 0) { + return false; + } + return expireTime <= System.currentTimeMillis(); + } + + public void setExpireTime(long expireTime) { + this.expireTime = expireTime; + if (expireTime > 0) { + setContainsExpireTime(); + } else { + clearContainsExpireTime(); + } + } + + public byte[] getExtra() { + return extra; + } + + public void setExtra(byte[] extra) { + this.extra = extra; + if (extra != null) { + setContainsExtra(); + } else { + clearContainsExtra(); + } + } + + public void setValueLocation(ValueLocation valueLocation) { + this.valueLocation = valueLocation; + if (valueLocation != null) { + setContainsValue(); + } else { + clearContainsValue(); + } + } + + public ValueLocation getValueLocation() { + return valueLocation; + } + + public void setContainsExpireTime() { + flag |= FlagType.CONTAINS_EXPIRE_TIME.getValue(); + } + + public void setContainsExtra() { + flag |= FlagType.CONTAINS_EXTRA.getValue(); + } + + public void setContainsValue() { + flag |= FlagType.CONTAINS_VALUE.getValue(); + } + + public void clearContainsExpireTime() { + flag &= (byte) ~ FlagType.CONTAINS_EXPIRE_TIME.getValue(); + } + + public void clearContainsExtra() { + flag &= (byte) ~ FlagType.CONTAINS_EXTRA.getValue(); + } + + public void clearContainsValue() { + flag &= (byte) ~ FlagType.CONTAINS_VALUE.getValue(); + } + + public boolean containsExpireTime() { + return 0 != (flag & FlagType.CONTAINS_EXPIRE_TIME.getValue()); + } + + public boolean containsExtra() { + return 0 != (flag & FlagType.CONTAINS_EXTRA.getValue()); + } + + public boolean containsValue() { + return 0 != (flag & FlagType.CONTAINS_VALUE.getValue()); + } + + @Override + public void marshal(Pack pack) { + pack.putByte(dataType.getValue()); + pack.putByte(flag); + pack.putVarbin(key); + if (containsExpireTime()) { + pack.putLong(expireTime); + } + if (containsValue()) { + pack.putLong(valueLocation.fileId()); + pack.putLong(valueLocation.offset()); + } + if (containsExtra()) { + pack.putVarbin(extra); + } + } + + @Override + public void unmarshal(Unpack unpack) { + dataType = DataType.getByValue(unpack.popByte()); + flag = unpack.popByte(); + key = unpack.popVarbin(); + if (containsExpireTime()) { + expireTime = unpack.popLong(); + } + if (containsValue()) { + long valueFileId = unpack.popLong(); + long valueOffset = unpack.popLong(); + valueLocation = new ValueLocation(valueFileId, valueOffset); + } + if (containsExtra()) { + extra = unpack.popVarbin(); + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java new file mode 100644 index 000000000..fca7de73b --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/KeyReadWrite.java @@ -0,0 +1,59 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.persist.KeyFlushExecutor; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotKeyBlockCache; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotKeyReadWrite; +import com.netease.nim.camellia.tools.utils.BytesKey; +import com.netease.nim.camellia.tools.utils.CamelliaMapUtils; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class KeyReadWrite { + + private final KeyFlushExecutor executor; + private final SlotKeyBlockCache blockCache; + + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); + + public KeyReadWrite(KeyFlushExecutor executor, SlotKeyBlockCache blockCache) { + this.executor = executor; + this.blockCache = blockCache; + } + + private SlotKeyReadWrite get(short slot) { + return CamelliaMapUtils.computeIfAbsent(map, slot, s -> new SlotKeyReadWrite(slot, executor, blockCache)); + } + + public KeyInfo get(short slot, BytesKey key) throws IOException { + KeyInfo keyInfo = get(slot).get(key); + if (keyInfo == null) { + return null; + } + if (keyInfo.isExpire()) { + return null; + } + return keyInfo; + } + + public void put(short slot, KeyInfo keyInfo) { + get(slot).put(keyInfo); + } + + public void delete(short slot, BytesKey key) { + get(slot).delete(key); + } + + public CompletableFuture flush(short slot) { + SlotKeyReadWrite slotKeyReadWrite = map.get(slot); + if (slotKeyReadWrite == null) { + return CompletableFuture.completedFuture(FlushResult.OK); + } + return slotKeyReadWrite.flush(); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java new file mode 100644 index 000000000..09ceef9d6 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushExecutor.java @@ -0,0 +1,228 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.persist; + + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.file.FileReadWrite; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec.KeyCodec; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.util.KeyHashUtils; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotKeyBlockCache; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeySlotMap; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotInfo; +import com.netease.nim.camellia.tools.utils.BytesKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; + +import static com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants.*; + +/** + * Created by caojiajun on 2025/1/2 + */ +public class KeyFlushExecutor { + + private static final Logger logger = LoggerFactory.getLogger(KeyFlushExecutor.class); + + private final ThreadPoolExecutor executor; + private final KeySlotMap keySlotMap; + private final FileReadWrite fileReadWrite; + private final SlotKeyBlockCache blockCache; + + public KeyFlushExecutor(ThreadPoolExecutor executor, KeySlotMap keySlotMap, FileReadWrite fileReadWrite, SlotKeyBlockCache blockCache) { + this.executor = executor; + this.keySlotMap = keySlotMap; + this.fileReadWrite = fileReadWrite; + this.blockCache = blockCache; + } + + public CompletableFuture submit(KeyFlushTask flushTask) { + CompletableFuture future = new CompletableFuture<>(); + try { + executor.submit(() -> { + try { + execute(flushTask); + future.complete(FlushResult.OK); + } catch (Exception e) { + logger.error("key flush error, slot = {}", flushTask.slot(), e); + future.complete(FlushResult.ERROR); + } + }); + } catch (Exception e) { + logger.error("submit key flush error, slot = {}", flushTask.slot(), e); + future.complete(FlushResult.ERROR); + } + return future; + } + + private void execute(KeyFlushTask task) throws Exception { + short slot = task.slot(); + Map flushKeys = task.flushKeys(); + SlotInfo source = keySlotMap.get(slot); + if (source == null) { + source = keySlotMap.init(slot); + clear(source); + } + SlotInfo target = source; + WriteResult lastWrite = null; + while (true) { + lastWrite = writeTo(slot, source, target, flushKeys, lastWrite); + if (lastWrite.success) { + break; + } + target = keySlotMap.expand(slot); + } + } + + private void clear(SlotInfo slotInfo) { + long fileId = slotInfo.fileId(); + long offset = slotInfo.offset(); + int capacity = slotInfo.capacity(); + fileReadWrite.write(fileId, offset, new byte[capacity]); + } + + private WriteResult writeTo(short slot, SlotInfo source, SlotInfo target, Map immutable, WriteResult lastWrite) { + Map> writeBuffer = new HashMap<>(); + int capacity = target.capacity(); + int bucketSize = capacity / _4k; + { + for (Map.Entry entry : immutable.entrySet()) { + BytesKey key = entry.getKey(); + KeyInfo data = entry.getValue(); + int bucket = KeyHashUtils.hash(key.getKey()) % bucketSize; + Map keys = writeBuffer.computeIfAbsent(bucket, k -> new HashMap<>()); + keys.put(key, data); + } + } + boolean expand = false; + WriteResult writeResult = new WriteResult(); + + TreeMap tasks = new TreeMap<>(); + if (source.equals(target)) { + long fileId = target.fileId(); + long offset = target.offset(); + for (Map.Entry> entry : writeBuffer.entrySet()) { + Integer bucket = entry.getKey(); + Map newKeys = entry.getValue(); + long bucketOffset = offset + bucket * _4k; + Map oldKeys = lastWrite.oldBucketKeys.get(bucket); + if (oldKeys == null) { + byte[] bytes = fileReadWrite.read(fileId, bucketOffset, _4k); + oldKeys = KeyCodec.decodeBucket(bytes); + } + writeResult.oldBucketKeys.put(bucket, oldKeys); + merge(newKeys, oldKeys); + byte[] encoded = KeyCodec.encodeBucket(newKeys); + if (encoded == null) { + expand = true; + break; + } + tasks.put(bucketOffset, new WriteTask(bucketOffset, encoded)); + } + } else { + Map oldAllKeys; + if (lastWrite == null || lastWrite.oldAllKeys == null) { + byte[] oldAll = fileReadWrite.read(source.fileId(), source.offset(), source.capacity()); + oldAllKeys = KeyCodec.decodeSlot(oldAll); + } else { + oldAllKeys = lastWrite.oldAllKeys; + } + for (Map.Entry entry : oldAllKeys.entrySet()) { + int bucket = KeyHashUtils.hash(entry.getKey().getKey()) % bucketSize; + Map newKeys = writeBuffer.computeIfAbsent(bucket, k -> new HashMap<>()); + merge(newKeys, entry); + } + for (Map.Entry> entry : writeBuffer.entrySet()) { + Integer bucket = entry.getKey(); + byte[] encoded = KeyCodec.encodeBucket(entry.getValue()); + if (encoded == null) { + expand = true; + break; + } + long bucketOffset = target.offset() + bucket * _4k; + tasks.put(bucketOffset, new WriteTask(bucketOffset, encoded)); + } + writeResult.oldAllKeys = oldAllKeys; + } + if (!expand) { + write0(slot, target.fileId(), tasks); + writeResult.success = true; + return writeResult; + } + writeResult.success = false; + return writeResult; + } + + private static class WriteResult { + boolean success; + Map oldAllKeys; + Map> oldBucketKeys = new HashMap<>(); + } + + private static class WriteTask { + long offset; + byte[] data; + + public WriteTask(long offset, byte[] data) { + this.offset = offset; + this.data = data; + } + } + + private void write0(short slot, long fileId, TreeMap writeTasks) { + List> all = new ArrayList<>(); + List merged = new ArrayList<>(); + WriteTask lastTask = null; + for (Map.Entry entry : writeTasks.entrySet()) { + Long offset = entry.getKey(); + WriteTask task = entry.getValue(); + //update block cache + blockCache.updateBlockCache(slot, fileId, task.offset, task.data); + //merge + if (lastTask != null) { + if (offset - lastTask.offset != _4k) { + if (!merged.isEmpty()) { + all.add(merged); + merged = new ArrayList<>(); + } + } + } + merged.add(task); + lastTask = task; + } + if (!merged.isEmpty()) { + all.add(merged); + } + for (List tasks : all) { + if (tasks.size() == 1) { + WriteTask first = tasks.getFirst(); + fileReadWrite.write(fileId, first.offset, first.data); + } else { + byte[] mergedData = new byte[_4k * tasks.size()]; + long offset = tasks.getFirst().offset; + for (int i=0;i newKeys, Map.Entry entry) { + KeyInfo key = newKeys.get(entry.getKey()); + if (key == KeyInfo.DELETE) { + return; + } + if (!newKeys.containsKey(entry.getKey())) { + newKeys.put(entry.getKey(), entry.getValue()); + } + } + + private void merge(Map newKeys, Map oldKeys) { + for (Map.Entry entry : oldKeys.entrySet()) { + merge(newKeys, entry); + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushTask.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushTask.java new file mode 100644 index 000000000..01c7c0c0c --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/persist/KeyFlushTask.java @@ -0,0 +1,12 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.persist; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.tools.utils.BytesKey; + +import java.util.Map; + +/** + * Created by caojiajun on 2025/1/2 + */ +public record KeyFlushTask(short slot, Map flushKeys) { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/key/KeySlotMap.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeySlotMap.java similarity index 86% rename from camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/key/KeySlotMap.java rename to camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeySlotMap.java index 4ee2f749d..7a50d325c 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/embedded/storage/key/KeySlotMap.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeySlotMap.java @@ -1,5 +1,6 @@ -package com.netease.nim.camellia.redis.proxy.embedded.storage.key; +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants; import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils; import com.netease.nim.camellia.redis.proxy.util.Utils; import org.slf4j.Logger; @@ -15,6 +16,7 @@ import java.util.*; import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * Created by caojiajun on 2024/12/31 */ @@ -22,9 +24,6 @@ public class KeySlotMap { private static final Logger logger = LoggerFactory.getLogger(KeySlotMap.class); - private static final int _64k = 64*1024; - private static final int bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib - private static final byte[] magic_header = "camellia_header".getBytes(StandardCharsets.UTF_8); private static final byte[] magic_footer = "camellia_footer".getBytes(StandardCharsets.UTF_8); @@ -80,9 +79,9 @@ public void load() throws IOException { if (fileId == 0) { continue; } - BitSet bits = fileBitsMap.computeIfAbsent(fileId, k -> new BitSet(bit_size)); - int bitsStart = (int)(offset / _64k); - int bitsEnd = (int)((offset + capacity) / _64k); + BitSet bits = fileBitsMap.computeIfAbsent(fileId, k -> new BitSet(EmbeddedStorageConstants.bit_size)); + int bitsStart = (int)(offset / EmbeddedStorageConstants._64k); + int bitsEnd = (int)((offset + capacity) / EmbeddedStorageConstants._64k); for (int index=bitsStart; index entry : fileBitsMap.entrySet()) { Long fileId = entry.getKey(); BitSet bits = entry.getValue(); - for (int i=0; i= bitsStep*2) { - for (int i=0; i= bitsStep*2) { + for (int i = 0; i< EmbeddedStorageConstants.bit_size-bitsStep*2; i++) { if (bits.get(i, bitsStep * 2).cardinality() == 0) { //clear old for (int j=bitsStart; j readCache; + + private final ConcurrentLinkedHashMap writeCache; + + public SlotKeyBlockCache(KeySlotMap keySlotMap, FileReadWrite fileReadWrite) { + this.keySlotMap = keySlotMap; + this.fileReadWrite = fileReadWrite; + readCache = new ConcurrentLinkedHashMap.Builder() + .initialCapacity(10000) + .maximumWeightedCapacity(10000) + .build(); + writeCache = new ConcurrentLinkedHashMap.Builder() + .initialCapacity(10000) + .maximumWeightedCapacity(10000) + .build(); + } + + /** + * 获取一个key + * @param slot slot + * @param key key + * @return key + * @throws IOException exception + */ + public KeyInfo get(short slot, BytesKey key) throws IOException { + SlotInfo slotInfo = keySlotMap.get(slot); + long fileId = slotInfo.fileId(); + long offset = slotInfo.offset(); + int capacity = slotInfo.capacity(); + int bucketSize = capacity / _4k; + int bucket = KeyHashUtils.hash(key.getKey()) % bucketSize; + + String cacheKey = slot + "|" + fileId + "|" + (offset + _4k * bucket); + byte[] blockCache = readCache.get(cacheKey); + if (blockCache == null) { + blockCache = writeCache.get(cacheKey); + if (blockCache != null) { + readCache.put(cacheKey, blockCache); + writeCache.remove(cacheKey); + } + } + if (blockCache == null) { + blockCache = fileReadWrite.read(fileId, offset, bucket * _4k); + readCache.put(cacheKey, blockCache); + } + Map map = KeyCodec.decodeBucket(blockCache); + KeyInfo data = map.get(key); + if (data == null) { + return KeyInfo.DELETE; + } + return data; + } + + /** + * 更新block-cache + * @param slot slot + * @param fileId fileId + * @param offset offset + * @param block block + */ + public void updateBlockCache(short slot, long fileId, long offset, byte[] block) { + String cacheKey = slot + "|" + fileId + "|" + offset; + byte[] blockCache = readCache.get(cacheKey); + if (blockCache != null) { + readCache.put(cacheKey, block); + } else { + writeCache.put(cacheKey, block); + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyReadWrite.java new file mode 100644 index 000000000..3f7a5b0be --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/SlotKeyReadWrite.java @@ -0,0 +1,115 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.persist.KeyFlushExecutor; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.persist.KeyFlushTask; +import com.netease.nim.camellia.tools.utils.BytesKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletableFuture; + +/** + * 线程不安全 + * Created by caojiajun on 2025/1/2 + */ +public class SlotKeyReadWrite { + + private static final Logger logger = LoggerFactory.getLogger(SlotKeyReadWrite.class); + + private final short slot; + private final KeyFlushExecutor executor; + private final SlotKeyBlockCache blockCache; + + private Map mutable = new HashMap<>(); + private Map immutable = new HashMap<>(); + private volatile boolean flushing = false; + + public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, SlotKeyBlockCache blockCache) { + this.slot = slot; + this.executor = executor; + this.blockCache = blockCache; + } + + /** + * 获取一个key + * @param key key + * @return key + */ + public KeyInfo get(BytesKey key) throws IOException { + KeyInfo keyInfo = mutable.get(key); + if (keyInfo == KeyInfo.DELETE) { + return null; + } + keyInfo = immutable.get(key); + if (keyInfo == KeyInfo.DELETE) { + return null; + } + keyInfo = blockCache.get(slot, key); + if (keyInfo == KeyInfo.DELETE) { + return null; + } + return keyInfo; + } + + /** + * 写入一个key + * @param key key + */ + public void put(KeyInfo key) { + mutable.put(new BytesKey(key.getKey()), key); + checkAndFlush(); + } + + /** + * 删除一个key + * @param key key + */ + public void delete(BytesKey key) { + mutable.put(key, KeyInfo.DELETE); + checkAndFlush(); + } + + /** + * flush + */ + public CompletableFuture flush() { + CompletableFuture future = new CompletableFuture<>(); + if (flushing) { + future.complete(FlushResult.SKIP); + return future; + } + if (mutable.isEmpty()) { + future.complete(FlushResult.OK); + return future; + } + Map flushKeys = flushPrepare(); + CompletableFuture submit = executor.submit(new KeyFlushTask(slot, flushKeys)); + submit.thenAccept(b -> { + flushDone(); + future.complete(b); + }); + return future; + } + + private Map flushPrepare() { + immutable = mutable; + mutable = new HashMap<>(); + flushing = true; + return immutable; + } + + private void flushDone() { + immutable.clear(); + flushing = false; + } + + private void checkAndFlush() { + if (mutable.size() >= 200 && !flushing) { + flush(); + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/util/KeyHashUtils.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/util/KeyHashUtils.java new file mode 100644 index 000000000..8f7705235 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/util/KeyHashUtils.java @@ -0,0 +1,13 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.util; + +import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils; + +/** + * Created by caojiajun on 2025/1/2 + */ +public class KeyHashUtils { + + public static int hash(byte[] key) { + return RedisClusterCRC16Utils.getCRC16(key); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/ValueLocation.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/ValueLocation.java new file mode 100644 index 000000000..ffee18d1d --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/ValueLocation.java @@ -0,0 +1,8 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value; + +/** + * Created by caojiajun on 2025/1/3 + */ +public record ValueLocation(long fileId, long offset) { + +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/hash/HashReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/hash/HashReadWrite.java new file mode 100644 index 000000000..307e7c332 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/hash/HashReadWrite.java @@ -0,0 +1,7 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.hash; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class HashReadWrite { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/set/SetReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/set/SetReadWrite.java new file mode 100644 index 000000000..92833634e --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/set/SetReadWrite.java @@ -0,0 +1,7 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.set; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class SetReadWrite { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java new file mode 100644 index 000000000..745705c34 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/string/StringReadWrite.java @@ -0,0 +1,18 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class StringReadWrite { + + public ValueLocation put(short slot, KeyInfo keyInfo, byte[] data) { + return null; + } + + public byte[] get(ValueLocation location) { + return null; + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/zset/ZSetReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/zset/ZSetReadWrite.java new file mode 100644 index 000000000..dc3be83f8 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/zset/ZSetReadWrite.java @@ -0,0 +1,7 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.zset; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class ZSetReadWrite { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/Wal.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/Wal.java new file mode 100644 index 000000000..9dd83782f --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/Wal.java @@ -0,0 +1,7 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.wal; + +/** + * Created by caojiajun on 2024/12/27 + */ +public class Wal { +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/WalGroup.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/WalGroup.java new file mode 100644 index 000000000..4b873492c --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/wal/WalGroup.java @@ -0,0 +1,13 @@ +package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.wal; + +import com.netease.nim.camellia.redis.proxy.command.Command; + +/** + * Created by caojiajun on 2025/1/3 + */ +public class WalGroup { + + public void append(short slot, Command command) { + + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/utils/BytesUtils.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/utils/BytesUtils.java index a22eedc4a..462628976 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/utils/BytesUtils.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/utils/BytesUtils.java @@ -92,6 +92,18 @@ public static byte[] toBytes(int val) { return b; } + public static short toShort(byte[] bytes) { + return toShort(bytes, 0); + } + + public static short toShort(byte[] bytes, int offset) { + short n = 0; + n ^= (short) (bytes[offset] & 0xFF); + n <<= 8; + n ^= (short) (bytes[offset+1] & 0xFF); + return n; + } + public static int toInt(byte[] bytes) { return toInt(bytes, 0, SIZEOF_INT); } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeySlotMapTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeySlotMapTest.java index 39d9e69d3..a61af29c1 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeySlotMapTest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeySlotMapTest.java @@ -1,7 +1,7 @@ package com.netease.nim.camellia.redis.proxy.test; -import com.netease.nim.camellia.redis.proxy.embedded.storage.key.KeySlotMap; -import com.netease.nim.camellia.redis.proxy.embedded.storage.key.SlotInfo; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.KeySlotMap; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.slot.SlotInfo; import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils; import org.junit.After; import org.junit.Assert;