diff --git a/client-adapter/tablestore/pom.xml b/client-adapter/tablestore/pom.xml
index da4b448d63..4189fdbb00 100644
--- a/client-adapter/tablestore/pom.xml
+++ b/client-adapter/tablestore/pom.xml
@@ -44,6 +44,21 @@
+
+
+ mysql
+ mysql-connector-java
+ 8.0.22
+ test
+
+
+
+ org.yaml
+ snakeyaml
+ 1.30
+ test
+
+
diff --git a/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/ConfigLoader.java b/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/ConfigLoader.java
index 19b4a8f073..ebc1eaef61 100644
--- a/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/ConfigLoader.java
+++ b/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/ConfigLoader.java
@@ -38,6 +38,7 @@ public static Map load(Properties envProperties) {
}
try {
config.validate();
+ config.getDbMapping().init(config);
} catch (Exception e) {
throw new RuntimeException("ERROR Config: " + fileName + " " + e.getMessage(), e);
}
diff --git a/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/MappingConfig.java b/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/MappingConfig.java
index e93f9f74af..996af8255f 100644
--- a/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/MappingConfig.java
+++ b/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/config/MappingConfig.java
@@ -7,6 +7,8 @@
import com.alibaba.otter.canal.client.adapter.tablestore.enums.TablestoreFieldType;
import com.alibaba.otter.canal.client.adapter.tablestore.support.SyncUtil;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.ResultSetMetaData;
import java.util.*;
@@ -154,6 +156,7 @@ public int hashCode() {
public static class DbMapping implements AdapterMapping {
+ protected Logger logger = LoggerFactory.getLogger(this.getClass());
private String database; // 数据库名或schema名
private String table; // 表名
@@ -168,6 +171,13 @@ public static class DbMapping implements AdapterMapping {
private int readBatch = 5000;
private int commitBatch = 5000; // etl等批量提交大小
+ private Map constantTargetColumns; //目标字段常量值映射
+
+ private Map constantTargetColumnsParsed; //目标字段常量值映射解析
+
+ private Map constantColumnItems = new LinkedHashMap<>(); // 转换后的静态常量字段映射列表
+
+
private Map columnItems = new LinkedHashMap<>(); // 转换后的字段映射列表
public String getDatabase() {
@@ -249,60 +259,98 @@ public void setColumnItems(Map columnItems) {
this.columnItems = columnItems;
}
+ public Map getConstantTargetColumns() {
+ return constantTargetColumns;
+ }
+
+ public void setConstantTargetColumns(Map constantTargetColumns) {
+ this.constantTargetColumns = constantTargetColumns;
+ }
+
+ public Map getConstantTargetColumnsParsed() {
+ return constantTargetColumnsParsed;
+ }
+
+ public void setConstantTargetColumnsParsed(Map constantTargetColumnsParsed) {
+ this.constantTargetColumnsParsed = constantTargetColumnsParsed;
+ }
+
+
+
+ public static class ConstantColumnItem {
+ private String targetColumn;
+ private String column;
+ private TablestoreFieldType type;
+
+ public String getColumn() {
+ return column;
+ }
+
+ public void setColumn(String column) {
+ this.column = column;
+ }
+
+ public TablestoreFieldType getType() {
+ return type;
+ }
+
+ public void setType(TablestoreFieldType type) {
+ this.type = type;
+ }
+
+ public String getTargetColumn() {
+ return targetColumn;
+ }
+
+ public void setTargetColumn(String targetColumn) {
+ this.targetColumn = targetColumn;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ColumnItem that = (ColumnItem) o;
+ return Objects.equals(column, that.column);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(column);
+ }
+ }
+
+
+
public void init(MappingConfig config) {
+ logger.info("=========dbMapping begin init.=========");
String splitBy = "$";
- if (targetColumns != null) {
- boolean needTypeInference = false;
- for (Map.Entry columnField : targetColumns.entrySet()) {
- String field = columnField.getValue();
- String type = null;
- if (field != null) {
- // 解析类型
- int i = field.indexOf(splitBy);
- if (i > -1) {
- type = field.substring(i + 1);
- field = field.substring(0, i);
- }
- }
- ColumnItem columnItem = new ColumnItem();
- columnItem.setColumn(columnField.getKey());
- columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);
- TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
- if (fieldType == null) {
- needTypeInference = true;
- }
- columnItem.setType(fieldType);
- columnItems.put(columnField.getKey(), columnItem);
- }
- if (needTypeInference) {
- // 认为有field没有配置映射类型,需要进行类型推断
- DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-
- Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
- try {
- ResultSetMetaData rsd = rs.getMetaData();
- int columnCount = rsd.getColumnCount();
- List columns = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- String columnName = rsd.getColumnName(i);
- if (columnItems.containsKey(columnName) && columnItems.get(columnName).getType() == null) {
- int columnType = rsd.getColumnType(i);
- columnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
- }
- }
- return true;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- } else {
+ //普通列
+ if (targetColumns != null) {
+ parseTargetColumns(config, splitBy);
+ }
+ if (targetColumns == null) {
this.targetColumns = new LinkedHashMap<>();
}
- targetColumnsParsed = new HashMap<>();
+ //常量列
+ if (constantTargetColumns != null) {
+ parseTargetConstant(config, splitBy);
+ }
+ if (constantTargetColumns == null) {
+ this.constantColumnItems = new LinkedHashMap<>();
+ }
+
+ initParsed(splitBy);
+
+ logger.info("=========dbMapping success init.=========");
+
+ }
+
+
+ private void initParsed(String splitBy) {
+ targetColumnsParsed = new HashMap<>();
targetColumns.forEach((key, value) -> {
if (StringUtils.isEmpty(value)) {
targetColumnsParsed.put(key, key);
@@ -312,6 +360,114 @@ public void init(MappingConfig config) {
targetColumnsParsed.put(key, value);
}
});
+
+ constantTargetColumnsParsed = new HashMap<>();
+ constantTargetColumns.forEach((key, value) -> {
+ if (StringUtils.isEmpty(value)) {
+ constantTargetColumnsParsed.put(key, key);
+ } else if (value.contains(splitBy) && constantColumnItems.containsKey(key)) {
+ constantTargetColumnsParsed.put(key, constantColumnItems.get(key).targetColumn);
+ } else {
+ constantTargetColumnsParsed.put(key, value);
+ }
+ });
+ }
+
+
+ private void parseTargetColumns(MappingConfig config, String splitBy) {
+ boolean needTypeInference = false;
+ for (Map.Entry columnField : targetColumns.entrySet()) {
+ String field = columnField.getValue();
+ String type = null;
+ if (field != null) {
+ // 解析类型
+ int i = field.indexOf(splitBy);
+ if (i > -1) {
+ type = field.substring(i + 1);
+ field = field.substring(0, i);
+ }
+ }
+ ColumnItem columnItem = new ColumnItem();
+ columnItem.setColumn(columnField.getKey());
+ columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);
+
+ TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
+ if (fieldType == null) {
+ needTypeInference = true;
+ }
+ columnItem.setType(fieldType);
+ columnItems.put(columnField.getKey(), columnItem);
+ }
+ if (needTypeInference) {
+ // 认为有field没有配置映射类型,需要进行类型推断
+ DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+
+ Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
+ try {
+ ResultSetMetaData rsd = rs.getMetaData();
+ int columnCount = rsd.getColumnCount();
+ List columns = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ String columnName = rsd.getColumnName(i);
+ if (columnItems.containsKey(columnName) && columnItems.get(columnName).getType() == null) {
+ int columnType = rsd.getColumnType(i);
+ columnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ private void parseTargetConstant(MappingConfig config, String splitBy) {
+ boolean needTypeInference = false;
+ for (Map.Entry columnField : constantTargetColumns.entrySet()) {
+ String field = columnField.getValue();
+ String type = null;
+ if (field != null) {
+ // 解析类型
+ int i = field.indexOf(splitBy);
+ if (i > -1) {
+ type = field.substring(i + 1);
+ field = field.substring(0, i);
+ }
+ }
+ ConstantColumnItem columnItem = new ConstantColumnItem();
+ columnItem.setColumn(columnField.getKey());
+ columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);
+
+ TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
+ if (fieldType == null) {
+ needTypeInference = true;
+ }
+ columnItem.setType(fieldType);
+ constantColumnItems.put(columnField.getKey(), columnItem);
+ }
+ if (needTypeInference) {
+ // 认为有field没有配置映射类型,需要进行类型推断
+ DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+
+ Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
+ try {
+ ResultSetMetaData rsd = rs.getMetaData();
+ int columnCount = rsd.getColumnCount();
+ List columns = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ String columnName = rsd.getColumnName(i);
+ if (constantColumnItems.containsKey(columnName) && constantColumnItems.get(columnName).getType() == null) {
+ int columnType = rsd.getColumnType(i);
+ constantColumnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
}
}
diff --git a/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreSyncService.java b/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreSyncService.java
index b62b7f911b..597bbd9cda 100644
--- a/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreSyncService.java
+++ b/client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/service/TablestoreSyncService.java
@@ -90,6 +90,8 @@ private List getUpdateChanges(Dml dml, boolean isColumnUpdate, Mappin
Map columnMap = config.getDbMapping().getTargetColumnsParsed();
Map typeMap = SyncUtil.getTypeMap(config);
MappingConfig.DbMapping dbMapping = config.getDbMapping();
+ Map constantParsed = config.getDbMapping().getConstantTargetColumnsParsed();
+
if (isColumnUpdate) {
// 列更新
@@ -123,7 +125,7 @@ private List getUpdateChanges(Dml dml, boolean isColumnUpdate, Mappin
primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
change.setPrimaryKey(primaryKey);
- List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+ List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap, constantParsed);
if (!CollectionUtils.isEmpty(columnList)) {
change.put(columnList);
changeList.add(change);
@@ -175,7 +177,7 @@ private List getUpdateChanges(Dml dml, boolean isColumnUpdate, Mappin
PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
change.setPrimaryKey(primaryKey);
- List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+ List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap, constantParsed);
if (!CollectionUtils.isEmpty(columnList)) {
change.addColumns(columnList);
changeList.add(change);
@@ -265,6 +267,8 @@ private List getInsertChanges(Dml dml, boolean isColumnUpdate, Mappin
Map columnMap = config.getDbMapping().getTargetColumnsParsed();
Map typeMap = SyncUtil.getTypeMap(config);
MappingConfig.DbMapping dbMapping = config.getDbMapping();
+ Map constantParsed = config.getDbMapping().getConstantTargetColumnsParsed();
+
if (isColumnUpdate) {
// 列更新
for (Map map : dml.getData()) {
@@ -272,7 +276,7 @@ private List getInsertChanges(Dml dml, boolean isColumnUpdate, Mappin
PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
change.setPrimaryKey(primaryKey);
- List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+ List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap, constantParsed);
if (!CollectionUtils.isEmpty(columnList)) {
change.put(columnList);
changeList.add(change);
@@ -286,7 +290,7 @@ private List getInsertChanges(Dml dml, boolean isColumnUpdate, Mappin
PrimaryKey primaryKey = buildPrimaryKey(map, typeMap, columnMap, dbMapping.getTargetPk());
change.setPrimaryKey(primaryKey);
- List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap);
+ List columnList = getColumnsWhenPut(columnMap, dbMapping, map, typeMap, constantParsed);
if (!CollectionUtils.isEmpty(columnList)) {
change.addColumns(columnList);
}
@@ -351,7 +355,8 @@ private PrimaryKey buildOldPrimaryKey(Map map,
private List getColumnsWhenPut(Map columnMap,
MappingConfig.DbMapping dbMapping,
Map map,
- Map typeMap) {
+ Map typeMap,
+ Map constantParsed) {
List columnList = new ArrayList<>();
for (Map.Entry entry : map.entrySet()) {
if (dbMapping.getTargetPk().containsKey(entry.getKey())) {
@@ -373,6 +378,20 @@ private List getColumnsWhenPut(Map columnMap,
ColumnValue columnValue = SyncUtil.getColumnValue(value, type);
columnList.add(new Column(targetColumn, columnValue));
}
+
+
+ //默认列处理添加
+ if (!CollectionUtils.isEmpty(dbMapping.getConstantTargetColumns())) {
+ dbMapping.getConstantTargetColumns().entrySet().forEach(mp -> {
+ int index = mp.getValue().indexOf("$");
+ if (index <= -1) {
+ throw new RuntimeException("the constant value must be have type!");
+ }
+ TablestoreFieldType type = SyncUtil.getTablestoreType(mp.getValue().substring(index + 1));
+ ColumnValue columnValue = SyncUtil.getColumnValue(constantParsed.get(mp.getKey()), type);
+ columnList.add(new Column(mp.getKey(), columnValue));
+ });
+ }
return columnList;
}
diff --git a/client-adapter/tablestore/src/main/resources/tablestore/constant_support_demo.yml b/client-adapter/tablestore/src/main/resources/tablestore/constant_support_demo.yml
new file mode 100644
index 0000000000..723ef970ad
--- /dev/null
+++ b/client-adapter/tablestore/src/main/resources/tablestore/constant_support_demo.yml
@@ -0,0 +1,35 @@
+dataSourceKey: defaultDS
+destination: tablestore
+groupId: g1
+outerAdapterKey: tablestore
+threads: 8
+updateChangeColumns: false
+dbMapping:
+ database: mgs_im_message
+ table: im_cmn_msg_send
+ targetTable: im_timeline_store_table
+ targetPk:
+ channel_id: timeline_id$string
+# AUTO: sequence_id$int
+
+ targetColumns:
+ msg_id: msg_id$string
+ user_id: sender$string
+ channel_id: conversation$string
+ type: msg_type$int
+ seq: seq$int
+ act_type: is_act$int
+ action: act_msg_id$string
+ cipher_content: msg_content$binary
+ mark: msg_state$int
+ operate_uid: operate_uid$int
+ create_time: send_time$int
+### 支持静态列配置,强制显式标准类型
+ constantTargetColumns:
+ conversation_type: GROUP$string
+ sender_type: 0$int
+ msg_biz_type: 0$int
+ etlCondition:
+ commitBatch: 5000
+
+
diff --git a/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/AppTest.java b/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/AppTest.java
new file mode 100644
index 0000000000..11540004b0
--- /dev/null
+++ b/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/AppTest.java
@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.client.adapter.tablestore;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/Common.java b/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/Common.java
new file mode 100644
index 0000000000..934ca09d11
--- /dev/null
+++ b/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/Common.java
@@ -0,0 +1,51 @@
+package com.alibaba.otter.canal.client.adapter.tablestore;
+
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.tablestore.common.PropertyConstants;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class Common {
+
+ public static TablestoreAdapter init() {
+ //初始化tablestore信息
+
+ DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+ OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+ outerAdapterConfig.setName("tablestore");
+ outerAdapterConfig.setKey("tablestore");
+ Map properties = new HashMap<>();
+ properties.put(PropertyConstants.TABLESTORE_ACCESSSECRETID, "xxxxxxxxxx");
+ properties.put(PropertyConstants.TABLESTORE_ACCESSSECRETKEY, "xxxxxxxxxx");
+ properties.put(PropertyConstants.TABLESTORE_ENDPOINT, "https://xxxxxxxxxx");
+ properties.put(PropertyConstants.TABLESTORE_INSTANCENAME, "xxxxxxxxxx");
+ outerAdapterConfig.setProperties(properties);
+ TablestoreAdapter adapter = new TablestoreAdapter();
+ Properties prop = getProperties();
+ adapter.init(outerAdapterConfig, prop);
+ return adapter;
+ }
+
+ private static Properties getProperties() {
+ File directory = new File("");
+ String rootAbsolutePath =directory.getAbsolutePath();
+ String filePath = rootAbsolutePath + "\\src/test/resources/tablestore/constant_support_demo.yml";
+ Properties prop = new Properties();
+ try {
+ FileInputStream inputStream = new FileInputStream(filePath);
+ prop.load(inputStream);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return prop;
+ }
+
+
+}
diff --git a/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/TablestoreSyncTest.java b/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/TablestoreSyncTest.java
new file mode 100644
index 0000000000..0cdde9546c
--- /dev/null
+++ b/client-adapter/tablestore/src/test/java/com/alibaba/otter/canal/client/adapter/tablestore/TablestoreSyncTest.java
@@ -0,0 +1,62 @@
+package com.alibaba.otter.canal.client.adapter.tablestore;
+
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import jdk.nashorn.internal.ir.annotations.Ignore;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+/**
+ * @Description
+ * @Author yun.zhang
+ * @Date 2022/12/15 15:43
+ * @Version 1.0
+ * @ClassName TablestoreSyncTest.class
+ */
+@Ignore
+public class TablestoreSyncTest {
+
+
+ private TablestoreAdapter tablestoreAdapter;
+
+ @Before
+ public void init() {
+ tablestoreAdapter = Common.init();
+ }
+
+ @Test
+ public void syncTest() {
+ Dml dml = new Dml();
+ dml.setDestination("tablestore");
+ dml.setGroupId("g1");
+ dml.setTs(new Date().getTime());
+ dml.setType("INSERT");
+ dml.setDatabase("mgs_im_message");
+ dml.setTable("im_cmn_msg_send");
+ List