From 09b2216f162e9b44113a3ff3ca5910d090674aaa Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 24 Jan 2025 14:13:05 +0800 Subject: [PATCH 1/8] stash --- .../src/executor/source/source_executor.rs | 292 ++++++++++-------- 1 file changed, 158 insertions(+), 134 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index ae4d59bb5c8cf..1f70108faf3f4 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -79,6 +79,134 @@ pub struct SourceExecutor { is_shared_non_cdc: bool, } +struct StreamReaderBuilder { + pub source_desc: SourceDesc, + pub rate_limit: Option, + pub source_id: TableId, + pub source_name: String, + + // cdc related + pub is_auto_schema_change_enable: bool, + pub actor_ctx: ActorContextRef, +} + +impl StreamReaderBuilder { + fn prepare_source_stream_build(&self) -> (Vec, SourceContext) { + let column_ids = self + .source_desc + .columns + .iter() + .map(|column_desc| column_desc.column_id) + .collect_vec(); + + let (schema_change_tx, mut schema_change_rx) = + mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); + let schema_change_tx = if self.is_auto_schema_change_enable { + let meta_client = self.actor_ctx.meta_client.clone(); + // spawn a task to handle schema change event from source parser + let _join_handle = tokio::task::spawn(async move { + while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await { + let table_ids = schema_change.table_ids(); + tracing::info!( + target: "auto_schema_change", + "recv a schema change event for tables: {:?}", table_ids); + // TODO: retry on rpc error + if let Some(ref meta_client) = meta_client { + match meta_client + .auto_schema_change(schema_change.to_protobuf()) + .await + { + Ok(_) => { + tracing::info!( + target: "auto_schema_change", + "schema change success for tables: {:?}", table_ids); + finish_tx.send(()).unwrap(); + } + Err(e) => { + tracing::error!( + target: "auto_schema_change", + error = ?e.as_report(), "schema change error"); + finish_tx.send(()).unwrap(); + } + } + } + } + }); + Some(schema_change_tx) + } else { + info!("auto schema change is disabled in config"); + None + }; + + let source_ctx = SourceContext::new( + self.actor_ctx.id, + self.source_id, + self.actor_ctx.fragment_id, + self.source_name.clone(), + self.source_desc.metrics.clone(), + SourceCtrlOpts { + chunk_size: limited_chunk_size(self.rate_limit), + split_txn: self.rate_limit.is_some(), // when rate limiting, we may split txn + }, + self.source_desc.source.config.clone(), + schema_change_tx, + ); + + (column_ids, source_ctx) + } + + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn into_retry_stream( + self, + state: ConnectorState, + seek_to_latest: bool, + is_initial_build: bool, + ) { + let (column_ids, source_ctx) = self.prepare_source_stream_build(); + let mut is_first_run = true; + let source_ctx_ref = Arc::new(source_ctx); + + 'build_consume_loop: loop { + let build_stream_result = self + .source_desc + .source + .build_stream( + state.clone(), + column_ids.clone(), + source_ctx_ref.clone(), + seek_to_latest, + ) + .await; + if let Err(e) = build_stream_result { + if is_first_run && is_initial_build { + return Err(StreamExecutorError::connector_error(e)); + } else { + is_first_run = false; + continue 'build_consume_loop; + } + } + + let (stream, _) = build_stream_result.unwrap(); + let stream = apply_rate_limit(stream, self.rate_limit).boxed(); + #[for_await] + 'consume: for msg in stream { + match msg { + Ok(msg) => yield msg, + Err(e) => { + if is_first_run && is_initial_build { + return Err(StreamExecutorError::connector_error(e)); + } else { + break 'consume; + } + } + } + } + + is_first_run = false; + } + } +} + impl SourceExecutor { pub fn new( actor_ctx: ActorContextRef, @@ -100,6 +228,22 @@ impl SourceExecutor { } } + fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder { + StreamReaderBuilder { + source_desc, + rate_limit: self.rate_limit_rps, + source_id: self.stream_source_core.as_ref().unwrap().source_id, + source_name: self + .stream_source_core + .as_ref() + .unwrap() + .source_name + .clone(), + is_auto_schema_change_enable: self.is_auto_schema_change_enable(), + actor_ctx: self.actor_ctx.clone(), + } + } + async fn spawn_wait_checkpoint_worker( core: &StreamSourceCore, source_reader: SourceReader, @@ -260,7 +404,7 @@ impl SourceExecutor { .update_state_if_changed(target_splits, should_trim_state) .await? { - self.rebuild_stream_reader(source_desc, stream).await?; + self.rebuild_stream_reader(source_desc, stream)?; } } @@ -349,7 +493,7 @@ impl SourceExecutor { } /// Rebuild stream if there is a err in stream - async fn rebuild_stream_reader_from_error( + fn rebuild_stream_reader_from_error( &mut self, source_desc: &SourceDesc, stream: &mut StreamReaderWithPause, @@ -369,10 +513,10 @@ impl SourceExecutor { self.actor_ctx.fragment_id.to_string(), ]); - self.rebuild_stream_reader(source_desc, stream).await + self.rebuild_stream_reader(source_desc, stream) } - async fn rebuild_stream_reader( + fn rebuild_stream_reader( &mut self, source_desc: &SourceDesc, stream: &mut StreamReaderWithPause, @@ -387,10 +531,13 @@ impl SourceExecutor { ); // Replace the source reader with a new one of the new state. - let (reader, _) = self - .build_stream_source_reader(source_desc, Some(target_state.clone()), false) - .await?; - let reader = reader.map_err(StreamExecutorError::connector_error); + let replace_stream_reader_builder = self.stream_reader_builder(source_desc.clone()); + let reader_stream = replace_stream_reader_builder.into_retry_stream( + Some(target_state.clone()), + false, + false, + ); + let reader = reader_stream.map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); @@ -512,128 +659,7 @@ impl SourceExecutor { let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); // Build the source stream reader. - let (source_chunk_reader, latest_splits) = if is_uninitialized { - tracing::info!("source uninitialized, build source stream reader w/o retry."); - let (source_chunk_reader, latest_splits) = self - .build_stream_source_reader( - &source_desc, - recover_state, - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - // Note that shared CDC source is special. It already starts from latest. - self.is_shared_non_cdc, - ) - .instrument_await("source_build_reader") - .await?; - ( - source_chunk_reader.map_err(StreamExecutorError::connector_error), - latest_splits, - ) - } else { - tracing::info!("source initialized, build source stream reader with retry."); - // Build the source stream reader with retry during recovery. - // We only build source stream reader with retry during recovery, - // because we can rely on the persisted source states to recover the source stream - // and can avoid the potential race with "seek to latest" - // https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 - let mut reader_and_splits: Option<(BoxSourceChunkStream, Option>)> = - None; - let source_reader = source_desc.source.clone(); - let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); - let source_ctx = Arc::new(source_ctx); - let mut build_source_stream_fut = Box::pin(async move { - let backoff = get_infinite_backoff_strategy(); - tokio_retry::Retry::spawn(backoff, || async { - match source_reader - .build_stream( - recover_state.clone(), - column_ids.clone(), - source_ctx.clone(), - false, // not need to seek to latest since source state is initialized - ) - .await { - Ok((stream, res)) => Ok((stream, res.latest_splits)), - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); - Err(e) - } - } - }) - .instrument(tracing::info_span!("build_source_stream_with_retry")) - .await - .expect("Retry build source stream until success.") - }); - - // loop to create source stream until success - loop { - if let Some(barrier) = build_source_stream_and_poll_barrier( - &mut barrier_stream, - &mut reader_and_splits, - &mut build_source_stream_fut, - ) - .await? - { - if let Message::Barrier(barrier) = barrier { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Throttle(actor_to_apply) => { - if let Some(new_rate_limit) = - actor_to_apply.get(&self.actor_ctx.id) - && *new_rate_limit != self.rate_limit_rps - { - tracing::info!( - "updating rate limit from {:?} to {:?}", - self.rate_limit_rps, - *new_rate_limit - ); - - // update the rate limit option, we will apply the rate limit - // when we finish building the source stream. - self.rate_limit_rps = *new_rate_limit; - } - } - Mutation::Resume => { - // We record the Resume mutation here and postpone the resume of the source stream - // after we have successfully built the source stream. - received_resume_during_build = true; - } - _ => { - // ignore other mutations and output a warn log - tracing::warn!( - "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before - finish building source stream.", - mutation - ); - } - } - } - - // bump state store epoch - let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; - yield Message::Barrier(barrier); - } else { - unreachable!( - "Only barrier message is expected when building source stream." - ); - } - } else { - assert!(reader_and_splits.is_some()); - tracing::info!("source stream created successfully"); - break; - } - } - let (source_chunk_reader, latest_splits) = - reader_and_splits.expect("source chunk reader and splits must be created"); - - ( - apply_rate_limit(source_chunk_reader, self.rate_limit_rps) - .boxed() - .map_err(StreamExecutorError::connector_error), - latest_splits, - ) - }; + let init_reader_builder = self.stream_reader_builder(source_desc.clone()); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. @@ -677,8 +703,7 @@ impl SourceExecutor { while let Some(msg) = stream.next().await { let Ok(msg) = msg else { tokio::time::sleep(Duration::from_millis(1000)).await; - self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err()) - .await?; + self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?; continue; }; @@ -745,8 +770,7 @@ impl SourceExecutor { ); self.rate_limit_rps = *new_rate_limit; // recreate from latest_split_info - self.rebuild_stream_reader(&source_desc, &mut stream) - .await?; + self.rebuild_stream_reader(&source_desc, &mut stream)?; } } _ => {} From 00b960be17e281ee85059a7976fcaca92d047cc9 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 24 Jan 2025 22:38:19 +0800 Subject: [PATCH 2/8] rerun Signed-off-by: tabversion From 0616c06391fd0b9807c71d1bd81644e158ddc15a Mon Sep 17 00:00:00 2001 From: tabversion Date: Sat, 25 Jan 2025 00:44:47 +0800 Subject: [PATCH 3/8] build read stream with retry --- .../src/executor/source/source_executor.rs | 103 ++++++++---------- 1 file changed, 46 insertions(+), 57 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 1f70108faf3f4..60a1a411544a8 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,13 +13,10 @@ // limitations under the License. use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; use std::time::Duration; use anyhow::anyhow; use either::Either; -use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; @@ -41,7 +38,6 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; -use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -50,7 +46,6 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; -use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; @@ -155,15 +150,30 @@ impl StreamReaderBuilder { (column_ids, source_ctx) } - #[try_stream(ok = StreamChunk, error = StreamExecutorError)] - async fn into_retry_stream( - self, + async fn fetch_latest_splits( + &self, state: ConnectorState, seek_to_latest: bool, - is_initial_build: bool, - ) { + ) -> StreamExecutorResult { + let (column_ids, source_ctx) = self.prepare_source_stream_build(); + let source_ctx_ref = Arc::new(source_ctx); + let (_, res) = self + .source_desc + .source + .build_stream( + state.clone(), + column_ids.clone(), + source_ctx_ref.clone(), + seek_to_latest, + ) + .await + .map_err(StreamExecutorError::connector_error)?; + Ok(res.latest_splits) + } + + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn into_retry_stream(self, state: ConnectorState, is_initial_build: bool) { let (column_ids, source_ctx) = self.prepare_source_stream_build(); - let mut is_first_run = true; let source_ctx_ref = Arc::new(source_ctx); 'build_consume_loop: loop { @@ -174,14 +184,14 @@ impl StreamReaderBuilder { state.clone(), column_ids.clone(), source_ctx_ref.clone(), - seek_to_latest, + // just `seek_to_latest` for initial build + is_initial_build, ) .await; if let Err(e) = build_stream_result { - if is_first_run && is_initial_build { + if is_initial_build { return Err(StreamExecutorError::connector_error(e)); } else { - is_first_run = false; continue 'build_consume_loop; } } @@ -193,16 +203,13 @@ impl StreamReaderBuilder { match msg { Ok(msg) => yield msg, Err(e) => { - if is_first_run && is_initial_build { - return Err(StreamExecutorError::connector_error(e)); - } else { - break 'consume; - } + tracing::warn!(error = ?e.as_report(), "stream source reader error"); + break 'consume; } } } - - is_first_run = false; + tracing::info!("stream source reader error, retry in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; } } } @@ -532,11 +539,8 @@ impl SourceExecutor { // Replace the source reader with a new one of the new state. let replace_stream_reader_builder = self.stream_reader_builder(source_desc.clone()); - let reader_stream = replace_stream_reader_builder.into_retry_stream( - Some(target_state.clone()), - false, - false, - ); + let reader_stream = + replace_stream_reader_builder.into_retry_stream(Some(target_state.clone()), false); let reader = reader_stream.map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); @@ -606,7 +610,7 @@ impl SourceExecutor { Vec::default() }; let is_pause_on_startup = first_barrier.is_pause_on_startup(); - + let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id); yield Message::Barrier(first_barrier); let mut core = self.stream_source_core.unwrap(); @@ -626,9 +630,6 @@ impl SourceExecutor { }; core.split_state_store.init_epoch(first_epoch).await?; - // initial_dispatch_num is 0 means the source executor doesn't have downstream jobs - // and is newly created - let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core .split_state_store @@ -655,11 +656,22 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let mut received_resume_during_build = false; - let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); // Build the source stream reader. let init_reader_builder = self.stream_reader_builder(source_desc.clone()); + let mut latest_splits = None; + if is_uninitialized { + // need to seek to latest for initial build && shared source + latest_splits = init_reader_builder + .fetch_latest_splits( + recover_state.clone(), + is_uninitialized && self.is_shared_non_cdc, + ) + .await?; + } + let reader_stream = + init_reader_builder.into_retry_stream(recover_state.clone(), is_uninitialized); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. @@ -673,11 +685,11 @@ impl SourceExecutor { // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. let mut stream = - StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); + StreamReaderWithPause::::new(barrier_stream, reader_stream); let mut command_paused = false; // - If the first barrier requires us to pause on startup, pause the stream. - if is_pause_on_startup && !received_resume_during_build { + if is_pause_on_startup { tracing::info!("source paused on startup"); stream.pause_stream(); command_paused = true; @@ -890,29 +902,6 @@ impl SourceExecutor { } } -async fn build_source_stream_and_poll_barrier( - barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, - reader_and_splits: &mut Option<(BoxSourceChunkStream, Option>)>, - build_future: &mut Pin< - Box>)>>, - >, -) -> StreamExecutorResult> { - if reader_and_splits.is_some() { - return Ok(None); - } - - tokio::select! { - biased; - build_ret = &mut *build_future => { - *reader_and_splits = Some(build_ret); - Ok(None) - } - msg = barrier_stream.next() => { - msg.transpose() - } - } -} - impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() { From b3c891ad68ec891f23258e5de1d8e8560ebc3351 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 27 Jan 2025 14:36:32 +0800 Subject: [PATCH 4/8] fix --- e2e_test/source_inline/kafka/avro/alter_source.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index bb36e116279e4..d16010995fecd 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -34,6 +34,8 @@ sr_register avro_alter_source_test-value AVRO <<< '{"type":"record","name":"Root system ok echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test +sleep 1s + query ? select * from s ---- From 4ff021e638668b07891289f03df975da241356bd Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 27 Jan 2025 15:00:39 +0800 Subject: [PATCH 5/8] fix --- src/stream/src/executor/source/source_executor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 60a1a411544a8..7605e279b7217 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -666,12 +666,12 @@ impl SourceExecutor { latest_splits = init_reader_builder .fetch_latest_splits( recover_state.clone(), - is_uninitialized && self.is_shared_non_cdc, + self.is_shared_non_cdc, ) .await?; } let reader_stream = - init_reader_builder.into_retry_stream(recover_state.clone(), is_uninitialized); + init_reader_builder.into_retry_stream(recover_state.clone(), is_uninitialized && self.is_shared_non_cdc); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. From 39a4d30b42e5fe6d432f65669f3d04c3e6284a9a Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 27 Jan 2025 15:11:24 +0800 Subject: [PATCH 6/8] format --- src/stream/src/executor/source/source_executor.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 7605e279b7217..a7fe6aa00f036 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -664,14 +664,13 @@ impl SourceExecutor { if is_uninitialized { // need to seek to latest for initial build && shared source latest_splits = init_reader_builder - .fetch_latest_splits( - recover_state.clone(), - self.is_shared_non_cdc, - ) + .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc) .await?; } - let reader_stream = - init_reader_builder.into_retry_stream(recover_state.clone(), is_uninitialized && self.is_shared_non_cdc); + let reader_stream = init_reader_builder.into_retry_stream( + recover_state.clone(), + is_uninitialized && self.is_shared_non_cdc, + ); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. From dcd62a4cf02362946a00a3647cae7632701a5c5c Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 27 Jan 2025 16:29:34 +0800 Subject: [PATCH 7/8] backfill exec --- .../source/source_backfill_executor.rs | 137 +++++++----------- .../src/executor/source/source_executor.rs | 21 ++- 2 files changed, 65 insertions(+), 93 deletions(-) diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index d8117f86fb5dc..92caec6170f7c 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -20,28 +20,23 @@ use std::time::Instant; use anyhow::anyhow; use either::Either; use futures::stream::{select_with_strategy, PollNext}; -use itertools::Itertools; use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::JsonbVal; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; -use risingwave_connector::source::{ - BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, - SplitMetaData, -}; +use risingwave_connector::source::{BackfillInfo, SplitId, SplitImpl, SplitMetaData}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::TryWaitEpochOptions; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; use super::executor_core::StreamSourceCore; +use super::get_split_offset_col_idx; use super::source_backfill_state_table::BackfillStateTableHandler; -use super::{apply_rate_limit, get_split_offset_col_idx}; -use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; -use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; +use crate::executor::source::source_executor::{StreamReaderBuilder, WAIT_BARRIER_MULTIPLE_TIMES}; use crate::executor::UpdateMutation; use crate::task::CreateMviewProgressReporter; @@ -279,45 +274,15 @@ impl SourceBackfillExecutorInner { } } - async fn build_stream_source_reader( - &self, - source_desc: &SourceDesc, - splits: Vec, - ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap)> { - let column_ids = source_desc - .columns - .iter() - .map(|column_desc| column_desc.column_id) - .collect_vec(); - let source_ctx = SourceContext::new( - self.actor_ctx.id, - self.stream_source_core.source_id, - self.actor_ctx.fragment_id, - self.stream_source_core.source_name.clone(), - source_desc.metrics.clone(), - SourceCtrlOpts { - chunk_size: limited_chunk_size(self.rate_limit_rps), - split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn - }, - source_desc.source.config.clone(), - None, - ); - - // We will check watermark to decide whether we need to backfill. - // e.g., when there's a Kafka topic-partition without any data, - // we don't need to backfill at all. But if we do not check here, - // the executor can only know it's finished when data coming in. - // For blocking DDL, this would be annoying. - - let (stream, res) = source_desc - .source - .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false) - .await - .map_err(StreamExecutorError::connector_error)?; - Ok(( - apply_rate_limit(stream, self.rate_limit_rps).boxed(), - res.backfill_info, - )) + fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder { + StreamReaderBuilder { + source_desc, + rate_limit: self.rate_limit_rps, + source_id: self.stream_source_core.source_id, + source_name: self.stream_source_core.source_name.clone(), + actor_ctx: self.actor_ctx.clone(), + is_auto_schema_change_enable: false, + } } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -332,6 +297,7 @@ impl SourceBackfillExecutorInner { .unwrap_or(&[]) .to_vec(); let is_pause_on_startup = barrier.is_pause_on_startup(); + let is_initialize = barrier.is_newly_added(self.actor_ctx.id); yield Message::Barrier(barrier); let mut core = self.stream_source_core; @@ -369,26 +335,32 @@ impl SourceBackfillExecutorInner { // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = core; + let stream_reader_builder = self.stream_reader_builder(source_desc.clone()); - let (source_chunk_reader, backfill_info) = self - .build_stream_source_reader( - &source_desc, - backfill_stage.get_latest_unfinished_splits()?, - ) - .instrument_await("source_build_reader") - .await?; - for (split_id, info) in &backfill_info { - let state = backfill_stage.states.get_mut(split_id).unwrap(); - match info { - BackfillInfo::NoDataToBackfill => { - state.state = BackfillState::Finished; - } - BackfillInfo::HasDataToBackfill { latest_offset } => { - // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value. - state.target_offset = Some(latest_offset.clone()); + if is_initialize { + let backfill_info = stream_reader_builder + .fetch_latest_splits(Some(backfill_stage.get_latest_unfinished_splits()?), false) + .await? + .backfill_info; + for (split_id, info) in &backfill_info { + let state = backfill_stage.states.get_mut(split_id).unwrap(); + match info { + BackfillInfo::NoDataToBackfill => { + state.state = BackfillState::Finished; + } + BackfillInfo::HasDataToBackfill { latest_offset } => { + // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value. + state.target_offset = Some(latest_offset.clone()); + } } } } + let retry_reader_stream = stream_reader_builder + .into_retry_stream( + Some(backfill_stage.get_latest_unfinished_splits()?), + is_initialize, + ) + .boxed(); tracing::debug!(?backfill_stage, "source backfill started"); fn select_strategy(_: &mut ()) -> PollNext { @@ -402,7 +374,7 @@ impl SourceBackfillExecutorInner { // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up. let mut backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), - source_chunk_reader.map(Either::Right), + retry_reader_stream.map(Either::Right), select_strategy, ); @@ -487,12 +459,13 @@ impl SourceBackfillExecutorInner { self.actor_ctx.fragment_id.to_string(), ]); - let (reader, _backfill_info) = self - .build_stream_source_reader( - &source_desc, - backfill_stage.get_latest_unfinished_splits()?, + let reader = self + .stream_reader_builder(source_desc.clone()) + .into_retry_stream( + Some(backfill_stage.get_latest_unfinished_splits()?), + true, ) - .await?; + .boxed(); backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -582,13 +555,16 @@ impl SourceBackfillExecutorInner { ); self.rate_limit_rps = *new_rate_limit; // rebuild reader - let (reader, _backfill_info) = self - .build_stream_source_reader( - &source_desc, - backfill_stage - .get_latest_unfinished_splits()?, + let reader = self + .stream_reader_builder(source_desc.clone()) + .into_retry_stream( + Some( + backfill_stage + .get_latest_unfinished_splits()?, + ), + true, ) - .await?; + .boxed(); backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -612,13 +588,10 @@ impl SourceBackfillExecutorInner { latest_unfinished_splits ); - // Replace the source reader with a new one of the new state. - let (reader, _backfill_info) = self - .build_stream_source_reader( - &source_desc, - latest_unfinished_splits, - ) - .await?; + let reader = self + .stream_reader_builder(source_desc.clone()) + .into_retry_stream(Some(latest_unfinished_splits), true) + .boxed(); backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index a7fe6aa00f036..df6bc279cfae1 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -17,7 +17,6 @@ use std::time::Duration; use anyhow::anyhow; use either::Either; -use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; use risingwave_common::catalog::{ColumnId, TableId}; @@ -29,8 +28,8 @@ use risingwave_connector::parser::schema_change::SchemaChangeEnvelope; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ - BoxSourceChunkStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, - SplitMetaData, WaitCheckpointTask, + BoxSourceChunkStream, ConnectorState, CreateSplitReaderResult, SourceContext, SourceCtrlOpts, + SplitId, SplitImpl, SplitMetaData, WaitCheckpointTask, }; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::TryWaitEpochOptions; @@ -74,7 +73,7 @@ pub struct SourceExecutor { is_shared_non_cdc: bool, } -struct StreamReaderBuilder { +pub(crate) struct StreamReaderBuilder { pub source_desc: SourceDesc, pub rate_limit: Option, pub source_id: TableId, @@ -150,11 +149,11 @@ impl StreamReaderBuilder { (column_ids, source_ctx) } - async fn fetch_latest_splits( + pub(crate) async fn fetch_latest_splits( &self, state: ConnectorState, seek_to_latest: bool, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let (column_ids, source_ctx) = self.prepare_source_stream_build(); let source_ctx_ref = Arc::new(source_ctx); let (_, res) = self @@ -168,11 +167,11 @@ impl StreamReaderBuilder { ) .await .map_err(StreamExecutorError::connector_error)?; - Ok(res.latest_splits) + Ok(res) } #[try_stream(ok = StreamChunk, error = StreamExecutorError)] - async fn into_retry_stream(self, state: ConnectorState, is_initial_build: bool) { + pub(crate) async fn into_retry_stream(self, state: ConnectorState, is_initial_build: bool) { let (column_ids, source_ctx) = self.prepare_source_stream_build(); let source_ctx_ref = Arc::new(source_ctx); @@ -541,9 +540,8 @@ impl SourceExecutor { let replace_stream_reader_builder = self.stream_reader_builder(source_desc.clone()); let reader_stream = replace_stream_reader_builder.into_retry_stream(Some(target_state.clone()), false); - let reader = reader_stream.map_err(StreamExecutorError::connector_error); - stream.replace_data_stream(reader); + stream.replace_data_stream(reader_stream); Ok(()) } @@ -665,7 +663,8 @@ impl SourceExecutor { // need to seek to latest for initial build && shared source latest_splits = init_reader_builder .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc) - .await?; + .await? + .latest_splits; } let reader_stream = init_reader_builder.into_retry_stream( recover_state.clone(), From 2b103d68765e5986dbb6da836a30332a599961f3 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 31 Jan 2025 13:25:31 +0800 Subject: [PATCH 8/8] add log --- src/stream/src/executor/source/source_executor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index df6bc279cfae1..902baf43d09a3 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -191,6 +191,8 @@ impl StreamReaderBuilder { if is_initial_build { return Err(StreamExecutorError::connector_error(e)); } else { + tracing::warn!(error = ?e.as_report(), "build stream source reader error, retry in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; continue 'build_consume_loop; } }