From 3422b0ca655ff92b9024449ea828a6ffeb6cf454 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Tue, 17 Oct 2023 15:43:37 +0530 Subject: [PATCH 1/2] rename table name --- .../ybexporter/KafkaConnectRecordParser.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java b/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java index 70e2ebedb3c..72fcff7b757 100644 --- a/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java +++ b/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java @@ -6,6 +6,7 @@ package io.debezium.server.ybexporter; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -14,6 +15,8 @@ import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.source.SourceRecord; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +27,7 @@ class KafkaConnectRecordParser implements RecordParser { String sourceType; private Map tableMap; private JsonConverter jsonConverter; + private Map renameTables; Record r = new Record(); public KafkaConnectRecordParser(String dataDirStr, String sourceType, Map tblMap) { @@ -34,6 +38,25 @@ public KafkaConnectRecordParser(String dataDirStr, String sourceType, Map jsonConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); jsonConverter.configure(jsonConfig, false); + final Config config = ConfigProvider.getConfig(); + String renameTablesConfig = config.getOptionalValue("debezium.sink.ybexporter.tables.rename", String.class).orElse(""); + renameTables = new HashMap<>(); + for (String renameTableConfig: renameTablesConfig.split(",")){ + String[] beforeAndAfter = renameTableConfig.split(":"); + if (beforeAndAfter.length != 2){ + throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as :", renameTableConfig)); + } + String before = beforeAndAfter[0]; + String after = beforeAndAfter[1]; + if ((before.split("\\.").length != 2) && (!sourceType.equals("mysql"))){ + throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as .", before)); + } + if ((after.split("\\.").length != 2) && (!sourceType.equals("mysql"))){ + throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as .", after)); + } + renameTables.put(before, after); + } + } /** @@ -84,6 +107,15 @@ protected void parseTable(Struct value, Struct sourceNode, Record r) { schemaName = sourceNode.getString("schema"); } String tableName = sourceNode.getString("table"); + // rename table name + String qualifiedTableName = tableName; + if (!schemaName.equals("")){ + qualifiedTableName = schemaName + "." + tableName; + } + if (renameTables.containsKey(qualifiedTableName)){ + String[] renamedTableName = renameTables.get(qualifiedTableName).split("\\."); + tableName = renamedTableName[renamedTableName.length - 1]; + } var tableIdentifier = dbName + "-" + schemaName + "-" + tableName; Table t = tableMap.get(tableIdentifier); From ea858e09a5633bce614a48b1268a31028fa70b4a Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 18 Oct 2023 13:36:34 +0530 Subject: [PATCH 2/2] refactor + handle empty string --- .../ybexporter/KafkaConnectRecordParser.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java b/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java index 72fcff7b757..977a8c45bd3 100644 --- a/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java +++ b/debezium-server/debezium-server-ybexporter/src/main/java/io/debezium/server/ybexporter/KafkaConnectRecordParser.java @@ -38,25 +38,30 @@ public KafkaConnectRecordParser(String dataDirStr, String sourceType, Map jsonConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); jsonConverter.configure(jsonConfig, false); + renameTables = new HashMap<>(); + retrieveRenameTablesFromConfig(); + } + + private void retrieveRenameTablesFromConfig(){ final Config config = ConfigProvider.getConfig(); String renameTablesConfig = config.getOptionalValue("debezium.sink.ybexporter.tables.rename", String.class).orElse(""); - renameTables = new HashMap<>(); - for (String renameTableConfig: renameTablesConfig.split(",")){ - String[] beforeAndAfter = renameTableConfig.split(":"); - if (beforeAndAfter.length != 2){ - throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as :", renameTableConfig)); - } - String before = beforeAndAfter[0]; - String after = beforeAndAfter[1]; - if ((before.split("\\.").length != 2) && (!sourceType.equals("mysql"))){ - throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as .", before)); - } - if ((after.split("\\.").length != 2) && (!sourceType.equals("mysql"))){ - throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as .", after)); + if (!renameTablesConfig.isEmpty()){ + for (String renameTableConfig: renameTablesConfig.split(",")){ + String[] beforeAndAfter = renameTableConfig.split(":"); + if (beforeAndAfter.length != 2){ + throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as :", renameTableConfig)); + } + String before = beforeAndAfter[0]; + String after = beforeAndAfter[1]; + if ((before.split("\\.").length != 2) && (!sourceType.equals("mysql"))){ + throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as .", before)); + } + if ((after.split("\\.").length != 2) && (!sourceType.equals("mysql"))){ + throw new RuntimeException(String.format("Incorrect format for specifying table rename config %s. Provide it as .", after)); + } + renameTables.put(before, after); } - renameTables.put(before, after); } - } /**