Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions scylla-cdc/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
use crate::cdc_types::{GenerationTimestamp, StreamID};
use anyhow;
use async_trait::async_trait;
use futures::future::RemoteHandle;
use futures::FutureExt;
use futures::future::RemoteHandle;
use scylla::client::session::Session;
use scylla::statement::prepared::PreparedStatement;
use scylla::value;
Expand Down Expand Up @@ -220,7 +220,7 @@ impl CDCCheckpointSaver for TableBackedCheckpointSaver {
.await?
.into_rows_result()?
.maybe_first_row::<(value::CqlTimestamp,)>()?
.map(|t| chrono::Duration::milliseconds(t.0 .0)))
.map(|t| chrono::Duration::milliseconds(t.0.0)))
}
}

Expand Down Expand Up @@ -259,9 +259,9 @@ mod tests {
.rows_stream::<(StreamID, GenerationTimestamp, value::CqlTimestamp)>()
.unwrap()
.map(|res| {
res.map(|(id, gen, time)| Checkpoint {
res.map(|(id, generation, time)| Checkpoint {
stream_id: id,
generation: gen,
generation,
timestamp: Duration::from_millis(time.0 as u64),
})
})
Expand Down
34 changes: 20 additions & 14 deletions scylla-cdc/src/stream_generations.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::FutureExt;
use futures::{future::RemoteHandle, TryStreamExt};
use futures::stream::StreamExt;
use futures::{TryStreamExt, future::RemoteHandle};
use scylla::client::session::Session;
use scylla::statement::unprepared::Statement;
use scylla::statement::Consistency;
use scylla::statement::unprepared::Statement;
use scylla::value;
use std::sync::Arc;
use std::time;
Expand Down Expand Up @@ -70,7 +70,7 @@ pub(crate) trait GenerationFetcher: Send + Sync + 'static {
}
}
}
}
};
}
_ => {
warn!("Failed to fetch generation by timestamp");
Expand Down Expand Up @@ -741,7 +741,7 @@ mod tests {
async fn test_fetch_all_generations(#[case] tablets_enabled: bool) {
let fetcher = setup(tablets_enabled).await.unwrap().0;

let correct_gen = vec![
let correct_generations = vec![
GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS),
},
Expand All @@ -750,9 +750,9 @@ mod tests {
},
];

let gen = fetcher.fetch_all_generations().await.unwrap();
let generations = fetcher.fetch_all_generations().await.unwrap();

assert_eq!(gen, correct_gen);
assert_eq!(generations, correct_generations);
}

#[rstest]
Expand Down Expand Up @@ -788,13 +788,13 @@ mod tests {
for i in 0..timestamps_ms.len() {
let timestamp = chrono::Duration::milliseconds(timestamps_ms[i]);

let gen = fetcher
let generations = fetcher
.fetch_generation_by_timestamp(&timestamp)
.await
.unwrap();

assert_eq!(
gen,
generations,
correct_generations[i].map(|gen_ms| GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(gen_ms)
}),
Expand All @@ -809,12 +809,18 @@ mod tests {
async fn test_get_next_generation(#[case] tablets_enabled: bool) {
let fetcher = setup(tablets_enabled).await.unwrap().0;

let gen = fetcher.fetch_all_generations().await.unwrap();
let generations = fetcher.fetch_all_generations().await.unwrap();

let gen_new_next = fetcher.fetch_next_generation(&gen[0]).await.unwrap();
let gen_new_next = fetcher
.fetch_next_generation(&generations[0])
.await
.unwrap();
assert!(gen_new_next.is_none());

let gen_old_next = fetcher.fetch_next_generation(&gen[1]).await.unwrap();
let gen_old_next = fetcher
.fetch_next_generation(&generations[1])
.await
.unwrap();
assert_eq!(
gen_old_next.unwrap(),
GenerationTimestamp {
Expand Down Expand Up @@ -850,11 +856,11 @@ mod tests {
async fn test_do_get_stream_ids(#[case] tablets_enabled: bool) {
let fetcher = setup(tablets_enabled).await.unwrap().0;

let gen = GenerationTimestamp {
let generation = GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS),
};

let stream_ids = fetcher.fetch_stream_ids(&gen).await.unwrap();
let stream_ids = fetcher.fetch_stream_ids(&generation).await.unwrap();

let stream1 =
StreamID::new(hex::decode(TEST_STREAM_1.strip_prefix("0x").unwrap()).unwrap());
Expand Down