diff --git a/Cargo.toml b/Cargo.toml index 4e5f9485..8463520b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -resolver = "2" +resolver = "3" members = [ "scylla-cdc", diff --git a/scylla-cdc-printer/Cargo.toml b/scylla-cdc-printer/Cargo.toml index 48761aed..265a7b14 100644 --- a/scylla-cdc-printer/Cargo.toml +++ b/scylla-cdc-printer/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "scylla-cdc-printer" version = "0.1.4" -edition = "2021" +edition = "2024" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index a78db359..738f6d23 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -115,7 +115,7 @@ mod tests { use std::time; use scylla_cdc::log_reader::CDCLogReaderBuilder; - use scylla_cdc_test_utils::{now, populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE}; + use scylla_cdc_test_utils::{TEST_TABLE, now, populate_simple_db_with_pk, prepare_simple_db}; use super::*; diff --git a/scylla-cdc-replicator/Cargo.toml b/scylla-cdc-replicator/Cargo.toml index 2a9fae46..67777965 100644 --- a/scylla-cdc-replicator/Cargo.toml +++ b/scylla-cdc-replicator/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "scylla-cdc-replicator" version = "0.1.4" -edition = "2021" +edition = "2024" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/scylla-cdc-test-utils/Cargo.toml b/scylla-cdc-test-utils/Cargo.toml index 8a23b30f..4f928404 100644 --- a/scylla-cdc-test-utils/Cargo.toml +++ b/scylla-cdc-test-utils/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "scylla-cdc-test-utils" version = "0.4.1" -edition = "2021" +edition = "2024" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/scylla-cdc-test-utils/src/lib.rs b/scylla-cdc-test-utils/src/lib.rs index 27d53913..b7667e43 100644 --- a/scylla-cdc-test-utils/src/lib.rs +++ b/scylla-cdc-test-utils/src/lib.rs @@ -1,11 +1,11 @@ use std::fmt; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; -use scylla::statement::unprepared::Statement; use scylla::statement::Consistency; +use scylla::statement::unprepared::Statement; pub const TEST_TABLE: &str = "t"; static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -22,7 +22,9 @@ pub fn unique_name() -> String { } fn get_create_table_query() -> String { - format!("CREATE TABLE IF NOT EXISTS {TEST_TABLE} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};") + format!( + "CREATE TABLE IF NOT EXISTS {TEST_TABLE} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};" + ) } fn get_create_keyspace_query( diff --git a/scylla-cdc/Cargo.toml b/scylla-cdc/Cargo.toml index 5c919df7..e8fa54d8 100644 --- a/scylla-cdc/Cargo.toml +++ b/scylla-cdc/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "scylla-cdc" version = "0.4.1" -edition = "2021" +edition = "2024" description = "Library for consuming ScyllaDB CDC log for Rust" repository = "https://github.com/scylladb/scylla-cdc-rust" readme = "../README.md" diff --git a/scylla-cdc/src/cdc_types.rs b/scylla-cdc/src/cdc_types.rs index aced4c51..96319579 100644 --- a/scylla-cdc/src/cdc_types.rs +++ b/scylla-cdc/src/cdc_types.rs @@ -1,9 +1,9 @@ //! A module containing types related to CDC internal structure. use scylla::deserialize::value::DeserializeValue; use scylla::frame::response::result::ColumnType; +use scylla::serialize::SerializationError; use scylla::serialize::value::SerializeValue; use scylla::serialize::writers::{CellWriter, WrittenCellProof}; -use scylla::serialize::SerializationError; use scylla::value::CqlTimestamp; use std::fmt; diff --git a/scylla-cdc/src/checkpoints.rs b/scylla-cdc/src/checkpoints.rs index a20c1bd4..7193eca2 100644 --- a/scylla-cdc/src/checkpoints.rs +++ b/scylla-cdc/src/checkpoints.rs @@ -2,8 +2,8 @@ use crate::cdc_types::{GenerationTimestamp, StreamID}; use anyhow; use async_trait::async_trait; -use futures::future::RemoteHandle; use futures::FutureExt; +use futures::future::RemoteHandle; use scylla::client::session::Session; use scylla::statement::prepared::PreparedStatement; use scylla::value; @@ -220,7 +220,7 @@ impl CDCCheckpointSaver for TableBackedCheckpointSaver { .await? .into_rows_result()? .maybe_first_row::<(value::CqlTimestamp,)>()? - .map(|t| chrono::Duration::milliseconds(t.0 .0))) + .map(|t| chrono::Duration::milliseconds(t.0.0))) } } @@ -259,9 +259,9 @@ mod tests { .rows_stream::<(StreamID, GenerationTimestamp, value::CqlTimestamp)>() .unwrap() .map(|res| { - res.map(|(id, gen, time)| Checkpoint { + res.map(|(id, generation, time)| Checkpoint { stream_id: id, - generation: gen, + generation, timestamp: Duration::from_millis(time.0 as u64), }) }) diff --git a/scylla-cdc/src/e2e_tests.rs b/scylla-cdc/src/e2e_tests.rs index ee176b40..f5ed0fc9 100644 --- a/scylla-cdc/src/e2e_tests.rs +++ b/scylla-cdc/src/e2e_tests.rs @@ -7,16 +7,16 @@ mod tests { use std::sync::Arc; use std::time; - use anyhow::{bail, Result}; + use anyhow::{Result, bail}; use async_trait::async_trait; use futures::future::RemoteHandle; - use itertools::{repeat_n, Itertools}; + use itertools::{Itertools, repeat_n}; use rstest::rstest; use scylla::client::session::Session; use scylla::frame::response::result::ColumnType; + use scylla::serialize::SerializationError; use scylla::serialize::value::SerializeValue; use scylla::serialize::writers::{CellWriter, WrittenCellProof}; - use scylla::serialize::SerializationError; use scylla::statement::prepared::PreparedStatement; use scylla::value::CqlValue; use scylla_cdc_test_utils::{now, prepare_db, skip_if_not_supported}; @@ -173,9 +173,11 @@ mod tests { .join(" AND "); ( - format!("CREATE TABLE {table_name} ({pk_definitions}, ck int, v int, primary key (({primary_key_tuple}), ck)) WITH cdc = {{'enabled' : true}}"), + format!( + "CREATE TABLE {table_name} ({pk_definitions}, ck int, v int, primary key (({primary_key_tuple}), ck)) WITH cdc = {{'enabled' : true}}" + ), format!("INSERT INTO {table_name} (v, {primary_key_tuple}, ck) VALUES ({binds}, ?, ?)"), - format!("UPDATE {table_name} SET v = ? WHERE {pk_conditions} AND ck = ?") + format!("UPDATE {table_name} SET v = ? WHERE {pk_conditions} AND ck = ?"), ) } diff --git a/scylla-cdc/src/log_reader.rs b/scylla-cdc/src/log_reader.rs index 7ee7dd7d..5825accb 100644 --- a/scylla-cdc/src/log_reader.rs +++ b/scylla-cdc/src/log_reader.rs @@ -18,9 +18,9 @@ use std::time; use std::time::SystemTime; use anyhow; +use futures::FutureExt; use futures::future::RemoteHandle; use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; -use futures::FutureExt; use scylla::client::session::Session; use tracing::warn; @@ -72,10 +72,10 @@ impl CDCLogReader { }; let rows_result = result.into_rows_result()?; let value = rows_result.maybe_first_row::<(Option,)>()?; - if let Some((tablets_val,)) = value { - if tablets_val.is_some() { - return Ok(true); - } + if let Some((tablets_val,)) = value + && tablets_val.is_some() + { + return Ok(true); } Ok(false) } @@ -205,7 +205,9 @@ impl CDCReaderWorker { } } else { // Current generation's next generation is not available - warn!("Next generation is not available, some rows in the CDC log table may be unread."); + warn!( + "Next generation is not available, some rows in the CDC log table may be unread." + ); return Ok(()); } } else { @@ -468,8 +470,8 @@ impl CDCLogReaderBuilder { let readers = vec![]; let mut start_timestamp = self.start_timestamp; - if self.should_load_progress { - if let Some(generation) = self + if self.should_load_progress + && let Some(generation) = self .checkpoint_saver .as_ref() .ok_or_else(|| { @@ -477,9 +479,8 @@ impl CDCLogReaderBuilder { })? .load_last_generation() .await? - { - start_timestamp = std::cmp::max(generation.timestamp, start_timestamp); - } + { + start_timestamp = std::cmp::max(generation.timestamp, start_timestamp); } let uses_tablets = CDCLogReader::uses_tablets(&session, &keyspace).await?; @@ -524,16 +525,16 @@ impl Default for CDCLogReaderBuilder { #[cfg(test)] mod tests { + use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; - use std::sync::Arc; use std::time::Duration; use crate::consumer::{CDCRow, Consumer, ConsumerFactory}; use crate::log_reader::CDCLogReaderBuilder; use anyhow::anyhow; use async_trait::async_trait; - use scylla_cdc_test_utils::{now, populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE}; + use scylla_cdc_test_utils::{TEST_TABLE, now, populate_simple_db_with_pk, prepare_simple_db}; struct ErrorConsumer { id: usize, diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index 0e01e6a0..52e62d91 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; -use futures::stream::StreamExt; use futures::FutureExt; -use futures::{future::RemoteHandle, TryStreamExt}; +use futures::stream::StreamExt; +use futures::{TryStreamExt, future::RemoteHandle}; use scylla::client::session::Session; -use scylla::statement::unprepared::Statement; use scylla::statement::Consistency; +use scylla::statement::unprepared::Statement; use scylla::value; use std::sync::Arc; use std::time; @@ -70,7 +70,7 @@ pub(crate) trait GenerationFetcher: Send + Sync + 'static { } } } - } + }; } _ => { warn!("Failed to fetch generation by timestamp"); @@ -741,7 +741,7 @@ mod tests { async fn test_fetch_all_generations(#[case] tablets_enabled: bool) { let fetcher = setup(tablets_enabled).await.unwrap().0; - let correct_gen = vec![ + let correct_generations = vec![ GenerationTimestamp { timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS), }, @@ -750,9 +750,9 @@ mod tests { }, ]; - let gen = fetcher.fetch_all_generations().await.unwrap(); + let generations = fetcher.fetch_all_generations().await.unwrap(); - assert_eq!(gen, correct_gen); + assert_eq!(generations, correct_generations); } #[rstest] @@ -788,13 +788,13 @@ mod tests { for i in 0..timestamps_ms.len() { let timestamp = chrono::Duration::milliseconds(timestamps_ms[i]); - let gen = fetcher + let generations = fetcher .fetch_generation_by_timestamp(×tamp) .await .unwrap(); assert_eq!( - gen, + generations, correct_generations[i].map(|gen_ms| GenerationTimestamp { timestamp: chrono::Duration::milliseconds(gen_ms) }), @@ -809,12 +809,18 @@ mod tests { async fn test_get_next_generation(#[case] tablets_enabled: bool) { let fetcher = setup(tablets_enabled).await.unwrap().0; - let gen = fetcher.fetch_all_generations().await.unwrap(); + let generations = fetcher.fetch_all_generations().await.unwrap(); - let gen_new_next = fetcher.fetch_next_generation(&gen[0]).await.unwrap(); + let gen_new_next = fetcher + .fetch_next_generation(&generations[0]) + .await + .unwrap(); assert!(gen_new_next.is_none()); - let gen_old_next = fetcher.fetch_next_generation(&gen[1]).await.unwrap(); + let gen_old_next = fetcher + .fetch_next_generation(&generations[1]) + .await + .unwrap(); assert_eq!( gen_old_next.unwrap(), GenerationTimestamp { @@ -850,11 +856,11 @@ mod tests { async fn test_do_get_stream_ids(#[case] tablets_enabled: bool) { let fetcher = setup(tablets_enabled).await.unwrap().0; - let gen = GenerationTimestamp { + let generation = GenerationTimestamp { timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS), }; - let stream_ids = fetcher.fetch_stream_ids(&gen).await.unwrap(); + let stream_ids = fetcher.fetch_stream_ids(&generation).await.unwrap(); let stream1 = StreamID::new(hex::decode(TEST_STREAM_1.strip_prefix("0x").unwrap()).unwrap()); diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index d20e3bc2..fe738f40 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -18,7 +18,7 @@ use tokio::time::sleep; use tracing::{enabled, warn}; use crate::cdc_types::{GenerationTimestamp, StreamID}; -use crate::checkpoints::{start_saving_checkpoints, CDCCheckpointSaver, Checkpoint}; +use crate::checkpoints::{CDCCheckpointSaver, Checkpoint, start_saving_checkpoints}; use crate::consumer::{CDCRow, CDCRowSchema, Consumer}; const BASIC_TIMEOUT_SLEEP_MS: u128 = 10; @@ -171,10 +171,10 @@ impl StreamReader { ) .await?; - if let Some(timestamp_to_stop) = self.upper_timestamp.lock().await.as_ref() { - if window_end >= *timestamp_to_stop { - break; - } + if let Some(timestamp_to_stop) = self.upper_timestamp.lock().await.as_ref() + && window_end >= *timestamp_to_stop + { + break; } window_begin = window_end; @@ -300,7 +300,7 @@ mod tests { use scylla::errors::{ExecutionError, PrepareError, RequestAttemptError}; use scylla::statement::unprepared::Statement; use scylla_cdc_test_utils::{ - now, populate_simple_db_with_pk, prepare_simple_db, skip_if_not_supported, TEST_TABLE, + TEST_TABLE, now, populate_simple_db_with_pk, prepare_simple_db, skip_if_not_supported, }; use std::sync::atomic::AtomicIsize; use std::sync::atomic::Ordering::Relaxed;