Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make rebuild reader inner of reader stream #20295

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ sr_register avro_alter_source_test-value AVRO <<< '{"type":"record","name":"Root
system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test

sleep 1s

query ?
select * from s
----
Expand Down
137 changes: 55 additions & 82 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,23 @@ use std::time::Instant;
use anyhow::anyhow;
use either::Either;
use futures::stream::{select_with_strategy, PollNext};
use itertools::Itertools;
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::metrics::{LabelGuardedIntCounter, GLOBAL_ERROR_METRICS};
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::JsonbVal;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{
BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
SplitMetaData,
};
use risingwave_connector::source::{BackfillInfo, SplitId, SplitImpl, SplitMetaData};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::store::TryWaitEpochOptions;
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

use super::executor_core::StreamSourceCore;
use super::get_split_offset_col_idx;
use super::source_backfill_state_table::BackfillStateTableHandler;
use super::{apply_rate_limit, get_split_offset_col_idx};
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::source::source_executor::{StreamReaderBuilder, WAIT_BARRIER_MULTIPLE_TIMES};
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgressReporter;

Expand Down Expand Up @@ -279,45 +274,15 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
}

async fn build_stream_source_reader(
&self,
source_desc: &SourceDesc,
splits: Vec<SplitImpl>,
) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
let column_ids = source_desc
.columns
.iter()
.map(|column_desc| column_desc.column_id)
.collect_vec();
let source_ctx = SourceContext::new(
self.actor_ctx.id,
self.stream_source_core.source_id,
self.actor_ctx.fragment_id,
self.stream_source_core.source_name.clone(),
source_desc.metrics.clone(),
SourceCtrlOpts {
chunk_size: limited_chunk_size(self.rate_limit_rps),
split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
},
source_desc.source.config.clone(),
None,
);

// We will check watermark to decide whether we need to backfill.
// e.g., when there's a Kafka topic-partition without any data,
// we don't need to backfill at all. But if we do not check here,
// the executor can only know it's finished when data coming in.
// For blocking DDL, this would be annoying.

let (stream, res) = source_desc
.source
.build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
.await
.map_err(StreamExecutorError::connector_error)?;
Ok((
apply_rate_limit(stream, self.rate_limit_rps).boxed(),
res.backfill_info,
))
fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
StreamReaderBuilder {
source_desc,
rate_limit: self.rate_limit_rps,
source_id: self.stream_source_core.source_id,
source_name: self.stream_source_core.source_name.clone(),
actor_ctx: self.actor_ctx.clone(),
is_auto_schema_change_enable: false,
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
Expand All @@ -332,6 +297,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.unwrap_or(&[])
.to_vec();
let is_pause_on_startup = barrier.is_pause_on_startup();
let is_initialize = barrier.is_newly_added(self.actor_ctx.id);
yield Message::Barrier(barrier);

let mut core = self.stream_source_core;
Expand Down Expand Up @@ -369,26 +335,32 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;
let stream_reader_builder = self.stream_reader_builder(source_desc.clone());

let (source_chunk_reader, backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?,
)
.instrument_await("source_build_reader")
.await?;
for (split_id, info) in &backfill_info {
let state = backfill_stage.states.get_mut(split_id).unwrap();
match info {
BackfillInfo::NoDataToBackfill => {
state.state = BackfillState::Finished;
}
BackfillInfo::HasDataToBackfill { latest_offset } => {
// Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
state.target_offset = Some(latest_offset.clone());
if is_initialize {
let backfill_info = stream_reader_builder
.fetch_latest_splits(Some(backfill_stage.get_latest_unfinished_splits()?), false)
.await?
.backfill_info;
for (split_id, info) in &backfill_info {
let state = backfill_stage.states.get_mut(split_id).unwrap();
match info {
BackfillInfo::NoDataToBackfill => {
state.state = BackfillState::Finished;
}
BackfillInfo::HasDataToBackfill { latest_offset } => {
// Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
state.target_offset = Some(latest_offset.clone());
}
}
}
}
let retry_reader_stream = stream_reader_builder
.into_retry_stream(
Some(backfill_stage.get_latest_unfinished_splits()?),
is_initialize,
)
.boxed();
tracing::debug!(?backfill_stage, "source backfill started");

fn select_strategy(_: &mut ()) -> PollNext {
Expand All @@ -402,7 +374,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
// - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
let mut backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
source_chunk_reader.map(Either::Right),
retry_reader_stream.map(Either::Right),
select_strategy,
);

Expand Down Expand Up @@ -487,12 +459,13 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.actor_ctx.fragment_id.to_string(),
]);

let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?,
let reader = self
.stream_reader_builder(source_desc.clone())
.into_retry_stream(
Some(backfill_stage.get_latest_unfinished_splits()?),
true,
)
.await?;
.boxed();

backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
Expand Down Expand Up @@ -582,13 +555,16 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
);
self.rate_limit_rps = *new_rate_limit;
// rebuild reader
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage
.get_latest_unfinished_splits()?,
let reader = self
.stream_reader_builder(source_desc.clone())
.into_retry_stream(
Some(
backfill_stage
.get_latest_unfinished_splits()?,
),
true,
)
.await?;
.boxed();

backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
Expand All @@ -612,13 +588,10 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
latest_unfinished_splits
);

// Replace the source reader with a new one of the new state.
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
latest_unfinished_splits,
)
.await?;
let reader = self
.stream_reader_builder(source_desc.clone())
.into_retry_stream(Some(latest_unfinished_splits), true)
.boxed();

backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
Expand Down
Loading
Loading