Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,34 +247,33 @@ private boolean dumpTableMeta(MysqlConnection connection, final CanalEventFilter
}

private boolean applyHistoryToDB(EntryPosition position, String schema, String ddl, String extra) {
Map<String, String> content = new HashMap<>();
content.put("destination", destination);
content.put("binlogFile", position.getJournalName());
content.put("binlogOffest", String.valueOf(position.getPosition()));
content.put("binlogMasterId", String.valueOf(position.getServerId()));
content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
content.put("useSchema", schema);
if (content.isEmpty()) {
throw new RuntimeException("apply failed caused by content is empty in applyHistoryToDB");
}
MetaHistoryDO metaDO = new MetaHistoryDO();
metaDO.setDestination(destination);
metaDO.setBinlogFile(position.getJournalName());
metaDO.setBinlogOffest(position.getPosition());
metaDO.setBinlogMasterId(String.valueOf(position.getServerId()));
metaDO.setBinlogTimestamp(position.getTimestamp());
metaDO.setUseSchema(schema);
// 待补充
List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
if (ddlResults.size() > 0) {
DdlResult ddlResult = ddlResults.get(0);
content.put("sqlSchema", ddlResult.getSchemaName());
content.put("sqlTable", ddlResult.getTableName());
content.put("sqlType", ddlResult.getType().name());
content.put("sqlText", ddl);
content.put("extra", extra);
metaDO.setSqlSchema(ddlResult.getSchemaName());
metaDO.setSqlTable(ddlResult.getTableName());
metaDO.setSqlType(ddlResult.getType().name());
metaDO.setSqlText(ddl);
metaDO.setExtra(extra);
}

MetaHistoryDO metaDO = new MetaHistoryDO();
try {
BeanUtils.populate(metaDO, content);
// 会建立唯一约束,解决:
// 1. 重复的binlog file+offest
// 2. 重复的masterId+timestamp
metaHistoryDAO.insert(metaDO);
// Fix issue #4459
String name = metaDO.getUseSchema() + "." + metaDO.getSqlTable();
if (blackFilter == null || !blackFilter.filter(name)) {
// 会建立唯一约束,解决:
// 1. 重复的binlog file+offset
// 2. 重复的masterId+timestamp
metaHistoryDAO.insert(metaDO);
}
} catch (Throwable e) {
if (isUkDuplicateException(e)) {
// 忽略掉重复的位点
Expand Down