@@ -29,6 +29,7 @@ pub use upsert::UpsertFormatter;
29
29
use super :: catalog:: { SinkEncode , SinkFormat , SinkFormatDesc } ;
30
30
use super :: encoder:: template:: TemplateEncoder ;
31
31
use super :: encoder:: KafkaConnectParams ;
32
+ use super :: redis:: { KEY_FORMAT , VALUE_FORMAT } ;
32
33
use crate :: sink:: encoder:: { JsonEncoder , ProtoEncoder , TimestampHandlingMode } ;
33
34
34
35
/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
@@ -92,7 +93,7 @@ impl SinkFormatterImpl {
92
93
let key_encoder = ( !pk_indices. is_empty ( ) ) . then ( || {
93
94
JsonEncoder :: new (
94
95
schema. clone ( ) ,
95
- Some ( pk_indices) ,
96
+ Some ( pk_indices. clone ( ) ) ,
96
97
TimestampHandlingMode :: Milli ,
97
98
)
98
99
} ) ;
@@ -115,6 +116,28 @@ impl SinkFormatterImpl {
115
116
Ok ( SinkFormatterImpl :: AppendOnlyProto ( formatter) )
116
117
}
117
118
SinkEncode :: Avro => err_unsupported ( ) ,
119
+ SinkEncode :: Template => {
120
+ let key_format = format_desc. options . get ( KEY_FORMAT ) . ok_or_else ( || {
121
+ SinkError :: Config ( anyhow ! (
122
+ "Cannot find 'key_format',please set it or use JSON"
123
+ ) )
124
+ } ) ?;
125
+ let value_format =
126
+ format_desc. options . get ( VALUE_FORMAT ) . ok_or_else ( || {
127
+ SinkError :: Config ( anyhow ! (
128
+ "Cannot find 'redis_value_format',please set it or use JSON"
129
+ ) )
130
+ } ) ?;
131
+ let key_encoder = TemplateEncoder :: new (
132
+ schema. clone ( ) ,
133
+ Some ( pk_indices) ,
134
+ key_format. clone ( ) ,
135
+ ) ;
136
+ let val_encoder = TemplateEncoder :: new ( schema, None , value_format. clone ( ) ) ;
137
+ Ok ( SinkFormatterImpl :: AppendOnlyTemplate (
138
+ AppendOnlyFormatter :: new ( Some ( key_encoder) , val_encoder) ,
139
+ ) )
140
+ }
118
141
}
119
142
}
120
143
SinkFormat :: Debezium => {
@@ -131,85 +154,66 @@ impl SinkFormatterImpl {
131
154
) ) )
132
155
}
133
156
SinkFormat :: Upsert => {
134
- if format_desc. encode != SinkEncode :: Json {
135
- return err_unsupported ( ) ;
136
- }
157
+ match format_desc. encode {
158
+ SinkEncode :: Json => {
159
+ let mut key_encoder = JsonEncoder :: new (
160
+ schema. clone ( ) ,
161
+ Some ( pk_indices) ,
162
+ TimestampHandlingMode :: Milli ,
163
+ ) ;
164
+ let mut val_encoder =
165
+ JsonEncoder :: new ( schema, None , TimestampHandlingMode :: Milli ) ;
137
166
138
- let mut key_encoder = JsonEncoder :: new (
139
- schema . clone ( ) ,
140
- Some ( pk_indices ) ,
141
- TimestampHandlingMode :: Milli ,
142
- ) ;
143
- let mut val_encoder = JsonEncoder :: new ( schema , None , TimestampHandlingMode :: Milli ) ;
144
-
145
- if let Some ( s ) = format_desc . options . get ( "schemas.enable" ) {
146
- match s . to_lowercase ( ) . parse :: < bool > ( ) {
147
- Ok ( true ) => {
148
- let kafka_connect = KafkaConnectParams {
149
- schema_name : format ! ( "{}.{}" , db_name , sink_from_name ) ,
150
- } ;
151
- key_encoder = key_encoder . with_kafka_connect ( kafka_connect . clone ( ) ) ;
152
- val_encoder = val_encoder . with_kafka_connect ( kafka_connect ) ;
153
- }
154
- Ok ( false ) => { }
155
- _ => {
156
- return Err ( SinkError :: Config ( anyhow ! (
157
- "schemas.enable is expected to be `true` or `false`, got {}" ,
158
- s
159
- ) ) ) ;
160
- }
167
+ if let Some ( s ) = format_desc . options . get ( "schemas.enable" ) {
168
+ match s . to_lowercase ( ) . parse :: < bool > ( ) {
169
+ Ok ( true ) => {
170
+ let kafka_connect = KafkaConnectParams {
171
+ schema_name : format ! ( "{}.{}" , db_name , sink_from_name ) ,
172
+ } ;
173
+ key_encoder =
174
+ key_encoder . with_kafka_connect ( kafka_connect . clone ( ) ) ;
175
+ val_encoder = val_encoder . with_kafka_connect ( kafka_connect ) ;
176
+ }
177
+ Ok ( false ) => { }
178
+ _ => {
179
+ return Err ( SinkError :: Config ( anyhow ! (
180
+ "schemas.enable is expected to be `true` or `false`, got {}" ,
181
+ s
182
+ ) ) ) ;
183
+ }
184
+ }
185
+ } ;
186
+
187
+ // Initialize the upsert_stream
188
+ let formatter = UpsertFormatter :: new ( key_encoder , val_encoder ) ;
189
+ Ok ( SinkFormatterImpl :: UpsertJson ( formatter ) )
161
190
}
162
- } ;
163
-
164
- // Initialize the upsert_stream
165
- let formatter = UpsertFormatter :: new ( key_encoder, val_encoder) ;
166
- Ok ( SinkFormatterImpl :: UpsertJson ( formatter) )
167
- }
168
- }
169
- }
170
-
171
- pub fn new_with_redis (
172
- schema : Schema ,
173
- pk_indices : Vec < usize > ,
174
- is_append_only : bool ,
175
- key_format : Option < String > ,
176
- value_format : Option < String > ,
177
- ) -> Result < Self > {
178
- match ( key_format, value_format) {
179
- ( Some ( k) , Some ( v) ) => {
180
- let key_encoder = TemplateEncoder :: new (
181
- schema. clone ( ) ,
182
- Some ( pk_indices) ,
183
- k,
184
- ) ;
185
- let val_encoder =
186
- TemplateEncoder :: new ( schema, None , v) ;
187
- if is_append_only {
188
- Ok ( SinkFormatterImpl :: AppendOnlyTemplate ( AppendOnlyFormatter :: new ( Some ( key_encoder) , val_encoder) ) )
189
- } else {
190
- Ok ( SinkFormatterImpl :: UpsertTemplate ( UpsertFormatter :: new ( key_encoder, val_encoder) ) )
191
- }
192
- }
193
- ( None , None ) => {
194
- let key_encoder = JsonEncoder :: new (
195
- schema. clone ( ) ,
196
- Some ( pk_indices) ,
197
- TimestampHandlingMode :: Milli ,
198
- ) ;
199
- let val_encoder = JsonEncoder :: new (
200
- schema,
201
- None ,
202
- TimestampHandlingMode :: Milli ,
203
- ) ;
204
- if is_append_only {
205
- Ok ( SinkFormatterImpl :: AppendOnlyJson ( AppendOnlyFormatter :: new ( Some ( key_encoder) , val_encoder) ) )
206
- } else {
207
- Ok ( SinkFormatterImpl :: UpsertJson ( UpsertFormatter :: new ( key_encoder, val_encoder) ) )
191
+ SinkEncode :: Template => {
192
+ let key_format = format_desc. options . get ( KEY_FORMAT ) . ok_or_else ( || {
193
+ SinkError :: Config ( anyhow ! (
194
+ "Cannot find 'key_format',please set it or use JSON"
195
+ ) )
196
+ } ) ?;
197
+ let value_format =
198
+ format_desc. options . get ( VALUE_FORMAT ) . ok_or_else ( || {
199
+ SinkError :: Config ( anyhow ! (
200
+ "Cannot find 'redis_value_format',please set it or use JSON"
201
+ ) )
202
+ } ) ?;
203
+ let key_encoder = TemplateEncoder :: new (
204
+ schema. clone ( ) ,
205
+ Some ( pk_indices) ,
206
+ key_format. clone ( ) ,
207
+ ) ;
208
+ let val_encoder = TemplateEncoder :: new ( schema, None , value_format. clone ( ) ) ;
209
+ Ok ( SinkFormatterImpl :: UpsertTemplate ( UpsertFormatter :: new (
210
+ key_encoder,
211
+ val_encoder,
212
+ ) ) )
213
+ }
214
+ _ => err_unsupported ( ) ,
208
215
}
209
216
}
210
- _ => {
211
- Err ( SinkError :: Encode ( "Please provide template formats for both key and value, or choose the JSON format." . to_string ( ) ) )
212
- }
213
217
}
214
218
}
215
219
}
0 commit comments