Skip to content

Commit d4caef7

Browse files
authored
[Fix] Fix Debezium format cannot parse date/time/timestamp (apache#5887)
1 parent ebb64f1 commit d4caef7

File tree

11 files changed

+641
-71
lines changed

11 files changed

+641
-71
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java

+4
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ public TableIdentifier getTableId() {
114114
return tableId;
115115
}
116116

117+
public TablePath getTablePath() {
118+
return tableId.toTablePath();
119+
}
120+
117121
public TableSchema getTableSchema() {
118122
return tableSchema;
119123
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java

+331-44
Large diffs are not rendered by default.

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/debezium_data.txt

+3-11
Large diffs are not rendered by default.

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debeziumFormatIT/kafkasource_debezium_cdc_to_pgsql.conf

+103-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,66 @@
1818
###### This config file is a demonstration of streaming processing in seatunnel config
1919
######
2020

21+
22+
// The DDL of mysql table
23+
//create table mysql_cdc.mysql_cdc_e2e_source_table
24+
//(
25+
// id int auto_increment
26+
// primary key,
27+
// f_binary binary(64) null,
28+
// f_blob blob null,
29+
// f_long_varbinary mediumblob null,
30+
// f_longblob longblob null,
31+
// f_tinyblob tinyblob null,
32+
// f_varbinary varbinary(100) null,
33+
// f_smallint smallint null,
34+
// f_smallint_unsigned smallint unsigned null,
35+
// f_mediumint mediumint null,
36+
// f_mediumint_unsigned mediumint unsigned null,
37+
// f_int int null,
38+
// f_int_unsigned int unsigned null,
39+
// f_integer int null,
40+
// f_integer_unsigned int unsigned null,
41+
// f_bigint bigint null,
42+
// f_bigint_unsigned bigint unsigned null,
43+
// f_numeric decimal null,
44+
// f_decimal decimal null,
45+
// f_float float null,
46+
// f_double double null,
47+
// f_double_precision double null,
48+
// f_longtext longtext null,
49+
// f_mediumtext mediumtext null,
50+
// f_text text null,
51+
// f_tinytext tinytext null,
52+
// f_varchar varchar(100) null,
53+
// f_date date null,
54+
// f_datetime datetime null,
55+
// f_timestamp timestamp null,
56+
// f_bit1 bit null,
57+
// f_bit64 bit(64) null,
58+
// f_char char null,
59+
// f_enum enum ('enum1', 'enum2', 'enum3') null,
60+
// f_mediumblob mediumblob null,
61+
// f_long_varchar mediumtext null,
62+
// f_real double null,
63+
// f_time time null,
64+
// f_tinyint tinyint null,
65+
// f_tinyint_unsigned tinyint unsigned null,
66+
// f_json json null,
67+
// f_year year null
68+
//);
69+
70+
// The DML of mysql table
71+
// INSERT INTO mysql_cdc.mysql_cdc_e2e_source_table (id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob,
72+
// f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year) VALUES (1, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, null, 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12,
73+
// 'This is a long text field', 'This is a medium text field', 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', true, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 12.345, '14:30:00', -128, 255, '{"key": "value"}', 2022);
74+
// INSERT INTO mysql_cdc.mysql_cdc_e2e_source_table (id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob,
75+
// f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year) VALUES (2, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, null, 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12,
76+
// 'This is a long text field', 'This is a medium text field', 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', true, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345, '14:30:00', -128, 22, '{"key": "value"}', 2013);
77+
// INSERT INTO mysql_cdc.mysql_cdc_e2e_source_table (id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob,
78+
// f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year) VALUES (3, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, null, 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12,
79+
// 'This is a long text field', 'This is a medium text field', 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40', true, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2', 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field', 112.345, '14:30:00', -128, 22, '{"key": "value"}', 2021);
80+
2181
env {
2282
execution.parallelism = 1
2383
job.mode = "BATCH"
@@ -38,10 +98,48 @@ source {
3898
format = debezium_json
3999
schema = {
40100
fields {
41-
id = "int"
42-
name = "string"
43-
description = "string"
44-
weight = "float"
101+
id = "int"
102+
f_binary = "bytes"
103+
f_blob = "bytes"
104+
f_long_varbinary = "bytes"
105+
f_longblob = "bytes"
106+
f_tinyblob = "bytes"
107+
f_varbinary = "string"
108+
f_smallint = "smallint"
109+
f_smallint_unsigned = "int"
110+
f_mediumint = "int"
111+
f_mediumint_unsigned = "int"
112+
f_int = "int"
113+
f_int_unsigned = "bigint"
114+
f_integer = "int"
115+
f_integer_unsigned = "bigint"
116+
f_bigint = "bigint"
117+
f_bigint_unsigned = "decimal(10, 0)"
118+
f_numeric = "decimal(10, 0)"
119+
f_decimal = "decimal(10, 0)"
120+
f_float = "float"
121+
f_double = "double"
122+
f_double_precision = "double"
123+
f_longtext = "string"
124+
f_mediumtext = "string"
125+
f_text = "string"
126+
f_tinytext = "string"
127+
f_varchar = "string"
128+
f_date = "date"
129+
f_datetime = "timestamp"
130+
f_timestamp = "timestamp"
131+
f_bit1 = "boolean"
132+
f_bit64 = "tinyint"
133+
f_char = "string"
134+
f_enum = "string"
135+
f_mediumblob = "bytes"
136+
f_long_varchar = "string"
137+
f_real = "double"
138+
f_time = "time"
139+
f_tinyint = "tinyint"
140+
f_tinyint_unsigned = "int"
141+
f_json = "string"
142+
f_year = "int"
45143
}
46144
}
47145
}
@@ -55,7 +153,7 @@ sink {
55153
password = test
56154
generate_sink_sql = true
57155
database = test
58-
table = public.sink
156+
table = public.sink2
59157
primary_keys = ["id"]
60158
}
61159
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debeziumFormatIT/kafkasource_debezium_to_kafka.conf

+41-3
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,47 @@ source {
3939
schema = {
4040
fields {
4141
id = "int"
42-
name = "string"
43-
description = "string"
44-
weight = "float"
42+
f_binary = "bytes"
43+
f_blob = "bytes"
44+
f_long_varbinary = "bytes"
45+
f_longblob = "bytes"
46+
f_tinyblob = "bytes"
47+
f_varbinary = "string"
48+
f_smallint = "smallint"
49+
f_smallint_unsigned = "int"
50+
f_mediumint = "int"
51+
f_mediumint_unsigned = "int"
52+
f_int = "int"
53+
f_int_unsigned = "bigint"
54+
f_integer = "int"
55+
f_integer_unsigned = "bigint"
56+
f_bigint = "bigint"
57+
f_bigint_unsigned = "decimal(10, 0)"
58+
f_numeric = "decimal(10, 0)"
59+
f_decimal = "decimal(10, 0)"
60+
f_float = "float"
61+
f_double = "double"
62+
f_double_precision = "double"
63+
f_longtext = "string"
64+
f_mediumtext = "string"
65+
f_text = "string"
66+
f_tinytext = "string"
67+
f_varchar = "string"
68+
f_date = "date"
69+
f_datetime = "timestamp"
70+
f_timestamp = "timestamp"
71+
f_bit1 = "boolean"
72+
f_bit64 = "tinyint"
73+
f_char = "string"
74+
f_enum = "string"
75+
f_mediumblob = "bytes"
76+
f_long_varchar = "string"
77+
f_real = "double"
78+
f_time = "time"
79+
f_tinyint = "tinyint"
80+
f_tinyint_unsigned = "int"
81+
f_json = "string"
82+
f_year = "int"
4583
}
4684
}
4785
}

seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ public SeaTunnelRow deserialize(SourceRecord record)
6060
String key = debeziumJsonConverter.serializeKey(record);
6161
String value = debeziumJsonConverter.serializeValue(record);
6262
Object[] fields = new Object[] {record.topic(), key, value};
63-
SeaTunnelRow row = new SeaTunnelRow(fields);
64-
return row;
63+
return new SeaTunnelRow(fields);
6564
}
6665

6766
@Override

seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java

-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
@RequiredArgsConstructor
3131
public class CompatibleDebeziumJsonSerializationSchema implements SerializationSchema {
32-
public static final String IDENTIFIER = CompatibleDebeziumJsonDeserializationSchema.IDENTIFIER;
3332

3433
private final int index;
3534

seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/DebeziumJsonConverter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public class DebeziumJsonConverter implements Serializable {
4141

4242
private final boolean keySchemaEnable;
4343
private final boolean valueSchemaEnable;
44-
private transient JsonConverter keyConverter;
45-
private transient JsonConverter valueConverter;
44+
private transient volatile JsonConverter keyConverter;
45+
private transient volatile JsonConverter valueConverter;
4646
private transient Method keyConverterMethod;
4747
private transient Method valueConverterMethod;
4848

seatunnel-formats/seatunnel-format-json/pom.xml

-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
<artifactId>seatunnel-format-json</artifactId>
2626
<name>SeaTunnel : Formats : Json</name>
2727

28-
<properties />
29-
3028
<dependencies>
3129

3230
<dependency>

seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema<
5151

5252
private final JsonDeserializationSchema jsonDeserializer;
5353

54+
private final DebeziumRowConverter debeziumRowConverter;
55+
5456
private final boolean ignoreParseErrors;
5557

5658
private final boolean debeziumEnabledSchema;
@@ -60,6 +62,7 @@ public DebeziumJsonDeserializationSchema(SeaTunnelRowType rowType, boolean ignor
6062
this.ignoreParseErrors = ignoreParseErrors;
6163
this.jsonDeserializer =
6264
new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType));
65+
this.debeziumRowConverter = new DebeziumRowConverter(rowType);
6366
this.debeziumEnabledSchema = false;
6467
}
6568

@@ -69,6 +72,7 @@ public DebeziumJsonDeserializationSchema(
6972
this.ignoreParseErrors = ignoreParseErrors;
7073
this.jsonDeserializer =
7174
new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType));
75+
this.debeziumRowConverter = new DebeziumRowConverter(rowType);
7276
this.debeziumEnabledSchema = debeziumEnabledSchema;
7377
}
7478

@@ -140,7 +144,7 @@ private JsonNode convertBytes(byte[] message) {
140144
}
141145

142146
private SeaTunnelRow convertJsonNode(JsonNode root) {
143-
return jsonDeserializer.convertToRowData(root);
147+
return debeziumRowConverter.serializeValue(root);
144148
}
145149

146150
@Override

0 commit comments

Comments
 (0)