Skip to content

Commit cf91e51

Browse files
authored
[Improve][Connector-V2] Support schema evolution for mysql-cdc and mysql-jdbc (apache#6929)
1 parent 2abbf69 commit cf91e51

File tree

37 files changed

+2517
-45
lines changed

37 files changed

+2517
-45
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java

+3
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,7 @@ protected Column(
191191

192192
/** Returns a copy of the column with a replaced name. */
193193
public abstract Column rename(String newColumnName);
194+
195+
/** Returns a copy of the column with a replaced sourceType. */
196+
public abstract Column reSourceType(String sourceType);
194197
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java

+5
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,9 @@ public Column rename(String newColumnName) {
8383
defaultValue,
8484
comment);
8585
}
86+
87+
@Override
88+
public Column reSourceType(String sourceType) {
89+
throw new UnsupportedOperationException("Not implemented");
90+
}
8691
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java

+18
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,22 @@ public Column rename(String newColumnName) {
325325
bitLen,
326326
longColumnLength);
327327
}
328+
329+
@Override
330+
public Column reSourceType(String newSourceType) {
331+
return new PhysicalColumn(
332+
name,
333+
dataType,
334+
columnLength,
335+
scale,
336+
nullable,
337+
defaultValue,
338+
comment,
339+
newSourceType,
340+
options,
341+
isUnsigned,
342+
isZeroFill,
343+
bitLen,
344+
longColumnLength);
345+
}
328346
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
@ToString(callSuper = true)
2525
public abstract class AlterTableColumnEvent extends AlterTableEvent {
26+
2627
public AlterTableColumnEvent(TableIdentifier tableIdentifier) {
2728
super(tableIdentifier);
2829
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public abstract class TableEvent implements SchemaChangeEvent {
3333
protected final TableIdentifier tableIdentifier;
3434
@Getter @Setter private String jobId;
3535
@Getter @Setter private String statement;
36+
@Getter @Setter protected String sourceDialectName;
3637

3738
@Override
3839
public TableIdentifier tableIdentifier() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.base.schema;
19+
20+
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
21+
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.kafka.connect.data.Struct;
25+
import org.apache.kafka.connect.source.SourceRecord;
26+
27+
import com.google.common.collect.Lists;
28+
import io.debezium.relational.history.HistoryRecord;
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
import java.util.List;
32+
33+
@Slf4j
34+
public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver {
35+
36+
protected static final List<String> SUPPORT_DDL = Lists.newArrayList("ALTER TABLE");
37+
38+
protected JdbcSourceConfig jdbcSourceConfig;
39+
40+
public AbstractSchemaChangeResolver(JdbcSourceConfig jdbcSourceConfig) {
41+
this.jdbcSourceConfig = jdbcSourceConfig;
42+
}
43+
44+
@Override
45+
public boolean support(SourceRecord record) {
46+
String ddl = SourceRecordUtils.getDdl(record);
47+
Struct value = (Struct) record.value();
48+
List<Struct> tableChanges = value.getArray(HistoryRecord.Fields.TABLE_CHANGES);
49+
if (tableChanges == null || tableChanges.isEmpty()) {
50+
log.warn("Ignoring statement for non-captured table {}", ddl);
51+
return false;
52+
}
53+
return StringUtils.isNotBlank(ddl)
54+
&& SUPPORT_DDL.stream()
55+
.map(String::toUpperCase)
56+
.anyMatch(prefix -> ddl.toUpperCase().contains(prefix));
57+
}
58+
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java

+6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.debezium.data.Envelope;
2929
import io.debezium.document.DocumentReader;
3030
import io.debezium.relational.TableId;
31+
import io.debezium.relational.history.HistoryRecord;
3132
import io.debezium.util.SchemaNameAdjuster;
3233

3334
import java.math.BigDecimal;
@@ -214,4 +215,9 @@ public static TablePath getTablePath(SourceRecord record) {
214215
}
215216
return TablePath.of(databaseName, schemaName, tableName);
216217
}
218+
219+
public static String getDdl(SourceRecord record) {
220+
Struct schemaChangeStruct = (Struct) record.value();
221+
return schemaChangeStruct.getString(HistoryRecord.Fields.DDL_STATEMENTS);
222+
}
217223
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,9 @@ private void deserializeSchemaChangeRecord(
119119
SourceRecord record, Collector<SeaTunnelRow> collector) {
120120
SchemaChangeEvent schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo);
121121
if (schemaChangeEvent == null) {
122-
log.info("Unsupported resolve schemaChangeEvent {}, just skip.", record);
122+
log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", record);
123123
return;
124124
}
125-
126125
if (resultTypeInfo instanceof MultipleRowType) {
127126
Map<String, SeaTunnelRowType> newRowTypeMap = new HashMap<>();
128127
for (Map.Entry<String, SeaTunnelRowType> entry : (MultipleRowType) resultTypeInfo) {

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public MySqlSourceConfig create(int subtaskId) {
7676
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
7777
// only DataStream API program need to emit the schema record, the Table API need not
7878

79-
// TODO Not yet supported
79+
// Some scenarios do not require automatic capture of table structure changes, so the
80+
// default setting is false.
8081
props.setProperty("include.schema.changes", String.valueOf(false));
8182
// disable the offset flush totally
8283
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java

+2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
102102
.setPhysicalRowType(physicalRowType)
103103
.setResultTypeInfo(physicalRowType)
104104
.setServerTimeZone(ZoneId.of(zoneId))
105+
.setSchemaChangeResolver(
106+
new MySqlSchemaChangeResolver(createSourceConfigFactory(config)))
105107
.build();
106108
}
107109

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
19+
20+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
21+
import org.apache.seatunnel.api.table.catalog.TablePath;
22+
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
23+
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
24+
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
26+
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
27+
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
28+
import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver;
29+
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
30+
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser;
31+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
32+
33+
import org.apache.commons.lang3.StringUtils;
34+
import org.apache.kafka.connect.source.SourceRecord;
35+
36+
import io.debezium.relational.Tables;
37+
38+
import java.util.List;
39+
import java.util.Objects;
40+
41+
public class MySqlSchemaChangeResolver extends AbstractSchemaChangeResolver {
42+
private transient Tables tables;
43+
private transient CustomMySqlAntlrDdlParser customMySqlAntlrDdlParser;
44+
45+
public MySqlSchemaChangeResolver(SourceConfig.Factory<JdbcSourceConfig> sourceConfigFactory) {
46+
super(sourceConfigFactory.create(0));
47+
}
48+
49+
@Override
50+
public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
51+
TablePath tablePath = SourceRecordUtils.getTablePath(record);
52+
String ddl = SourceRecordUtils.getDdl(record);
53+
if (Objects.isNull(customMySqlAntlrDdlParser)) {
54+
this.customMySqlAntlrDdlParser =
55+
new CustomMySqlAntlrDdlParser(
56+
tablePath, this.jdbcSourceConfig.getDbzConnectorConfig());
57+
}
58+
if (Objects.isNull(tables)) {
59+
this.tables = new Tables();
60+
}
61+
customMySqlAntlrDdlParser.setCurrentDatabase(tablePath.getDatabaseName());
62+
customMySqlAntlrDdlParser.setCurrentSchema(tablePath.getSchemaName());
63+
// Parse DDL statement using Debezium's Antlr parser
64+
customMySqlAntlrDdlParser.parse(ddl, tables);
65+
List<AlterTableColumnEvent> parsedEvents =
66+
customMySqlAntlrDdlParser.getAndClearParsedEvents();
67+
AlterTableColumnsEvent alterTableColumnsEvent =
68+
new AlterTableColumnsEvent(
69+
TableIdentifier.of(
70+
StringUtils.EMPTY,
71+
tablePath.getDatabaseName(),
72+
tablePath.getSchemaName(),
73+
tablePath.getTableName()),
74+
parsedEvents);
75+
alterTableColumnsEvent.setStatement(ddl);
76+
alterTableColumnsEvent.setSourceDialectName(DatabaseIdentifier.MYSQL);
77+
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
78+
}
79+
}

0 commit comments

Comments
 (0)