diff --git a/client-adapter/README.md b/client-adapter/README.md index 80c62c0b54..8ba94b0add 100644 --- a/client-adapter/README.md +++ b/client-adapter/README.md @@ -377,6 +377,7 @@ esMapping: # _obj: obj:{"test":"123"} etlCondition: "where a.c_time>='{0}'" # etl 的条件参数 commitBatch: 3000 # 提交批大小 + routingKey: _id ``` sql映射说明: diff --git a/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/etl/ESEtlService.java b/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/etl/ESEtlService.java index a70ea53578..e010f06904 100644 --- a/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/etl/ESEtlService.java +++ b/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/etl/ESEtlService.java @@ -114,6 +114,15 @@ protected boolean executeSqlImport(DataSource ds, String sql, List value }); } + // 设置路由字段 + final String routingKey = mapping.getRoutingKey(); + if (routingKey != null && routingKey.length() > 0){ + final Object routingVal = esTemplate.getValFromRS(mapping, rs, routingKey, routingKey); + if (routingVal != null) { + esFieldData.put("$parent_routing", routingVal.toString()); + } + } + if (idVal != null) { String parentVal = (String) esFieldData.remove("$parent_routing"); if (mapping.isUpsert()) { diff --git a/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/support/ES7xTemplate.java b/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/support/ES7xTemplate.java index 4f2ff06131..cefe56a0b5 100644 --- a/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/support/ES7xTemplate.java +++ b/client-adapter/es7x/src/main/java/com/alibaba/otter/canal/client/adapter/es7x/support/ES7xTemplate.java @@ -220,7 +220,7 @@ public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, } // 添加父子文档关联信息 - putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData); + putRelationDataAndRoutingFromRS(mapping, schemaItem, resultSet, esFieldData); return resultIdVal; } @@ -263,7 +263,7 @@ public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map dmlDat } // 添加父子文档关联信息 - putRelationData(mapping, schemaItem, dmlData, esFieldData); + putRelationDataAndRouting(mapping, schemaItem, dmlData, esFieldData); return resultIdVal; } @@ -334,7 +334,7 @@ public Object getESDataFromDmlData(ESMapping mapping,String owner, Map esFieldData) { + private void putRelationDataAndRoutingFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet, + Map esFieldData) { // 添加父子文档关联信息 if (!mapping.getRelations().isEmpty()) { mapping.getRelations().forEach((relationField, relationMapping) -> { @@ -443,10 +443,24 @@ private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, Res esFieldData.put(relationField, relations); }); } + + // 强制设置路由字段 + final String routingKey = mapping.getRoutingKey(); + if (routingKey != null && routingKey.length() > 0){ + final Object routingVal; + try { + routingVal = getValFromRS(mapping, resultSet, routingKey, routingKey); + if (routingVal != null) { + esFieldData.put("$parent_routing", routingVal.toString()); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } } - private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map dmlData, - Map esFieldData) { + private void putRelationDataAndRouting(ESMapping mapping, SchemaItem schemaItem, Map dmlData, + Map esFieldData) { // 添加父子文档关联信息 if (!mapping.getRelations().isEmpty()) { mapping.getRelations().forEach((relationField, relationMapping) -> { @@ -465,5 +479,15 @@ private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map 0){ + final Object routingVal; + routingVal = getValFromData(mapping, dmlData, routingKey, routingKey); + if (routingVal != null) { + esFieldData.put("$parent_routing", routingVal.toString()); + } + } } } diff --git a/client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/config/ESSyncConfig.java b/client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/config/ESSyncConfig.java index e4c31ba314..b15ee3492a 100644 --- a/client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/config/ESSyncConfig.java +++ b/client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/config/ESSyncConfig.java @@ -120,6 +120,11 @@ public static class ESMapping implements AdapterMapping { private SchemaItem schemaItem; // sql解析结果模型 + /** + * wkq: routing字段,es 与 sql resultSet columnName必须一致 + */ + private String routingKey; + public String getIndex() { return index; } @@ -231,6 +236,14 @@ public SchemaItem getSchemaItem() { public void setSchemaItem(SchemaItem schemaItem) { this.schemaItem = schemaItem; } + + public String getRoutingKey() { + return routingKey; + } + + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } } public static class RelationMapping {