@@ -18,12 +18,15 @@ mod prometheus;
18
18
use std:: collections:: { BTreeMap , HashMap } ;
19
19
use std:: fmt:: Debug ;
20
20
use std:: num:: NonZeroU64 ;
21
+ use std:: str:: FromStr ;
21
22
use std:: sync:: Arc ;
22
23
23
24
use anyhow:: { anyhow, Context } ;
24
25
use async_trait:: async_trait;
25
26
use iceberg:: arrow:: { arrow_schema_to_schema, schema_to_arrow_schema} ;
26
- use iceberg:: spec:: { DataFile , SerializedDataFile } ;
27
+ use iceberg:: spec:: {
28
+ DataFile , SerializedDataFile , Transform , UnboundPartitionField , UnboundPartitionSpec ,
29
+ } ;
27
30
use iceberg:: table:: Table ;
28
31
use iceberg:: transaction:: Transaction ;
29
32
use iceberg:: writer:: base_writer:: data_file_writer:: DataFileWriterBuilder ;
@@ -47,6 +50,7 @@ use itertools::Itertools;
47
50
use parquet:: file:: properties:: WriterProperties ;
48
51
use prometheus:: monitored_general_writer:: MonitoredGeneralWriterBuilder ;
49
52
use prometheus:: monitored_position_delete_writer:: MonitoredPositionDeleteWriterBuilder ;
53
+ use regex:: Regex ;
50
54
use risingwave_common:: array:: arrow:: arrow_array_iceberg:: { Int32Array , RecordBatch } ;
51
55
use risingwave_common:: array:: arrow:: arrow_schema_iceberg:: {
52
56
self , DataType as ArrowDataType , Field as ArrowField , Schema as ArrowSchema , SchemaRef ,
@@ -109,6 +113,13 @@ pub struct IcebergConfig {
109
113
#[ serde( skip) ]
110
114
pub java_catalog_props : HashMap < String , String > ,
111
115
116
+ #[ serde(
117
+ rename = "partition_by" ,
118
+ default ,
119
+ deserialize_with = "deserialize_optional_string_seq_from_string"
120
+ ) ]
121
+ pub partition_by : Option < Vec < String > > ,
122
+
112
123
/// Commit every n(>0) checkpoints, default is 10.
113
124
#[ serde( default = "default_commit_checkpoint_interval" ) ]
114
125
#[ serde_as( as = "DisplayFromStr" ) ]
@@ -311,9 +322,53 @@ impl IcebergSink {
311
322
}
312
323
} ;
313
324
325
+ let partition_fields = match & self . config . partition_by {
326
+ Some ( partition_by) => {
327
+ let mut partition_fields =
328
+ Vec :: < UnboundPartitionField > :: with_capacity ( partition_by. len ( ) ) ;
329
+ for partition_field in partition_by {
330
+ let re = Regex :: new ( r"\w+(\(\w+\))?" ) . unwrap ( ) ;
331
+ if !re. is_match ( partition_field) {
332
+ bail ! ( format!( "Invalid partition field: {}" , partition_field) )
333
+ }
334
+ let ( _, [ field1, field2] ) = re. captures ( partition_field) . unwrap ( ) . extract ( ) ;
335
+ let mut func = field1. to_owned ( ) ;
336
+ let mut column = field2. to_owned ( ) ;
337
+ if column. is_empty ( ) {
338
+ column. replace_range ( .., & func) ;
339
+ func. replace_range ( .., "identity" ) ;
340
+ }
341
+ let transform = Transform :: from_str ( & func) . unwrap ( ) ;
342
+ if transform == Transform :: Unknown {
343
+ bail ! ( format!( "Invalid partition field: {}" , partition_field) )
344
+ }
345
+ for ( pos, col) in self . param . columns . iter ( ) . enumerate ( ) {
346
+ if col. name == column {
347
+ partition_fields. push (
348
+ UnboundPartitionField :: builder ( )
349
+ . source_id ( pos as i32 )
350
+ . transform ( transform)
351
+ . name ( column. to_owned ( ) )
352
+ . build ( ) ,
353
+ ) ;
354
+ break ;
355
+ }
356
+ }
357
+ }
358
+ partition_fields
359
+ }
360
+ None => Vec :: < UnboundPartitionField > :: new ( ) ,
361
+ } ;
362
+
314
363
let table_creation_builder = TableCreation :: builder ( )
315
364
. name ( self . config . common . table_name . clone ( ) )
316
- . schema ( iceberg_schema) ;
365
+ . schema ( iceberg_schema)
366
+ . partition_spec (
367
+ UnboundPartitionSpec :: builder ( )
368
+ . add_partition_fields ( partition_fields)
369
+ . unwrap ( )
370
+ . build ( ) ,
371
+ ) ;
317
372
318
373
let table_creation = match location {
319
374
Some ( location) => table_creation_builder. location ( location) . build ( ) ,
@@ -1402,6 +1457,7 @@ mod test {
1402
1457
( "connector" , "iceberg" ) ,
1403
1458
( "type" , "upsert" ) ,
1404
1459
( "primary_key" , "v1" ) ,
1460
+ ( "partition_by" , "v1,identity(v2)" ) ,
1405
1461
( "warehouse.path" , "s3://iceberg" ) ,
1406
1462
( "s3.endpoint" , "http://127.0.0.1:9301" ) ,
1407
1463
( "s3.access.key" , "hummockadmin" ) ,
@@ -1445,6 +1501,7 @@ mod test {
1445
1501
r#type : "upsert" . to_owned ( ) ,
1446
1502
force_append_only : false ,
1447
1503
primary_key : Some ( vec ! [ "v1" . to_owned( ) ] ) ,
1504
+ partition_by : Some ( vec ! [ "v1" . to_owned( ) , "identity(v2)" . to_owned( ) ] ) ,
1448
1505
java_catalog_props : [ ( "jdbc.user" , "admin" ) , ( "jdbc.password" , "123456" ) ]
1449
1506
. into_iter ( )
1450
1507
. map ( |( k, v) | ( k. to_owned ( ) , v. to_owned ( ) ) )
0 commit comments