Skip to content

Commit e95a77f

Browse files
committed
Redis Deprecated cache module support fixed
JedisCacheStubAsyncClient for fake async client added JedisCacheAsyncClient uses user provided executor for async operations
1 parent c016122 commit e95a77f

File tree

16 files changed

+251
-94
lines changed

16 files changed

+251
-94
lines changed

cache/cache-annotation-processor/src/main/java/ru/tinkoff/kora/cache/annotation/processor/CacheAnnotationProcessor.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
8787

8888
var cacheImplBase = getCacheImplBase(cacheContract, cacheContractType);
8989
var implSpec = TypeSpec.classBuilder(getCacheImpl(cacheContract))
90-
.addModifiers(Modifier.FINAL)
90+
.addModifiers(Modifier.PUBLIC, Modifier.FINAL)
9191
.addAnnotation(AnnotationSpec.builder(CommonClassNames.koraGenerated)
9292
.addMember("value", CodeBlock.of("$S", CacheAnnotationProcessor.class.getCanonicalName())).build())
93-
.addMethod(getCacheConstructor(configPath, cacheContractType))
93+
.addMethod(getCacheConstructor(configPath, cacheContractType, cacheContract))
9494
.superclass(cacheImplBase)
9595
.addSuperinterface(cacheContract.asType())
9696
.build();
@@ -151,6 +151,16 @@ private ParameterizedTypeName getCacheSuperType(TypeElement candidate) {
151151
return null;
152152
}
153153

154+
private boolean isRedisDeprecated(TypeElement cacheContract) {
155+
return cacheContract.getInterfaces().stream()
156+
.filter(a -> a instanceof DeclaredType)
157+
.map(a -> ((DeclaredType) a))
158+
.map(a -> ((TypeElement) a.asElement()))
159+
.filter(a -> ClassName.get(a).equals(REDIS_CACHE))
160+
.flatMap(a -> a.getAnnotationMirrors().stream())
161+
.anyMatch(a -> TypeName.get(a.getAnnotationType()).equals(TypeName.get(Deprecated.class)));
162+
}
163+
154164
private TypeName getCacheImplBase(TypeElement cacheContract, ParameterizedTypeName cacheType) {
155165
if (cacheType.rawType.equals(CAFFEINE_CACHE)) {
156166
return ParameterizedTypeName.get(CAFFEINE_CACHE_IMPL, cacheType.typeArguments.get(0), cacheType.typeArguments.get(1));
@@ -289,7 +299,7 @@ private MethodSpec getCacheMethodImpl(TypeElement cacheContract, ParameterizedTy
289299
}
290300

291301
if (cacheType.rawType.equals(REDIS_CACHE)) {
292-
if (cacheType.annotations.stream().anyMatch(a -> a.type.equals(TypeName.get(Deprecated.class)))) {
302+
if (isRedisDeprecated(cacheContract)) {
293303
return getCacheRedisDeprecatedMethod(cacheContract, cacheType, cacheImplName, methodName);
294304
} else {
295305
return getCacheRedisMethod(cacheContract, cacheType, cacheImplName, methodName);
@@ -387,9 +397,10 @@ private MethodSpec getCacheRedisDeprecatedMethod(TypeElement cacheContract,
387397
.build();
388398
}
389399

390-
private MethodSpec getCacheConstructor(String configPath, ParameterizedTypeName cacheContract) {
400+
private MethodSpec getCacheConstructor(String configPath, ParameterizedTypeName cacheContract, TypeElement cacheElement) {
391401
if (cacheContract.rawType.equals(CAFFEINE_CACHE)) {
392402
return MethodSpec.constructorBuilder()
403+
.addModifiers(Modifier.PUBLIC)
393404
.addParameter(CAFFEINE_CACHE_CONFIG, "config")
394405
.addParameter(CAFFEINE_CACHE_FACTORY, "factory")
395406
.addParameter(CACHE_TELEMETRY_FACTORY, "telemetryFactory")
@@ -398,12 +409,13 @@ private MethodSpec getCacheConstructor(String configPath, ParameterizedTypeName
398409
}
399410

400411
if (cacheContract.rawType.equals(REDIS_CACHE)) {
401-
if (cacheContract.annotations.stream().anyMatch(a -> a.type.equals(TypeName.get(Deprecated.class)))) {
412+
if (isRedisDeprecated(cacheElement)) {
402413
var keyType = cacheContract.typeArguments.get(0);
403414
var valueType = cacheContract.typeArguments.get(1);
404415
var keyMapperType = ParameterizedTypeName.get(REDIS_CACHE_MAPPER_KEY, keyType);
405416
var valueMapperType = ParameterizedTypeName.get(REDIS_CACHE_MAPPER_VALUE, valueType);
406417
return MethodSpec.constructorBuilder()
418+
.addModifiers(Modifier.PUBLIC)
407419
.addParameter(REDIS_CACHE_CONFIG, "config")
408420
.addParameter(REDIS_CACHE_OLD_CLIENT, "redisClient")
409421
.addParameter(REDIS_TELEMETRY, "telemetry")
@@ -417,6 +429,7 @@ private MethodSpec getCacheConstructor(String configPath, ParameterizedTypeName
417429
var keyMapperType = ParameterizedTypeName.get(REDIS_CACHE_MAPPER_KEY, keyType);
418430
var valueMapperType = ParameterizedTypeName.get(REDIS_CACHE_MAPPER_VALUE, valueType);
419431
return MethodSpec.constructorBuilder()
432+
.addModifiers(Modifier.PUBLIC)
420433
.addParameter(REDIS_CACHE_CONFIG, "config")
421434
.addParameter(REDIS_CACHE_SYNC_CLIENT, "redisSyncClient")
422435
.addParameter(REDIS_CACHE_ASYNC_CLIENT, "redisAsyncClient")

cache/cache-caffeine/src/main/java/ru/tinkoff/kora/cache/caffeine/CaffeineCache.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package ru.tinkoff.kora.cache.caffeine;
22

33
import jakarta.annotation.Nonnull;
4-
import ru.tinkoff.kora.cache.AsyncCache;
54
import ru.tinkoff.kora.cache.Cache;
65

76
import java.util.Map;

cache/cache-common/src/main/java/ru/tinkoff/kora/cache/AsyncCache.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.concurrent.CompletableFuture;
1010
import java.util.concurrent.CompletionStage;
1111
import java.util.function.Function;
12-
import java.util.stream.Collectors;
1312

1413
/**
1514
* Represents Async Cache contract.

cache/cache-common/src/main/java/ru/tinkoff/kora/cache/AsyncFacadeCacheBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.concurrent.CompletableFuture;
77
import java.util.concurrent.CompletionStage;
88
import java.util.function.Function;
9-
import java.util.stream.Collectors;
109

1110
final class AsyncFacadeCacheBuilder<K, V> implements AsyncCache.Builder<K, V> {
1211

cache/cache-common/src/main/java/ru/tinkoff/kora/cache/Cache.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
import java.util.HashMap;
88
import java.util.Map;
99
import java.util.Set;
10-
import java.util.concurrent.CompletableFuture;
1110
import java.util.function.Function;
12-
import java.util.stream.Collectors;
1311

1412
/**
1513
* Represents Synchronous Cache contract.

cache/cache-redis-jedis/src/main/java/ru/tinkoff/kora/cache/redis/jedis/JedisCacheAsyncClient.java

Lines changed: 25 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,127 +7,96 @@
77
import java.util.Map;
88
import java.util.concurrent.CompletableFuture;
99
import java.util.concurrent.CompletionStage;
10+
import java.util.concurrent.Executor;
1011

1112
final class JedisCacheAsyncClient implements RedisCacheAsyncClient {
1213

1314
private final RedisCacheClient syncClient;
15+
private final Executor executor;
1416

15-
JedisCacheAsyncClient(RedisCacheClient syncClient) {
17+
JedisCacheAsyncClient(RedisCacheClient syncClient, Executor executor) {
1618
this.syncClient = syncClient;
19+
this.executor = executor;
1720
}
1821

1922
@Nonnull
2023
@Override
2124
public CompletionStage<byte[]> get(byte[] key) {
22-
try {
23-
return CompletableFuture.completedFuture(syncClient.get(key));
24-
} catch (Exception e) {
25-
return CompletableFuture.failedFuture(e);
26-
}
25+
return CompletableFuture.supplyAsync(() -> syncClient.get(key), executor);
2726
}
2827

2928
@Nonnull
3029
@Override
3130
public CompletionStage<Map<byte[], byte[]>> mget(byte[][] keys) {
32-
try {
33-
return CompletableFuture.completedFuture(syncClient.mget(keys));
34-
} catch (Exception e) {
35-
return CompletableFuture.failedFuture(e);
36-
}
31+
return CompletableFuture.supplyAsync(() -> syncClient.mget(keys), executor);
3732
}
3833

3934
@Nonnull
4035
@Override
4136
public CompletionStage<byte[]> getex(byte[] key, long expireAfterMillis) {
42-
try {
43-
return CompletableFuture.completedFuture(syncClient.getex(key, expireAfterMillis));
44-
} catch (Exception e) {
45-
return CompletableFuture.failedFuture(e);
46-
}
37+
return CompletableFuture.supplyAsync(() -> syncClient.getex(key, expireAfterMillis), executor);
4738
}
4839

4940
@Nonnull
5041
@Override
5142
public CompletionStage<Map<byte[], byte[]>> getex(byte[][] keys, long expireAfterMillis) {
52-
try {
53-
return CompletableFuture.completedFuture(syncClient.getex(keys, expireAfterMillis));
54-
} catch (Exception e) {
55-
return CompletableFuture.failedFuture(e);
56-
}
43+
return CompletableFuture.supplyAsync(() -> syncClient.getex(keys, expireAfterMillis), executor);
5744
}
5845

5946
@Nonnull
6047
@Override
6148
public CompletionStage<Void> set(byte[] key, byte[] value) {
62-
try {
49+
return CompletableFuture.supplyAsync(() -> {
6350
syncClient.set(key, value);
64-
return CompletableFuture.completedFuture(null);
65-
} catch (Exception e) {
66-
return CompletableFuture.failedFuture(e);
67-
}
51+
return null;
52+
}, executor);
6853
}
6954

7055
@Nonnull
7156
@Override
7257
public CompletionStage<Void> mset(@Nonnull Map<byte[], byte[]> keyAndValue) {
73-
try {
58+
return CompletableFuture.supplyAsync(() -> {
7459
syncClient.mset(keyAndValue);
75-
return CompletableFuture.completedFuture(null);
76-
} catch (Exception e) {
77-
return CompletableFuture.failedFuture(e);
78-
}
60+
return null;
61+
}, executor);
7962
}
8063

8164
@Nonnull
8265
@Override
8366
public CompletionStage<Void> psetex(byte[] key, byte[] value, long expireAfterMillis) {
84-
try {
67+
return CompletableFuture.supplyAsync(() -> {
8568
syncClient.psetex(key, value, expireAfterMillis);
86-
return CompletableFuture.completedFuture(null);
87-
} catch (Exception e) {
88-
return CompletableFuture.failedFuture(e);
89-
}
69+
return null;
70+
}, executor);
9071
}
9172

9273
@Nonnull
9374
@Override
9475
public CompletionStage<Void> psetex(@Nonnull Map<byte[], byte[]> keyAndValue, long expireAfterMillis) {
95-
try {
76+
return CompletableFuture.supplyAsync(() -> {
9677
syncClient.psetex(keyAndValue, expireAfterMillis);
97-
return CompletableFuture.completedFuture(null);
98-
} catch (Exception e) {
99-
return CompletableFuture.failedFuture(e);
100-
}
78+
return null;
79+
}, executor);
10180
}
10281

10382
@Nonnull
10483
@Override
10584
public CompletionStage<Long> del(byte[] key) {
106-
try {
107-
return CompletableFuture.completedFuture(syncClient.del(key));
108-
} catch (Exception e) {
109-
return CompletableFuture.failedFuture(e);
110-
}
85+
return CompletableFuture.supplyAsync(() -> syncClient.del(key), executor);
11186
}
11287

11388
@Nonnull
11489
@Override
11590
public CompletionStage<Long> del(byte[][] keys) {
116-
try {
117-
return CompletableFuture.completedFuture(syncClient.del(keys));
118-
} catch (Exception e) {
119-
return CompletableFuture.failedFuture(e);
120-
}
91+
return CompletableFuture.supplyAsync(() -> syncClient.del(keys), executor);
12192
}
12293

12394
@Nonnull
12495
@Override
12596
public CompletionStage<Void> flushAll() {
126-
try {
97+
return CompletableFuture.supplyAsync(() -> {
12798
syncClient.flushAll();
128-
return CompletableFuture.completedFuture(null);
129-
} catch (Exception e) {
130-
return CompletableFuture.failedFuture(e);
131-
}
99+
return null;
100+
}, executor);
132101
}
133102
}
Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,42 @@
11
package ru.tinkoff.kora.cache.redis.jedis;
22

3+
import jakarta.annotation.Nullable;
4+
import redis.clients.jedis.Jedis;
35
import redis.clients.jedis.UnifiedJedis;
6+
import ru.tinkoff.kora.application.graph.internal.loom.VirtualThreadExecutorHolder;
47
import ru.tinkoff.kora.cache.redis.RedisCacheAsyncClient;
58
import ru.tinkoff.kora.cache.redis.RedisCacheClient;
69
import ru.tinkoff.kora.cache.redis.RedisCacheModule;
710
import ru.tinkoff.kora.common.DefaultComponent;
11+
import ru.tinkoff.kora.common.Tag;
812
import ru.tinkoff.kora.redis.jedis.JedisModule;
913

14+
import java.util.concurrent.Executor;
15+
import java.util.concurrent.ForkJoinPool;
16+
1017
public interface JedisCacheModule extends RedisCacheModule, JedisModule {
1118

19+
@Tag(Jedis.class)
1220
@DefaultComponent
13-
default RedisCacheClient lettuceRedisClient(UnifiedJedis jedis) {
21+
default Executor jedisRedisCacheAsyncExecutor() {
22+
var virtualExecutor = VirtualThreadExecutorHolder.executor();
23+
if (virtualExecutor == null) {
24+
return ForkJoinPool.commonPool();
25+
} else {
26+
return virtualExecutor;
27+
}
28+
}
29+
30+
default RedisCacheClient jedisRedisCacheSyncClient(UnifiedJedis jedis) {
1431
return new JedisCacheSyncClient(jedis);
1532
}
1633

17-
@DefaultComponent
18-
default RedisCacheAsyncClient lettuceRedisAsyncClient(RedisCacheClient redisCacheClient) {
19-
return new JedisCacheAsyncClient(redisCacheClient);
34+
default RedisCacheAsyncClient jedisRedisCacheAsyncClient(RedisCacheClient redisCacheClient,
35+
@Tag(Jedis.class) @Nullable Executor executor) {
36+
if (executor == null) {
37+
return new JedisCacheStubAsyncClient(redisCacheClient);
38+
} else {
39+
return new JedisCacheAsyncClient(redisCacheClient, executor);
40+
}
2041
}
2142
}

0 commit comments

Comments
 (0)