Skip to content

Commit 43d6b21

Browse files
authored
Add support for otfMetadata in Kinesis firehose response metadata for Iceberg routing (#595)
* Add support for otfMetadata in Kinesis firehose response metadata for iceberg table routing * Update tests * Use specific type for otf operations
1 parent 42a01a9 commit 43d6b21

File tree

2 files changed

+62
-31
lines changed

2 files changed

+62
-31
lines changed

events/firehose.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ const (
2525
KinesisFirehoseTransformedStateProcessingFailed = "ProcessingFailed"
2626
)
2727

28+
// KinesisFirehoseOTFOperation represents the operation to apply on the record during on-the-fly record routing.
29+
type KinesisFirehoseOTFOperation string
30+
31+
const (
32+
KinesisFirehoseOTFOperationInsert KinesisFirehoseOTFOperation = "insert"
33+
KinesisFirehoseOTFOperationUpdate KinesisFirehoseOTFOperation = "update"
34+
KinesisFirehoseOTFOperationDelete KinesisFirehoseOTFOperation = "delete"
35+
)
36+
2837
type KinesisFirehoseResponse struct {
2938
Records []KinesisFirehoseResponseRecord `json:"records"`
3039
}
@@ -37,7 +46,14 @@ type KinesisFirehoseResponseRecord struct {
3746
}
3847

3948
type KinesisFirehoseResponseRecordMetadata struct {
40-
PartitionKeys map[string]string `json:"partitionKeys"`
49+
PartitionKeys map[string]string `json:"partitionKeys"`
50+
OTFMetadata KinesisFirehoseResponseRecordOTFMetadata `json:"otfMetadata"`
51+
}
52+
53+
type KinesisFirehoseResponseRecordOTFMetadata struct {
54+
DestinationDatabaseName string `json:"destinationDatabaseName"`
55+
DestinationTableName string `json:"destinationTableName"`
56+
Operation KinesisFirehoseOTFOperation `json:"operation"` // The Operation field must have one of the following values – insert, update, or delete.
4157
}
4258

4359
type KinesisFirehoseRecordMetadata struct {
Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,46 @@
11
{
2-
"records": [
3-
{
4-
"data": "SGVsbG8gV29ybGQ=",
5-
"recordId": "record1",
6-
"result": "TRANSFORMED_STATE_OK",
7-
"metadata": {
8-
"partitionKeys": {}
9-
}
10-
},
11-
{
12-
"data": "SGVsbG8gV29ybGQ=",
13-
"recordId": "record2",
14-
"result": "TRANSFORMED_STATE_DROPPED",
15-
"metadata": {
16-
"partitionKeys": {}
17-
}
18-
},
19-
{
20-
"data": "SGVsbG8gV29ybGQ=",
21-
"recordId": "record3",
22-
"result": "TransformedStateOk",
23-
"metadata": {
24-
"partitionKeys": {
25-
"iamKey1": "iamValue1",
26-
"iamKey2": "iamValue2"
27-
}
28-
}
29-
}
30-
]
31-
}
2+
"records": [
3+
{
4+
"data": "SGVsbG8gV29ybGQ=",
5+
"recordId": "record1",
6+
"result": "TRANSFORMED_STATE_OK",
7+
"metadata": {
8+
"partitionKeys": {},
9+
"otfMetadata": {
10+
"destinationTableName": "",
11+
"destinationDatabaseName": "",
12+
"operation": ""
13+
}
14+
}
15+
},
16+
{
17+
"data": "SGVsbG8gV29ybGQ=",
18+
"recordId": "record2",
19+
"result": "TRANSFORMED_STATE_DROPPED",
20+
"metadata": {
21+
"partitionKeys": {},
22+
"otfMetadata": {
23+
"destinationTableName": "",
24+
"destinationDatabaseName": "",
25+
"operation": ""
26+
}
27+
}
28+
},
29+
{
30+
"data": "SGVsbG8gV29ybGQ=",
31+
"recordId": "record3",
32+
"result": "TransformedStateOk",
33+
"metadata": {
34+
"partitionKeys": {
35+
"iamKey1": "iamValue1",
36+
"iamKey2": "iamValue2"
37+
},
38+
"otfMetadata": {
39+
"destinationTableName": "table1",
40+
"destinationDatabaseName": "database1",
41+
"operation": "update"
42+
}
43+
}
44+
}
45+
]
46+
}

0 commit comments

Comments
 (0)