Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client-adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ esMapping:
# _obj: obj:{"test":"123"}
etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
routingKey: _id
```
sql映射说明:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ protected boolean executeSqlImport(DataSource ds, String sql, List<Object> value
});
}

// 设置路由字段
final String routingKey = mapping.getRoutingKey();
if (routingKey != null && routingKey.length() > 0){
final Object routingVal = esTemplate.getValFromRS(mapping, rs, routingKey, routingKey);
if (routingVal != null) {
esFieldData.put("$parent_routing", routingVal.toString());
}
}

if (idVal != null) {
String parentVal = (String) esFieldData.remove("$parent_routing");
if (mapping.isUpsert()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet,
}

// 添加父子文档关联信息
putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
putRelationDataAndRoutingFromRS(mapping, schemaItem, resultSet, esFieldData);

return resultIdVal;
}
Expand Down Expand Up @@ -263,7 +263,7 @@ public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String
}

// 添加父子文档关联信息
putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
putRelationDataAndRoutingFromRS(mapping, schemaItem, resultSet, esFieldData);

return resultIdVal;
}
Expand Down Expand Up @@ -306,7 +306,7 @@ public Object getESDataFromDmlData(ESMapping mapping, Map<String, Object> dmlDat
}

// 添加父子文档关联信息
putRelationData(mapping, schemaItem, dmlData, esFieldData);
putRelationDataAndRouting(mapping, schemaItem, dmlData, esFieldData);
return resultIdVal;
}

Expand Down Expand Up @@ -334,7 +334,7 @@ public Object getESDataFromDmlData(ESMapping mapping,String owner, Map<String, O
}

// 添加父子文档关联信息
putRelationData(mapping, schemaItem, dmlOld, esFieldData);
putRelationDataAndRouting(mapping, schemaItem, dmlOld, esFieldData);
return resultIdVal;
}

Expand Down Expand Up @@ -416,8 +416,8 @@ private String getEsType(ESMapping mapping, String fieldName) {
}
}

private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
Map<String, Object> esFieldData) {
private void putRelationDataAndRoutingFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
Map<String, Object> esFieldData) {
// 添加父子文档关联信息
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Expand All @@ -443,10 +443,24 @@ private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, Res
esFieldData.put(relationField, relations);
});
}

// 强制设置路由字段
final String routingKey = mapping.getRoutingKey();
if (routingKey != null && routingKey.length() > 0){
final Object routingVal;
try {
routingVal = getValFromRS(mapping, resultSet, routingKey, routingKey);
if (routingVal != null) {
esFieldData.put("$parent_routing", routingVal.toString());
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
Map<String, Object> esFieldData) {
private void putRelationDataAndRouting(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
Map<String, Object> esFieldData) {
// 添加父子文档关联信息
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Expand All @@ -465,5 +479,15 @@ private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<Strin
esFieldData.put(relationField, relations);
});
}

// 强制设置路由字段
final String routingKey = mapping.getRoutingKey();
if (routingKey != null && routingKey.length() > 0){
final Object routingVal;
routingVal = getValFromData(mapping, dmlData, routingKey, routingKey);
if (routingVal != null) {
esFieldData.put("$parent_routing", routingVal.toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public static class ESMapping implements AdapterMapping {

private SchemaItem schemaItem; // sql解析结果模型

/**
* wkq: routing字段,es 与 sql resultSet columnName必须一致
*/
private String routingKey;

public String getIndex() {
return index;
}
Expand Down Expand Up @@ -231,6 +236,14 @@ public SchemaItem getSchemaItem() {
public void setSchemaItem(SchemaItem schemaItem) {
this.schemaItem = schemaItem;
}

public String getRoutingKey() {
return routingKey;
}

public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}

public static class RelationMapping {
Expand Down