From 96e81586c39c90bdd12f5de1506e6f9293da260f Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 24 Jan 2025 14:27:38 +0800 Subject: [PATCH] fix: finish source backfill immediately for scan.startup.mode=latest (#20285) Signed-off-by: xxchan --- .../kafka/shared_source.slt.serial | 30 +++++++++++++++++++ .../src/source/kafka/source/reader.rs | 16 +++++++--- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/e2e_test/source_inline/kafka/shared_source.slt.serial b/e2e_test/source_inline/kafka/shared_source.slt.serial index 42d4cf86725ef..97129c7f6fff3 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt.serial +++ b/e2e_test/source_inline/kafka/shared_source.slt.serial @@ -99,6 +99,10 @@ SET streaming_use_shared_source TO false; statement ok create materialized view mv_2 as select * from s0; +statement ok +SET streaming_use_shared_source TO true; + + sleep 2s query ?? rowsort @@ -370,3 +374,29 @@ drop source s0 cascade; statement ok drop source s_before_produce cascade; + +# test: scan.startup.mode=latest should not be blocked when there's no data to backfill +# https://github.com/risingwavelabs/risingwave/issues/20083#issuecomment-2609422824 +statement ok +create source s_latest (v1 int, v2 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source', + scan.startup.mode = 'latest' +) FORMAT PLAIN ENCODE JSON; + +# Note: batch kafka scan ignores scan.startup.mode +query ? rowsort +select count(*) from s_latest; +---- +55 + +statement ok +create materialized view mv_latest as select * from s_latest; + +query ? rowsort +select count(*) from mv_latest; +---- +0 + +system ok +rpk topic delete shared_source; diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index e0eddb0dee507..502f06ae31a6c 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -124,9 +124,9 @@ impl SplitReader for KafkaSplitReader { properties.common.sync_call_timeout, ) .await?; - tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}"); - // note: low is inclusive, high is exclusive - if low == high { + tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}"); + // note: low is inclusive, high is exclusive, start_offset is exclusive + if low == high || split.start_offset.is_some_and(|offset| offset + 1 >= high) { backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill); } else { debug_assert!(high > 0); @@ -138,7 +138,15 @@ impl SplitReader for KafkaSplitReader { ); } } - tracing::debug!("backfill_info: {:?}", backfill_info); + tracing::info!( + topic = properties.common.topic, + source_name = source_ctx.source_name, + fragment_id = source_ctx.fragment_id, + source_id = source_ctx.source_id.table_id, + actor_id = source_ctx.actor_id, + "backfill_info: {:?}", + backfill_info + ); consumer.assign(&tpl)?;