Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deployer/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
<level value="INFO" />
<appender-ref ref="CANAL-ROOT" />
</logger>
<logger name="com.alibaba.otter.canal.server.embedded" additivity="false">
<level value="INFO" />
<appender-ref ref="CANAL-ROOT" />
</logger>
<logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">
<level value="INFO" />
<appender-ref ref="CANAL-META" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.lang.StringUtils;

Expand Down Expand Up @@ -43,67 +44,83 @@ public class TableMetaCache {
private boolean isOnTSDB = false;

private TableMetaTSDB tableMetaTSDB;

// 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
private LoadingCache<String, TableMeta> tableMetaDB;
private LoadingCache<String, TableMeta> tableMetaCache;

public TableMetaCache(MysqlConnection con, TableMetaTSDB tableMetaTSDB){
this.connection = con;
this.tableMetaTSDB = tableMetaTSDB;
// 如果持久存储的表结构为空,从db里面获取下
if (tableMetaTSDB == null) {
this.tableMetaDB = CacheBuilder.newBuilder().build(new CacheLoader<String, TableMeta>() {

@Override
public TableMeta load(String name) throws Exception {
try {
return getTableMetaByDB(name);
} catch (Throwable e) {
// 尝试做一次retry操作
try {
connection.reconnect();
return getTableMetaByDB(name);
} catch (IOException e1) {
throw new CanalParseException("fetch failed by table meta:" + name, e1);
}
}
}

});
this.tableMetaCache = createLocalCache();
} else {
isOnTSDB = true;
}

try {
ResultSetPacket packet = connection.query("show global variables like 'rds\\_%'");
if (packet.getFieldValues().size() > 0) {
isOnRDS = true;
}
// if (packet.getFieldValues().size() > 0) {
isOnRDS = packet.getFieldValues().size() > 0;
// }
} catch (IOException e) {
}

try {
ResultSetPacket packet = connection.query("show global variables like 'polarx\\_%'");
if (packet.getFieldValues().size() > 0) {
isOnPolarX = true;
}
// if (packet.getFieldValues().size() > 0) {
isOnPolarX = packet.getFieldValues().size() > 0;
// }
} catch (IOException e) {
}
}

private synchronized TableMeta getTableMetaByDB(String fullname) throws IOException {
try {
ResultSetPacket packet = connection.query("show create table " + fullname);
String[] names = StringUtils.split(fullname, "`.`");
String schema = names[0];
String table = names[1].substring(0, names[1].length());
return new TableMeta(schema, table, parseTableMeta(schema, table, packet));
} catch (Throwable e) { // fallback to desc table
ResultSetPacket packet = connection.query("desc " + fullname);
String[] names = StringUtils.split(fullname, "`.`");
String schema = names[0];
String table = names[1].substring(0, names[1].length());
return new TableMeta(schema, table, parseTableMetaByDesc(packet));
private LoadingCache<String, TableMeta> createLocalCache() {
return CacheBuilder.newBuilder().build(new CacheLoader<String, TableMeta>() {

@Override
public TableMeta load(String name) throws Exception {
try {
return getTableMetaByDB(name);
} catch (Throwable e) {
// 尝试做一次retry操作
try {
connection.reconnect();
return getTableMetaByDB(name);
} catch (IOException e1) {
throw new CanalParseException("fetch table meta failed. table: " + name, e1);
}
}
}
});
}

private /* synchronized */ TableMeta getTableMetaByDB(String fullname) throws IOException {
// try {
// ResultSetPacket packet = connection.query("show create table " + fullname);
// String[] names = StringUtils.split(fullname, "`.`");
// String schema = names[0];
// String table = names[1].substring(0, names[1].length());
// return new TableMeta(schema, table, parseTableMeta(schema, table, packet));
// } catch (Throwable e) { // fallback to desc table
// ResultSetPacket packet = connection.query("desc " + fullname);
// String[] names = StringUtils.split(fullname, "`.`");
// String schema = names[0];
// String table = names[1].substring(0, names[1].length());
// return new TableMeta(schema, table, parseTableMetaByDesc(packet));
// }
boolean showCreateTable = true;
ResultSetPacket packet = null;
synchronized (this) {
try {
packet = connection.query("show create table " + fullname);
}catch (Exception ex) {
packet = connection.query("desc " + fullname);
}
}
String[] names = StringUtils.split(fullname, "`.`");
String schema = names[0];
String table = names[1].substring(0, names[1].length());
List<FieldMeta> fieldMetas = showCreateTable ? parseTableMeta(schema, table, packet) : parseTableMetaByDesc(packet);
return new TableMeta(schema, table, fieldMetas);
}

public static List<FieldMeta> parseTableMeta(String schema, String table, ResultSetPacket packet) {
Expand All @@ -122,105 +139,123 @@ public static List<FieldMeta> parseTableMeta(String schema, String table, Result
* 处理desc table的结果
*/
public static List<FieldMeta> parseTableMetaByDesc(ResultSetPacket packet) {
Map<String, Integer> nameMaps = new HashMap<>(6, 1f);
List<FieldPacket> fieldPackets = packet.getFieldDescriptors();
int size = fieldPackets.size();
Map<String, Integer> nameMaps = new HashMap<>(size);
int index = 0;
for (FieldPacket fieldPacket : packet.getFieldDescriptors()) {
// for (FieldPacket fieldPacket : packet.getFieldDescriptors()) {
for (FieldPacket fieldPacket : fieldPackets) {
nameMaps.put(StringUtils.lowerCase(fieldPacket.getName()), index++);
}

int size = packet.getFieldDescriptors().size();
int count = packet.getFieldValues().size() / packet.getFieldDescriptors().size();
List<FieldMeta> result = new ArrayList<>();
List<String> fieldValues = packet.getFieldValues();
// int size = packet.getFieldDescriptors().size();
// int count = packet.getFieldValues().size() / packet.getFieldDescriptors().size();
int count = fieldValues.size() / size;
List<FieldMeta> result = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
FieldMeta meta = new FieldMeta();
// 做一个优化,使用String.intern(),共享String对象,减少内存使用
meta.setColumnName(packet.getFieldValues().get(nameMaps.get(COLUMN_NAME) + i * size).intern());
meta.setColumnType(packet.getFieldValues().get(nameMaps.get(COLUMN_TYPE) + i * size));
meta.setNullable(StringUtils.equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(IS_NULLABLE) + i
* size),
"YES"));
meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
meta.setUnique("UNI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
meta.setColumnName(fieldValues.get(nameMaps.get(COLUMN_NAME) + i * size).intern());
meta.setColumnType(fieldValues.get(nameMaps.get(COLUMN_TYPE) + i * size));
meta.setNullable(StringUtils.equalsIgnoreCase(fieldValues.get(nameMaps.get(IS_NULLABLE) + i * size), "YES"));
meta.setKey("PRI".equalsIgnoreCase(fieldValues.get(nameMaps.get(COLUMN_KEY) + i * size)));
meta.setUnique("UNI".equalsIgnoreCase(fieldValues.get(nameMaps.get(COLUMN_KEY) + i * size)));
// 特殊处理引号
meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(packet.getFieldValues()
.get(nameMaps.get(COLUMN_DEFAULT) + i * size)));
meta.setExtra(packet.getFieldValues().get(nameMaps.get(EXTRA) + i * size));

meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(fieldValues.get(nameMaps.get(COLUMN_DEFAULT) + i * size)));
meta.setExtra(fieldValues.get(nameMaps.get(EXTRA) + i * size));
result.add(meta);
}

return result;
}

public TableMeta getTableMeta(String schema, String table) {
return getTableMeta(schema, table, true);
}

public TableMeta getTableMeta(String schema, String table, boolean useCache) {
if (!useCache) {
tableMetaDB.invalidate(getFullName(schema, table));
}

return tableMetaDB.getUnchecked(getFullName(schema, table));
}

public TableMeta getTableMeta(String schema, String table, EntryPosition position) {
return getTableMeta(schema, table, true, position);
}

public synchronized TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
// This method is unused
// public TableMeta getTableMeta(String schema, String table) {
// return getTableMeta(schema, table, true);
// }

// This method is unused
// public TableMeta getTableMeta(String schema, String table, boolean useCache) {
// String fullName = getFullName(schema, table);
// if (!useCache) {
// // tableMetaCache.invalidate(getFullName(schema, table));
// tableMetaCache.invalidate(fullName);
// }
// // return tableMetaCache.getUnchecked(getFullName(schema, table));
// return tableMetaCache.getUnchecked(fullName);
// }

// This method is unused
// public TableMeta getTableMeta(String schema, String table, EntryPosition position) {
// return getTableMeta(schema, table, true, position);
// }

public /* synchronized */ TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
TableMeta tableMeta = null;
if (tableMetaTSDB != null) {
tableMeta = tableMetaTSDB.find(schema, table);
if (tableMeta == null) {
// 因为条件变化,可能第一次的tableMeta没取到,需要从db获取一次,并记录到snapshot中
String fullName = getFullName(schema, table);
ResultSetPacket packet = null;
String createDDL = null;
try {
try {
packet = connection.query("show create table " + fullName);
} catch (Exception e) {
// 尝试做一次retry操作
connection.reconnect();
packet = connection.query("show create table " + fullName);
}
if (packet.getFieldValues().size() > 0) {
createDDL = packet.getFieldValues().get(1);
synchronized (this) {
tableMeta = tableMetaTSDB.find(schema, table);
if (tableMeta != null) {
return tableMeta;
}

ResultSetPacket packet = null;
try {
packet = connection.query("show create table " + fullName);
} catch (Exception e) {
// 尝试做一次retry操作
connection.reconnect();
packet = connection.query("show create table " + fullName);
}

String createDDL = null;
if (packet.getFieldValues().size() > 0) {
createDDL = packet.getFieldValues().get(1);
}
// 强制覆盖掉内存值
tableMetaTSDB.apply(position, schema, createDDL, "first");
tableMeta = tableMetaTSDB.find(schema, table);
}
// 强制覆盖掉内存值
tableMetaTSDB.apply(position, schema, createDDL, "first");
tableMeta = tableMetaTSDB.find(schema, table);
} catch (IOException e) {
throw new CanalParseException("fetch failed by table meta:" + fullName, e);
}
}
return tableMeta;
} else {
String fullName = getFullName(schema, table);
if (!useCache) {
tableMetaDB.invalidate(getFullName(schema, table));
// tableMetaCache.invalidate(getFullName(schema, table));
tableMetaCache.invalidate(fullName);
}

return tableMetaDB.getUnchecked(getFullName(schema, table));
// return tableMetaCache.getUnchecked(getFullName(schema, table));
return tableMetaCache.getUnchecked(fullName);
}
}

public void clearTableMeta(String schema, String table) {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
tableMetaDB.invalidate(getFullName(schema, table));
tableMetaCache.invalidate(getFullName(schema, table));
}
}

public void clearTableMetaWithSchemaName(String schema) {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
for (String name : tableMetaDB.asMap().keySet()) {
if (StringUtils.startsWithIgnoreCase(name, schema + ".")) {
// removeNames.add(name);
tableMetaDB.invalidate(name);
ConcurrentMap<String, TableMeta> map = tableMetaCache.asMap();
if (!map.isEmpty()) {
for (String name : map.keySet()) {
if (StringUtils.startsWithIgnoreCase(name, schema + ".")) {
// removeNames.add(name);
tableMetaCache.invalidate(name);
}
}
}
}
Expand All @@ -230,7 +265,7 @@ public void clearTableMeta() {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
tableMetaDB.invalidateAll();
tableMetaCache.invalidateAll();
}
}

Expand All @@ -252,14 +287,10 @@ public boolean apply(EntryPosition position, String schema, String ddl, String e
}

private String getFullName(String schema, String table) {
StringBuilder builder = new StringBuilder();
return builder.append('`')
.append(schema)
.append('`')
StringBuilder builder = new StringBuilder(64);
return builder.append('`').append(schema).append('`')
.append('.')
.append('`')
.append(StringUtils.replace(table,"`","``"))
.append('`')
.append('`').append(StringUtils.replace(table,"`","``")).append('`')
.toString();
}

Expand All @@ -286,5 +317,4 @@ public boolean isOnPolarX() {
public void setOnPolarX(boolean isOnPolarX) {
this.isOnPolarX = isOnPolarX;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ public boolean apply(EntryPosition position, String schema, String ddl, String e
@Override
public boolean rollback(EntryPosition position) {
// 每次rollback需要重新构建一次memory data
// FIXME The global MemoryTableMeta may cause OOM when it store too many tables
this.memoryTableMeta = new MemoryTableMeta();
boolean flag = false;
EntryPosition snapshotPosition = buildMemFromSnapshot(position);
Expand Down
Loading