diff --git a/deployer/src/main/resources/logback.xml b/deployer/src/main/resources/logback.xml
index 1d30442ec2..9c447cfc00 100644
--- a/deployer/src/main/resources/logback.xml
+++ b/deployer/src/main/resources/logback.xml
@@ -86,6 +86,10 @@
+
+
+
+
diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
index f2cd0a645d..931676ea25 100644
--- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
+++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
@@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang.StringUtils;
@@ -43,67 +44,83 @@ public class TableMetaCache {
private boolean isOnTSDB = false;
private TableMetaTSDB tableMetaTSDB;
+
// 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
- private LoadingCache tableMetaDB;
+ private LoadingCache tableMetaCache;
public TableMetaCache(MysqlConnection con, TableMetaTSDB tableMetaTSDB){
this.connection = con;
this.tableMetaTSDB = tableMetaTSDB;
// 如果持久存储的表结构为空,从db里面获取下
if (tableMetaTSDB == null) {
- this.tableMetaDB = CacheBuilder.newBuilder().build(new CacheLoader() {
-
- @Override
- public TableMeta load(String name) throws Exception {
- try {
- return getTableMetaByDB(name);
- } catch (Throwable e) {
- // 尝试做一次retry操作
- try {
- connection.reconnect();
- return getTableMetaByDB(name);
- } catch (IOException e1) {
- throw new CanalParseException("fetch failed by table meta:" + name, e1);
- }
- }
- }
-
- });
+ this.tableMetaCache = createLocalCache();
} else {
isOnTSDB = true;
}
-
try {
ResultSetPacket packet = connection.query("show global variables like 'rds\\_%'");
- if (packet.getFieldValues().size() > 0) {
- isOnRDS = true;
- }
+ // if (packet.getFieldValues().size() > 0) {
+ isOnRDS = packet.getFieldValues().size() > 0;
+ // }
} catch (IOException e) {
}
-
try {
ResultSetPacket packet = connection.query("show global variables like 'polarx\\_%'");
- if (packet.getFieldValues().size() > 0) {
- isOnPolarX = true;
- }
+ // if (packet.getFieldValues().size() > 0) {
+ isOnPolarX = packet.getFieldValues().size() > 0;
+ // }
} catch (IOException e) {
}
}
- private synchronized TableMeta getTableMetaByDB(String fullname) throws IOException {
- try {
- ResultSetPacket packet = connection.query("show create table " + fullname);
- String[] names = StringUtils.split(fullname, "`.`");
- String schema = names[0];
- String table = names[1].substring(0, names[1].length());
- return new TableMeta(schema, table, parseTableMeta(schema, table, packet));
- } catch (Throwable e) { // fallback to desc table
- ResultSetPacket packet = connection.query("desc " + fullname);
- String[] names = StringUtils.split(fullname, "`.`");
- String schema = names[0];
- String table = names[1].substring(0, names[1].length());
- return new TableMeta(schema, table, parseTableMetaByDesc(packet));
+ private LoadingCache createLocalCache() {
+ return CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ @Override
+ public TableMeta load(String name) throws Exception {
+ try {
+ return getTableMetaByDB(name);
+ } catch (Throwable e) {
+ // 尝试做一次retry操作
+ try {
+ connection.reconnect();
+ return getTableMetaByDB(name);
+ } catch (IOException e1) {
+ throw new CanalParseException("fetch table meta failed. table: " + name, e1);
+ }
+ }
+ }
+ });
+ }
+
+ private /* synchronized */ TableMeta getTableMetaByDB(String fullname) throws IOException {
+ // try {
+ // ResultSetPacket packet = connection.query("show create table " + fullname);
+ // String[] names = StringUtils.split(fullname, "`.`");
+ // String schema = names[0];
+ // String table = names[1].substring(0, names[1].length());
+ // return new TableMeta(schema, table, parseTableMeta(schema, table, packet));
+ // } catch (Throwable e) { // fallback to desc table
+ // ResultSetPacket packet = connection.query("desc " + fullname);
+ // String[] names = StringUtils.split(fullname, "`.`");
+ // String schema = names[0];
+ // String table = names[1].substring(0, names[1].length());
+ // return new TableMeta(schema, table, parseTableMetaByDesc(packet));
+ // }
+ boolean showCreateTable = true;
+ ResultSetPacket packet = null;
+ synchronized (this) {
+ try {
+ packet = connection.query("show create table " + fullname);
+ }catch (Exception ex) {
+ packet = connection.query("desc " + fullname);
+ }
}
+ String[] names = StringUtils.split(fullname, "`.`");
+ String schema = names[0];
+ String table = names[1].substring(0, names[1].length());
+ List fieldMetas = showCreateTable ? parseTableMeta(schema, table, packet) : parseTableMetaByDesc(packet);
+ return new TableMeta(schema, table, fieldMetas);
}
public static List parseTableMeta(String schema, String table, ResultSetPacket packet) {
@@ -122,86 +139,101 @@ public static List parseTableMeta(String schema, String table, Result
* 处理desc table的结果
*/
public static List parseTableMetaByDesc(ResultSetPacket packet) {
- Map nameMaps = new HashMap<>(6, 1f);
+ List fieldPackets = packet.getFieldDescriptors();
+ int size = fieldPackets.size();
+ Map nameMaps = new HashMap<>(size);
int index = 0;
- for (FieldPacket fieldPacket : packet.getFieldDescriptors()) {
+ // for (FieldPacket fieldPacket : packet.getFieldDescriptors()) {
+ for (FieldPacket fieldPacket : fieldPackets) {
nameMaps.put(StringUtils.lowerCase(fieldPacket.getName()), index++);
}
- int size = packet.getFieldDescriptors().size();
- int count = packet.getFieldValues().size() / packet.getFieldDescriptors().size();
- List result = new ArrayList<>();
+ List fieldValues = packet.getFieldValues();
+ // int size = packet.getFieldDescriptors().size();
+ // int count = packet.getFieldValues().size() / packet.getFieldDescriptors().size();
+ int count = fieldValues.size() / size;
+ List result = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
FieldMeta meta = new FieldMeta();
// 做一个优化,使用String.intern(),共享String对象,减少内存使用
- meta.setColumnName(packet.getFieldValues().get(nameMaps.get(COLUMN_NAME) + i * size).intern());
- meta.setColumnType(packet.getFieldValues().get(nameMaps.get(COLUMN_TYPE) + i * size));
- meta.setNullable(StringUtils.equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(IS_NULLABLE) + i
- * size),
- "YES"));
- meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
- meta.setUnique("UNI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
+ meta.setColumnName(fieldValues.get(nameMaps.get(COLUMN_NAME) + i * size).intern());
+ meta.setColumnType(fieldValues.get(nameMaps.get(COLUMN_TYPE) + i * size));
+ meta.setNullable(StringUtils.equalsIgnoreCase(fieldValues.get(nameMaps.get(IS_NULLABLE) + i * size), "YES"));
+ meta.setKey("PRI".equalsIgnoreCase(fieldValues.get(nameMaps.get(COLUMN_KEY) + i * size)));
+ meta.setUnique("UNI".equalsIgnoreCase(fieldValues.get(nameMaps.get(COLUMN_KEY) + i * size)));
// 特殊处理引号
- meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(packet.getFieldValues()
- .get(nameMaps.get(COLUMN_DEFAULT) + i * size)));
- meta.setExtra(packet.getFieldValues().get(nameMaps.get(EXTRA) + i * size));
-
+ meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(fieldValues.get(nameMaps.get(COLUMN_DEFAULT) + i * size)));
+ meta.setExtra(fieldValues.get(nameMaps.get(EXTRA) + i * size));
result.add(meta);
}
-
return result;
}
- public TableMeta getTableMeta(String schema, String table) {
- return getTableMeta(schema, table, true);
- }
-
- public TableMeta getTableMeta(String schema, String table, boolean useCache) {
- if (!useCache) {
- tableMetaDB.invalidate(getFullName(schema, table));
- }
-
- return tableMetaDB.getUnchecked(getFullName(schema, table));
- }
-
- public TableMeta getTableMeta(String schema, String table, EntryPosition position) {
- return getTableMeta(schema, table, true, position);
- }
-
- public synchronized TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
+ // This method is unused
+ // public TableMeta getTableMeta(String schema, String table) {
+ // return getTableMeta(schema, table, true);
+ // }
+
+ // This method is unused
+ // public TableMeta getTableMeta(String schema, String table, boolean useCache) {
+ // String fullName = getFullName(schema, table);
+ // if (!useCache) {
+ // // tableMetaCache.invalidate(getFullName(schema, table));
+ // tableMetaCache.invalidate(fullName);
+ // }
+ // // return tableMetaCache.getUnchecked(getFullName(schema, table));
+ // return tableMetaCache.getUnchecked(fullName);
+ // }
+
+ // This method is unused
+ // public TableMeta getTableMeta(String schema, String table, EntryPosition position) {
+ // return getTableMeta(schema, table, true, position);
+ // }
+
+ public /* synchronized */ TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
TableMeta tableMeta = null;
if (tableMetaTSDB != null) {
tableMeta = tableMetaTSDB.find(schema, table);
if (tableMeta == null) {
// 因为条件变化,可能第一次的tableMeta没取到,需要从db获取一次,并记录到snapshot中
String fullName = getFullName(schema, table);
- ResultSetPacket packet = null;
- String createDDL = null;
try {
- try {
- packet = connection.query("show create table " + fullName);
- } catch (Exception e) {
- // 尝试做一次retry操作
- connection.reconnect();
- packet = connection.query("show create table " + fullName);
- }
- if (packet.getFieldValues().size() > 0) {
- createDDL = packet.getFieldValues().get(1);
+ synchronized (this) {
+ tableMeta = tableMetaTSDB.find(schema, table);
+ if (tableMeta != null) {
+ return tableMeta;
+ }
+
+ ResultSetPacket packet = null;
+ try {
+ packet = connection.query("show create table " + fullName);
+ } catch (Exception e) {
+ // 尝试做一次retry操作
+ connection.reconnect();
+ packet = connection.query("show create table " + fullName);
+ }
+
+ String createDDL = null;
+ if (packet.getFieldValues().size() > 0) {
+ createDDL = packet.getFieldValues().get(1);
+ }
+ // 强制覆盖掉内存值
+ tableMetaTSDB.apply(position, schema, createDDL, "first");
+ tableMeta = tableMetaTSDB.find(schema, table);
}
- // 强制覆盖掉内存值
- tableMetaTSDB.apply(position, schema, createDDL, "first");
- tableMeta = tableMetaTSDB.find(schema, table);
} catch (IOException e) {
throw new CanalParseException("fetch failed by table meta:" + fullName, e);
}
}
return tableMeta;
} else {
+ String fullName = getFullName(schema, table);
if (!useCache) {
- tableMetaDB.invalidate(getFullName(schema, table));
+ // tableMetaCache.invalidate(getFullName(schema, table));
+ tableMetaCache.invalidate(fullName);
}
-
- return tableMetaDB.getUnchecked(getFullName(schema, table));
+ // return tableMetaCache.getUnchecked(getFullName(schema, table));
+ return tableMetaCache.getUnchecked(fullName);
}
}
@@ -209,7 +241,7 @@ public void clearTableMeta(String schema, String table) {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
- tableMetaDB.invalidate(getFullName(schema, table));
+ tableMetaCache.invalidate(getFullName(schema, table));
}
}
@@ -217,10 +249,13 @@ public void clearTableMetaWithSchemaName(String schema) {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
- for (String name : tableMetaDB.asMap().keySet()) {
- if (StringUtils.startsWithIgnoreCase(name, schema + ".")) {
- // removeNames.add(name);
- tableMetaDB.invalidate(name);
+ ConcurrentMap map = tableMetaCache.asMap();
+ if (!map.isEmpty()) {
+ for (String name : map.keySet()) {
+ if (StringUtils.startsWithIgnoreCase(name, schema + ".")) {
+ // removeNames.add(name);
+ tableMetaCache.invalidate(name);
+ }
}
}
}
@@ -230,7 +265,7 @@ public void clearTableMeta() {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
- tableMetaDB.invalidateAll();
+ tableMetaCache.invalidateAll();
}
}
@@ -252,14 +287,10 @@ public boolean apply(EntryPosition position, String schema, String ddl, String e
}
private String getFullName(String schema, String table) {
- StringBuilder builder = new StringBuilder();
- return builder.append('`')
- .append(schema)
- .append('`')
+ StringBuilder builder = new StringBuilder(64);
+ return builder.append('`').append(schema).append('`')
.append('.')
- .append('`')
- .append(StringUtils.replace(table,"`","``"))
- .append('`')
+ .append('`').append(StringUtils.replace(table,"`","``")).append('`')
.toString();
}
@@ -286,5 +317,4 @@ public boolean isOnPolarX() {
public void setOnPolarX(boolean isOnPolarX) {
this.isOnPolarX = isOnPolarX;
}
-
}
diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
index f78626922f..234de43f6a 100644
--- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
+++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
@@ -300,6 +300,7 @@ public boolean apply(EntryPosition position, String schema, String ddl, String e
@Override
public boolean rollback(EntryPosition position) {
// 每次rollback需要重新构建一次memory data
+ // FIXME The global MemoryTableMeta may cause OOM when it store too many tables
this.memoryTableMeta = new MemoryTableMeta();
boolean flag = false;
EntryPosition snapshotPosition = buildMemFromSnapshot(position);