Skip to content

Commit cd373f3

Browse files
feat: redis sink using depot library (#206)
* feat: redis sink using depot library * docs: redis sink using depot library * chore: version bump * fix: remove descriptors * chore: version bump * chore: version bump
1 parent 477bb8b commit cd373f3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+26
-2744
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ lombok {
3333
}
3434

3535
group 'io.odpf'
36-
version '0.6.1'
36+
version '0.7.0'
3737

3838
def projName = "firehose"
3939

@@ -101,7 +101,7 @@ dependencies {
101101
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
102102
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
103103
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
104-
implementation group: 'io.odpf', name: 'depot', version: '0.3.4'
104+
implementation group: 'io.odpf', name: 'depot', version: '0.3.5'
105105
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
106106

107107
testImplementation group: 'junit', name: 'junit', version: '4.11'

docs/docs/sinks/redis-sink.md

Lines changed: 13 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,21 @@
1-
# Redis
1+
# Redis Sink
22

3-
A Redis sink Firehose \(`SINK_TYPE`=`redis`\) requires the following variables to be set along with Generic ones
3+
Redis Sink is implemented in Firehose using the Redis sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot).
44

5-
### `SINK_REDIS_URLS`
5+
### Data Types
6+
Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List
7+
- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping)
8+
- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_list_data_field_name)
9+
- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_key_value_data_field_name)
610

7-
REDIS instance hostname/IP address followed by its port.
11+
The `key` is picked up from a field in the message itself.
812

9-
- Example value: `localhos:6379,localhost:6380`
10-
- Type: `required`
13+
Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.
1114

12-
### `SINK_REDIS_DATA_TYPE`
15+
### Configuration
1316

14-
To select whether you want to push your data as a HashSet or as a List.
17+
For Redis sink in Firehose we need to set first (`SINK_TYPE`=`redis`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Redis sink specific configs are mentioned in ODPF Depot repository. You can check out the Redis Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md)
1518

16-
- Example value: `Hashset`
17-
- Type: `required`
18-
- Default value: `List`
1919

20-
### `SINK_REDIS_KEY_TEMPLATE`
21-
22-
The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key.
23-
24-
- Example value: `Service\_%%s,1`
25-
26-
This will take the value with index 1 from proto and create the Redis keys as per the template\
27-
28-
- Type: `required`
29-
30-
### `INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING`
31-
32-
This is the field that decides what all data will be stored in the HashSet for each message.
33-
34-
- Example value: `{"6":"customer_id", "2":"order_num"}`
35-
- Type: `required (For Hashset)`
36-
37-
### `SINK_REDIS_LIST_DATA_PROTO_INDEX`
38-
39-
This field decides what all data will be stored in the List for each message.
40-
41-
- Example value: `6`
42-
43-
This will get the value of the field with index 6 in your proto and push that to the Redis list with the corresponding keyTemplate\
44-
45-
- Type: `required (For List)`
46-
47-
### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX`
48-
49-
This field decides what data will be stored in the value part of key-value pair
50-
51-
- Example value: `6`
52-
53-
This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\
54-
55-
- Type: `required (For KeyValue)`
56-
57-
### `SINK_REDIS_TTL_TYPE`
58-
59-
- Example value: `DURATION`
60-
- Type: `optional`
61-
- Default value: `DISABLE`
62-
- Choice of Redis TTL type.It can be:\
63-
- `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)\
64-
- `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired
65-
66-
### `SINK_REDIS_TTL_VALUE`
67-
68-
Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type.
69-
70-
- Example value: `100000`
71-
- Type: `optional`
72-
- Default value: `0`
73-
74-
### `SINK_REDIS_DEPLOYMENT_TYPE`
75-
76-
The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types.
77-
78-
- Example value: `Standalone`
79-
- Type: `required`
80-
- Default value: `Standalone`
20+
### Deployment Types
21+
Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`.

src/main/java/io/odpf/firehose/config/RedisSinkConfig.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/io/odpf/firehose/sink/SinkFactory.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
import io.odpf.depot.bigquery.BigQuerySink;
44
import io.odpf.depot.bigquery.BigQuerySinkFactory;
55
import io.odpf.depot.config.BigQuerySinkConfig;
6+
import io.odpf.depot.config.RedisSinkConfig;
67
import io.odpf.depot.bigtable.BigTableSinkFactory;
78
import io.odpf.depot.bigtable.BigTableSink;
89
import io.odpf.depot.config.BigTableSinkConfig;
910
import io.odpf.depot.log.LogSink;
1011
import io.odpf.depot.log.LogSinkFactory;
1112
import io.odpf.depot.metrics.StatsDReporter;
13+
import io.odpf.depot.redis.RedisSink;
14+
import io.odpf.depot.redis.RedisSinkFactory;
1215
import io.odpf.firehose.config.KafkaConsumerConfig;
1316
import io.odpf.firehose.config.enums.SinkType;
1417
import io.odpf.firehose.consumer.kafka.OffsetManager;
@@ -23,7 +26,6 @@
2326
import io.odpf.firehose.sink.jdbc.JdbcSinkFactory;
2427
import io.odpf.firehose.sink.mongodb.MongoSinkFactory;
2528
import io.odpf.firehose.sink.prometheus.PromSinkFactory;
26-
import io.odpf.firehose.sink.redis.RedisSinkFactory;
2729
import io.odpf.stencil.client.StencilClient;
2830
import org.aeonbits.owner.ConfigFactory;
2931

@@ -38,6 +40,7 @@ public class SinkFactory {
3840
private BigQuerySinkFactory bigQuerySinkFactory;
3941
private BigTableSinkFactory bigTableSinkFactory;
4042
private LogSinkFactory logSinkFactory;
43+
private RedisSinkFactory redisSinkFactory;
4144
private final Map<String, String> config;
4245

4346
public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
@@ -61,7 +64,6 @@ public void init() {
6164
case HTTP:
6265
case INFLUXDB:
6366
case ELASTICSEARCH:
64-
case REDIS:
6567
case GRPC:
6668
case PROMETHEUS:
6769
case BLOB:
@@ -71,6 +73,12 @@ public void init() {
7173
logSinkFactory = new LogSinkFactory(config, statsDReporter);
7274
logSinkFactory.init();
7375
return;
76+
case REDIS:
77+
redisSinkFactory = new RedisSinkFactory(
78+
ConfigFactory.create(RedisSinkConfig.class, config),
79+
statsDReporter);
80+
redisSinkFactory.init();
81+
return;
7482
case BIGQUERY:
7583
BigquerySinkUtils.addMetadataColumns(config);
7684
bigQuerySinkFactory = new BigQuerySinkFactory(
@@ -105,7 +113,7 @@ public Sink getSink() {
105113
case ELASTICSEARCH:
106114
return EsSinkFactory.create(config, statsDReporter, stencilClient);
107115
case REDIS:
108-
return RedisSinkFactory.create(config, statsDReporter, stencilClient);
116+
return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), sinkType.name(), redisSinkFactory.create());
109117
case GRPC:
110118
return GrpcSinkFactory.create(config, statsDReporter, stencilClient);
111119
case PROMETHEUS:

src/main/java/io/odpf/firehose/sink/redis/RedisSink.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)