Skip to content

Commit

Permalink
feat: lru cache (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 8, 2025
1 parent dab82fc commit 0cb6328
Show file tree
Hide file tree
Showing 26 changed files with 429 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache;

/**
* Created by caojiajun on 2025/1/8
*/
public class BytesSizeCalculator implements SizeCalculator<byte[]> {

@Override
public long size(byte[] element) {
return element.length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache;

import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by caojiajun on 2025/1/8
*/
public class CacheCapacityConfigParser {

private static final Logger logger = LoggerFactory.getLogger(CacheCapacityConfigParser.class);

public static long parse(String configKey, String defaultValue) {
String string = ProxyDynamicConf.getString(configKey, defaultValue).toUpperCase();
try {
long bytes = bytes(string);
if (bytes < 0) {
logger.warn("illegal config = {} for config-key = {}, use default config = {}", string, configKey, defaultValue);
}
bytes = bytes(defaultValue);
return bytes;
} catch (Exception e) {
logger.warn("error config = {} parse for config-key = {}, use default config = {}", string, configKey, defaultValue);
return bytes(defaultValue);
}
}

public static String toString(long capacity) {
return (capacity / 1024 / 1024) + "M";
}

private static long bytes(String string) {
int size = Integer.parseInt(string.substring(string.length() - 2));
if (string.endsWith("M")) {
return size * 1024 * 1024L;
} else if (string.endsWith("G")) {
return size * 1024 * 1024 * 1024L ;
}
return -1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache;

import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.EstimateSizeValue;

import java.util.Arrays;
import java.util.Objects;

/**
* Created by caojiajun on 2025/1/8
*/
public record CacheKey(byte[] key) implements EstimateSizeValue {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.deepEquals(key, cacheKey.key);
}

@Override
public int hashCode() {
return Arrays.hashCode(key);
}

@Override
public long estimateSize() {
return key.length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache;

import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.EstimateSizeValue;

/**
* Created by caojiajun on 2025/1/8
*/
public class EstimateSizeValueCalculator<T extends EstimateSizeValue> implements SizeCalculator<T> {

@Override
public long size(T element) {
return element.estimateSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.executor.CamelliaThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by caojiajun on 2025/1/8
*/
public class LRUCache<K, V> {

private static final Logger logger = LoggerFactory.getLogger(LRUCache.class);

private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new CamelliaThreadFactory("lru-cache-scheduler"));

private final String name;
private final String configKey;
private final SizeCalculator<K> keySizeCalculator;
private final SizeCalculator<V> valueSizeCalculator;

private String config;

private final ConcurrentLinkedHashMap<K, V> cache;
private long capacity;

private int cacheSize;

private int loop = 0;

public LRUCache(String name, String configKey, String defaultConfig, int estimateSizePerKV, SizeCalculator<K> keySizeCalculator, SizeCalculator<V> valueSizeCalculator) {
this.name = name;
this.configKey = configKey;
this.config = defaultConfig;
this.keySizeCalculator = keySizeCalculator;
this.valueSizeCalculator = valueSizeCalculator;
this.capacity = CacheCapacityConfigParser.parse(configKey, defaultConfig);
this.config = CacheCapacityConfigParser.toString(capacity);
this.cacheSize = (int) (capacity / estimateSizePerKV);
this.cache = new ConcurrentLinkedHashMap.Builder<K, V>()
.initialCapacity(cacheSize)
.maximumWeightedCapacity(cacheSize)
.build();
scheduler.scheduleAtFixedRate(this::schedule, 10, 10, TimeUnit.SECONDS);
}

public void put(K key, V value) {
cache.put(key, value);
}

public void delete(K key) {
cache.remove(key);
}

public V get(K key) {
return cache.get(key);
}

private void schedule() {
this.capacity = CacheCapacityConfigParser.parse(configKey, config);
this.config = CacheCapacityConfigParser.toString(capacity);
long size = 0;
long count = cache.size();
for (Map.Entry<K, V> entry : cache.entrySet()) {
size += keySizeCalculator.size(entry.getKey());
size += valueSizeCalculator.size(entry.getValue());
}
size += count * 8;
if (size > capacity) {
double ratio = size * 1.0 / capacity;
cacheSize = (int) (count / ratio);
this.cache.setCapacity(cacheSize);
} else {
if (count >= cacheSize) {
double ratio = size * 1.0 / capacity;
cacheSize = (int) (count / ratio);
this.cache.setCapacity(cacheSize);
}
}
loop ++;
if (loop == 6) {//print log every 60s
logger.info("lru-cache, name = {}, target.capacity = {}, current.estimate.size = {}, current.key.count = {}, current.key.max.count = {}",
name, config, Utils.humanReadableByteCountBin(size), count, cacheSize);
loop = 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache;

/**
* Created by caojiajun on 2025/1/8
*/
public interface SizeCalculator<T> {

StringSizeCalculator STRING_INSTANCE = new StringSizeCalculator();
BytesSizeCalculator BYTES_INSTANCE = new BytesSizeCalculator();

long size(T element);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache;

/**
* Created by caojiajun on 2025/1/8
*/
public class StringSizeCalculator implements SizeCalculator<String> {

public static final BytesSizeCalculator INSTANCE = new BytesSizeCalculator();

@Override
public long size(String element) {
return element.length();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import com.netease.nim.camellia.codec.Pack;
import com.netease.nim.camellia.codec.Unpack;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache.CacheKey;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressUtils;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.ICompressor;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.kv.utils.BytesUtils;
import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils;
import com.netease.nim.camellia.tools.utils.BytesKey;

import java.nio.ByteBuffer;
import java.util.HashMap;
Expand All @@ -27,14 +27,14 @@ public class KeyCodec {
* @param all data
* @return 解码结果
*/
public static Map<BytesKey, KeyInfo> decodeSlot(byte[] all) {
public static Map<CacheKey, KeyInfo> decodeSlot(byte[] all) {
ByteBuffer buffer = ByteBuffer.wrap(all);
int bucketSize = all.length / EmbeddedStorageConstants._4k;
Map<BytesKey, KeyInfo> result = new HashMap<>();
Map<CacheKey, KeyInfo> result = new HashMap<>();
for (int i=0; i<bucketSize; i++) {
byte[] bytes = new byte[EmbeddedStorageConstants._4k];
buffer.get(bytes);
Map<BytesKey, KeyInfo> map = KeyCodec.decodeBucket(bytes);
Map<CacheKey, KeyInfo> map = KeyCodec.decodeBucket(bytes);
result.putAll(map);
}
return result;
Expand All @@ -45,7 +45,7 @@ public static Map<BytesKey, KeyInfo> decodeSlot(byte[] all) {
* @param bytes 固定为4k输入
* @return 解码结果
*/
public static Map<BytesKey, KeyInfo> decodeBucket(byte[] bytes) {
public static Map<CacheKey, KeyInfo> decodeBucket(byte[] bytes) {
int crc1 = BytesUtils.toInt(bytes, 0);//0,1,2,3
int crc2 = RedisClusterCRC16Utils.getCRC16(bytes, 9, bytes.length);
if (crc1 != crc2) {
Expand All @@ -58,11 +58,11 @@ public static Map<BytesKey, KeyInfo> decodeBucket(byte[] bytes) {
byte[] decompressData = compressor.decompress(bytes, 9, compressLen, decompressLen);
Unpack unpack = new Unpack(decompressData);
int size = unpack.popVarUint();
Map<BytesKey, KeyInfo> map = new HashMap<>();
Map<CacheKey, KeyInfo> map = new HashMap<>();
for (int i=0; i<size; i++) {
KeyInfo key = new KeyInfo();
unpack.popMarshallable(key);
map.put(new BytesKey(key.getKey()), key);
map.put(new CacheKey(key.getKey()), key);
}
return map;
}
Expand All @@ -73,10 +73,10 @@ public static Map<BytesKey, KeyInfo> decodeBucket(byte[] bytes) {
* @param keys keys
* @return 编码结果,固定为4k
*/
public static byte[] encodeBucket(Map<BytesKey, KeyInfo> keys) {
public static byte[] encodeBucket(Map<CacheKey, KeyInfo> keys) {
Pack pack = new Pack();
pack.putVarUint(keys.size());
for (Map.Entry<BytesKey, KeyInfo> entry : keys.entrySet()) {
for (Map.Entry<CacheKey, KeyInfo> entry : keys.entrySet()) {
pack.putMarshallable(entry.getValue());
}
pack.getBuffer().capacity(pack.getBuffer().readableBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* Created by caojiajun on 2025/1/3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import com.netease.nim.camellia.redis.proxy.reply.BulkReply;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache.CacheKey;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.tools.utils.BytesKey;

import java.io.IOException;

/**
* GET key
Expand All @@ -33,12 +31,8 @@ protected boolean parse(Command command) {
@Override
protected Reply execute(short slot, Command command) throws Exception {
byte[][] objects = command.getObjects();
BytesKey key = new BytesKey(objects[1]);
CacheKey key = new CacheKey(objects[1]);
KeyInfo keyInfo = keyReadWrite.get(slot, key);
return execute0(slot, keyInfo);
}

private Reply execute0(short slot, KeyInfo keyInfo) throws IOException {
if (keyInfo == null) {
return BulkReply.NIL_REPLY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.reply.StatusReply;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.cache.CacheKey;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.string.SetCommander;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.utils.BytesKey;

/**
* SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
Expand All @@ -38,7 +37,7 @@ protected boolean parse(Command command) {
@Override
protected Reply execute(short slot, Command command) throws Exception {
byte[][] objects = command.getObjects();
BytesKey key = new BytesKey(objects[1]);
CacheKey key = new CacheKey(objects[1]);
byte[] value = objects[2];
int nxxx = -1;
long expireTime = -1;
Expand Down Expand Up @@ -113,7 +112,7 @@ protected Reply execute(short slot, Command command) throws Exception {

walGroup.append(slot, command);

if (key.getKey().length + value.length <= 128) {
if (key.key().length + value.length <= 128) {
keyInfo.setExtra(value);
} else {
keyInfo.setExtra(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.BlockLocation;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.ValueLocation;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.EstimateSizeValue;

import java.util.Arrays;
import java.util.Objects;

/**
* Created by caojiajun on 2025/1/2
*/
public class KeyInfo implements Marshallable {
public class KeyInfo implements Marshallable, EstimateSizeValue {

public static final KeyInfo DELETE = new KeyInfo();

Expand All @@ -24,6 +25,18 @@ public class KeyInfo implements Marshallable {
private ValueLocation valueLocation;
private byte[] extra;

@Override
public long estimateSize() {
long size = 1 + 1 + key.length + 8;
if (valueLocation != null) {
size += 16;
}
if (extra != null) {
size += extra.length;
}
return size;
}

public static enum FlagType {
DEFAULT((byte) 0),
CONTAINS_EXPIRE_TIME((byte) 1),
Expand Down
Loading

0 comments on commit 0cb6328

Please sign in to comment.