Skip to content

Commit 4e4d2b8

Browse files
authored
[Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled characters (apache#7046)
1 parent 77f6140 commit 4e4d2b8

File tree

5 files changed

+63
-51
lines changed

5 files changed

+63
-51
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
19+
20+
import io.debezium.config.Configuration;
21+
import io.debezium.connector.mysql.MySqlConnection;
22+
import io.debezium.jdbc.JdbcConfiguration;
23+
import io.debezium.jdbc.JdbcConnection;
24+
25+
import java.sql.Connection;
26+
import java.sql.SQLException;
27+
28+
import static io.debezium.connector.mysql.MySqlConnectorConfig.JDBC_DRIVER;
29+
30+
public class CustomMySqlConnectionConfiguration
31+
extends MySqlConnection.MySqlConnectionConfiguration {
32+
33+
protected static final String URL_PATTERN =
34+
"jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
35+
36+
private final JdbcConnection.ConnectionFactory connectionFactory;
37+
38+
public CustomMySqlConnectionConfiguration(Configuration config) {
39+
super(config);
40+
String driverClassName =
41+
config.getString(JDBC_DRIVER.name(), JDBC_DRIVER.defaultValueAsString());
42+
connectionFactory =
43+
JdbcConnection.patternBasedFactory(
44+
URL_PATTERN, driverClassName, getClass().getClassLoader());
45+
}
46+
47+
@Override
48+
public JdbcConnection.ConnectionFactory factory() {
49+
return new JdbcConnection.ConnectionFactory() {
50+
@Override
51+
public Connection connect(JdbcConfiguration config) throws SQLException {
52+
return connectionFactory.connect(config);
53+
}
54+
};
55+
}
56+
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
3131
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
3232
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
33+
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
3334
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
3435

3536
import org.apache.kafka.connect.data.Struct;
@@ -120,7 +121,8 @@ public void configure(SourceSplitBase sourceSplitBase) {
120121
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
121122

122123
this.databaseSchema =
123-
MySqlUtils.createMySqlDatabaseSchema(connectorConfig, tableIdCaseInsensitive);
124+
MySqlConnectionUtils.createMySqlDatabaseSchema(
125+
connectorConfig, tableIdCaseInsensitive);
124126
this.offsetContext =
125127
loadStartingOffsetState(
126128
new MySqlOffsetContext.Loader(connectorConfig), sourceSplitBase);

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
1919

2020
import org.apache.seatunnel.common.utils.SeaTunnelException;
21+
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.CustomMySqlConnectionConfiguration;
2122
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
2223

2324
import com.github.shyiko.mysql.binlog.BinaryLogClient;
@@ -44,8 +45,7 @@ public class MySqlConnectionUtils {
4445

4546
/** Creates a new {@link MySqlConnection}, but not open the connection. */
4647
public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) {
47-
return new MySqlConnection(
48-
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
48+
return new MySqlConnection(new CustomMySqlConnectionConfiguration(dbzConfiguration));
4949
}
5050

5151
/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java

-46
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,11 @@
2424

2525
import org.apache.kafka.connect.source.SourceRecord;
2626

27-
import io.debezium.connector.mysql.MySqlConnectorConfig;
28-
import io.debezium.connector.mysql.MySqlDatabaseSchema;
29-
import io.debezium.connector.mysql.MySqlTopicSelector;
30-
import io.debezium.connector.mysql.MySqlValueConverters;
3127
import io.debezium.jdbc.JdbcConnection;
32-
import io.debezium.jdbc.JdbcValueConverters;
33-
import io.debezium.jdbc.TemporalPrecisionMode;
3428
import io.debezium.relational.Column;
3529
import io.debezium.relational.RelationalDatabaseConnectorConfig;
3630
import io.debezium.relational.Table;
3731
import io.debezium.relational.TableId;
38-
import io.debezium.schema.TopicSelector;
39-
import io.debezium.util.SchemaNameAdjuster;
4032
import lombok.extern.slf4j.Slf4j;
4133

4234
import java.sql.Connection;
@@ -342,44 +334,6 @@ public static SeaTunnelRowType getSplitType(
342334
return getSplitType(primaryKeys.get(0), dbzConnectorConfig);
343335
}
344336

345-
/** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */
346-
public static MySqlDatabaseSchema createMySqlDatabaseSchema(
347-
MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
348-
TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
349-
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
350-
MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig);
351-
return new MySqlDatabaseSchema(
352-
dbzMySqlConfig,
353-
valueConverters,
354-
topicSelector,
355-
schemaNameAdjuster,
356-
isTableIdCaseSensitive);
357-
}
358-
359-
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
360-
TemporalPrecisionMode timePrecisionMode = dbzMySqlConfig.getTemporalPrecisionMode();
361-
JdbcValueConverters.DecimalMode decimalMode = dbzMySqlConfig.getDecimalMode();
362-
String bigIntUnsignedHandlingModeStr =
363-
dbzMySqlConfig
364-
.getConfig()
365-
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
366-
MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
367-
MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
368-
bigIntUnsignedHandlingModeStr);
369-
JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
370-
bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
371-
372-
boolean timeAdjusterEnabled =
373-
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
374-
return new MySqlValueConverters(
375-
decimalMode,
376-
timePrecisionMode,
377-
bigIntUnsignedMode,
378-
dbzMySqlConfig.binaryHandlingMode(),
379-
timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
380-
MySqlValueConverters::defaultParsingErrorHandler);
381-
}
382-
383337
public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
384338
return getBinlogPosition(dataRecord.sourceOffset());
385339
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ CREATE TABLE mysql_cdc_e2e_source_table
5050
`f_mediumtext` mediumtext,
5151
`f_text` text,
5252
`f_tinytext` tinytext,
53-
`f_varchar` varchar(100) DEFAULT NULL,
53+
`f_varchar` varchar(100) collate gbk_bin DEFAULT NULL,
5454
`f_date` date DEFAULT NULL,
5555
`f_datetime` datetime DEFAULT NULL,
5656
`f_timestamp` timestamp NULL DEFAULT NULL,
@@ -333,7 +333,7 @@ VALUES ( 1, 0x616263740000000000000000000000000000000000000000000000000000000000
333333
0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
334334
0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,
335335
123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',
336-
'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',
336+
'This is a text field', 'This is a tiny text field', '中文测试', '2022-04-27', '2022-04-27 14:30:00',
337337
'2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
338338
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',
339339
12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),

0 commit comments

Comments
 (0)