Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
resolver = "2"
resolver = "3"

members = [
"scylla-cdc",
Expand Down
2 changes: 1 addition & 1 deletion scylla-cdc-printer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "scylla-cdc-printer"
version = "0.1.4"
edition = "2021"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
2 changes: 1 addition & 1 deletion scylla-cdc-printer/src/printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ mod tests {
use std::time;

use scylla_cdc::log_reader::CDCLogReaderBuilder;
use scylla_cdc_test_utils::{now, populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE};
use scylla_cdc_test_utils::{TEST_TABLE, now, populate_simple_db_with_pk, prepare_simple_db};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion scylla-cdc-replicator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "scylla-cdc-replicator"
version = "0.1.4"
edition = "2021"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
2 changes: 1 addition & 1 deletion scylla-cdc-test-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "scylla-cdc-test-utils"
version = "0.4.1"
edition = "2021"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
8 changes: 5 additions & 3 deletions scylla-cdc-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
use scylla::statement::unprepared::Statement;
use scylla::statement::Consistency;
use scylla::statement::unprepared::Statement;

pub const TEST_TABLE: &str = "t";
static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0);
Expand All @@ -22,7 +22,9 @@ pub fn unique_name() -> String {
}

fn get_create_table_query() -> String {
format!("CREATE TABLE IF NOT EXISTS {TEST_TABLE} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};")
format!(
"CREATE TABLE IF NOT EXISTS {TEST_TABLE} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};"
)
}

fn get_create_keyspace_query(
Expand Down
2 changes: 1 addition & 1 deletion scylla-cdc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "scylla-cdc"
version = "0.4.1"
edition = "2021"
edition = "2024"
description = "Library for consuming ScyllaDB CDC log for Rust"
repository = "https://github.com/scylladb/scylla-cdc-rust"
readme = "../README.md"
Expand Down
2 changes: 1 addition & 1 deletion scylla-cdc/src/cdc_types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! A module containing types related to CDC internal structure.
use scylla::deserialize::value::DeserializeValue;
use scylla::frame::response::result::ColumnType;
use scylla::serialize::SerializationError;
use scylla::serialize::value::SerializeValue;
use scylla::serialize::writers::{CellWriter, WrittenCellProof};
use scylla::serialize::SerializationError;
use scylla::value::CqlTimestamp;
use std::fmt;

Expand Down
8 changes: 4 additions & 4 deletions scylla-cdc/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
use crate::cdc_types::{GenerationTimestamp, StreamID};
use anyhow;
use async_trait::async_trait;
use futures::future::RemoteHandle;
use futures::FutureExt;
use futures::future::RemoteHandle;
use scylla::client::session::Session;
use scylla::statement::prepared::PreparedStatement;
use scylla::value;
Expand Down Expand Up @@ -220,7 +220,7 @@ impl CDCCheckpointSaver for TableBackedCheckpointSaver {
.await?
.into_rows_result()?
.maybe_first_row::<(value::CqlTimestamp,)>()?
.map(|t| chrono::Duration::milliseconds(t.0 .0)))
.map(|t| chrono::Duration::milliseconds(t.0.0)))
}
}

Expand Down Expand Up @@ -259,9 +259,9 @@ mod tests {
.rows_stream::<(StreamID, GenerationTimestamp, value::CqlTimestamp)>()
.unwrap()
.map(|res| {
res.map(|(id, gen, time)| Checkpoint {
res.map(|(id, generation, time)| Checkpoint {
stream_id: id,
generation: gen,
generation,
timestamp: Duration::from_millis(time.0 as u64),
})
})
Expand Down
12 changes: 7 additions & 5 deletions scylla-cdc/src/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ mod tests {
use std::sync::Arc;
use std::time;

use anyhow::{bail, Result};
use anyhow::{Result, bail};
use async_trait::async_trait;
use futures::future::RemoteHandle;
use itertools::{repeat_n, Itertools};
use itertools::{Itertools, repeat_n};
use rstest::rstest;
use scylla::client::session::Session;
use scylla::frame::response::result::ColumnType;
use scylla::serialize::SerializationError;
use scylla::serialize::value::SerializeValue;
use scylla::serialize::writers::{CellWriter, WrittenCellProof};
use scylla::serialize::SerializationError;
use scylla::statement::prepared::PreparedStatement;
use scylla::value::CqlValue;
use scylla_cdc_test_utils::{now, prepare_db, skip_if_not_supported};
Expand Down Expand Up @@ -173,9 +173,11 @@ mod tests {
.join(" AND ");

(
format!("CREATE TABLE {table_name} ({pk_definitions}, ck int, v int, primary key (({primary_key_tuple}), ck)) WITH cdc = {{'enabled' : true}}"),
format!(
"CREATE TABLE {table_name} ({pk_definitions}, ck int, v int, primary key (({primary_key_tuple}), ck)) WITH cdc = {{'enabled' : true}}"
),
format!("INSERT INTO {table_name} (v, {primary_key_tuple}, ck) VALUES ({binds}, ?, ?)"),
format!("UPDATE {table_name} SET v = ? WHERE {pk_conditions} AND ck = ?")
format!("UPDATE {table_name} SET v = ? WHERE {pk_conditions} AND ck = ?"),
)
}

Expand Down
27 changes: 14 additions & 13 deletions scylla-cdc/src/log_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use std::time;
use std::time::SystemTime;

use anyhow;
use futures::FutureExt;
use futures::future::RemoteHandle;
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use futures::FutureExt;
use scylla::client::session::Session;
use tracing::warn;

Expand Down Expand Up @@ -72,10 +72,10 @@ impl CDCLogReader {
};
let rows_result = result.into_rows_result()?;
let value = rows_result.maybe_first_row::<(Option<i32>,)>()?;
if let Some((tablets_val,)) = value {
if tablets_val.is_some() {
return Ok(true);
}
if let Some((tablets_val,)) = value
&& tablets_val.is_some()
{
return Ok(true);
}
Ok(false)
}
Expand Down Expand Up @@ -205,7 +205,9 @@ impl CDCReaderWorker {
}
} else {
// Current generation's next generation is not available
warn!("Next generation is not available, some rows in the CDC log table may be unread.");
warn!(
"Next generation is not available, some rows in the CDC log table may be unread."
);
return Ok(());
}
} else {
Expand Down Expand Up @@ -468,18 +470,17 @@ impl CDCLogReaderBuilder {
let readers = vec![];

let mut start_timestamp = self.start_timestamp;
if self.should_load_progress {
if let Some(generation) = self
if self.should_load_progress
&& let Some(generation) = self
.checkpoint_saver
.as_ref()
.ok_or_else(|| {
anyhow::anyhow!("checkpoint_saver is not set, while should_load is true")
})?
.load_last_generation()
.await?
{
start_timestamp = std::cmp::max(generation.timestamp, start_timestamp);
}
{
start_timestamp = std::cmp::max(generation.timestamp, start_timestamp);
}

let uses_tablets = CDCLogReader::uses_tablets(&session, &keyspace).await?;
Expand Down Expand Up @@ -524,16 +525,16 @@ impl Default for CDCLogReaderBuilder {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;

use crate::consumer::{CDCRow, Consumer, ConsumerFactory};
use crate::log_reader::CDCLogReaderBuilder;
use anyhow::anyhow;
use async_trait::async_trait;
use scylla_cdc_test_utils::{now, populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE};
use scylla_cdc_test_utils::{TEST_TABLE, now, populate_simple_db_with_pk, prepare_simple_db};

struct ErrorConsumer {
id: usize,
Expand Down
34 changes: 20 additions & 14 deletions scylla-cdc/src/stream_generations.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::FutureExt;
use futures::{future::RemoteHandle, TryStreamExt};
use futures::stream::StreamExt;
use futures::{TryStreamExt, future::RemoteHandle};
use scylla::client::session::Session;
use scylla::statement::unprepared::Statement;
use scylla::statement::Consistency;
use scylla::statement::unprepared::Statement;
use scylla::value;
use std::sync::Arc;
use std::time;
Expand Down Expand Up @@ -70,7 +70,7 @@ pub(crate) trait GenerationFetcher: Send + Sync + 'static {
}
}
}
}
};
}
_ => {
warn!("Failed to fetch generation by timestamp");
Expand Down Expand Up @@ -741,7 +741,7 @@ mod tests {
async fn test_fetch_all_generations(#[case] tablets_enabled: bool) {
let fetcher = setup(tablets_enabled).await.unwrap().0;

let correct_gen = vec![
let correct_generations = vec![
GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS),
},
Expand All @@ -750,9 +750,9 @@ mod tests {
},
];

let gen = fetcher.fetch_all_generations().await.unwrap();
let generations = fetcher.fetch_all_generations().await.unwrap();

assert_eq!(gen, correct_gen);
assert_eq!(generations, correct_generations);
}

#[rstest]
Expand Down Expand Up @@ -788,13 +788,13 @@ mod tests {
for i in 0..timestamps_ms.len() {
let timestamp = chrono::Duration::milliseconds(timestamps_ms[i]);

let gen = fetcher
let generations = fetcher
.fetch_generation_by_timestamp(&timestamp)
.await
.unwrap();

assert_eq!(
gen,
generations,
correct_generations[i].map(|gen_ms| GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(gen_ms)
}),
Expand All @@ -809,12 +809,18 @@ mod tests {
async fn test_get_next_generation(#[case] tablets_enabled: bool) {
let fetcher = setup(tablets_enabled).await.unwrap().0;

let gen = fetcher.fetch_all_generations().await.unwrap();
let generations = fetcher.fetch_all_generations().await.unwrap();

let gen_new_next = fetcher.fetch_next_generation(&gen[0]).await.unwrap();
let gen_new_next = fetcher
.fetch_next_generation(&generations[0])
.await
.unwrap();
assert!(gen_new_next.is_none());

let gen_old_next = fetcher.fetch_next_generation(&gen[1]).await.unwrap();
let gen_old_next = fetcher
.fetch_next_generation(&generations[1])
.await
.unwrap();
assert_eq!(
gen_old_next.unwrap(),
GenerationTimestamp {
Expand Down Expand Up @@ -850,11 +856,11 @@ mod tests {
async fn test_do_get_stream_ids(#[case] tablets_enabled: bool) {
let fetcher = setup(tablets_enabled).await.unwrap().0;

let gen = GenerationTimestamp {
let generation = GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS),
};

let stream_ids = fetcher.fetch_stream_ids(&gen).await.unwrap();
let stream_ids = fetcher.fetch_stream_ids(&generation).await.unwrap();

let stream1 =
StreamID::new(hex::decode(TEST_STREAM_1.strip_prefix("0x").unwrap()).unwrap());
Expand Down
12 changes: 6 additions & 6 deletions scylla-cdc/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::time::sleep;
use tracing::{enabled, warn};

use crate::cdc_types::{GenerationTimestamp, StreamID};
use crate::checkpoints::{start_saving_checkpoints, CDCCheckpointSaver, Checkpoint};
use crate::checkpoints::{CDCCheckpointSaver, Checkpoint, start_saving_checkpoints};
use crate::consumer::{CDCRow, CDCRowSchema, Consumer};

const BASIC_TIMEOUT_SLEEP_MS: u128 = 10;
Expand Down Expand Up @@ -171,10 +171,10 @@ impl StreamReader {
)
.await?;

if let Some(timestamp_to_stop) = self.upper_timestamp.lock().await.as_ref() {
if window_end >= *timestamp_to_stop {
break;
}
if let Some(timestamp_to_stop) = self.upper_timestamp.lock().await.as_ref()
&& window_end >= *timestamp_to_stop
{
break;
}

window_begin = window_end;
Expand Down Expand Up @@ -300,7 +300,7 @@ mod tests {
use scylla::errors::{ExecutionError, PrepareError, RequestAttemptError};
use scylla::statement::unprepared::Statement;
use scylla_cdc_test_utils::{
now, populate_simple_db_with_pk, prepare_simple_db, skip_if_not_supported, TEST_TABLE,
TEST_TABLE, now, populate_simple_db_with_pk, prepare_simple_db, skip_if_not_supported,
};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering::Relaxed;
Expand Down