From 92470cbe7417b6ef8acfa16a1b7bb89bbea05ff9 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Fri, 27 Oct 2023 19:55:16 +0800 Subject: [PATCH] test(sink): add recovery test for sink (#12701) --- ci/scripts/deterministic-it-test.sh | 1 + src/stream/src/executor/sink.rs | 4 +- .../tests/integration_tests/sink/basic.rs | 355 ++---------------- .../tests/integration_tests/sink/mod.rs | 30 ++ .../tests/integration_tests/sink/recovery.rs | 101 +++++ .../tests/integration_tests/sink/scale.rs | 121 ++++++ .../tests/integration_tests/sink/utils.rs | 336 +++++++++++++++++ 7 files changed, 628 insertions(+), 320 deletions(-) create mode 100644 src/tests/simulation/tests/integration_tests/sink/recovery.rs create mode 100644 src/tests/simulation/tests/integration_tests/sink/scale.rs create mode 100644 src/tests/simulation/tests/integration_tests/sink/utils.rs diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index f281eaa467bfd..5d76a6677d580 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -16,6 +16,7 @@ mv target/ci-sim target/sim echo "--- Run integration tests in deterministic simulation mode" seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ cargo nextest run \ + --no-capture \ --no-fail-fast \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 70e63b4b33cd0..fdc4e95ef5799 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -241,7 +241,7 @@ impl SinkExecutor { .await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { - log_writer.update_vnode_bitmap(vnode_bitmap); + log_writer.update_vnode_bitmap(vnode_bitmap).await?; } yield Message::Barrier(barrier); } @@ -276,7 +276,7 @@ impl SinkExecutor { .await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { - log_writer.update_vnode_bitmap(vnode_bitmap); + log_writer.update_vnode_bitmap(vnode_bitmap).await?; } yield Message::Barrier(barrier); } diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index bceb45a8a2389..24f709139f0fc 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -12,360 +12,79 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::Write; -use std::iter::once; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::time::Duration; use anyhow::Result; -use async_trait::async_trait; -use futures::stream::select_all; -use futures::StreamExt; -use itertools::Itertools; -use rand::prelude::SliceRandom; -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::buffer::Bitmap; -use risingwave_common::types::{DataType, ScalarImpl}; -use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_connector::sink::boxed::{BoxCoordinator, BoxWriter}; -use risingwave_connector::sink::test_sink::registry_build_sink; -use risingwave_connector::sink::writer::SinkWriter; -use risingwave_connector::sink::{Sink, SinkWriterParam}; -use risingwave_connector::source::test_source::{registry_test_source, BoxSource, TestSourceSplit}; -use risingwave_connector::source::StreamChunkWithState; -use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::time::sleep; -use tokio_stream::wrappers::UnboundedReceiverStream; -struct TestWriter { - row_counter: Arc, - parallelism_counter: Arc, -} - -#[async_trait] -impl SinkWriter for TestWriter { - async fn begin_epoch(&mut self, _epoch: u64) -> risingwave_connector::sink::Result<()> { - Ok(()) - } - - async fn write_batch(&mut self, chunk: StreamChunk) -> risingwave_connector::sink::Result<()> { - let mut count = 0; - for _ in chunk.rows() { - count += 1; - } - self.row_counter.fetch_add(count, Relaxed); - Ok(()) - } - - async fn barrier( - &mut self, - _is_checkpoint: bool, - ) -> risingwave_connector::sink::Result { - sleep(Duration::from_millis(100)).await; - Ok(()) - } -} - -impl Drop for TestWriter { - fn drop(&mut self) { - self.parallelism_counter.fetch_sub(1, Relaxed); - } -} - -fn build_stream_chunk(row_iter: impl Iterator) -> StreamChunk { - let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Varchar], 100000); - for (id, name) in row_iter { - assert!(builder - .append_one_row([ - Some(ScalarImpl::Int32(id)), - Some(ScalarImpl::Utf8(name.into())), - ]) - .is_none()); - } - let chunk = builder.consume_all().unwrap(); - let ops = (0..chunk.cardinality()).map(|_| Op::Insert).collect_vec(); - StreamChunk::from_parts(ops, chunk) -} - -#[tokio::test] -async fn test_sink_basic() -> Result<()> { - let config_path = { - let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); - file.write_all(include_bytes!("../../../../../config/ci-sim.toml")) - .expect("failed to write config file"); - file.into_temp_path() - }; - - let mut cluster = Cluster::start(Configuration { - config_path: ConfigPath::Temp(config_path.into()), - frontend_nodes: 1, - compute_nodes: 3, - meta_nodes: 1, - compactor_nodes: 1, - compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, - }) - .await?; - - let row_counter = Arc::new(AtomicUsize::new(0)); - let parallelism_counter = Arc::new(AtomicUsize::new(0)); +use crate::sink::utils::{ + start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE, + DROP_SINK, DROP_SOURCE, +}; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; - let _sink_guard = registry_build_sink({ - let row_counter = row_counter.clone(); - let parallelism_counter = parallelism_counter.clone(); - move |_, _| { - parallelism_counter.fetch_add(1, Relaxed); - Box::new(TestWriter { - row_counter: row_counter.clone(), - parallelism_counter: parallelism_counter.clone(), - }) - } - }); +async fn basic_test_inner(is_decouple: bool) -> Result<()> { + let mut cluster = start_sink_test_cluster().await?; - let source_parallelism = 12; - let mut txs = Vec::new(); - let mut rxs = Vec::new(); - for _ in 0..source_parallelism { - let (tx, rx): (_, UnboundedReceiver) = unbounded_channel(); - txs.push(tx); - rxs.push(Some(rx)); - } - - let _source_guard = registry_test_source(BoxSource::new( - move |_, _| { - Ok((0..source_parallelism) - .map(|i: usize| TestSourceSplit { - id: format!("{}", i).as_str().into(), - properties: Default::default(), - offset: "".to_string(), - }) - .collect_vec()) - }, - move |_, splits, _, _, _| { - select_all(splits.into_iter().map(|split| { - let id: usize = split.id.parse().unwrap(); - let rx = rxs[id].take().unwrap(); - UnboundedReceiverStream::new(rx).map(|chunk| Ok(StreamChunkWithState::from(chunk))) - })) - .boxed() - }, - )); + let test_sink = SimulationTestSink::register_new(); + let test_source = SimulationTestSource::register_new(12, 0..500000, 0.2, 50); let mut session = cluster.start_session(); session.run("set streaming_parallelism = 6").await?; - session.run("set sink_decouple = false").await?; - session - .run("create table test_table (id int primary key, name varchar) with (connector = 'test') FORMAT PLAIN ENCODE JSON") - .await?; - session - .run("create sink test_sink from test_table with (connector = 'test')") - .await?; - let mut count = 0; - let mut id_list: Vec = (0..100000).collect_vec(); - id_list.shuffle(&mut rand::thread_rng()); - let flush_freq = 50; - for id in &id_list[0..10000] { - let chunk = build_stream_chunk(once((*id as i32, format!("name-{}", id)))); - txs[id % source_parallelism].send(chunk).unwrap(); - count += 1; - if count % flush_freq == 0 { - sleep(Duration::from_millis(10)).await; - } + if is_decouple { + session.run("set sink_decouple = true").await?; + } else { + session.run("set sink_decouple = false").await?; } - sleep(Duration::from_millis(10000)).await; + session.run(CREATE_SOURCE).await?; + session.run(CREATE_SINK).await?; + assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); - assert_eq!(6, parallelism_counter.load(Relaxed)); - assert_eq!(count, row_counter.load(Relaxed)); + test_sink + .store + .wait_for_count(test_source.id_list.len()) + .await?; - session.run("drop sink test_sink").await?; + session.run(DROP_SINK).await?; + session.run(DROP_SOURCE).await?; - assert_eq!(0, parallelism_counter.load(Relaxed)); + assert_eq!(0, test_sink.parallelism_counter.load(Relaxed)); + test_sink.store.check_simple_result(&test_source.id_list)?; + assert!(test_sink.store.inner().checkpoint_count > 0); Ok(()) } #[tokio::test] -async fn test_sink_decouple_basic() -> Result<()> { - let config_path = { - let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); - file.write_all(include_bytes!("../../../../../config/ci-sim.toml")) - .expect("failed to write config file"); - file.into_temp_path() - }; - - let mut cluster = Cluster::start(Configuration { - config_path: ConfigPath::Temp(config_path.into()), - frontend_nodes: 1, - compute_nodes: 3, - meta_nodes: 1, - compactor_nodes: 1, - compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, - }) - .await?; - - let row_counter = Arc::new(AtomicUsize::new(0)); - let parallelism_counter = Arc::new(AtomicUsize::new(0)); - - let _sink_guard = registry_build_sink({ - let row_counter = row_counter.clone(); - let parallelism_counter = parallelism_counter.clone(); - move |_, _| { - parallelism_counter.fetch_add(1, Relaxed); - Box::new(TestWriter { - row_counter: row_counter.clone(), - parallelism_counter: parallelism_counter.clone(), - }) - } - }); - - let source_parallelism = 12; - let mut txs = Vec::new(); - let mut rxs = Vec::new(); - for _ in 0..source_parallelism { - let (tx, rx): (_, UnboundedReceiver) = unbounded_channel(); - txs.push(tx); - rxs.push(Some(rx)); - } - - let _source_guard = registry_test_source(BoxSource::new( - move |_, _| { - Ok((0..source_parallelism) - .map(|i: usize| TestSourceSplit { - id: format!("{}", i).as_str().into(), - properties: Default::default(), - offset: "".to_string(), - }) - .collect_vec()) - }, - move |_, splits, _, _, _| { - select_all(splits.into_iter().map(|split| { - let id: usize = split.id.parse().unwrap(); - let rx = rxs[id].take().unwrap(); - UnboundedReceiverStream::new(rx).map(|chunk| Ok(StreamChunkWithState::from(chunk))) - })) - .boxed() - }, - )); - - let mut session = cluster.start_session(); - - session.run("set streaming_parallelism = 6").await?; - session.run("set sink_decouple = true").await?; - session - .run("create table test_table (id int primary key, name varchar) with (connector = 'test') FORMAT PLAIN ENCODE JSON") - .await?; - session - .run("create sink test_sink from test_table with (connector = 'test')") - .await?; - assert_eq!(6, parallelism_counter.load(Relaxed)); - - let mut count = 0; - let mut id_list = (0..100000).collect_vec(); - id_list.shuffle(&mut rand::thread_rng()); - let flush_freq = 50; - for id in &id_list[0..10000] { - let chunk = build_stream_chunk(once((*id as i32, format!("name-{}", id)))); - txs[id % source_parallelism].send(chunk).unwrap(); - count += 1; - if count % flush_freq == 0 { - sleep(Duration::from_millis(10)).await; - } - } - - while row_counter.load(Relaxed) < count { - sleep(Duration::from_millis(1000)).await - } - - assert_eq!(count, row_counter.load(Relaxed)); - - session.run("drop sink test_sink").await?; - - assert_eq!(0, parallelism_counter.load(Relaxed)); +async fn test_sink_basic() -> Result<()> { + basic_test_inner(false).await +} - Ok(()) +#[tokio::test] +async fn test_sink_decouple_basic() -> Result<()> { + basic_test_inner(true).await } #[tokio::test] async fn test_sink_decouple_blackhole() -> Result<()> { - let config_path = { - let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); - file.write_all(include_bytes!("../../../../../config/ci-sim.toml")) - .expect("failed to write config file"); - file.into_temp_path() - }; - - let mut cluster = Cluster::start(Configuration { - config_path: ConfigPath::Temp(config_path.into()), - frontend_nodes: 1, - compute_nodes: 3, - meta_nodes: 1, - compactor_nodes: 1, - compute_node_cores: 2, - etcd_timeout_rate: 0.0, - etcd_data_path: None, - }) - .await?; - - let source_parallelism = 12; - let mut txs = Vec::new(); - let mut rxs = Vec::new(); - for _ in 0..source_parallelism { - let (tx, rx): (_, UnboundedReceiver) = unbounded_channel(); - txs.push(tx); - rxs.push(Some(rx)); - } + let mut cluster = start_sink_test_cluster().await?; - let _source_guard = registry_test_source(BoxSource::new( - move |_, _| { - Ok((0..source_parallelism) - .map(|i: usize| TestSourceSplit { - id: format!("{}", i).as_str().into(), - properties: Default::default(), - offset: "".to_string(), - }) - .collect_vec()) - }, - move |_, splits, _, _, _| { - select_all(splits.into_iter().map(|split| { - let id: usize = split.id.parse().unwrap(); - let rx = rxs[id].take().unwrap(); - UnboundedReceiverStream::new(rx).map(|chunk| Ok(StreamChunkWithState::from(chunk))) - })) - .boxed() - }, - )); + let test_source = SimulationTestSource::register_new(12, 0..500000, 0.2, 50); let mut session = cluster.start_session(); session.run("set streaming_parallelism = 6").await?; session.run("set sink_decouple = true").await?; + session.run(CREATE_SOURCE).await?; session - .run("create table test_table (id int primary key, name varchar) with (connector = 'test') FORMAT PLAIN ENCODE JSON") - .await?; - session - .run("create sink test_sink from test_table with (connector = 'blackhole')") + .run("create sink test_sink from test_source with (connector = 'blackhole')") .await?; - let mut count = 0; - let mut id_list = (0..100000).collect_vec(); - id_list.shuffle(&mut rand::thread_rng()); - let flush_freq = 50; - for id in &id_list[0..10000] { - let chunk = build_stream_chunk(once((*id as i32, format!("name-{}", id)))); - txs[id % source_parallelism].send(chunk).unwrap(); - count += 1; - if count % flush_freq == 0 { - sleep(Duration::from_millis(10)).await; - } - } + session.run(DROP_SINK).await?; + session.run(DROP_SOURCE).await?; - session.run("drop sink test_sink").await?; Ok(()) } diff --git a/src/tests/simulation/tests/integration_tests/sink/mod.rs b/src/tests/simulation/tests/integration_tests/sink/mod.rs index 71a65bf062d9c..0d0de28b84dc8 100644 --- a/src/tests/simulation/tests/integration_tests/sink/mod.rs +++ b/src/tests/simulation/tests/integration_tests/sink/mod.rs @@ -14,3 +14,33 @@ #[cfg(madsim)] mod basic; +#[cfg(madsim)] +mod recovery; +#[cfg(madsim)] +mod scale; +#[cfg(madsim)] +mod utils; + +#[macro_export] +macro_rules! assert_with_err_returned { + ($condition:expr, $($rest:tt)*) => {{ + if !$condition { + return Err(anyhow::anyhow!($($rest)*).into()); + } + }}; + ($condition:expr) => {{ + if !$condition { + return Err(anyhow::anyhow!("fail assertion {}", stringify! {$condition}).into()); + } + }}; +} + +#[macro_export] +macro_rules! assert_eq_with_err_returned { + ($first:expr, $second:expr $(,$($rest:tt)*)?) => {{ + $crate::assert_with_err_returned ! { + {$first == $second} + $(, $($rest:tt)*)? + } + }}; +} diff --git a/src/tests/simulation/tests/integration_tests/sink/recovery.rs b/src/tests/simulation/tests/integration_tests/sink/recovery.rs new file mode 100644 index 0000000000000..c23ea7fe5fa78 --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/sink/recovery.rs @@ -0,0 +1,101 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use risingwave_simulation::cluster::{Cluster, KillOpts}; +use tokio::time::sleep; + +use crate::sink::utils::{ + start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE, + DROP_SINK, DROP_SOURCE, +}; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; + +async fn kill_and_check( + cluster: &mut Cluster, + test_sink: &SimulationTestSink, + target: usize, +) -> anyhow::Result<()> { + let mut prev_count = 0; + sleep(Duration::from_secs(2)).await; + for i in 0..5 { + let curr_count = test_sink.store.id_count(); + if curr_count == target { + assert!(i > 0, "test finish without kill"); + break; + } + assert!( + curr_count > prev_count, + "not make progress between kill. Prev count {}, curr count {}, i {}", + prev_count, + curr_count, + i + ); + prev_count = curr_count; + cluster.kill_node(&KillOpts::ALL).await; + sleep(Duration::from_secs(10)).await; + } + Ok(()) +} + +async fn recovery_test_inner(is_decouple: bool) -> Result<()> { + let mut cluster = start_sink_test_cluster().await?; + + let test_sink = SimulationTestSink::register_new(); + let test_source = SimulationTestSource::register_new(12, 0..500000, 0.2, 50); + + let mut session = cluster.start_session(); + + session.run("set streaming_parallelism = 6").await?; + if is_decouple { + session.run("set sink_decouple = true").await?; + } else { + session.run("set sink_decouple = false").await?; + } + session.run(CREATE_SOURCE).await?; + session.run(CREATE_SINK).await?; + assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + + let count = test_source.id_list.len(); + + kill_and_check(&mut cluster, &test_sink, count).await?; + + test_sink.store.wait_for_count(count).await?; + + let mut session = cluster.start_session(); + session.run(DROP_SINK).await?; + session.run(DROP_SOURCE).await?; + + assert_eq!(0, test_sink.parallelism_counter.load(Relaxed)); + assert!(test_sink.store.inner().checkpoint_count > 0); + + test_sink.store.check_simple_result(&test_source.id_list)?; + assert!(test_sink.store.inner().checkpoint_count > 0); + + Ok(()) +} + +#[tokio::test] +async fn test_sink_recovery() -> Result<()> { + recovery_test_inner(false).await +} + +#[tokio::test] +async fn test_sink_decouple_recovery() -> Result<()> { + recovery_test_inner(true).await +} diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs new file mode 100644 index 0000000000000..259678636d535 --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -0,0 +1,121 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use itertools::Itertools; +use rand::{thread_rng, Rng}; +use risingwave_simulation::cluster::{Cluster, KillOpts}; +use risingwave_simulation::ctl_ext::predicate::identity_contains; +use tokio::time::sleep; + +use crate::sink::utils::{ + start_sink_test_cluster, SimulationTestSink, SimulationTestSource, CREATE_SINK, CREATE_SOURCE, + DROP_SINK, DROP_SOURCE, +}; +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; + +async fn scale_and_check( + cluster: &mut Cluster, + test_sink: &SimulationTestSink, + target_count: usize, + schedule_plan: impl Iterator, +) -> Result<()> { + for (plan, expected_parallelism) in schedule_plan { + let prev_count = test_sink.store.id_count(); + assert!(prev_count <= target_count); + if prev_count == target_count { + return Ok(()); + } + cluster.reschedule(plan).await?; + let after_count = test_sink.store.id_count(); + sleep(Duration::from_secs(10)).await; + if thread_rng().gen_bool(0.5) { + sleep(Duration::from_secs(10)).await; + let before_kill_count = test_sink.store.id_count(); + cluster.kill_node(&KillOpts::ALL).await; + sleep(Duration::from_secs(10)).await; + } + } + Ok(()) +} + +async fn scale_test_inner(is_decouple: bool) -> Result<()> { + let mut cluster = start_sink_test_cluster().await?; + + let test_sink = SimulationTestSink::register_new(); + let test_source = SimulationTestSource::register_new(12, 0..500000, 0.2, 20); + + let mut session = cluster.start_session(); + + session.run("set streaming_parallelism = 6").await?; + if is_decouple { + session.run("set sink_decouple = true").await?; + } else { + session.run("set sink_decouple = false").await?; + } + session.run(CREATE_SOURCE).await?; + session.run(CREATE_SINK).await?; + assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + + let mut sink_fragments = cluster + .locate_fragments([identity_contains("Sink")]) + .await?; + + assert_eq!(sink_fragments.len(), 1); + let framgment = sink_fragments.pop().unwrap(); + let id = framgment.id(); + + let count = test_source.id_list.len(); + + scale_and_check( + &mut cluster, + &test_sink, + count, + vec![ + (format!("{id}-[1,2,3]"), 3), + (format!("{id}-[4,5]+[1,2]"), 3), + (format!("{id}+[3,4,5]"), 6), + ] + .into_iter(), + ) + .await?; + + test_sink.store.wait_for_count(count).await?; + + let mut session = cluster.start_session(); + session.run(DROP_SINK).await?; + session.run(DROP_SOURCE).await?; + + assert_eq!(0, test_sink.parallelism_counter.load(Relaxed)); + assert!(test_sink.store.inner().checkpoint_count > 0); + + test_sink.store.check_simple_result(&test_source.id_list)?; + assert!(test_sink.store.inner().checkpoint_count > 0); + + Ok(()) +} + +#[tokio::test] +async fn test_sink_scale() -> Result<()> { + scale_test_inner(false).await +} + +#[tokio::test] +async fn test_sink_decouple_scale() -> Result<()> { + scale_test_inner(true).await +} diff --git a/src/tests/simulation/tests/integration_tests/sink/utils.rs b/src/tests/simulation/tests/integration_tests/sink/utils.rs new file mode 100644 index 0000000000000..965c74b1e972a --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/sink/utils.rs @@ -0,0 +1,336 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::io::Write; +use std::iter::once; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicI64, AtomicUsize}; +use std::sync::{Arc, LazyLock, Mutex, MutexGuard}; +use std::time::{Duration, Instant}; + +use anyhow::Result; +use async_trait::async_trait; +use futures::future::pending; +use futures::stream::{empty, select_all, BoxStream}; +use futures::{stream, FutureExt, StreamExt}; +use itertools::Itertools; +use rand::prelude::SliceRandom; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::row::Row; +use risingwave_common::types::{DataType, ScalarImpl, Serial}; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_connector::sink::test_sink::{registry_build_sink, TestSinkRegistryGuard}; +use risingwave_connector::sink::writer::SinkWriter; +use risingwave_connector::source::test_source::{ + registry_test_source, BoxSource, TestSourceRegistryGuard, TestSourceSplit, +}; +use risingwave_connector::source::StreamChunkWithState; +use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration}; +use tokio::time::sleep; + +use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; + +pub const CREATE_SOURCE: &str = "create source test_source (id int, name varchar) with (connector = 'test') FORMAT PLAIN ENCODE JSON"; +pub const CREATE_SINK: &str = "create sink test_sink from test_source with (connector = 'test')"; +pub const DROP_SINK: &str = "drop sink test_sink"; +pub const DROP_SOURCE: &str = "drop source test_source"; + +pub struct TestSinkStoreInner { + pub id_name: HashMap>, + pub epochs: Vec, + pub checkpoint_count: usize, +} + +#[derive(Clone)] +pub struct TestSinkStore { + inner: Arc>, +} + +impl TestSinkStore { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(TestSinkStoreInner { + id_name: HashMap::new(), + epochs: Vec::new(), + checkpoint_count: 0, + })), + } + } + + pub fn insert(&self, id: i32, name: String) { + self.inner().id_name.entry(id).or_default().push(name); + } + + pub fn begin_epoch(&self, epoch: u64) { + self.inner().epochs.push(epoch) + } + + pub fn inner(&self) -> MutexGuard<'_, TestSinkStoreInner> { + self.inner.lock().unwrap() + } + + pub fn check_simple_result(&self, id_list: &[i32]) -> anyhow::Result<()> { + let inner = self.inner(); + assert_eq!(inner.id_name.len(), id_list.len()); + for id in id_list { + let names = inner.id_name.get(id).unwrap(); + assert!(!names.is_empty()); + for name in names { + assert_eq!(name, &simple_name_of_id(*id)); + } + } + Ok(()) + } + + pub fn id_count(&self) -> usize { + self.inner().id_name.len() + } + + pub async fn wait_for_count(&self, count: usize) -> anyhow::Result<()> { + let mut prev_count = 0; + loop { + sleep(Duration::from_secs(1)).await; + let curr_count = self.id_count(); + if curr_count >= count { + assert_eq!(count, curr_count); + break; + } + assert!( + curr_count > prev_count, + "not making progress: curr {}, prev {}", + curr_count, + prev_count + ); + prev_count = curr_count; + } + Ok(()) + } +} + +pub struct TestWriter { + store: TestSinkStore, + parallelism_counter: Arc, +} + +impl TestWriter { + pub fn new(store: TestSinkStore, parallelism_counter: Arc) -> Self { + Self { + store, + parallelism_counter, + } + } +} + +#[async_trait] +impl SinkWriter for TestWriter { + async fn begin_epoch(&mut self, epoch: u64) -> risingwave_connector::sink::Result<()> { + self.store.begin_epoch(epoch); + Ok(()) + } + + async fn write_batch(&mut self, chunk: StreamChunk) -> risingwave_connector::sink::Result<()> { + for (op, row) in chunk.rows() { + assert_eq!(op, Op::Insert); + assert_eq!(row.len(), 2); + let id = row.datum_at(0).unwrap().into_int32(); + let name = row.datum_at(1).unwrap().into_utf8().to_string(); + self.store.insert(id, name); + } + Ok(()) + } + + async fn barrier( + &mut self, + is_checkpoint: bool, + ) -> risingwave_connector::sink::Result { + if is_checkpoint { + self.store.inner().checkpoint_count += 1; + sleep(Duration::from_millis(100)).await; + } + Ok(()) + } +} + +impl Drop for TestWriter { + fn drop(&mut self) { + self.parallelism_counter.fetch_sub(1, Relaxed); + } +} + +pub fn simple_name_of_id(id: i32) -> String { + format!("name-{}", id) +} + +pub struct SimulationTestSink { + _sink_guard: TestSinkRegistryGuard, + pub store: TestSinkStore, + pub parallelism_counter: Arc, +} + +impl SimulationTestSink { + pub fn register_new() -> Self { + let parallelism_counter = Arc::new(AtomicUsize::new(0)); + let store = TestSinkStore::new(); + + let _sink_guard = registry_build_sink({ + let parallelism_counter = parallelism_counter.clone(); + let store = store.clone(); + move |_, _| { + parallelism_counter.fetch_add(1, Relaxed); + Box::new(TestWriter::new(store.clone(), parallelism_counter.clone())) + } + }); + + Self { + _sink_guard, + parallelism_counter, + store, + } + } +} + +pub fn build_stream_chunk(row_iter: impl Iterator) -> StreamChunk { + static ROW_ID_GEN: LazyLock> = LazyLock::new(|| Arc::new(AtomicI64::new(0))); + + let mut builder = DataChunkBuilder::new( + vec![DataType::Int32, DataType::Varchar, DataType::Serial], + 100000, + ); + for (id, name) in row_iter { + let row_id = ROW_ID_GEN.fetch_add(1, Relaxed); + std::assert!(builder + .append_one_row([ + Some(ScalarImpl::Int32(id)), + Some(ScalarImpl::Utf8(name.into())), + Some(ScalarImpl::Serial(Serial::from(row_id))), + ]) + .is_none()); + } + let chunk = builder.consume_all().unwrap(); + let ops = (0..chunk.cardinality()).map(|_| Op::Insert).collect_vec(); + StreamChunk::from_parts(ops, chunk) +} + +pub struct SimulationTestSource { + _source_guard: TestSourceRegistryGuard, + pub id_list: Vec, +} + +impl SimulationTestSource { + pub fn register_new( + source_parallelism: usize, + id_list: impl Iterator, + sample_rate: f32, + pause_interval: usize, + ) -> Self { + let mut id_list: Vec = id_list.collect_vec(); + let count = (id_list.len() as f32 * sample_rate) as usize; + id_list.shuffle(&mut rand::thread_rng()); + let id_list = id_list[0..count].iter().cloned().collect_vec(); + let mut id_lists = vec![vec![]; source_parallelism]; + for id in &id_list { + id_lists[*id as usize % source_parallelism].push(*id); + } + let id_lists_clone = id_lists.iter().map(|l| Arc::new(l.clone())).collect_vec(); + let _source_guard = registry_test_source(BoxSource::new( + move |_, _| { + Ok((0..source_parallelism) + .map(|i: usize| TestSourceSplit { + id: format!("{}", i).as_str().into(), + properties: Default::default(), + offset: "".to_string(), + }) + .collect_vec()) + }, + move |_, splits, _, _, _| { + select_all(splits.into_iter().map(|split| { + let split_id: usize = split.id.parse().unwrap(); + let id_list = id_lists_clone[split_id].clone(); + let mut offset = if split.offset == "" { + 0 + } else { + split.offset.parse::().unwrap() + 1 + }; + + let mut stream: BoxStream<'static, StreamChunkWithState> = empty().boxed(); + + while offset < id_list.len() { + let mut chunks = Vec::new(); + while offset < id_list.len() && chunks.len() < pause_interval { + let id = id_list[offset]; + let chunk = build_stream_chunk(once((id, simple_name_of_id(id)))); + let mut split_offset = HashMap::new(); + split_offset.insert(split.id.clone(), offset.to_string()); + let chunk_with_state = StreamChunkWithState { + chunk, + split_offset_mapping: Some(split_offset), + }; + chunks.push(chunk_with_state); + + offset += 1; + } + + stream = stream + .chain( + async move { stream::iter(chunks) } + .into_stream() + .chain( + async move { + sleep(Duration::from_millis(100)).await; + stream::iter(Vec::new()) + } + .into_stream(), + ) + .flatten(), + ) + .boxed(); + } + + stream + .chain(async { pending::().await }.into_stream()) + .map(|chunk| Ok(chunk)) + .boxed() + })) + .boxed() + }, + )); + + Self { + _source_guard, + id_list, + } + } +} + +pub async fn start_sink_test_cluster() -> Result { + let config_path = { + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("../../../../../config/ci-sim.toml")) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Cluster::start(Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 3, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: 2, + etcd_timeout_rate: 0.0, + etcd_data_path: None, + }) + .await +}