Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot-backfill): implement executor to consume upstream table #20167

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Jan 15, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR is for supporting the cross db streaming query described in #19631.

In this PR, we implement a ConsumeUpstreamStream to consume the upstream table epoch by epoch and yield StreamChunk. Inside ConsumeUpstreamStream, we use VnodeStream implemented in #19936 to consume each vnode, and with it we are able to get the latest progress of each vnode at anytime. After it finishes one epoch, it will call next_epoch the next epoch and create a new VnodeStream to consume the new epoch. A UpstreamTableExecutor will poll the ConsumeUpstreamStream and the barrier_rx concurrently. We use the backfill progress state introduced in #19720 to track the progress state. When receiving a new barrier, the executor will inspect the latest progress of ConsumeUpstreamStream and write progress state, and then yield the barrier, and then continue consuming the ConsumeUpstreamStream. On update vnode bitmap, we will recreate the ConsumeUpstreamStream for the new vnode bitmap. The logic of the executor is like the following:

let mut stream = ConsumeUpstreamStream::new(
    ...
);
loop {
    select! {
        chunk = stream.next() => {
            yield Message::Chunk(chunk);
        }
        barrier = receive_next_barrier(&mut self.barrier_rx) => {
            for progress in stream.progress() {
                progress_state.write(progress);
            }
            progress.flush();
            yield Message::Barrier(barrier);
            if let Some(new_vnode_bitmap) {
                // recreate stream on new vnode bitmap
                stream = ConsumeUpstreamStream::new(
                    new_vnode_bitmap
                    ...
                )
            }
        }
    }
    
}

Ideally, the ConsumeUpstreamStream should be implemented with the try_stream macro, so that the state machine of async execution can be generated automatically by the rust compiler. However, we need to be able to access the progress of the stream at the time we receive barrier, but the stream generated by try_stream macro will take the ownership of the internal VnodeStream, and then we won't be able to get the latest progress of the ongoing VnodeStream. Therefore, in this PR, we will implement the state machine by ourselves. The state machine is like the following:

enum ConsumeUpstreamStreamState<'a, T> {
    CreatingSnapshotStream {
        future: UpstreamTableSnapshotStreamFuture<'a, T>,
    },
    ConsumingSnapshotStream {
        stream: UpstreamTableSnapshotStream<T>,
    },
    CreatingChangeLogStream {
        future: UpstreamTableChangeLogStreamFuture<'a, T>,
    },
    ConsumingChangeLogStream {
        stream: UpstreamTableChangeLogStream<T>,
    },
    ResolvingNextEpoch {
        future: NextEpochFuture<'a>,
    },
}

The states represent the await point in the following code

let snapshot_stream = create_snapshot_stream(snapshot_epoch).await; // CreatingSnapshotStream
while let Some(chunk) = snapshot_stream.next().await { // ConsumingSnapshotStream
    yield Message::Chunk(chunk);
}
let mut epoch = snapshot_epoch;
loop {
    epoch = next_epoch(epoch).await;  // ResolvingNextEpoch
    let change_log_stream = create_change_log_stream(epoch).await; // CreatingChangeLogStream
    while let Some(chunk) = change_log_stream.next().await { // ConsumingChangeLogStream
        yield Message::Chunk(chunk);
    }
}

Note that in each state, we need to store the progress of all vnodes owned by the executor even if the progress is made in the previous epoch, so that the previous progress won't be lost.

To support better future extension, the input of ConsumeUpstreamStream can be any type that implements the following UpstreamTable trait.

pub trait UpstreamTable: Send + Sync + 'static {
    type SnapshotStream: Stream<Item = StreamExecutorResult<OwnedRow>> + Send + 'static;
    type ChangeLogStream: Stream<Item = StreamExecutorResult<ChangeLogRow>> + Send + 'static;

    fn next_epoch(&self, epoch: u64)
        -> impl Future<Output = StreamExecutorResult<u64>> + Send + '_;
    fn snapshot_stream(
        &self,
        vnode: VirtualNode,
        epoch: u64,
        start_pk: Option<OwnedRow>,
    ) -> impl Future<Output = StreamExecutorResult<Self::SnapshotStream>> + Send + '_;
    fn change_log_stream(
        &self,
        vnode: VirtualNode,
        epoch: u64,
        start_pk: Option<OwnedRow>,
    ) -> impl Future<Output = StreamExecutorResult<Self::ChangeLogStream>> + Send + '_;

    ...
}

In this way, we can easily reuse the logic for different ways to consume upstream table, such as consuming the subscription of other RisingWave cluster, as long as we implement this trait for it.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

@wenym1 wenym1 force-pushed the yiming/consume-upstream-executor branch from cb6d3ef to 5685057 Compare January 15, 2025 09:09
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 5555 files.

Valid Invalid Ignored Fixed
2347 1 3207 0
Click to see the invalid file list
  • src/stream/src/executor/backfill/snapshot_backfill/consume_upstream/upstream_table_trait.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

@wenym1 wenym1 force-pushed the yiming/consume-upstream-executor branch from 5685057 to 6b2ce75 Compare January 18, 2025 14:39
@wenym1 wenym1 marked this pull request as ready for review January 20, 2025 03:50
Copy link
Contributor Author

wenym1 commented Jan 20, 2025

@wenym1 wenym1 force-pushed the yiming/consume-upstream-executor branch from 6b2ce75 to 8a97831 Compare January 20, 2025 06:49
Copy link

gru-agent bot commented Jan 20, 2025

This pull request has been modified. If you want me to regenerate unit test for any of the files related, please find the file in "Files Changed" tab and add a comment @gru-agent. (The github "Comment on this file" feature is in the upper right corner of each file in "Files Changed" tab.)

@kwannoel
Copy link
Contributor

Any description?

@wenym1
Copy link
Contributor Author

wenym1 commented Jan 21, 2025

Any description?

Added the PR description. @kwannoel @hzxa21 @yezizp2012 @st1page PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants