diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 98ddf1c2598a4..367f06123cf1f 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -44,8 +44,10 @@ if [ "$profile" == "ci-dev" ]; then echo "--- Run debug mode only tests" risedev slt './e2e_test/debug_mode_only/debug_splits.slt' fi + risedev slt './e2e_test/source_inline/**/*.slt' -j4 risedev slt './e2e_test/source_inline/**/*.slt.serial' + echo "--- Kill cluster" risedev ci-kill diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index bb36e116279e4..d16010995fecd 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -34,6 +34,8 @@ sr_register avro_alter_source_test-value AVRO <<< '{"type":"record","name":"Root system ok echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test +sleep 1s + query ? select * from s ---- diff --git a/e2e_test/source_inline/kafka/issue_19563.slt.serial b/e2e_test/source_inline/kafka/issue_19563.slt.serial index 0f04a5e81c10a..7e9168dd76330 100644 --- a/e2e_test/source_inline/kafka/issue_19563.slt.serial +++ b/e2e_test/source_inline/kafka/issue_19563.slt.serial @@ -18,7 +18,7 @@ WITH ( ); # Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 3 upstream fragments. -query T +query T retry 3 backoff 5s explain create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000; ---- StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } @@ -41,7 +41,7 @@ StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_co statement ok create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000; -query I +query I retry 3 backoff 5s select array_length(upstream_fragment_ids) from rw_fragments where array_contains(flags, Array['SOURCE_SCAN']); ---- 3 @@ -58,7 +58,7 @@ EOF sleep 3s # Below lower bound and above upper bound are not shown -query I +query I retry 3 backoff 5s select * from mv1 order by v1; ---- 3031-01-01 19:00:00+00:00 diff --git a/e2e_test/source_inline/tvf/mysql_query.slt b/e2e_test/source_inline/tvf/mysql_query.slt index 7b2e9fff77bfd..4886fa10b533c 100644 --- a/e2e_test/source_inline/tvf/mysql_query.slt +++ b/e2e_test/source_inline/tvf/mysql_query.slt @@ -70,7 +70,7 @@ create source mysql_cdc_source with ( database.name = 'tvf', ); -query +query I retry 3 backoff 5s select * from mysql_query('$MYSQL_HOST', '$MYSQL_TCP_PORT', '$RISEDEV_MYSQL_USER', '$MYSQL_PWD', 'tvf', 'select * from test;'); ---- 1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL @@ -163,7 +163,7 @@ create table test ( sleep 5s -query I +query I retry 3 backoff 5s select * from test order by id; ---- 1 t t 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x0a00 \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL diff --git a/src/stream/src/executor/source/executor_core.rs b/src/stream/src/executor/source/executor_core.rs index 82f5838ce0dc0..f56581be0d40c 100644 --- a/src/stream/src/executor/source/executor_core.rs +++ b/src/stream/src/executor/source/executor_core.rs @@ -13,11 +13,13 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use risingwave_common::catalog::{ColumnId, TableId}; use risingwave_connector::source::reader::desc::SourceDescBuilder; use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_storage::StateStore; +use tokio::sync::RwLock; use super::SourceStateTableHandler; @@ -35,7 +37,7 @@ pub struct StreamSourceCore { /// Split info for stream source. A source executor might read data from several splits of /// external connector. - pub(crate) latest_split_info: HashMap, + pub(crate) latest_split_info: Arc>>, /// Stores information of the splits. pub(crate) split_state_store: SourceStateTableHandler, @@ -64,16 +66,16 @@ where source_name, column_ids, source_desc_builder: Some(source_desc_builder), - latest_split_info: HashMap::new(), + latest_split_info: Arc::new(RwLock::new(HashMap::new())), split_state_store, updated_splits_in_epoch: HashMap::new(), } } - pub fn init_split_state(&mut self, splits: Vec) { - self.latest_split_info = splits - .into_iter() - .map(|split| (split.id(), split)) - .collect(); + pub async fn init_split_state(&mut self, splits: Vec) { + let mut latest_split_info_guard = self.latest_split_info.write().await; + for split in splits { + latest_split_info_guard.insert(split.id(), split); + } } } diff --git a/src/stream/src/executor/source/legacy_fs_source_executor.rs b/src/stream/src/executor/source/legacy_fs_source_executor.rs index 8774d5c468b7f..bc77e007805df 100644 --- a/src/stream/src/executor/source/legacy_fs_source_executor.rs +++ b/src/stream/src/executor/source/legacy_fs_source_executor.rs @@ -127,6 +127,8 @@ impl LegacyFsSourceExecutor { let target_state: Vec = self .stream_source_core .latest_split_info + .read() + .await .values() .cloned() .collect(); @@ -224,7 +226,7 @@ impl LegacyFsSourceExecutor { .map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); - self.stream_source_core.latest_split_info = target_state + *self.stream_source_core.latest_split_info.write().await = target_state .into_iter() .map(|split| (split.id(), split)) .collect(); @@ -352,7 +354,9 @@ impl LegacyFsSourceExecutor { } // init in-memory split states with persisted state if any - self.stream_source_core.init_split_state(boot_state.clone()); + self.stream_source_core + .init_split_state(boot_state.clone()) + .await; let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); @@ -470,17 +474,18 @@ impl LegacyFsSourceExecutor { } // update split offset if let Some(mapping) = split_offset_mapping { + let mut latest_split_info_guard = + self.stream_source_core.latest_split_info.write().await; let state: Vec<(SplitId, SplitImpl)> = mapping .iter() .flat_map(|(id, offset)| { - self.stream_source_core.latest_split_info.get_mut(id).map( - |origin_split| { - origin_split.update_in_place(offset.clone())?; - Ok::<_, ConnectorError>((id.clone(), origin_split.clone())) - }, - ) + latest_split_info_guard.get_mut(id).map(|origin_split| { + origin_split.update_in_place(offset.clone())?; + Ok::<_, ConnectorError>((id.clone(), origin_split.clone())) + }) }) .try_collect()?; + drop(latest_split_info_guard); self.stream_source_core .updated_splits_in_epoch diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index d8117f86fb5dc..e40d9e90153dd 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -20,31 +20,30 @@ use std::time::Instant; use anyhow::anyhow; use either::Either; use futures::stream::{select_with_strategy, PollNext}; -use itertools::Itertools; use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS}; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::JsonbVal; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; -use risingwave_connector::source::{ - BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, - SplitMetaData, -}; +use risingwave_connector::source::{BackfillInfo, SplitId, SplitImpl, SplitMetaData}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::TryWaitEpochOptions; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; +use tokio::sync::RwLock; use super::executor_core::StreamSourceCore; +use super::get_split_offset_col_idx; use super::source_backfill_state_table::BackfillStateTableHandler; -use super::{apply_rate_limit, get_split_offset_col_idx}; -use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; -use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; +use crate::executor::source::source_executor::{StreamReaderBuilder, WAIT_BARRIER_MULTIPLE_TIMES}; +use crate::executor::source::GetLatestSplitInfoFn; use crate::executor::UpdateMutation; use crate::task::CreateMviewProgressReporter; +type SharedBackfillStage = Arc>; + #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum BackfillState { /// `None` means not started yet. It's the initial state. @@ -279,45 +278,21 @@ impl SourceBackfillExecutorInner { } } - async fn build_stream_source_reader( + fn stream_reader_builder( &self, - source_desc: &SourceDesc, - splits: Vec, - ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap)> { - let column_ids = source_desc - .columns - .iter() - .map(|column_desc| column_desc.column_id) - .collect_vec(); - let source_ctx = SourceContext::new( - self.actor_ctx.id, - self.stream_source_core.source_id, - self.actor_ctx.fragment_id, - self.stream_source_core.source_name.clone(), - source_desc.metrics.clone(), - SourceCtrlOpts { - chunk_size: limited_chunk_size(self.rate_limit_rps), - split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn - }, - source_desc.source.config.clone(), - None, - ); - - // We will check watermark to decide whether we need to backfill. - // e.g., when there's a Kafka topic-partition without any data, - // we don't need to backfill at all. But if we do not check here, - // the executor can only know it's finished when data coming in. - // For blocking DDL, this would be annoying. - - let (stream, res) = source_desc - .source - .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false) - .await - .map_err(StreamExecutorError::connector_error)?; - Ok(( - apply_rate_limit(stream, self.rate_limit_rps).boxed(), - res.backfill_info, - )) + source_desc: SourceDesc, + get_latest_split_info_fn: GetLatestSplitInfoFn, + ) -> StreamReaderBuilder { + StreamReaderBuilder { + source_desc, + rate_limit: self.rate_limit_rps, + source_id: self.stream_source_core.source_id, + source_name: self.stream_source_core.source_name.clone(), + actor_ctx: self.actor_ctx.clone(), + is_auto_schema_change_enable: false, + get_latest_split_info_fn, + reader_stream: None, + } } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -332,6 +307,7 @@ impl SourceBackfillExecutorInner { .unwrap_or(&[]) .to_vec(); let is_pause_on_startup = barrier.is_pause_on_startup(); + let is_initialize = barrier.is_newly_added(self.actor_ctx.id); yield Message::Barrier(barrier); let mut core = self.stream_source_core; @@ -361,34 +337,59 @@ impl SourceBackfillExecutorInner { }); backfill_states.insert(split_id, backfill_state); } - let mut backfill_stage = BackfillStage { + let backfill_stage = Arc::new(RwLock::new(BackfillStage { states: backfill_states, splits: owned_splits, - }; - backfill_stage.debug_assert_consistent(); + })); + backfill_stage.read().await.debug_assert_consistent(); // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = core; - let (source_chunk_reader, backfill_info) = self - .build_stream_source_reader( - &source_desc, - backfill_stage.get_latest_unfinished_splits()?, - ) - .instrument_await("source_build_reader") - .await?; - for (split_id, info) in &backfill_info { - let state = backfill_stage.states.get_mut(split_id).unwrap(); - match info { - BackfillInfo::NoDataToBackfill => { - state.state = BackfillState::Finished; - } - BackfillInfo::HasDataToBackfill { latest_offset } => { - // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value. - state.target_offset = Some(latest_offset.clone()); + let latest_split_info_weak = Arc::downgrade(&backfill_stage); + let get_latest_split_info_fn: GetLatestSplitInfoFn = Arc::new(move || { + let weak = latest_split_info_weak.clone(); + Box::pin(async move { + let Some(backfill_stage_ref) = weak.upgrade() else { + return Err(StreamExecutorError::connector_error(anyhow!( + "backfill_stage is already dropped, retry later" + ))); + }; + let backfill_stage_guard = backfill_stage_ref.read().await; + backfill_stage_guard.get_latest_unfinished_splits() + }) + }); + let mut stream_reader_builder = + self.stream_reader_builder(source_desc.clone(), get_latest_split_info_fn.clone()); + + if is_initialize { + let mut backfill_stage_guard = backfill_stage.write().await; + let backfill_info = stream_reader_builder + .fetch_latest_splits( + Some(backfill_stage_guard.get_latest_unfinished_splits()?), + false, + ) + .await? + .backfill_info; + for (split_id, info) in &backfill_info { + let state = backfill_stage_guard.states.get_mut(split_id).unwrap(); + match info { + BackfillInfo::NoDataToBackfill => { + state.state = BackfillState::Finished; + } + BackfillInfo::HasDataToBackfill { latest_offset } => { + // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value. + state.target_offset = Some(latest_offset.clone()); + } } } } + let retry_reader_stream = stream_reader_builder + .into_retry_stream( + Some(backfill_stage.read().await.get_latest_unfinished_splits()?), + false, // do not use the latest, because we need to backfill from the beginning + ) + .boxed(); tracing::debug!(?backfill_stage, "source backfill started"); fn select_strategy(_: &mut ()) -> PollNext { @@ -402,7 +403,7 @@ impl SourceBackfillExecutorInner { // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up. let mut backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), - source_chunk_reader.map(Either::Right), + retry_reader_stream.map(Either::Right), select_strategy, ); @@ -487,12 +488,21 @@ impl SourceBackfillExecutorInner { self.actor_ctx.fragment_id.to_string(), ]); - let (reader, _backfill_info) = self - .build_stream_source_reader( - &source_desc, - backfill_stage.get_latest_unfinished_splits()?, + let reader = self + .stream_reader_builder( + source_desc.clone(), + get_latest_split_info_fn.clone(), ) - .await?; + .into_retry_stream( + Some( + backfill_stage + .read() + .await + .get_latest_unfinished_splits()?, + ), + false, + ) + .boxed(); backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -554,7 +564,7 @@ impl SourceBackfillExecutorInner { split_changed = self .apply_split_change( actor_splits, - &mut backfill_stage, + &backfill_stage, true, ) .await?; @@ -565,7 +575,7 @@ impl SourceBackfillExecutorInner { split_changed = self .apply_split_change( actor_splits, - &mut backfill_stage, + &backfill_stage, false, ) .await?; @@ -582,13 +592,21 @@ impl SourceBackfillExecutorInner { ); self.rate_limit_rps = *new_rate_limit; // rebuild reader - let (reader, _backfill_info) = self - .build_stream_source_reader( - &source_desc, - backfill_stage - .get_latest_unfinished_splits()?, + let reader = self + .stream_reader_builder( + source_desc.clone(), + get_latest_split_info_fn.clone(), + ) + .into_retry_stream( + Some( + backfill_stage + .read() + .await + .get_latest_unfinished_splits()?, + ), + false, ) - .await?; + .boxed(); backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -604,21 +622,23 @@ impl SourceBackfillExecutorInner { // rebuild backfill_stream // Note: we don't put this part in a method, due to some complex lifetime issues. - let latest_unfinished_splits = - backfill_stage.get_latest_unfinished_splits()?; + let latest_unfinished_splits = backfill_stage + .read() + .await + .get_latest_unfinished_splits()?; tracing::info!( "actor {:?} apply source split change to {:?}", self.actor_ctx.id, latest_unfinished_splits ); - // Replace the source reader with a new one of the new state. - let (reader, _backfill_info) = self - .build_stream_source_reader( - &source_desc, - latest_unfinished_splits, + let reader = self + .stream_reader_builder( + source_desc.clone(), + get_latest_split_info_fn.clone(), ) - .await?; + .into_retry_stream(Some(latest_unfinished_splits), false) + .boxed(); backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -628,14 +648,15 @@ impl SourceBackfillExecutorInner { } self.backfill_state_store - .set_states(backfill_stage.states.clone()) + .set_states(backfill_stage.read().await.states.clone()) .await?; self.backfill_state_store .state_store .commit(barrier.epoch) .await?; - if self.should_report_finished(&backfill_stage.states) { + if self.should_report_finished(&backfill_stage.read().await.states) + { // drop the backfill kafka consumers backfill_stream = select_with_strategy( input.by_ref().map(Either::Left), @@ -645,22 +666,23 @@ impl SourceBackfillExecutorInner { self.progress.finish( barrier.epoch, - backfill_stage.total_backfilled_rows(), + backfill_stage.read().await.total_backfilled_rows(), ); // yield barrier after reporting progress yield Message::Barrier(barrier); // After we reported finished, we still don't exit the loop. // Because we need to handle split migration. - if STATE_TABLE_INITIALIZED.is_completed() - && self.backfill_finished(&backfill_stage.states).await? - { + if STATE_TABLE_INITIALIZED.is_completed() && { + let state = &backfill_stage.read().await.states; + self.backfill_finished(state).await? + } { break 'backfill_loop; } } else { self.progress.update_for_source_backfill( barrier.epoch, - backfill_stage.total_backfilled_rows(), + backfill_stage.read().await.total_backfilled_rows(), ); // yield barrier after reporting progress yield Message::Barrier(barrier); @@ -674,7 +696,10 @@ impl SourceBackfillExecutorInner { for (i, (_, row)) in chunk.rows().enumerate() { let split = row.datum_at(split_idx).unwrap().into_utf8(); let offset = row.datum_at(offset_idx).unwrap().into_utf8(); - let vis = backfill_stage.handle_upstream_row(split, offset); + let vis = backfill_stage + .write() + .await + .handle_upstream_row(split, offset); new_vis.set(i, vis); } // emit chunk if vis is not empty. i.e., some splits finished backfilling. @@ -725,7 +750,10 @@ impl SourceBackfillExecutorInner { for (i, (_, row)) in chunk.rows().enumerate() { let split_id = row.datum_at(split_idx).unwrap().into_utf8(); let offset = row.datum_at(offset_idx).unwrap().into_utf8(); - let vis = backfill_stage.handle_backfill_row(split_id, offset); + let vis = backfill_stage + .write() + .await + .handle_backfill_row(split_id, offset); new_vis.set(i, vis); } @@ -742,7 +770,11 @@ impl SourceBackfillExecutorInner { } std::mem::drop(backfill_stream); - let mut states = backfill_stage.states; + let mut states = backfill_stage.read().await.states.clone(); + + // backfill_stage is done, we just need persist the states but not use the lock. + drop(backfill_stage); + // Make sure `Finished` state is persisted. self.backfill_state_store.set_states(states.clone()).await?; @@ -840,7 +872,7 @@ impl SourceBackfillExecutorInner { async fn apply_split_change( &mut self, split_assignment: &HashMap>, - stage: &mut BackfillStage, + stage: &SharedBackfillStage, should_trim_state: bool, ) -> StreamExecutorResult { self.source_split_change_count.inc(); @@ -861,7 +893,7 @@ impl SourceBackfillExecutorInner { async fn update_state_if_changed( &mut self, target_splits: Vec, - stage: &mut BackfillStage, + stage: &SharedBackfillStage, should_trim_state: bool, ) -> StreamExecutorResult { let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len()); @@ -869,7 +901,7 @@ impl SourceBackfillExecutorInner { let mut split_changed = false; // Take out old states (immutable, only used to build target_state and check for added/dropped splits). // Will be set to target_state in the end. - let old_states = std::mem::take(&mut stage.states); + let old_states = std::mem::take(&mut stage.write().await.states); // Iterate over the target (assigned) splits // - check if any new splits are added // - build target_state @@ -913,8 +945,9 @@ impl SourceBackfillExecutorInner { } } + let mut stage_write_guard = stage.write().await; if split_changed { - let dropped_splits = stage + let dropped_splits = stage_write_guard .states .extract_if(|split_id, _| !target_state.contains_key(split_id)) .map(|(split_id, _)| split_id); @@ -927,9 +960,9 @@ impl SourceBackfillExecutorInner { } else { debug_assert_eq!(old_states, target_state); } - stage.states = target_state; - stage.splits = target_splits; - stage.debug_assert_consistent(); + stage_write_guard.states = target_state; + stage_write_guard.splits = target_splits; + stage_write_guard.debug_assert_consistent(); Ok(split_changed) } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index ae4d59bb5c8cf..9a7e71650a774 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -19,8 +19,6 @@ use std::time::Duration; use anyhow::anyhow; use either::Either; -use futures::stream::BoxStream; -use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::array::ArrayRef; use risingwave_common::catalog::{ColumnId, TableId}; @@ -32,8 +30,8 @@ use risingwave_connector::parser::schema_change::SchemaChangeEnvelope; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::reader::reader::SourceReader; use risingwave_connector::source::{ - BoxSourceChunkStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, - SplitMetaData, WaitCheckpointTask, + BoxSourceChunkStream, ConnectorState, CreateSplitReaderResult, SourceContext, SourceCtrlOpts, + SplitId, SplitImpl, SplitMetaData, WaitCheckpointTask, }; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::TryWaitEpochOptions; @@ -41,7 +39,6 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot}; use tokio::time::Instant; -use tracing::Instrument; use super::executor_core::StreamSourceCore; use super::{ @@ -50,10 +47,15 @@ use super::{ }; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; -use crate::executor::source::get_infinite_backoff_strategy; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::UpdateMutation; +pub(crate) type GetLatestSplitInfoFn = Arc< + dyn Fn() -> Pin, StreamExecutorError>> + Send>> + + Send + + Sync, +>; + /// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to /// some latencies in network and cost in meta. pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5; @@ -79,6 +81,162 @@ pub struct SourceExecutor { is_shared_non_cdc: bool, } +pub(crate) struct StreamReaderBuilder { + pub source_desc: SourceDesc, + pub rate_limit: Option, + pub source_id: TableId, + pub source_name: String, + pub reader_stream: Option, + + // cdc related + pub is_auto_schema_change_enable: bool, + pub actor_ctx: ActorContextRef, + + pub get_latest_split_info_fn: GetLatestSplitInfoFn, +} + +impl StreamReaderBuilder { + fn prepare_source_stream_build(&self) -> (Vec, SourceContext) { + let column_ids = self + .source_desc + .columns + .iter() + .map(|column_desc| column_desc.column_id) + .collect_vec(); + + let (schema_change_tx, mut schema_change_rx) = + mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); + let schema_change_tx = if self.is_auto_schema_change_enable { + let meta_client = self.actor_ctx.meta_client.clone(); + // spawn a task to handle schema change event from source parser + let _join_handle = tokio::task::spawn(async move { + while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await { + let table_ids = schema_change.table_ids(); + tracing::info!( + target: "auto_schema_change", + "recv a schema change event for tables: {:?}", table_ids); + // TODO: retry on rpc error + if let Some(ref meta_client) = meta_client { + match meta_client + .auto_schema_change(schema_change.to_protobuf()) + .await + { + Ok(_) => { + tracing::info!( + target: "auto_schema_change", + "schema change success for tables: {:?}", table_ids); + finish_tx.send(()).unwrap(); + } + Err(e) => { + tracing::error!( + target: "auto_schema_change", + error = ?e.as_report(), "schema change error"); + finish_tx.send(()).unwrap(); + } + } + } + } + }); + Some(schema_change_tx) + } else { + info!("auto schema change is disabled in config"); + None + }; + + let source_ctx = SourceContext::new( + self.actor_ctx.id, + self.source_id, + self.actor_ctx.fragment_id, + self.source_name.clone(), + self.source_desc.metrics.clone(), + SourceCtrlOpts { + chunk_size: limited_chunk_size(self.rate_limit), + split_txn: self.rate_limit.is_some(), // when rate limiting, we may split txn + }, + self.source_desc.source.config.clone(), + schema_change_tx, + ); + + (column_ids, source_ctx) + } + + pub(crate) async fn fetch_latest_splits( + &mut self, + state: ConnectorState, + seek_to_latest: bool, + ) -> StreamExecutorResult { + let (column_ids, source_ctx) = self.prepare_source_stream_build(); + let source_ctx_ref = Arc::new(source_ctx); + let (stream, res) = self + .source_desc + .source + .build_stream( + state.clone(), + column_ids.clone(), + source_ctx_ref.clone(), + seek_to_latest, + ) + .await + .map_err(StreamExecutorError::connector_error)?; + self.reader_stream = Some(stream); + Ok(res) + } + + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + pub(crate) async fn into_retry_stream( + mut self, + mut state: ConnectorState, + is_initial_build: bool, + ) { + let (column_ids, source_ctx) = self.prepare_source_stream_build(); + let source_ctx_ref = Arc::new(source_ctx); + + 'build_consume_loop: loop { + tracing::debug!("build stream source reader with state: {:?}", state); + let build_stream_result = if let Some(exist_stream) = self.reader_stream.take() { + Ok((exist_stream, CreateSplitReaderResult::default())) + } else { + self.source_desc + .source + .build_stream( + state.clone(), + column_ids.clone(), + source_ctx_ref.clone(), + // just `seek_to_latest` for initial build + is_initial_build, + ) + .await + }; + if let Err(e) = build_stream_result { + if is_initial_build { + return Err(StreamExecutorError::connector_error(e)); + } else { + tracing::warn!(error = ?e.as_report(), "build stream source reader error, retry in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue 'build_consume_loop; + } + } + + let (stream, _) = build_stream_result.unwrap(); + let stream = apply_rate_limit(stream, self.rate_limit).boxed(); + #[for_await] + 'consume: for msg in stream { + match msg { + Ok(msg) => yield msg, + Err(e) => { + tracing::warn!(error = ?e.as_report(), "stream source reader error"); + break 'consume; + } + } + } + tracing::info!("stream source reader error, retry in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + + state = Some((self.get_latest_split_info_fn)().await?); + } + } +} + impl SourceExecutor { pub fn new( actor_ctx: ActorContextRef, @@ -100,6 +258,40 @@ impl SourceExecutor { } } + fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder { + let latest_split_info_weak = + Arc::downgrade(&self.stream_source_core.as_ref().unwrap().latest_split_info); + + let get_latest_split_info_fn: GetLatestSplitInfoFn = Arc::new(move || { + let weak = latest_split_info_weak.clone(); + Box::pin(async move { + let Some(latest_split_info_ref) = weak.upgrade() else { + return Err(StreamExecutorError::connector_error(anyhow!( + "latest_split_info_weak is already dropped, retry later" + ))); + }; + let read_guard = latest_split_info_ref.read().await; + Ok(read_guard.values().cloned().collect()) + }) + }); + + StreamReaderBuilder { + source_desc, + rate_limit: self.rate_limit_rps, + source_id: self.stream_source_core.as_ref().unwrap().source_id, + source_name: self + .stream_source_core + .as_ref() + .unwrap() + .source_name + .clone(), + is_auto_schema_change_enable: self.is_auto_schema_change_enable(), + actor_ctx: self.actor_ctx.clone(), + get_latest_split_info_fn, + reader_stream: None, + } + } + async fn spawn_wait_checkpoint_worker( core: &StreamSourceCore, source_reader: SourceReader, @@ -287,7 +479,7 @@ impl SourceExecutor { // Checks added splits for (split_id, split) in target_splits { - if let Some(s) = core.latest_split_info.get(&split_id) { + if let Some(s) = core.latest_split_info.read().await.get(&split_id) { // For existing splits, we should use the latest offset from the cache. // `target_splits` is from meta and contains the initial offset. target_state.insert(split_id, s.clone()); @@ -314,7 +506,7 @@ impl SourceExecutor { } // Checks dropped splits - for existing_split_id in core.latest_split_info.keys() { + for existing_split_id in core.latest_split_info.read().await.keys() { if !target_state.contains_key(existing_split_id) { tracing::info!("split dropping detected: {}", existing_split_id); split_changed = true; @@ -333,6 +525,8 @@ impl SourceExecutor { let dropped_splits = core .latest_split_info + .write() + .await .extract_if(|split_id, _| !target_state.contains_key(split_id)) .map(|(_, split)| split) .collect_vec(); @@ -342,7 +536,7 @@ impl SourceExecutor { core.split_state_store.trim_state(&dropped_splits).await?; } - core.latest_split_info = target_state; + *core.latest_split_info.write().await = target_state; } Ok(split_changed) @@ -378,7 +572,13 @@ impl SourceExecutor { stream: &mut StreamReaderWithPause, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); - let target_state: Vec = core.latest_split_info.values().cloned().collect(); + let target_state: Vec = core + .latest_split_info + .read() + .await + .values() + .cloned() + .collect(); tracing::info!( "actor {:?} apply source split change to {:?}", @@ -387,12 +587,11 @@ impl SourceExecutor { ); // Replace the source reader with a new one of the new state. - let (reader, _) = self - .build_stream_source_reader(source_desc, Some(target_state.clone()), false) - .await?; - let reader = reader.map_err(StreamExecutorError::connector_error); + let replace_stream_reader_builder = self.stream_reader_builder(source_desc.clone()); + let reader_stream = + replace_stream_reader_builder.into_retry_stream(Some(target_state.clone()), false); - stream.replace_data_stream(reader); + stream.replace_data_stream(reader_stream); Ok(()) } @@ -459,7 +658,7 @@ impl SourceExecutor { Vec::default() }; let is_pause_on_startup = first_barrier.is_pause_on_startup(); - + let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id); yield Message::Barrier(first_barrier); let mut core = self.stream_source_core.unwrap(); @@ -479,9 +678,6 @@ impl SourceExecutor { }; core.split_state_store.init_epoch(first_epoch).await?; - // initial_dispatch_num is 0 means the source executor doesn't have downstream jobs - // and is newly created - let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core .split_state_store @@ -500,7 +696,7 @@ impl SourceExecutor { } // init in-memory split states with persisted state if any - core.init_split_state(boot_state.clone()); + core.init_split_state(boot_state.clone()).await; // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); @@ -508,132 +704,22 @@ impl SourceExecutor { let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let mut received_resume_during_build = false; - let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); // Build the source stream reader. - let (source_chunk_reader, latest_splits) = if is_uninitialized { - tracing::info!("source uninitialized, build source stream reader w/o retry."); - let (source_chunk_reader, latest_splits) = self - .build_stream_source_reader( - &source_desc, - recover_state, - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - // Note that shared CDC source is special. It already starts from latest. - self.is_shared_non_cdc, - ) - .instrument_await("source_build_reader") - .await?; - ( - source_chunk_reader.map_err(StreamExecutorError::connector_error), - latest_splits, - ) - } else { - tracing::info!("source initialized, build source stream reader with retry."); - // Build the source stream reader with retry during recovery. - // We only build source stream reader with retry during recovery, - // because we can rely on the persisted source states to recover the source stream - // and can avoid the potential race with "seek to latest" - // https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002 - let mut reader_and_splits: Option<(BoxSourceChunkStream, Option>)> = - None; - let source_reader = source_desc.source.clone(); - let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc); - let source_ctx = Arc::new(source_ctx); - let mut build_source_stream_fut = Box::pin(async move { - let backoff = get_infinite_backoff_strategy(); - tokio_retry::Retry::spawn(backoff, || async { - match source_reader - .build_stream( - recover_state.clone(), - column_ids.clone(), - source_ctx.clone(), - false, // not need to seek to latest since source state is initialized - ) - .await { - Ok((stream, res)) => Ok((stream, res.latest_splits)), - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying..."); - Err(e) - } - } - }) - .instrument(tracing::info_span!("build_source_stream_with_retry")) - .await - .expect("Retry build source stream until success.") - }); - - // loop to create source stream until success - loop { - if let Some(barrier) = build_source_stream_and_poll_barrier( - &mut barrier_stream, - &mut reader_and_splits, - &mut build_source_stream_fut, - ) + let mut init_reader_builder = self.stream_reader_builder(source_desc.clone()); + let mut latest_splits = None; + if is_uninitialized { + // need to seek to latest for initial build && shared source + latest_splits = init_reader_builder + .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc) .await? - { - if let Message::Barrier(barrier) = barrier { - if let Some(mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::Throttle(actor_to_apply) => { - if let Some(new_rate_limit) = - actor_to_apply.get(&self.actor_ctx.id) - && *new_rate_limit != self.rate_limit_rps - { - tracing::info!( - "updating rate limit from {:?} to {:?}", - self.rate_limit_rps, - *new_rate_limit - ); - - // update the rate limit option, we will apply the rate limit - // when we finish building the source stream. - self.rate_limit_rps = *new_rate_limit; - } - } - Mutation::Resume => { - // We record the Resume mutation here and postpone the resume of the source stream - // after we have successfully built the source stream. - received_resume_during_build = true; - } - _ => { - // ignore other mutations and output a warn log - tracing::warn!( - "Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before - finish building source stream.", - mutation - ); - } - } - } - - // bump state store epoch - let _ = self.persist_state_and_clear_cache(barrier.epoch).await?; - yield Message::Barrier(barrier); - } else { - unreachable!( - "Only barrier message is expected when building source stream." - ); - } - } else { - assert!(reader_and_splits.is_some()); - tracing::info!("source stream created successfully"); - break; - } - } - let (source_chunk_reader, latest_splits) = - reader_and_splits.expect("source chunk reader and splits must be created"); - - ( - apply_rate_limit(source_chunk_reader, self.rate_limit_rps) - .boxed() - .map_err(StreamExecutorError::connector_error), - latest_splits, - ) - }; + .latest_splits; + } + let reader_stream = init_reader_builder.into_retry_stream( + recover_state.clone(), + is_uninitialized && self.is_shared_non_cdc, + ); if let Some(latest_splits) = latest_splits { // make sure it is written to state table later. @@ -647,11 +733,11 @@ impl SourceExecutor { // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. let mut stream = - StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); + StreamReaderWithPause::::new(barrier_stream, reader_stream); let mut command_paused = false; // - If the first barrier requires us to pause on startup, pause the stream. - if is_pause_on_startup && !received_resume_during_build { + if is_pause_on_startup { tracing::info!("source paused on startup"); stream.pause_stream(); command_paused = true; @@ -802,23 +888,28 @@ impl SourceExecutor { * WAIT_BARRIER_MULTIPLE_TIMES; } if let Some(mapping) = split_offset_mapping { + let mut latest_split_info_guard = self + .stream_source_core + .as_ref() + .unwrap() + .latest_split_info + .write() + .await; let state: HashMap<_, _> = mapping .iter() .flat_map(|(split_id, offset)| { - self.stream_source_core - .as_mut() - .unwrap() - .latest_split_info - .get_mut(split_id) - .map(|original_split_impl| { + latest_split_info_guard.get_mut(split_id).map( + |original_split_impl| { original_split_impl.update_in_place(offset.clone())?; Ok::<_, anyhow::Error>(( split_id.clone(), original_split_impl.clone(), )) - }) + }, + ) }) .try_collect()?; + drop(latest_split_info_guard); self.stream_source_core .as_mut() @@ -866,29 +957,6 @@ impl SourceExecutor { } } -async fn build_source_stream_and_poll_barrier( - barrier_stream: &mut BoxStream<'static, StreamExecutorResult>, - reader_and_splits: &mut Option<(BoxSourceChunkStream, Option>)>, - build_future: &mut Pin< - Box>)>>, - >, -) -> StreamExecutorResult> { - if reader_and_splits.is_some() { - return Ok(None); - } - - tokio::select! { - biased; - build_ret = &mut *build_future => { - *reader_and_splits = Some(build_ret); - Ok(None) - } - msg = barrier_stream.next() => { - msg.transpose() - } - } -} - impl Execute for SourceExecutor { fn execute(self: Box) -> BoxedMessageStream { if self.stream_source_core.is_some() { @@ -1039,6 +1107,7 @@ impl WaitCheckpointWorker { #[cfg(test)] mod tests { use std::collections::HashSet; + use std::sync::Arc; use maplit::{btreemap, convert_args, hashmap}; use risingwave_common::catalog::{ColumnId, Field, TableId}; @@ -1051,6 +1120,7 @@ mod tests { use risingwave_pb::plan_common::PbRowFormatType; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::unbounded_channel; + use tokio::sync::RwLock; use tracing_test::traced_test; use super::*; @@ -1092,7 +1162,7 @@ mod tests { source_id: table_id, column_ids, source_desc_builder: Some(source_desc_builder), - latest_split_info: HashMap::new(), + latest_split_info: Arc::new(RwLock::new(HashMap::new())), split_state_store, updated_splits_in_epoch: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_owned(), @@ -1182,7 +1252,7 @@ mod tests { source_id: table_id, column_ids: column_ids.clone(), source_desc_builder: Some(source_desc_builder), - latest_split_info: HashMap::new(), + latest_split_info: Arc::new(RwLock::new(HashMap::new())), split_state_store, updated_splits_in_epoch: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_owned(),