Skip to content

Commit be37f05

Browse files
authored
[Improve][Redis] Redis reader use scan cammnd instead of keys, single mode reader/writer support batch (apache#7087)
1 parent 4281b86 commit be37f05

File tree

20 files changed

+1036
-45
lines changed

20 files changed

+1036
-45
lines changed

docs/en/connector-v2/sink/Redis.md

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Used to write data to Redis.
1818
| port | int | yes | - |
1919
| key | string | yes | - |
2020
| data_type | string | yes | - |
21+
| batch_size | int | no | 10 |
2122
| user | string | no | - |
2223
| auth | string | no | - |
2324
| db_num | int | no | 0 |
@@ -83,6 +84,10 @@ Redis data types, support `key` `hash` `list` `set` `zset`
8384
- zset
8485

8586
> Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption.
87+
>
88+
### batch_size [int]
89+
90+
ensure the batch write size in single-machine mode; no guarantees in cluster mode.
8691

8792
### user [string]
8893

docs/en/connector-v2/source/Redis.md

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Used to read data from Redis.
2222
| host | string | yes | - |
2323
| port | int | yes | - |
2424
| keys | string | yes | - |
25+
| batch_size | int | yes | 10 |
2526
| data_type | string | yes | - |
2627
| user | string | no | - |
2728
| auth | string | no | - |
@@ -113,6 +114,10 @@ each kv that in hash key it will be treated as a row and send it to upstream.
113114

114115
keys pattern
115116

117+
### batch_size [int]
118+
119+
indicates the number of keys to attempt to return per iteration,default 10
120+
116121
**Tips:Redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type**
117122

118123
### data_type [string]

release-note.md

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
5757
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
5858
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997)
59+
- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085)
5960

6061
### Zeta(ST-Engine)
6162

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redis.client;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
21+
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
22+
23+
import redis.clients.jedis.Jedis;
24+
import redis.clients.jedis.params.ScanParams;
25+
import redis.clients.jedis.resps.ScanResult;
26+
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
31+
public abstract class RedisClient extends Jedis {
32+
33+
protected final RedisParameters redisParameters;
34+
35+
protected final int batchSize;
36+
37+
protected final Jedis jedis;
38+
39+
protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
40+
this.redisParameters = redisParameters;
41+
this.batchSize = redisParameters.getBatchSize();
42+
this.jedis = jedis;
43+
}
44+
45+
public ScanResult<String> scanKeys(
46+
String cursor, int batchSize, String keysPattern, RedisDataType type) {
47+
ScanParams scanParams = new ScanParams();
48+
scanParams.match(keysPattern);
49+
scanParams.count(batchSize);
50+
return jedis.scan(cursor, scanParams, type.name());
51+
}
52+
53+
public abstract List<String> batchGetString(List<String> keys);
54+
55+
public abstract List<List<String>> batchGetList(List<String> keys);
56+
57+
public abstract List<Set<String>> batchGetSet(List<String> keys);
58+
59+
public abstract List<Map<String, String>> batchGetHash(List<String> keys);
60+
61+
public abstract List<List<String>> batchGetZset(List<String> keys);
62+
63+
public abstract void batchWriteString(
64+
List<String> keys, List<String> values, long expireSeconds);
65+
66+
public abstract void batchWriteList(
67+
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
68+
69+
public abstract void batchWriteSet(
70+
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
71+
72+
public abstract void batchWriteHash(
73+
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
74+
75+
public abstract void batchWriteZset(
76+
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redis.client;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
21+
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
22+
23+
import org.apache.commons.collections4.CollectionUtils;
24+
25+
import redis.clients.jedis.Jedis;
26+
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Set;
31+
32+
public class RedisClusterClient extends RedisClient {
33+
public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) {
34+
super(redisParameters, jedis);
35+
}
36+
37+
@Override
38+
public List<String> batchGetString(List<String> keys) {
39+
if (CollectionUtils.isEmpty(keys)) {
40+
return new ArrayList<>();
41+
}
42+
List<String> result = new ArrayList<>(keys.size());
43+
for (String key : keys) {
44+
result.add(jedis.get(key));
45+
}
46+
return result;
47+
}
48+
49+
@Override
50+
public List<List<String>> batchGetList(List<String> keys) {
51+
if (CollectionUtils.isEmpty(keys)) {
52+
return new ArrayList<>();
53+
}
54+
List<List<String>> result = new ArrayList<>(keys.size());
55+
for (String key : keys) {
56+
result.add(jedis.lrange(key, 0, -1));
57+
}
58+
return result;
59+
}
60+
61+
@Override
62+
public List<Set<String>> batchGetSet(List<String> keys) {
63+
if (CollectionUtils.isEmpty(keys)) {
64+
return new ArrayList<>();
65+
}
66+
List<Set<String>> result = new ArrayList<>(keys.size());
67+
for (String key : keys) {
68+
result.add(jedis.smembers(key));
69+
}
70+
return result;
71+
}
72+
73+
@Override
74+
public List<Map<String, String>> batchGetHash(List<String> keys) {
75+
if (CollectionUtils.isEmpty(keys)) {
76+
return new ArrayList<>();
77+
}
78+
List<Map<String, String>> result = new ArrayList<>(keys.size());
79+
for (String key : keys) {
80+
Map<String, String> map = jedis.hgetAll(key);
81+
map.put("hash_key", key);
82+
result.add(map);
83+
}
84+
return result;
85+
}
86+
87+
@Override
88+
public List<List<String>> batchGetZset(List<String> keys) {
89+
if (CollectionUtils.isEmpty(keys)) {
90+
return new ArrayList<>();
91+
}
92+
List<List<String>> result = new ArrayList<>(keys.size());
93+
for (String key : keys) {
94+
result.add(jedis.zrange(key, 0, -1));
95+
}
96+
return result;
97+
}
98+
99+
@Override
100+
public void batchWriteString(List<String> keys, List<String> values, long expireSeconds) {
101+
int size = keys.size();
102+
for (int i = 0; i < size; i++) {
103+
RedisDataType.STRING.set(this, keys.get(i), values.get(i), expireSeconds);
104+
}
105+
}
106+
107+
@Override
108+
public void batchWriteList(List<String> keys, List<String> values, long expireSeconds) {
109+
int size = keys.size();
110+
for (int i = 0; i < size; i++) {
111+
RedisDataType.LIST.set(this, keys.get(i), values.get(i), expireSeconds);
112+
}
113+
}
114+
115+
@Override
116+
public void batchWriteSet(List<String> keys, List<String> values, long expireSeconds) {
117+
int size = keys.size();
118+
for (int i = 0; i < size; i++) {
119+
RedisDataType.SET.set(this, keys.get(i), values.get(i), expireSeconds);
120+
}
121+
}
122+
123+
@Override
124+
public void batchWriteHash(List<String> keys, List<String> values, long expireSeconds) {
125+
int size = keys.size();
126+
for (int i = 0; i < size; i++) {
127+
RedisDataType.HASH.set(this, keys.get(i), values.get(i), expireSeconds);
128+
}
129+
}
130+
131+
@Override
132+
public void batchWriteZset(List<String> keys, List<String> values, long expireSeconds) {
133+
int size = keys.size();
134+
for (int i = 0; i < size; i++) {
135+
RedisDataType.ZSET.set(this, keys.get(i), values.get(i), expireSeconds);
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)