diff --git a/Cargo.lock b/Cargo.lock index dbf5797..7f9d84d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3280,6 +3280,7 @@ dependencies = [ "testcontainers-modules", "tokio", "tokio-util", + "toshokan", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 94ef837..1afd3aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,9 @@ lto = true inherits = "release" lto = "thin" +[features] +in-tests = [] + [dependencies] async-trait = "0.1.80" bincode = "1.3.3" @@ -43,3 +46,4 @@ rstest = "0.21.0" tempfile = "3.10.1" testcontainers = "0.17.0" testcontainers-modules = { version = "0.5.0", features = ["postgres", "kafka"] } +toshokan = { path = ".", features = ["in-tests"] } diff --git a/src/commands/index.rs b/src/commands/index.rs index 317742e..4dec5ff 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -49,6 +49,15 @@ pub struct IndexRunner { impl IndexRunner { pub async fn new(args: IndexArgs, pool: PgPool) -> Result { + let source = connect_to_source(args.input.as_deref(), args.stream, &pool).await?; + IndexRunner::new_with_source(args, pool, source).await + } + + pub async fn new_with_source( + args: IndexArgs, + pool: PgPool, + source: Box, + ) -> Result { let config = get_index_config(&args.name, &pool).await?; let mut schema_builder = Schema::builder(); @@ -58,8 +67,6 @@ impl IndexRunner { build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?; let schema = schema_builder.build(); - let source = connect_to_source(args.input.as_deref(), args.stream, &pool).await?; - Ok(Self { source, schema, diff --git a/src/commands/sources/kafka_source.rs b/src/commands/sources/kafka_source.rs index 6f53cd9..adeb92b 100644 --- a/src/commands/sources/kafka_source.rs +++ b/src/commands/sources/kafka_source.rs @@ -23,6 +23,34 @@ pub const KAFKA_PREFIX: &str = "kafka://"; const CONSUMER_THREAD_MESSAGES_CHANNEL_SIZE: usize = 10; const POLL_DURATION: Duration = Duration::from_secs(1); +macro_rules! track_saved_checkpoint_impl { + ( $self:ident, $partitions_and_offsets:ident ) => { + #[cfg(feature = "in-tests")] + { + $self.saved_partitions_and_offsets = $partitions_and_offsets.to_vec(); + } + + #[cfg(not(feature = "in-tests"))] + { + // No-op when not in tests. + } + }; +} + +macro_rules! track_loaded_checkpoint_impl { + ( $self:ident, $partitions_and_offsets:ident ) => { + #[cfg(feature = "in-tests")] + { + $self.loaded_partitions_and_offsets = $partitions_and_offsets.to_vec(); + } + + #[cfg(not(feature = "in-tests"))] + { + // No-op when not in tests. + } + }; +} + struct PostRebalance { partitions: Vec, checkpoint_tx: oneshot::Sender)>>, @@ -129,6 +157,11 @@ pub struct KafkaSource { messages_rx: mpsc::Receiver>, checkpoint: Option, partition_to_offset: BTreeMap, + + #[cfg(feature = "in-tests")] + pub saved_partitions_and_offsets: Vec<(i32, i64)>, + #[cfg(feature = "in-tests")] + pub loaded_partitions_and_offsets: Vec<(i32, Option)>, } pub fn parse_url(url: &str) -> Result<(&str, &str)> { @@ -253,6 +286,11 @@ impl KafkaSource { messages_rx: rx, checkpoint, partition_to_offset: BTreeMap::new(), + + #[cfg(feature = "in-tests")] + saved_partitions_and_offsets: Vec::new(), + #[cfg(feature = "in-tests")] + loaded_partitions_and_offsets: Vec::new(), }; this.wait_for_assignment() @@ -285,6 +323,9 @@ impl KafkaSource { .load(&msg.partitions) .await .context("failed to load checkpoint")?; + + self.track_loaded_checkpoint(&partitions_and_offsets); + if msg.checkpoint_tx.send(partitions_and_offsets).is_err() { bail!("failed to respond with partition offsets, kafka consumer thread probably closed") } @@ -293,6 +334,14 @@ impl KafkaSource { Ok(()) } + + fn track_saved_checkpoint(&mut self, partitions_and_offsets: Vec<(i32, i64)>) { + track_saved_checkpoint_impl!(self, partitions_and_offsets); + } + + fn track_loaded_checkpoint(&mut self, partitions_and_offsets: &[(i32, Option)]) { + track_loaded_checkpoint_impl!(self, partitions_and_offsets); + } } #[async_trait] @@ -338,6 +387,8 @@ impl Source for KafkaSource { .collect::>(); checkpoint.save(&flat).await?; + self.track_saved_checkpoint(flat); + self.partition_to_offset.clear(); Ok(()) diff --git a/tests/kafka_indexing.rs b/tests/kafka_indexing.rs index e24fb7d..996e39a 100644 --- a/tests/kafka_indexing.rs +++ b/tests/kafka_indexing.rs @@ -3,9 +3,11 @@ mod common; use std::{ path::{Path, PathBuf}, str::FromStr, + sync::Arc, time::Duration, }; +use async_trait::async_trait; use clap::Parser; use color_eyre::{ eyre::{bail, Context}, @@ -13,10 +15,7 @@ use color_eyre::{ }; use ctor::ctor; use log::debug; -use notify::{ - event::{CreateKind, RemoveKind}, - recommended_watcher, Event, EventKind, RecursiveMode, Watcher, -}; +use notify::{event::CreateKind, recommended_watcher, Event, EventKind, RecursiveMode, Watcher}; use rdkafka::{ producer::{FutureProducer, FutureRecord, Producer}, ClientConfig, @@ -27,7 +26,7 @@ use testcontainers_modules::kafka::{Kafka as KafkaContainer, KAFKA_PORT}; use tokio::{ fs::{create_dir_all, remove_dir_all}, join, select, spawn, - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, Mutex}, task::JoinHandle, }; use toshokan::{ @@ -35,7 +34,10 @@ use toshokan::{ commands::{ create::run_create_from_config, index::{run_index, BatchResult, IndexRunner}, - sources::kafka_source::parse_url, + sources::{ + kafka_source::{parse_url, KafkaSource}, + Source, SourceItem, + }, }, config::IndexConfig, }; @@ -242,3 +244,81 @@ async fn test_kafka_index_stream_restart_on_rebalance() -> Result<()> { Ok(()) } + +struct ArcSource(Arc>); + +#[async_trait] +impl Source for ArcSource { + async fn get_one(&mut self) -> Result { + self.0.clone().lock_owned().await.get_one().await + } + + async fn on_index_created(&mut self) -> Result<()> { + self.0.clone().lock_owned().await.on_index_created().await + } +} + +#[tokio::test] +async fn test_kafka_index_stream_load_checkpoint() -> Result<()> { + let postgres = run_postgres().await?; + + let kafka_container = KafkaContainer::default().start().await?; + let kafka_port = kafka_container.get_host_port_ipv4(KAFKA_PORT).await?; + let kafka_url = format!("kafka://127.0.0.1:{kafka_port}/test_topic"); + + let mut config = IndexConfig::from_str(include_str!("../example_config.yaml"))?; + config.path = "/tmp/toshokan_kafka_stream_load_checkpoint".to_string(); + + // Just in case this path already exists, remove it. + let _ = remove_dir_all(&config.path).await; + create_dir_all(&config.path).await?; + + run_create_from_config(&config, &postgres.pool).await?; + + produce_logs( + &kafka_url, + include_str!("test_files/hdfs-logs-multitenants-2.json"), + ) + .await?; + + // Save checkpoint. + { + let kafka_source = Arc::new(Mutex::new( + KafkaSource::from_url(&kafka_url, true, &postgres.pool).await?, + )); + + assert_eq!( + kafka_source.lock().await.loaded_partitions_and_offsets, + vec![(0, None)] + ); + + let mut runner = IndexRunner::new_with_source( + IndexArgs::parse_from([ + "", + &config.name, + &kafka_url, + "--stream", + "--commit-interval", + "500ms", + ]), + postgres.pool.clone(), + Box::new(ArcSource(kafka_source.clone())), + ) + .await?; + + while kafka_source.lock().await.saved_partitions_and_offsets != vec![(0, 2)] { + assert_eq!(runner.run_one_batch().await?, BatchResult::Timeout); + } + } + + // Load checkpoint. + { + let kafka_source = KafkaSource::from_url(&kafka_url, true, &postgres.pool).await?; + assert_eq!( + kafka_source.loaded_partitions_and_offsets, + vec![(0, Some(2))] + ); + } + + Ok(()) +}