diff --git a/admin/admin-web/src/main/resources/instance-template.properties b/admin/admin-web/src/main/resources/instance-template.properties index 5ceccab6d2..58f0349787 100644 --- a/admin/admin-web/src/main/resources/instance-template.properties +++ b/admin/admin-web/src/main/resources/instance-template.properties @@ -50,6 +50,7 @@ canal.instance.filter.black.regex= canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* +#canal.mq.dynamicTag=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 diff --git a/connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQDestination.java b/connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQDestination.java index bdf0858794..0ab66af932 100644 --- a/connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQDestination.java +++ b/connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQDestination.java @@ -14,6 +14,7 @@ public class MQDestination { private Integer partitionsNum; private String partitionHash; private String dynamicTopic; + private String dynamicTag; private String dynamicTopicPartitionNum; private Boolean enableDynamicQueuePartition; @@ -65,6 +66,14 @@ public void setDynamicTopic(String dynamicTopic) { this.dynamicTopic = dynamicTopic; } + public String getDynamicTag() { + return dynamicTag; + } + + public void setDynamicTag(String dynamicTag) { + this.dynamicTag = dynamicTag; + } + public String getDynamicTopicPartitionNum() { return dynamicTopicPartitionNum; } diff --git a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java index eb478c9530..19e7a144e5 100644 --- a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java +++ b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java @@ -159,18 +159,40 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes for (Map.Entry entry : messageMap.entrySet()) { String topicName = entry.getKey().replace('.', '_'); com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue(); - template.submit(() -> { - try { - send(destination, topicName, messageSub); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + if (!StringUtils.isEmpty(destination.getDynamicTag())) { + // 按动态tag发送 + sendByDynamicTag(template, destination, messageSub, topicName, destination.getDynamicTag()); + } else { + template.submit(() -> { + try { + send(destination, + topicName, + ((RocketMQProducerConfig) this.mqProperties).getTag(), + messageSub); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } } template.waitForResult(); } else { - send(destination, destination.getTopic(), message); + if (!StringUtils.isEmpty(destination.getDynamicTag())) { + // 按动态tag发送 + sendByDynamicTag(template, + destination, + message, + destination.getTopic(), + destination.getDynamicTag()); + + template.waitForResult(); + } else { + send(destination, + destination.getTopic(), + ((RocketMQProducerConfig) this.mqProperties).getTag(), + message); + } } callback.commit(); @@ -182,8 +204,37 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes } } - public void send(final MQDestination destination, String topicName, - com.alibaba.otter.canal.protocol.Message message) { + /** + * 按动态tag配置发送消息,动态tag配置采用与动态topic配置一致的分割处理 + * + * @param template + * @param destination + * @param message + * @param topicName + * @param dynamicTagConfigs + */ + private void sendByDynamicTag(ExecutorTemplate template, MQDestination destination, + com.alibaba.otter.canal.protocol.Message message, String topicName, + String dynamicTagConfigs) { + // 动态tag, 直接使用[动态topic]相同的分隔逻辑 + Map messageMap = MQMessageUtils + .messageTopics(message, null, dynamicTagConfigs); + for (Map.Entry entry : messageMap.entrySet()) { + String tagName = entry.getKey().replace('.', '_'); + com.alibaba.otter.canal.protocol.Message messageTag = entry.getValue(); + + template.submit(() -> { + try { + send(destination, topicName, tagName, messageTag); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + } + + private void send(final MQDestination destination, String topicName, String tagName, + com.alibaba.otter.canal.protocol.Message message) { // 获取当前topic的分区数 Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, destination.getDynamicTopicPartitionNum()); @@ -215,7 +266,7 @@ public void send(final MQDestination destination, String topicName, final int index = i; template.submit(() -> { Message data = new Message(topicName, - ((RocketMQProducerConfig) this.mqProperties).getTag(), + tagName, CanalMessageSerializerUtil.serializer(dataPartition, mqProperties.isFilterTransactionEntry())); sendMessage(data, index); @@ -227,7 +278,7 @@ public void send(final MQDestination destination, String topicName, } else { final int partition = destination.getPartition() != null ? destination.getPartition() : 0; Message data = new Message(topicName, - ((RocketMQProducerConfig) this.mqProperties).getTag(), + tagName, CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry())); sendMessage(data, partition); } @@ -265,7 +316,7 @@ public void send(final MQDestination destination, String topicName, template.submit(() -> { List messages = flatMessagePart.stream() .map(flatMessage -> new Message(topicName, - ((RocketMQProducerConfig) this.mqProperties).getTag(), + tagName, JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject))) @@ -282,7 +333,7 @@ public void send(final MQDestination destination, String topicName, final int partition = destination.getPartition() != null ? destination.getPartition() : 0; List messages = flatMessages.stream() .map(flatMessage -> new Message(topicName, - ((RocketMQProducerConfig) this.mqProperties).getTag(), + tagName, JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject))) .collect(Collectors.toList()); // 批量发送 diff --git a/deployer/src/main/resources/example/instance.properties b/deployer/src/main/resources/example/instance.properties index 6b177096cc..30079d128b 100644 --- a/deployer/src/main/resources/example/instance.properties +++ b/deployer/src/main/resources/example/instance.properties @@ -63,6 +63,7 @@ canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..* +#canal.mq.dynamicTag=mytest1.user,tag2:mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.enableDynamicQueuePartition=false diff --git a/deployer/src/main/resources/spring/default-instance.xml b/deployer/src/main/resources/spring/default-instance.xml index 095b003177..05faf7f882 100644 --- a/deployer/src/main/resources/spring/default-instance.xml +++ b/deployer/src/main/resources/spring/default-instance.xml @@ -229,6 +229,7 @@ + diff --git a/deployer/src/main/resources/spring/file-instance.xml b/deployer/src/main/resources/spring/file-instance.xml index 9f0ac3eec1..1809593b8c 100644 --- a/deployer/src/main/resources/spring/file-instance.xml +++ b/deployer/src/main/resources/spring/file-instance.xml @@ -215,6 +215,7 @@ + diff --git a/deployer/src/main/resources/spring/group-instance.xml b/deployer/src/main/resources/spring/group-instance.xml index 3f7a13e28b..abb50e0f04 100644 --- a/deployer/src/main/resources/spring/group-instance.xml +++ b/deployer/src/main/resources/spring/group-instance.xml @@ -331,6 +331,7 @@ + diff --git a/deployer/src/main/resources/spring/memory-instance.xml b/deployer/src/main/resources/spring/memory-instance.xml index 55b7e6876d..f716dbc3d5 100644 --- a/deployer/src/main/resources/spring/memory-instance.xml +++ b/deployer/src/main/resources/spring/memory-instance.xml @@ -203,6 +203,7 @@ + diff --git a/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java b/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java index af362271e6..09a96b8807 100644 --- a/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java +++ b/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java @@ -7,6 +7,7 @@ public class CanalMQConfig { private Integer partitionsNum; private String partitionHash; private String dynamicTopic; + private String dynamicTag; private String dynamicTopicPartitionNum; private Boolean enableDynamicQueuePartition; @@ -50,6 +51,14 @@ public void setDynamicTopic(String dynamicTopic) { this.dynamicTopic = dynamicTopic; } + public String getDynamicTag() { + return dynamicTag; + } + + public void setDynamicTag(String dynamicTag) { + this.dynamicTag = dynamicTag; + } + public String getDynamicTopicPartitionNum() { return dynamicTopicPartitionNum; } diff --git a/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java b/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java index ea53ae7546..83c72125ef 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java @@ -160,6 +160,7 @@ private void worker(String destination, AtomicBoolean destinationRunning, CountD canalDestination.setTopic(mqConfig.getTopic()); canalDestination.setPartition(mqConfig.getPartition()); canalDestination.setDynamicTopic(mqConfig.getDynamicTopic()); + canalDestination.setDynamicTag(mqConfig.getDynamicTag()); canalDestination.setPartitionsNum(mqConfig.getPartitionsNum()); canalDestination.setPartitionHash(mqConfig.getPartitionHash()); canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());