diff --git a/scylla-cdc-printer/src/main.rs b/scylla-cdc-printer/src/main.rs index f4e704e5..f62b70ee 100644 --- a/scylla-cdc-printer/src/main.rs +++ b/scylla-cdc-printer/src/main.rs @@ -20,7 +20,7 @@ struct Args { table: String, /// Address of a node in source cluster - #[clap(short, long, action = clap::ArgAction::Set)] + #[clap(long, action = clap::ArgAction::Set)] hostname: String, /// Window size in seconds diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index 1dd651ee..a78db359 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -129,7 +129,7 @@ mod tests { let start = now(); let end = start + chrono::Duration::seconds(2); - let (shared_session, ks) = prepare_simple_db().await.unwrap(); + let (shared_session, ks) = prepare_simple_db(false).await.unwrap(); let partition_key_1 = 0; let partition_key_2 = 1; diff --git a/scylla-cdc-replicator/Cargo.toml b/scylla-cdc-replicator/Cargo.toml index b8df98ac..2a9fae46 100644 --- a/scylla-cdc-replicator/Cargo.toml +++ b/scylla-cdc-replicator/Cargo.toml @@ -21,3 +21,4 @@ thiserror = "2.0" [dev-dependencies] scylla-cdc-test-utils = { path = "../scylla-cdc-test-utils" } +rstest = "0.26.1" diff --git a/scylla-cdc-replicator/src/replication_tests.rs b/scylla-cdc-replicator/src/replication_tests.rs index fb4aa6c0..51014ad0 100644 --- a/scylla-cdc-replicator/src/replication_tests.rs +++ b/scylla-cdc-replicator/src/replication_tests.rs @@ -4,11 +4,12 @@ mod tests { use anyhow::anyhow; use futures_util::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; + use rstest::rstest; use scylla::client::session::Session; use scylla::value::CqlValue::{Boolean, Int, Text, UserDefinedType}; use scylla::value::{CqlValue, Row}; use scylla_cdc::consumer::{CDCRow, CDCRowSchema, Consumer}; - use scylla_cdc_test_utils::prepare_db; + use scylla_cdc_test_utils::{prepare_db, skip_if_not_supported}; use std::sync::Arc; /// Tuple representing a column in the table that will be replicated. @@ -295,8 +296,9 @@ mod tests { async fn test_replication( schema: TestTableSchema<'_>, operations: Vec>, + tablets_enabled: bool, ) -> anyhow::Result<()> { - test_replication_with_udt(schema, vec![], operations).await?; + test_replication_with_udt(schema, vec![], operations, tablets_enabled).await?; Ok(()) } @@ -307,6 +309,7 @@ mod tests { table_schema: TestTableSchema<'_>, udt_schemas: Vec>, operations: Vec>, + tablets_enabled: bool, ) -> anyhow::Result<(Arc, String, String)> { let mut schema_queries = get_udt_queries(udt_schemas); let create_dst_table_query = get_table_create_query(&table_schema); @@ -315,9 +318,9 @@ mod tests { let len = schema_queries.len(); schema_queries.push(create_src_table_query); - let (session, ks_src) = prepare_db(&schema_queries, 1).await?; + let (session, ks_src) = prepare_db(&schema_queries, 1, tablets_enabled).await?; schema_queries[len] = create_dst_table_query; - let (_, ks_dst) = prepare_db(&schema_queries, 1).await?; + let (_, ks_dst) = prepare_db(&schema_queries, 1, tablets_enabled).await?; session.refresh_metadata().await?; let mut last_read = (0, 0); @@ -446,8 +449,11 @@ mod tests { assert!(!is_equal); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn simple_insert_test() { + async fn simple_insert_test(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "SIMPLE_INSERT".to_string(), partition_key: vec![("pk", "int")], @@ -460,11 +466,14 @@ mod tests { "INSERT INTO SIMPLE_INSERT (pk, ck, v1, v2) VALUES (3, 2, 1, false)", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn simple_update_test() { + async fn simple_update_test(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "SIMPLE_UPDATE".to_string(), partition_key: vec![("pk", "int")], @@ -478,11 +487,14 @@ mod tests { "DELETE v1 FROM SIMPLE_UPDATE WHERE pk = 1 AND ck = 2", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn simple_frozen_udt_test() { + async fn simple_frozen_udt_test(#[case] tablets_enabled: bool) { let table_schema = TestTableSchema { name: "SIMPLE_UDT_TEST".to_string(), partition_key: vec![("pk", "int")], @@ -500,13 +512,19 @@ mod tests { "UPDATE SIMPLE_UDT_TEST SET ut_col = null WHERE pk = 0 AND ck = 0", ]; - test_replication_with_udt(table_schema, udt_schemas, operations) - .await - .unwrap(); + skip_if_not_supported!(test_replication_with_udt( + table_schema, + udt_schemas, + operations, + tablets_enabled + )); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_map_insert() { + async fn test_map_insert(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "MAPS_INSERT".to_string(), partition_key: vec![("pk", "int")], @@ -521,11 +539,14 @@ mod tests { "INSERT INTO MAPS_INSERT (pk, ck, v1, v2) VALUES (5, 6, {100: 100, 200: 200, 300: 300}, {400: true, 500: false})", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_map_update() { + async fn test_map_update(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "MAPS_UPDATE".to_string(), partition_key: vec![("pk", "int")], @@ -539,11 +560,14 @@ mod tests { "DELETE v1 FROM MAPS_UPDATE WHERE pk = 1 AND ck = 2", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_map_elements_update() { + async fn test_map_elements_update(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "MAP_ELEMENTS_UPDATE".to_string(), partition_key: vec![("pk", "int")], @@ -558,11 +582,14 @@ mod tests { "UPDATE MAP_ELEMENTS_UPDATE SET v1 = v1 - {10} WHERE pk = 10 AND ck = 20", "UPDATE MAP_ELEMENTS_UPDATE SET v1 = v1 - {1}, v1 = v1 + {2137: -2137} WHERE pk = 1 AND ck = 2", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_row_delete() { + async fn test_row_delete(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "ROW_DELETE".to_string(), partition_key: vec![("pk", "int")], @@ -579,11 +606,14 @@ mod tests { "INSERT INTO ROW_DELETE (pk, ck, v1, v2) VALUES (-1, -2, 30, true)", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_set_insert() { + async fn test_set_insert(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "SET_TEST".to_string(), partition_key: vec![("pk", "int")], @@ -597,11 +627,14 @@ mod tests { "INSERT INTO SET_TEST (pk, ck, v) VALUES (3, 4, {1, 1})", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_set_overwrite() { + async fn test_set_overwrite(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "SET_TEST".to_string(), partition_key: vec![("pk", "int")], @@ -614,11 +647,14 @@ mod tests { "UPDATE SET_TEST SET v = {1, 2} WHERE pk = 0 AND ck = 1", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_set_delete() { + async fn test_set_delete(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "SET_TEST".to_string(), partition_key: vec![("pk", "int")], @@ -631,11 +667,14 @@ mod tests { "DELETE v FROM SET_TEST WHERE pk = 0 AND ck = 1", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_set_update() { + async fn test_set_update(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "SET_TEST".to_string(), partition_key: vec![("pk", "int")], @@ -650,11 +689,14 @@ mod tests { "UPDATE SET_TEST SET v = v - {10}, v = v + {200} WHERE pk = 0 AND ck = 1", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_partition_delete() { + async fn test_partition_delete(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "PARTITION_DELETE".to_string(), partition_key: vec![("pk", "int")], @@ -668,11 +710,14 @@ mod tests { "DELETE FROM PARTITION_DELETE WHERE pk = 0", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_udt_insert() { + async fn test_udt_insert(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "TEST_UDT_INSERT".to_string(), partition_key: vec![("pk", "int")], @@ -691,13 +736,19 @@ mod tests { "INSERT INTO TEST_UDT_INSERT (pk, ck, v) VALUES (3, 4, {int_val: 3, bool_val: true})", ]; - test_replication_with_udt(schema, udt_schemas, operations) - .await - .unwrap(); + skip_if_not_supported!(test_replication_with_udt( + schema, + udt_schemas, + operations, + tablets_enabled + )); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_partition_delete_with_multiple_pk() { + async fn test_partition_delete_with_multiple_pk(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "PARTITION_DELETE_MULT_PK".to_string(), partition_key: vec![("pk1", "int"), ("pk2", "int")], @@ -711,11 +762,14 @@ mod tests { "DELETE FROM PARTITION_DELETE_MULT_PK WHERE pk1 = 0 AND pk2 = 2", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_list_update() { + async fn test_list_update(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "LIST_ELEMENTS_UPDATE".to_string(), partition_key: vec![("pk", "int")], @@ -729,11 +783,14 @@ mod tests { "UPDATE LIST_ELEMENTS_UPDATE SET v = v - [1, 5] WHERE pk = 1 AND ck = 2", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_udt_update() { + async fn test_udt_update(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "TEST_UDT_UPDATE".to_string(), partition_key: vec![("pk", "int")], @@ -752,13 +809,19 @@ mod tests { "UPDATE TEST_UDT_UPDATE SET v = null WHERE pk = 0 AND ck = 1", ]; - test_replication_with_udt(schema, udt_schemas, operations) - .await - .unwrap(); + skip_if_not_supported!(test_replication_with_udt( + schema, + udt_schemas, + operations, + tablets_enabled + )); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_list_replace() { + async fn test_list_replace(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "LIST_REPLACE".to_string(), partition_key: vec![("pk", "int")], @@ -771,11 +834,14 @@ mod tests { "UPDATE LIST_REPLACE SET v = [2, 4, 6, 8] WHERE pk = 1 AND ck = 2", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_checking_timestamps() { + async fn test_checking_timestamps(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "COMPARE_TIME".to_string(), partition_key: vec![("pk", "int")], @@ -788,10 +854,12 @@ mod tests { "UPDATE COMPARE_TIME SET v2 = false WHERE pk = 1 AND ck = 2", ]; - let (session, ks_src, ks_dst) = - test_replication_with_udt(schema.clone(), vec![], operations) - .await - .unwrap(); + let (session, ks_src, ks_dst) = skip_if_not_supported!(test_replication_with_udt( + schema.clone(), + vec![], + operations, + tablets_enabled + )); // We update timestamps for v2 column in src. session @@ -810,8 +878,11 @@ mod tests { assert!(result.is_err()); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_compare_time_for_complicated_types() { + async fn test_compare_time_for_complicated_types(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "COMPARE_TIME".to_string(), partition_key: vec![("pk", "int")], @@ -839,11 +910,14 @@ mod tests { "DELETE v4 FROM COMPARE_TIME WHERE pk = 4 AND CK = 4", ]; - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_udt_fields_update() { + async fn test_udt_fields_update(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "TEST_UDT_ELEMENTS_UPDATE".to_string(), partition_key: vec![("pk", "int")], @@ -866,13 +940,19 @@ mod tests { "UPDATE TEST_UDT_ELEMENTS_UPDATE SET v.int_val = null WHERE pk = 0 AND ck = 1", ]; - test_replication_with_udt(schema, udt_schemas, operations) - .await - .unwrap(); + skip_if_not_supported!(test_replication_with_udt( + schema, + udt_schemas, + operations, + tablets_enabled + )); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_range_delete() { + async fn test_range_delete(#[case] tablets_enabled: bool) { let schema = TestTableSchema { name: "RANGE_DELETE".to_string(), partition_key: vec![("pk1", "int"), ("pk2", "int")], @@ -893,6 +973,6 @@ mod tests { "DELETE FROM RANGE_DELETE WHERE pk1 = 0 AND pk2 = 0 AND (ck1, ck2, ck3) > (3, 3, 3)", ]); - test_replication(schema, operations).await.unwrap(); + skip_if_not_supported!(test_replication(schema, operations, tablets_enabled)); } } diff --git a/scylla-cdc-test-utils/src/lib.rs b/scylla-cdc-test-utils/src/lib.rs index 39f64e90..27d53913 100644 --- a/scylla-cdc-test-utils/src/lib.rs +++ b/scylla-cdc-test-utils/src/lib.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -24,14 +25,33 @@ fn get_create_table_query() -> String { format!("CREATE TABLE IF NOT EXISTS {TEST_TABLE} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};") } +fn get_create_keyspace_query( + keyspace: &str, + replication_factor: u8, + tablets_enabled: bool, +) -> String { + let tablets_clause = if tablets_enabled { + " AND tablets={'enabled': true}" + } else { + " AND tablets={'enabled': false}" + }; + + format!( + "CREATE KEYSPACE IF NOT EXISTS {keyspace} WITH REPLICATION = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {replication_factor}}}{tablets_clause};" + ) +} + pub async fn create_test_db( session: &Arc, schema: &[String], replication_factor: u8, + tablets_enabled: bool, ) -> anyhow::Result { let ks = unique_name(); - let mut create_keyspace_query = Statement::new(format!( - "CREATE KEYSPACE IF NOT EXISTS {ks} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': {replication_factor}}};" + let mut create_keyspace_query = Statement::new(get_create_keyspace_query( + &ks, + replication_factor, + tablets_enabled, )); create_keyspace_query.set_consistency(Consistency::All); @@ -65,18 +85,59 @@ fn get_uri() -> String { std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()) } +#[derive(Debug)] +pub struct CdcWithTabletsNotSupported(pub String); + +impl fmt::Display for CdcWithTabletsNotSupported { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CDC with tablets is not supported: {}", self.0) + } +} + +impl std::error::Error for CdcWithTabletsNotSupported {} + pub async fn prepare_db( schema: &[String], replication_factor: u8, + tablets_enabled: bool, ) -> anyhow::Result<(Arc, String)> { let uri = get_uri(); let session = SessionBuilder::new().known_node(uri).build().await.unwrap(); let shared_session = Arc::new(session); - let ks = create_test_db(&shared_session, schema, replication_factor).await?; - Ok((shared_session, ks)) + match create_test_db(&shared_session, schema, replication_factor, tablets_enabled).await { + Ok(ks) => Ok((shared_session, ks)), + Err(e) => { + let msg = e.to_string(); + if tablets_enabled && msg.contains("issue #16317") { + Err(anyhow::Error::new(CdcWithTabletsNotSupported(msg))) + } else { + Err(e) + } + } + } +} + +pub async fn prepare_simple_db(tablets_enabled: bool) -> anyhow::Result<(Arc, String)> { + prepare_db(&[get_create_table_query()], 1, tablets_enabled).await } -pub async fn prepare_simple_db() -> anyhow::Result<(Arc, String)> { - prepare_db(&[get_create_table_query()], 1).await +#[macro_export] +macro_rules! skip_if_not_supported { + ($expr:expr) => { + match $expr.await { + Ok(val) => val, + Err(e) => { + if let Some(src) = e + .root_cause() + .downcast_ref::() + { + eprintln!("Skipping test: {}", src); + return; + } else { + panic!("failed: {e}"); + } + } + } + }; } diff --git a/scylla-cdc/Cargo.toml b/scylla-cdc/Cargo.toml index 38b6379b..5c919df7 100644 --- a/scylla-cdc/Cargo.toml +++ b/scylla-cdc/Cargo.toml @@ -45,3 +45,4 @@ tokio = { version = "1.1.0", features = [ "sync", "rt-multi-thread", ] } +rstest = "0.26.1" diff --git a/scylla-cdc/src/checkpoints.rs b/scylla-cdc/src/checkpoints.rs index 3cabe955..a20c1bd4 100644 --- a/scylla-cdc/src/checkpoints.rs +++ b/scylla-cdc/src/checkpoints.rs @@ -239,7 +239,7 @@ mod tests { async fn setup() -> (Arc, String, Arc) { const DEFAULT_TTL: i64 = 300; - let (session, ks) = prepare_db(&[], 1).await.unwrap(); + let (session, ks) = prepare_db(&[], 1, false).await.unwrap(); let table_name = unique_name(); let cp_saver = Arc::new( diff --git a/scylla-cdc/src/consumer.rs b/scylla-cdc/src/consumer.rs index d5b1340f..fba722a9 100644 --- a/scylla-cdc/src/consumer.rs +++ b/scylla-cdc/src/consumer.rs @@ -372,6 +372,7 @@ mod tests { construct_single_collection_table_query(), ], 1, + false, ) .await? .0; diff --git a/scylla-cdc/src/e2e_tests.rs b/scylla-cdc/src/e2e_tests.rs index 6c442bc4..ee176b40 100644 --- a/scylla-cdc/src/e2e_tests.rs +++ b/scylla-cdc/src/e2e_tests.rs @@ -11,6 +11,7 @@ mod tests { use async_trait::async_trait; use futures::future::RemoteHandle; use itertools::{repeat_n, Itertools}; + use rstest::rstest; use scylla::client::session::Session; use scylla::frame::response::result::ColumnType; use scylla::serialize::value::SerializeValue; @@ -18,7 +19,7 @@ mod tests { use scylla::serialize::SerializationError; use scylla::statement::prepared::PreparedStatement; use scylla::value::CqlValue; - use scylla_cdc_test_utils::{now, prepare_db}; + use scylla_cdc_test_utils::{now, prepare_db, skip_if_not_supported}; use tokio::sync::Mutex; use crate::checkpoints::TableBackedCheckpointSaver; @@ -188,10 +189,14 @@ mod tests { } impl Test { - async fn new(table_name: &str, pk_type_names: Vec<&str>) -> Result { + async fn new( + table_name: &str, + pk_type_names: Vec<&str>, + tablets_enabled: bool, + ) -> Result { let (create_query, insert_query, update_query) = get_queries(table_name, pk_type_names); - let (session, keyspace) = prepare_db(&[create_query], 1).await?; + let (session, keyspace) = prepare_db(&[create_query], 1, tablets_enabled).await?; let insert_query = session.prepare(insert_query).await?; let update_query = session.prepare(update_query).await?; @@ -322,9 +327,13 @@ mod tests { } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn e2e_test_small() { - let mut test = Test::new("int_small_test", vec!["int"]).await.unwrap(); + async fn e2e_test_small(#[case] tablets_enabled: bool) { + let mut test = + skip_if_not_supported!(Test::new("int_small_test", vec!["int"], tablets_enabled)); let start = now(); for i in 0..10 { @@ -346,9 +355,12 @@ mod tests { test.test_cdc(start).await.unwrap(); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn e2e_test_int_pk() { - let mut test = Test::new("int_test", vec!["int"]).await.unwrap(); + async fn e2e_test_int_pk(#[case] tablets_enabled: bool) { + let mut test = skip_if_not_supported!(Test::new("int_test", vec!["int"], tablets_enabled)); let start = now(); for i in 0..100 { @@ -370,11 +382,16 @@ mod tests { test.test_cdc(start).await.unwrap(); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn e2e_test_int_string_pk() { - let mut test = Test::new("int_string_test", vec!["int", "text"]) - .await - .unwrap(); + async fn e2e_test_int_string_pk(#[case] tablets_enabled: bool) { + let mut test = skip_if_not_supported!(Test::new( + "int_string_test", + vec!["int", "text"], + tablets_enabled + )); let strings = ["blep".to_string(), "nghu".to_string(), "pkeee".to_string()]; let start = now(); @@ -459,13 +476,16 @@ mod tests { } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn e2e_test_saving_progress_complex() { + async fn e2e_test_saving_progress_complex(#[case] tablets_enabled: bool) { const N: i32 = 5; let table_name = "test_saving_progress"; let start = now(); - let mut test = Test::new(table_name, vec!["int"]).await.unwrap(); + let mut test = skip_if_not_supported!(Test::new(table_name, vec!["int"], tablets_enabled)); let results = Arc::new(Mutex::new(HashMap::new())); let factory = Arc::new(TestConsumerFactory::new(Arc::clone(&results))); diff --git a/scylla-cdc/src/log_reader.rs b/scylla-cdc/src/log_reader.rs index 53d5d6ae..7ee7dd7d 100644 --- a/scylla-cdc/src/log_reader.rs +++ b/scylla-cdc/src/log_reader.rs @@ -27,7 +27,7 @@ use tracing::warn; use crate::cdc_types::GenerationTimestamp; use crate::checkpoints::CDCCheckpointSaver; use crate::consumer::ConsumerFactory; -use crate::stream_generations::GenerationFetcher; +use crate::stream_generations::get_generation_fetcher; use crate::stream_reader::{CDCReaderConfig, StreamReader}; const SECOND_IN_MILLIS: i64 = 1_000; @@ -58,6 +58,27 @@ impl CDCLogReader { pub fn stop(&mut self) { self.stop_at(chrono::Duration::MIN); } + + /// Checks if the given keyspace uses tablets by querying system_schema.scylla_keyspaces. + async fn uses_tablets(session: &Session, keyspace: &str) -> anyhow::Result { + let query = + "SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = ?" + .to_string(); + // the query may fail on old scylla versions that don't have this table. in this case return + // false because it doesn't support tablets. + let result = match session.query_unpaged(query, (keyspace,)).await { + Ok(r) => r, + Err(_) => return Ok(false), + }; + let rows_result = result.into_rows_result()?; + let value = rows_result.maybe_first_row::<(Option,)>()?; + if let Some((tablets_val,)) = value { + if tablets_val.is_some() { + return Ok(true); + } + } + Ok(false) + } } struct CDCReaderWorker { @@ -69,11 +90,17 @@ struct CDCReaderWorker { end_timestamp_receiver: tokio::sync::watch::Receiver, consumer_factory: Arc, config: CDCReaderConfig, + uses_tablets: bool, } impl CDCReaderWorker { pub async fn run(&mut self) -> anyhow::Result<()> { - let fetcher = Arc::new(GenerationFetcher::new(&self.session)); + let fetcher = get_generation_fetcher( + &self.session, + &self.keyspace, + &self.table_name, + self.uses_tablets, + ); let (mut generation_receiver, _future_handle) = fetcher .clone() .fetch_generations_continuously(self.config.lower_timestamp, self.config.sleep_interval) @@ -455,6 +482,8 @@ impl CDCLogReaderBuilder { } } + let uses_tablets = CDCLogReader::uses_tablets(&session, &keyspace).await?; + let config = CDCReaderConfig { lower_timestamp: start_timestamp, window_size: self.window_size, @@ -475,6 +504,7 @@ impl CDCLogReaderBuilder { end_timestamp_receiver, consumer_factory, config, + uses_tablets, }; let (fut, handle) = async move { cdc_reader_worker.run().await }.remote_handle(); @@ -543,7 +573,7 @@ mod tests { next_id: Arc::new(AtomicUsize::new(0)), condition, }); - let (session, ks) = prepare_simple_db().await.unwrap(); + let (session, ks) = prepare_simple_db(false).await.unwrap(); let start = now(); for i in 0..pk_count { populate_simple_db_with_pk(&session, i).await.unwrap(); diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index aa73081d..0e01e6a0 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -1,6 +1,7 @@ -use futures::future::RemoteHandle; +use async_trait::async_trait; use futures::stream::StreamExt; use futures::FutureExt; +use futures::{future::RemoteHandle, TryStreamExt}; use scylla::client::session::Session; use scylla::statement::unprepared::Statement; use scylla::statement::Consistency; @@ -14,7 +15,97 @@ use tracing::warn; use crate::cdc_types::{GenerationTimestamp, StreamID}; /// Component responsible for managing stream generations. -pub struct GenerationFetcher { +#[async_trait] +pub(crate) trait GenerationFetcher: Send + Sync + 'static { + /// In case of a success returns a vector containing all the generations in the database. + /// Propagates all errors. + async fn fetch_all_generations(&self) -> anyhow::Result>; + + /// Given a timestamp of an operation fetch generation that was operating when this operation was performed. + /// If no such generation exists, returns `None`. + /// Propagates errors. + async fn fetch_generation_by_timestamp( + &self, + time: &chrono::Duration, + ) -> anyhow::Result>; + + /// Given a generation returns the next generation. + /// If given generation is currently operating, returns `None`. + /// Propagates errors. + async fn fetch_next_generation( + &self, + generation: &GenerationTimestamp, + ) -> anyhow::Result>; + + /// Given a generation return grouped identifiers of all streams of this generation. + async fn fetch_stream_ids( + &self, + generation: &GenerationTimestamp, + ) -> anyhow::Result>>; + + /// Continuously monitors and fetches new generations as they become available. + /// Returns a receiver for new generations and a handle to the background task. + async fn fetch_generations_continuously( + self: Arc, + start_timestamp: chrono::Duration, + sleep_interval: time::Duration, + ) -> anyhow::Result<(mpsc::Receiver, RemoteHandle<()>)> { + let (generation_sender, generation_receiver) = mpsc::channel(1); + + let (future, future_handle) = async move { + let mut generation = loop { + match self.fetch_generation_by_timestamp(&start_timestamp).await { + Ok(Some(generation)) => break generation, + Ok(None) => { + break { + loop { + match self.fetch_all_generations().await { + Ok(vectors) => match vectors.last() { + None => sleep(sleep_interval).await, + Some(generation) => break generation.clone(), + }, + _ => { + warn!("Failed to fetch all generations"); + sleep(sleep_interval).await + } + } + } + } + } + _ => { + warn!("Failed to fetch generation by timestamp"); + sleep(sleep_interval).await + } + } + }; + if generation_sender.send(generation.clone()).await.is_err() { + return; + } + + loop { + generation = loop { + match self.fetch_next_generation(&generation).await { + Ok(Some(generation)) => break generation, + Ok(None) => sleep(sleep_interval).await, + _ => { + warn!("Failed to fetch next generation"); + sleep(sleep_interval).await + } + } + }; + if generation_sender.send(generation.clone()).await.is_err() { + break; + } + } + } + .remote_handle(); + tokio::spawn(future); + Ok((generation_receiver, future_handle)) + } +} + +/// implementation of GenerationFetcher for vnodes-based keyspaces. +pub struct VnodeGenerationFetcher { generations_table_name: String, streams_table_name: String, session: Arc, @@ -23,11 +114,14 @@ pub struct GenerationFetcher { // Number taken from: https://www.scylladb.com/2017/11/17/7-rules-planning-queries-maximum-performance/. const DEFAULT_PAGE_SIZE: i32 = 5000; -impl GenerationFetcher { - pub fn new(session: &Arc) -> GenerationFetcher { - GenerationFetcher { - generations_table_name: "system_distributed.cdc_generation_timestamps".to_string(), - streams_table_name: "system_distributed.cdc_streams_descriptions_v2".to_string(), +const VNODE_GENERATIONS_TABLE: &str = "system_distributed.cdc_generation_timestamps"; +const VNODE_STREAMS_TABLE: &str = "system_distributed.cdc_streams_descriptions_v2"; + +impl VnodeGenerationFetcher { + pub fn new(session: &Arc) -> VnodeGenerationFetcher { + VnodeGenerationFetcher { + generations_table_name: VNODE_GENERATIONS_TABLE.to_string(), + streams_table_name: VNODE_STREAMS_TABLE.to_string(), session: Arc::clone(session), } } @@ -36,19 +130,54 @@ impl GenerationFetcher { fn get_all_stream_generations_query(&self) -> String { format!( " - SELECT time - FROM {} - WHERE key = 'timestamps'; - ", + SELECT time + FROM {} + WHERE key = 'timestamps';", self.generations_table_name ) } - /// In case of a success returns a vector containing all the generations in the database. - /// Propagates all errors. - pub async fn fetch_all_generations(&self) -> anyhow::Result> { - let mut generations = Vec::new(); + fn get_generation_by_timestamp_query(&self) -> String { + format!( + " + SELECT time + FROM {} + WHERE key = 'timestamps' + AND time <= ? + ORDER BY time DESC + LIMIT 1;", + self.generations_table_name + ) + } + + fn get_next_generation_query(&self) -> String { + format!( + " + SELECT time + FROM {} + WHERE key = 'timestamps' + AND time > ? + ORDER BY time ASC + LIMIT 1;", + self.generations_table_name + ) + } + + fn get_stream_ids_by_time_query(&self) -> String { + format!( + " + SELECT streams + FROM {} + WHERE time = ?;", + self.streams_table_name + ) + } +} +#[async_trait] +impl GenerationFetcher for VnodeGenerationFetcher { + async fn fetch_all_generations(&self) -> anyhow::Result> { + let mut generations = Vec::new(); let mut query = new_distributed_system_query(self.get_all_stream_generations_query(), &self.session) .await?; @@ -67,25 +196,7 @@ impl GenerationFetcher { Ok(generations) } - // Function instead of constant for testing purposes. - fn get_generation_by_timestamp_query(&self) -> String { - format!( - " - SELECT time - FROM {} - WHERE key = 'timestamps' - AND time <= ? - ORDER BY time DESC - LIMIT 1; - ", - self.generations_table_name - ) - } - - /// Given a timestamp of an operation fetch generation that was operating when this operation was performed. - /// If no such generation exists, returns `None`. - /// Propagates errors. - pub async fn fetch_generation_by_timestamp( + async fn fetch_generation_by_timestamp( &self, time: &chrono::Duration, ) -> anyhow::Result> { @@ -104,25 +215,7 @@ impl GenerationFetcher { Ok(result) } - // Function instead of constant for testing purposes. - fn get_next_generation_query(&self) -> String { - format!( - " - SELECT time - FROM {} - WHERE key = 'timestamps' - AND time > ? - ORDER BY time ASC - LIMIT 1; - ", - self.generations_table_name - ) - } - - /// Given a generation returns the next generation. - /// If given generation is currently operating, returns `None`. - /// Propagates errors. - pub async fn fetch_next_generation( + async fn fetch_next_generation( &self, generation: &GenerationTimestamp, ) -> anyhow::Result> { @@ -140,21 +233,8 @@ impl GenerationFetcher { Ok(result) } - // Function instead of constant for testing purposes. - fn get_stream_ids_by_time_query(&self) -> String { - format!( - " - SELECT streams - FROM {} - WHERE time = ?; - ", - self.streams_table_name - ) - } - - /// Given a generation return identifiers of all streams of this generation. - /// Streams are grouped by vnodes. - pub async fn fetch_stream_ids( + // Streams are grouped by vnodes + async fn fetch_stream_ids( &self, generation: &GenerationTimestamp, ) -> anyhow::Result>> { @@ -178,63 +258,179 @@ impl GenerationFetcher { Ok(result_vec) } +} - pub async fn fetch_generations_continuously( - self: Arc, - start_timestamp: chrono::Duration, - sleep_interval: time::Duration, - ) -> anyhow::Result<(mpsc::Receiver, RemoteHandle<()>)> { - let (generation_sender, generation_receiver) = mpsc::channel(1); +/// implementation of GenerationFetcher for tablets-based keyspaces. +pub struct TabletsGenerationFetcher { + timestamps_table_name: String, + streams_table_name: String, + keyspace_name: String, + table_name: String, + session: Arc, +} - let (future, future_handle) = async move { - let mut generation = loop { - match self.fetch_generation_by_timestamp(&start_timestamp).await { - Ok(Some(generation)) => break generation, - Ok(None) => { - break { - loop { - match self.fetch_all_generations().await { - Ok(vectors) => match vectors.last() { - None => sleep(sleep_interval).await, - Some(generation) => break generation.clone(), - }, - _ => { - warn!("Failed to fetch all generations"); - sleep(sleep_interval).await - } - } - } - } - } - _ => { - warn!("Failed to fetch generation by timestamp"); - sleep(sleep_interval).await - } - } - }; - if generation_sender.send(generation.clone()).await.is_err() { - return; - } +// follows stream_state in system.cdc_streams +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(i8)] +enum StreamState { + Current = 0, + #[allow(dead_code)] + Closed = 1, + #[allow(dead_code)] + Opened = 2, +} - loop { - generation = loop { - match self.fetch_next_generation(&generation).await { - Ok(Some(generation)) => break generation, - Ok(None) => sleep(sleep_interval).await, - _ => { - warn!("Failed to fetch next generation"); - sleep(sleep_interval).await - } - } - }; - if generation_sender.send(generation.clone()).await.is_err() { - break; - } - } +const TABLETS_TIMESTAMPS_TABLE: &str = "system.cdc_timestamps"; +const TABLETS_STREAMS_TABLE: &str = "system.cdc_streams"; + +impl TabletsGenerationFetcher { + pub fn new( + session: &Arc, + keyspace_name: String, + table_name: String, + ) -> TabletsGenerationFetcher { + TabletsGenerationFetcher { + timestamps_table_name: TABLETS_TIMESTAMPS_TABLE.to_string(), + streams_table_name: TABLETS_STREAMS_TABLE.to_string(), + keyspace_name, + table_name, + session: Arc::clone(session), } - .remote_handle(); - tokio::spawn(future); - Ok((generation_receiver, future_handle)) + } + + fn get_all_stream_generations_query(&self) -> String { + format!( + r#" + SELECT timestamp + FROM {} + WHERE keyspace_name = ? AND table_name = ?; + "#, + self.timestamps_table_name + ) + } + + fn get_generation_by_timestamp_query(&self) -> String { + format!( + r#" + SELECT timestamp + FROM {} + WHERE keyspace_name = ? AND table_name = ? AND timestamp <= ? + ORDER BY timestamp DESC + LIMIT 1; + "#, + self.timestamps_table_name + ) + } + + fn get_next_generation_query(&self) -> String { + format!( + r#" + SELECT timestamp + FROM {} + WHERE keyspace_name = ? AND table_name = ? AND timestamp > ? + ORDER BY timestamp ASC + LIMIT 1; + "#, + self.timestamps_table_name + ) + } + + fn get_stream_ids_by_time_query(&self) -> String { + format!( + r#" + SELECT stream_id + FROM {} + WHERE keyspace_name = ? AND table_name = ? AND timestamp = ? AND stream_state = ?; + "#, + self.streams_table_name + ) + } +} + +#[async_trait] +impl GenerationFetcher for TabletsGenerationFetcher { + async fn fetch_all_generations(&self) -> anyhow::Result> { + let mut query = Statement::new(self.get_all_stream_generations_query()); + query.set_page_size(DEFAULT_PAGE_SIZE); + + let generations = self + .session + .query_iter(query, (&self.keyspace_name, &self.table_name)) + .await? + .rows_stream::<(GenerationTimestamp,)>()? + .map(|r| r.map(|(ts,)| ts)) + .try_collect::>() + .await?; + + Ok(generations) + } + + async fn fetch_generation_by_timestamp( + &self, + time: &chrono::Duration, + ) -> anyhow::Result> { + let query = Statement::new(self.get_generation_by_timestamp_query()); + + let result = self + .session + .query_unpaged( + query, + ( + &self.keyspace_name, + &self.table_name, + value::CqlTimestamp(time.num_milliseconds()), + ), + ) + .await? + .into_rows_result()? + .maybe_first_row::<(GenerationTimestamp,)>()? + .map(|(ts,)| ts); + + Ok(result) + } + + async fn fetch_next_generation( + &self, + generation: &GenerationTimestamp, + ) -> anyhow::Result> { + let query = Statement::new(self.get_next_generation_query()); + + let result = self + .session + .query_unpaged(query, (&self.keyspace_name, &self.table_name, generation)) + .await? + .into_rows_result()? + .maybe_first_row::<(GenerationTimestamp,)>()? + .map(|(ts,)| ts); + + Ok(result) + } + + async fn fetch_stream_ids( + &self, + generation: &GenerationTimestamp, + ) -> anyhow::Result>> { + let mut query = Statement::new(self.get_stream_ids_by_time_query()); + query.set_page_size(DEFAULT_PAGE_SIZE); + + let result = self + .session + .query_iter( + query, + ( + &self.keyspace_name, + &self.table_name, + generation, + StreamState::Current as i8, + ), + ) + .await? + .rows_stream::<(StreamID,)>()? + .map(|r| r.map(|(id,)| vec![id])) + .try_collect::>() + .await?; + + Ok(result) } } @@ -272,58 +468,80 @@ async fn new_distributed_system_query( Ok(query) } +pub fn get_generation_fetcher( + session: &Arc, + keyspace: &str, + table_name: &str, + uses_tablets: bool, +) -> Arc { + if uses_tablets { + Arc::new(TabletsGenerationFetcher::new( + session, + keyspace.to_string(), + table_name.to_string(), + )) + } else { + Arc::new(VnodeGenerationFetcher::new(session)) + } +} + #[cfg(test)] mod tests { + use rstest::rstest; use scylla_cdc_test_utils::prepare_db; use super::*; - const TEST_STREAM_TABLE: &str = "cdc_streams_descriptions_v2"; - const TEST_GENERATION_TABLE: &str = "cdc_generation_timestamps"; const GENERATION_NEW_MILLISECONDS: i64 = 1635882326384; const GENERATION_OLD_MILLISECONDS: i64 = 1635882224341; const TEST_STREAM_1: &str = "0x7fb9f781956cea08c651295720000001"; const TEST_STREAM_2: &str = "0x7fc0000000000000c298b9f168000001"; - impl GenerationFetcher { - // Constructor intended for testing purposes. - fn test_new(session: &Arc) -> GenerationFetcher { - GenerationFetcher { - streams_table_name: TEST_STREAM_TABLE.to_string(), - generations_table_name: TEST_GENERATION_TABLE.to_string(), - session: Arc::clone(session), + mod vnode_tests { + use super::*; + + const TEST_STREAM_TABLE: &str = "cdc_streams_descriptions_v2"; + const TEST_GENERATION_TABLE: &str = "cdc_generation_timestamps"; + + impl VnodeGenerationFetcher { + // Constructor intended for testing purposes. + fn test_new(session: &Arc) -> VnodeGenerationFetcher { + VnodeGenerationFetcher { + streams_table_name: TEST_STREAM_TABLE.to_string(), + generations_table_name: TEST_GENERATION_TABLE.to_string(), + session: Arc::clone(session), + } } } - } - // Constructs mock table with the same schema as the original one's. - fn construct_generation_table_query() -> String { - format!( - " + // Constructs mock table with the same schema as the original one's. + fn construct_generation_table_query() -> String { + format!( + " CREATE TABLE IF NOT EXISTS {TEST_GENERATION_TABLE}( key text, time timestamp, expired timestamp, PRIMARY KEY (key, time) ) WITH CLUSTERING ORDER BY (time DESC);" - ) - } + ) + } - // Constructs mock table with the same schema as the original one's. - fn construct_stream_table_query() -> String { - format!( - " + // Constructs mock table with the same schema as the original one's. + fn construct_stream_table_query() -> String { + format!( + " CREATE TABLE IF NOT EXISTS {TEST_STREAM_TABLE} ( time timestamp, range_end bigint, streams frozen>, PRIMARY KEY (time, range_end) ) WITH CLUSTERING ORDER BY (range_end ASC);", - ) - } + ) + } - async fn insert_generation_timestamp(session: &Session, generation: i64) { - let query = new_distributed_system_query( + pub(super) async fn insert_generation_timestamp(session: &Session, generation: i64) { + let query = new_distributed_system_query( format!( "INSERT INTO {TEST_GENERATION_TABLE} (key, time, expired) VALUES ('timestamps', ?, NULL);", ), @@ -332,56 +550,196 @@ mod tests { .await .unwrap(); - session - .query_unpaged(query, (value::CqlTimestamp(generation),)) - .await - .unwrap(); - } + session + .query_unpaged(query, (value::CqlTimestamp(generation),)) + .await + .unwrap(); + } - // Populate test tables with given data. - async fn populate_test_db(session: &Session) { - let stream_generation = value::CqlTimestamp(GENERATION_NEW_MILLISECONDS); + // Populate test tables with given data. + async fn populate_test_db(session: &Session) { + let stream_generation = value::CqlTimestamp(GENERATION_NEW_MILLISECONDS); - for generation in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] { - insert_generation_timestamp(session, *generation).await; - } + for generation in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] { + insert_generation_timestamp(session, *generation).await; + } - let query = new_distributed_system_query( + let query = new_distributed_system_query( format!( "INSERT INTO {TEST_STREAM_TABLE}(time, range_end, streams) VALUES (?, -1, {{{TEST_STREAM_1}, {TEST_STREAM_2}}});" ), session, - ) - .await - .unwrap(); - - session - .query_unpaged(query, (stream_generation,)) + ) .await .unwrap(); + + session + .query_unpaged(query, (stream_generation,)) + .await + .unwrap(); + } + + // Create setup for tests. + pub(super) async fn setup() -> anyhow::Result { + let session = prepare_db( + &[ + construct_generation_table_query(), + construct_stream_table_query(), + ], + 1, + false, + ) + .await? + .0; + populate_test_db(&session).await; + + let generation_fetcher = VnodeGenerationFetcher::test_new(&session); + + Ok(generation_fetcher) + } } - // Create setup for tests. - async fn setup() -> anyhow::Result { - let session = prepare_db( - &[ - construct_generation_table_query(), - construct_stream_table_query(), - ], - 3, - ) - .await? - .0; - populate_test_db(&session).await; + mod tablets_tests { + use super::*; + + const TEST_KEYSPACE: &str = "test_tablets_ks"; + const TEST_TABLE: &str = "test_tablets_table"; + + impl TabletsGenerationFetcher { + // Constructor intended for testing purposes. + fn test_new( + session: &Arc, + keyspace: &str, + table: &str, + ) -> TabletsGenerationFetcher { + TabletsGenerationFetcher { + timestamps_table_name: "cdc_timestamps".to_string(), + streams_table_name: "cdc_streams".to_string(), + keyspace_name: keyspace.to_string(), + table_name: table.to_string(), + session: Arc::clone(session), + } + } + } + + pub(super) async fn insert_generation_timestamp(session: &Session, timestamp: i64) { + let query = new_distributed_system_query( + "INSERT INTO cdc_timestamps (keyspace_name, table_name, timestamp) VALUES (?, ?, ?);".to_string(), session) + .await + .unwrap(); + + session + .query_unpaged( + query, + (TEST_KEYSPACE, TEST_TABLE, value::CqlTimestamp(timestamp)), + ) + .await + .unwrap(); + } + + // TabletsGenerationFetcher tests + pub(super) async fn setup() -> anyhow::Result { + let session = Arc::new(prepare_db(&[], 1, false).await?.0); + // Create CDC tables for tablets in the current keyspace + session + .query_unpaged( + "CREATE TABLE IF NOT EXISTS cdc_timestamps ( + keyspace_name text, + table_name text, + timestamp timestamp, + PRIMARY KEY ((keyspace_name, table_name), timestamp) + ) WITH CLUSTERING ORDER BY (timestamp DESC);", + &[], + ) + .await?; + session + .query_unpaged( + "CREATE TABLE IF NOT EXISTS cdc_streams ( + keyspace_name text, + table_name text, + timestamp timestamp, + stream_state tinyint, + stream_id blob, + PRIMARY KEY ((keyspace_name, table_name), timestamp, stream_state, stream_id) + ) WITH CLUSTERING ORDER BY (timestamp ASC, stream_state ASC, stream_id ASC);", + &[], + ) + .await?; + + // Insert generations + for ts in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] { + insert_generation_timestamp(&session, *ts).await; + } - let generation_fetcher = GenerationFetcher::test_new(&session); + // Insert streams + for sid in &[TEST_STREAM_1, TEST_STREAM_2] { + let stream_id = hex::decode(sid.strip_prefix("0x").unwrap()).unwrap(); + let query = new_distributed_system_query( + "INSERT INTO cdc_streams (keyspace_name, table_name, timestamp, stream_state, stream_id) VALUES (?, ?, ?, ?, ?);".to_string(), + &session, + ) + .await + .unwrap(); + + for st in &[StreamState::Current, StreamState::Opened] { + session + .query_unpaged( + query.clone(), + ( + TEST_KEYSPACE, + TEST_TABLE, + value::CqlTimestamp(GENERATION_NEW_MILLISECONDS), + *st as i8, + stream_id.clone(), + ), + ) + .await + .unwrap(); + } + } - Ok(generation_fetcher) + Ok(TabletsGenerationFetcher::test_new( + &session, + TEST_KEYSPACE, + TEST_TABLE, + )) + } + } + + // helper function to setup and get appropriate GenerationFetcher and Session for tests + async fn setup( + tablets_enabled: bool, + ) -> anyhow::Result<(Arc, Arc)> { + if tablets_enabled { + let fetcher = tablets_tests::setup().await?; + let session = Arc::clone(&fetcher.session); + Ok((Arc::new(fetcher), session)) + } else { + let fetcher = vnode_tests::setup().await?; + let session = Arc::clone(&fetcher.session); + Ok((Arc::new(fetcher), session)) + } + } + + // helper function to insert a new generation + async fn insert_generation_timestamp( + session: &Session, + tablets_enabled: bool, + generation: i64, + ) { + if tablets_enabled { + tablets_tests::insert_generation_timestamp(session, generation).await; + } else { + vnode_tests::insert_generation_timestamp(session, generation).await; + } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_fetch_all_generations() { - let fetcher = setup().await.unwrap(); + async fn test_fetch_all_generations(#[case] tablets_enabled: bool) { + let fetcher = setup(tablets_enabled).await.unwrap().0; let correct_gen = vec![ GenerationTimestamp { @@ -397,9 +755,12 @@ mod tests { assert_eq!(gen, correct_gen); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_get_generation_by_timestamp() { - let fetcher = setup().await.unwrap(); + async fn test_get_generation_by_timestamp(#[case] tablets_enabled: bool) { + let fetcher = setup(tablets_enabled).await.unwrap().0; // Input. let timestamps_ms = [ @@ -441,9 +802,12 @@ mod tests { } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_get_next_generation() { - let fetcher = setup().await.unwrap(); + async fn test_get_next_generation(#[case] tablets_enabled: bool) { + let fetcher = setup(tablets_enabled).await.unwrap().0; let gen = fetcher.fetch_all_generations().await.unwrap(); @@ -459,9 +823,12 @@ mod tests { ); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_get_next_generation_correct_order() { - let fetcher = setup().await.unwrap(); + async fn test_get_next_generation_correct_order(#[case] tablets_enabled: bool) { + let fetcher = setup(tablets_enabled).await.unwrap().0; let gen_before_all_others = GenerationTimestamp { timestamp: chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS - 1), @@ -476,9 +843,12 @@ mod tests { assert_eq!(gen_before_others_next.unwrap(), first_gen); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_do_get_stream_ids() { - let fetcher = setup().await.unwrap(); + async fn test_do_get_stream_ids(#[case] tablets_enabled: bool) { + let fetcher = setup(tablets_enabled).await.unwrap().0; let gen = GenerationTimestamp { timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS), @@ -486,27 +856,26 @@ mod tests { let stream_ids = fetcher.fetch_stream_ids(&gen).await.unwrap(); - let correct_stream_ids: Vec> = [[TEST_STREAM_1, TEST_STREAM_2]] - .iter() - .map(|stream_vec| { - stream_vec - .iter() - .map(|stream| StreamID { - id: hex::decode(stream.strip_prefix("0x").unwrap()).unwrap(), - }) - .collect() - }) - .collect(); - - assert_eq!(stream_ids, correct_stream_ids); + let stream1 = + StreamID::new(hex::decode(TEST_STREAM_1.strip_prefix("0x").unwrap()).unwrap()); + let stream2 = + StreamID::new(hex::decode(TEST_STREAM_2.strip_prefix("0x").unwrap()).unwrap()); + + if tablets_enabled { + assert_eq!(stream_ids, vec![[stream1], [stream2]]); + } else { + assert_eq!(stream_ids, vec![vec![stream1, stream2]]); + } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn test_get_generations_continuously() { - let fetcher = setup().await.unwrap(); - let session = fetcher.session.clone(); + async fn test_get_generations_continuously(#[case] tablets_enabled: bool) { + let (fetcher, session) = setup(tablets_enabled).await.unwrap(); - let (mut generation_receiver, _future) = Arc::new(fetcher) + let (mut generation_receiver, _future) = fetcher .fetch_generations_continuously( chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS - 1), time::Duration::from_millis(100), @@ -532,7 +901,9 @@ mod tests { timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS + 100), }; - insert_generation_timestamp(&session, GENERATION_NEW_MILLISECONDS + 100).await; + insert_generation_timestamp(&session, tablets_enabled, GENERATION_NEW_MILLISECONDS + 100) + .await; + let generation = generation_receiver.recv().await.unwrap(); assert_eq!(generation, new_gen); } diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index 90963f69..d20e3bc2 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -296,9 +296,12 @@ impl StreamReader { mod tests { use async_trait::async_trait; use futures::stream::StreamExt; + use rstest::rstest; use scylla::errors::{ExecutionError, PrepareError, RequestAttemptError}; use scylla::statement::unprepared::Statement; - use scylla_cdc_test_utils::{now, populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE}; + use scylla_cdc_test_utils::{ + now, populate_simple_db_with_pk, prepare_simple_db, skip_if_not_supported, TEST_TABLE, + }; use std::sync::atomic::AtomicIsize; use std::sync::atomic::Ordering::Relaxed; use tokio::sync::Mutex; @@ -440,9 +443,12 @@ mod tests { } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn check_fetch_cdc_with_multiple_stream_id() { - let (shared_session, ks) = prepare_simple_db().await.unwrap(); + async fn check_fetch_cdc_with_multiple_stream_id(#[case] tablets_enabled: bool) { + let (shared_session, ks) = skip_if_not_supported!(prepare_simple_db(tablets_enabled)); let partition_key_1 = 0; let partition_key_2 = 1; @@ -496,9 +502,12 @@ mod tests { assert_eq!(row_count_with_pk1, 3); } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn check_fetch_cdc_with_one_stream_id() { - let (shared_session, ks) = prepare_simple_db().await.unwrap(); + async fn check_fetch_cdc_with_one_stream_id(#[case] tablets_enabled: bool) { + let (shared_session, ks) = skip_if_not_supported!(prepare_simple_db(tablets_enabled)); let partition_key = 0; populate_simple_db_with_pk(&shared_session, partition_key) @@ -528,9 +537,12 @@ mod tests { } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn check_set_upper_timestamp_in_fetch_cdc() { - let (shared_session, ks) = prepare_simple_db().await.unwrap(); + async fn check_set_upper_timestamp_in_fetch_cdc(#[case] tablets_enabled: bool) { + let (shared_session, ks) = skip_if_not_supported!(prepare_simple_db(tablets_enabled)); let mut insert_before_upper_timestamp_query = Statement::new(format!( "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');", @@ -578,9 +590,12 @@ mod tests { } } + #[rstest] + #[case::vnodes(false)] + #[case::tablets(true)] #[tokio::test] - async fn timeout_retry_test() { - let (shared_session, ks) = prepare_simple_db().await.unwrap(); + async fn timeout_retry_test(#[case] tablets_enabled: bool) { + let (shared_session, ks) = skip_if_not_supported!(prepare_simple_db(tablets_enabled)); let partition_key = 0; populate_simple_db_with_pk(&shared_session, partition_key)