From faca320451cc83cd0ad0e8648e5644d8b6af5db7 Mon Sep 17 00:00:00 2001 From: poorna2152 Date: Sat, 15 Mar 2025 16:05:04 +0530 Subject: [PATCH 1/2] Fix kafka offset from option --- .../io/kafka/source/KafkaConsumerThread.java | 14 ++++++++++---- .../extension/io/kafka/source/KafkaSource.java | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java index 4646c752..cb068e70 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java @@ -290,8 +290,8 @@ public void run() { } } if (!isReplayThread) { - kafkaSourceState.getTopicOffsetMap().get(record.topic()).put(record.partition(), - record.offset()); + kafkaSourceState.getTopicOffsetMap().get(record.topic()) + .putIfAbsent(record.partition(), record.offset()); } if (endReplay(record)) { inactive = true; @@ -300,7 +300,7 @@ public void run() { } } else { if (!isReplayThread) { - kafkaSourceState.getTopicOffsetMap().get(record.topic()).put(record.partition(), + kafkaSourceState.getTopicOffsetMap().get(record.topic()).putIfAbsent(record.partition(), record.offset()); } if (metrics != null) { @@ -351,7 +351,13 @@ public void run() { void seekToRequiredOffset() {} boolean isRecordAfterStartOffset(ConsumerRecord record) { - return true; + Map partitionMap = kafkaSourceState.getTopicOffsetMap().get(record.topic()); + if (partitionMap == null) { + return true; + } + + Long offsetThreshold = partitionMap.get(record.partition()); + return offsetThreshold == null || record.offset() > offsetThreshold; } boolean endReplay(ConsumerRecord record) { diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java index 2c455a4d..f9d6c304 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java @@ -314,7 +314,7 @@ public StateFactory init(SourceEventListener sourceEventListen "true")); topicOffsetMapConfig = optionHolder.validateAndGetStaticValue(TOPIC_OFFSET_MAP, null); } - partitions = (partitionList != null) ? partitionList.split(KafkaIOUtils.HEADER_SEPARATOR) : null; + partitions = (partitionList != null) ? partitionList.split(KafkaIOUtils.HEADER_SEPARATOR) : new String[]{"0"}; topics = topicList.split(KafkaIOUtils.HEADER_SEPARATOR); seqEnabled = optionHolder.validateAndGetStaticValue(SEQ_ENABLED, "false").equalsIgnoreCase("true"); optionalConfigs = optionHolder.validateAndGetStaticValue(ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, null); From 33ec9713b583aa77202115fdfff3f983721f1d80 Mon Sep 17 00:00:00 2001 From: poorna2152 Date: Sat, 15 Mar 2025 16:19:15 +0530 Subject: [PATCH 2/2] Update spotbugs --- findbugs-exclude.xml | 20 ++++---------------- pom.xml | 18 ++++++++++++++++-- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index ae08fffa..2e5b6476 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -18,22 +18,6 @@ --> - - - - - - - - - - - - - - - - @@ -42,4 +26,8 @@ + + + + diff --git a/pom.xml b/pom.xml index a1edf191..7207d55e 100644 --- a/pom.xml +++ b/pom.xml @@ -332,12 +332,26 @@ true - + com.github.spotbugs spotbugs-maven-plugin - + + Max + Low + true + ${project.build.directory}/findbugs ${maven.findbugsplugin.version.exclude} + + + + analyze-compile + compile + + check + + +