Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ canal.instance.filter.black.regex=
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.dynamicTag=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MQDestination {
private Integer partitionsNum;
private String partitionHash;
private String dynamicTopic;
private String dynamicTag;
private String dynamicTopicPartitionNum;
private Boolean enableDynamicQueuePartition;

Expand Down Expand Up @@ -65,6 +66,14 @@ public void setDynamicTopic(String dynamicTopic) {
this.dynamicTopic = dynamicTopic;
}

public String getDynamicTag() {
return dynamicTag;
}

public void setDynamicTag(String dynamicTag) {
this.dynamicTag = dynamicTag;
}

public String getDynamicTopicPartitionNum() {
return dynamicTopicPartitionNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,40 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes
for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
String topicName = entry.getKey().replace('.', '_');
com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
template.submit(() -> {
try {
send(destination, topicName, messageSub);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
if (!StringUtils.isEmpty(destination.getDynamicTag())) {
// 按动态tag发送
sendByDynamicTag(template, destination, messageSub, topicName, destination.getDynamicTag());
} else {
template.submit(() -> {
try {
send(destination,
topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
messageSub);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

template.waitForResult();
} else {
send(destination, destination.getTopic(), message);
if (!StringUtils.isEmpty(destination.getDynamicTag())) {
// 按动态tag发送
sendByDynamicTag(template,
destination,
message,
destination.getTopic(),
destination.getDynamicTag());

template.waitForResult();
} else {
send(destination,
destination.getTopic(),
((RocketMQProducerConfig) this.mqProperties).getTag(),
message);
}
}

callback.commit();
Expand All @@ -182,8 +204,37 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes
}
}

public void send(final MQDestination destination, String topicName,
com.alibaba.otter.canal.protocol.Message message) {
/**
* 按动态tag配置发送消息,动态tag配置采用与动态topic配置一致的分割处理
*
* @param template
* @param destination
* @param message
* @param topicName
* @param dynamicTagConfigs
*/
private void sendByDynamicTag(ExecutorTemplate template, MQDestination destination,
com.alibaba.otter.canal.protocol.Message message, String topicName,
String dynamicTagConfigs) {
// 动态tag, 直接使用[动态topic]相同的分隔逻辑
Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
.messageTopics(message, null, dynamicTagConfigs);
for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
String tagName = entry.getKey().replace('.', '_');
com.alibaba.otter.canal.protocol.Message messageTag = entry.getValue();

template.submit(() -> {
try {
send(destination, topicName, tagName, messageTag);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

private void send(final MQDestination destination, String topicName, String tagName,
com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());
Expand Down Expand Up @@ -215,7 +266,7 @@ public void send(final MQDestination destination, String topicName,
final int index = i;
template.submit(() -> {
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
tagName,
CanalMessageSerializerUtil.serializer(dataPartition,
mqProperties.isFilterTransactionEntry()));
sendMessage(data, index);
Expand All @@ -227,7 +278,7 @@ public void send(final MQDestination destination, String topicName,
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
tagName,
CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
sendMessage(data, partition);
}
Expand Down Expand Up @@ -265,7 +316,7 @@ public void send(final MQDestination destination, String topicName,
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
tagName,
JSON.toJSONBytes(flatMessage,
JSONWriter.Feature.WriteNulls,
JSONWriter.Feature.LargeObject)))
Expand All @@ -282,7 +333,7 @@ public void send(final MQDestination destination, String topicName,
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
List<Message> messages = flatMessages.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
tagName,
JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject)))
.collect(Collectors.toList());
// 批量发送
Expand Down
1 change: 1 addition & 0 deletions deployer/src/main/resources/example/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
#canal.mq.dynamicTag=mytest1.user,tag2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
Expand Down
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/default-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
<property name="topic" value="${canal.mq.topic}" />
<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
<property name="dynamicTag" value="${canal.mq.dynamicTag}" />
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
Expand Down
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/file-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
<property name="topic" value="${canal.mq.topic}" />
<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
<property name="dynamicTag" value="${canal.mq.dynamicTag}" />
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
Expand Down
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/group-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@
<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
<property name="topic" value="${canal.mq.topic}" />
<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
<property name="dynamicTag" value="${canal.mq.dynamicTag}" />
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
Expand Down
1 change: 1 addition & 0 deletions deployer/src/main/resources/spring/memory-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
<property name="topic" value="${canal.mq.topic}" />
<property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
<property name="dynamicTag" value="${canal.mq.dynamicTag}" />
<property name="partition" value="${canal.mq.partition}" />
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
<property name="partitionHash" value="${canal.mq.partitionHash}" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class CanalMQConfig {
private Integer partitionsNum;
private String partitionHash;
private String dynamicTopic;
private String dynamicTag;
private String dynamicTopicPartitionNum;
private Boolean enableDynamicQueuePartition;

Expand Down Expand Up @@ -50,6 +51,14 @@ public void setDynamicTopic(String dynamicTopic) {
this.dynamicTopic = dynamicTopic;
}

public String getDynamicTag() {
return dynamicTag;
}

public void setDynamicTag(String dynamicTag) {
this.dynamicTag = dynamicTag;
}

public String getDynamicTopicPartitionNum() {
return dynamicTopicPartitionNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private void worker(String destination, AtomicBoolean destinationRunning, CountD
canalDestination.setTopic(mqConfig.getTopic());
canalDestination.setPartition(mqConfig.getPartition());
canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
canalDestination.setDynamicTag(mqConfig.getDynamicTag());
canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
canalDestination.setPartitionHash(mqConfig.getPartitionHash());
canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());
Expand Down