Skip to content

Commit

Permalink
Fix seek-rewind when using nanosecond timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Oct 22, 2024
1 parent 1310558 commit a01e95c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
12 changes: 10 additions & 2 deletions sea-streamer-redis/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct ConsumerConfig {
consumer_id: Option<ConsumerId>,
auto_ack: bool,
pre_fetch: bool,
timestamp_format: TimestampFormat,
}

/// More constants used throughout SeaStreamer Redis.
Expand All @@ -86,8 +87,15 @@ impl Consumer for RedisConsumer {

#[inline]
async fn seek(&mut self, ts: Timestamp) -> RedisResult<()> {
self.seek_to(((ts.unix_timestamp_nanos() / 1_000_000) as u64, u16::MAX))
.await
self.seek_to((
(match self.config.timestamp_format {
TimestampFormat::UnixTimestampMillis => ts.unix_timestamp_nanos() / 1_000_000,
#[cfg(feature = "nanosecond-timestamp")]
TimestampFormat::UnixTimestampNanos => ts.unix_timestamp_nanos(),
}) as u64,
u16::MAX,
))
.await
}

#[inline]
Expand Down
1 change: 1 addition & 0 deletions sea-streamer-redis/src/consumer/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl From<&RedisConsumerOptions> for ConsumerConfig {
consumer_id: options.consumer_id().cloned(),
auto_ack: options.auto_commit() == &AutoCommit::Delayed,
pre_fetch: options.pre_fetch(),
timestamp_format: options.timestamp_format,
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion sea-streamer-redis/tests/seek-rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ async fn main() -> anyhow::Result<()> {
async fn test(consumer_mode: ConsumerMode, batch_size: usize) -> anyhow::Result<()> {
println!("ConsumerMode = {consumer_mode:?}; batch_size = {batch_size} ...");

let options = RedisConnectOptions::default();
let mut options = RedisConnectOptions::default();
#[cfg(feature = "nanosecond-timestamp")]
options.set_timestamp_format(sea_streamer_redis::TimestampFormat::UnixTimestampNanos);
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
Expand Down

0 comments on commit a01e95c

Please sign in to comment.