Skip to content

Commit

Permalink
test(sink): add recovery test for sink (risingwavelabs#12701)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 27, 2023
1 parent bc392aa commit 92470cb
Show file tree
Hide file tree
Showing 7 changed files with 628 additions and 320 deletions.
1 change: 1 addition & 0 deletions ci/scripts/deterministic-it-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mv target/ci-sim target/sim
echo "--- Run integration tests in deterministic simulation mode"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \
cargo nextest run \
--no-capture \
--no-fail-fast \
--cargo-metadata target/nextest/cargo-metadata.json \
--binaries-metadata target/nextest/binaries-metadata.json \
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
log_writer.update_vnode_bitmap(vnode_bitmap).await?;
}
yield Message::Barrier(barrier);
}
Expand Down Expand Up @@ -276,7 +276,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
.await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id)
{
log_writer.update_vnode_bitmap(vnode_bitmap);
log_writer.update_vnode_bitmap(vnode_bitmap).await?;
}
yield Message::Barrier(barrier);
}
Expand Down
Loading

0 comments on commit 92470cb

Please sign in to comment.