From 1346870da08ef0f86caf4cf25dba521a793d7474 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sat, 22 Jun 2024 20:31:34 +0300 Subject: [PATCH] tests: kafka: Add index abort test on rebalancing when streaming --- src/commands/field_parser.rs | 2 +- src/commands/index.rs | 232 +++++++++++++++------------ src/commands/mod.rs | 2 +- src/commands/sources/kafka_source.rs | 81 ++++++---- src/commands/sources/mod.rs | 6 +- src/unified_index/writer.rs | 4 +- tests/kafka_indexing.rs | 151 ++++++++++++++--- 7 files changed, 316 insertions(+), 162 deletions(-) diff --git a/src/commands/field_parser.rs b/src/commands/field_parser.rs index 26ea70c..848c8f7 100644 --- a/src/commands/field_parser.rs +++ b/src/commands/field_parser.rs @@ -10,7 +10,7 @@ use crate::config::{ escaped_with_parent_name, number::NumberFieldType, FieldConfig, FieldConfigs, FieldType, }; -type ParseFn = Box Result>; +type ParseFn = Box Result + Send + Sync>; enum FieldParserVariation { Value { field: Field, parse_fn: ParseFn }, diff --git a/src/commands/index.rs b/src/commands/index.rs index 0a1bea0..3d4be8f 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -30,128 +30,154 @@ use super::{ DYNAMIC_FIELD_NAME, }; -async fn pipe_source_to_index( - source: &mut Box, +#[derive(Debug, PartialEq, Eq)] +pub enum BatchResult { + Eof, + Timeout, + Restart, +} + +pub struct IndexRunner { + source: Box, schema: Schema, - field_parsers: &[FieldParser], + field_parsers: Vec, dynamic_field: Field, - args: &IndexArgs, - config: &IndexConfig, - pool: &PgPool, -) -> Result { - let id = uuid::Uuid::now_v7().hyphenated().to_string(); - let index_dir = Path::new(&args.build_dir).join(&id); - let _ = create_dir_all(&index_dir).await; - let index = Index::open_or_create(MmapDirectory::open(&index_dir)?, schema)?; - let mut index_writer: IndexWriter = index.writer(args.memory_budget)?; - index_writer.set_merge_policy(Box::new(NoMergePolicy)); - - let mut added = 0; - let mut did_timeout = false; - - let mut commit_timeout_fut: Pin>> = if args.stream { - Box::pin(sleep(args.commit_interval)) - } else { - // Infinite timeout by waiting on a future that never resolves. - Box::pin(pending::<()>()) - }; - - debug!("Piping source -> index of id '{}'", &id); - - 'reader_loop: loop { - let item = select! { - _ = &mut commit_timeout_fut => { - did_timeout = true; - break; - } - item = source.get_one() => { - item? - } - }; + args: IndexArgs, + config: IndexConfig, + pool: PgPool, +} - let mut json_obj = match item { - SourceItem::Document(json_obj) => json_obj, - SourceItem::Close => { - debug!("Source closed for index of id '{}'", &id); - break; - } - SourceItem::Restart => { - debug!("Aborting index of id '{}' with {} documents", &id, added); - if let Err(e) = remove_dir_all(&index_dir).await { - warn!("Failed to remove aborted index of id '{}': {}", &id, e); - } - return Ok(true); - } +impl IndexRunner { + pub async fn new(args: IndexArgs, pool: PgPool) -> Result { + let config = get_index_config(&args.name, &pool).await?; + + let mut schema_builder = Schema::builder(); + let dynamic_field = + schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config()); + let field_parsers = + build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?; + let schema = schema_builder.build(); + + let source = connect_to_source(args.input.as_deref(), args.stream, &pool).await?; + + Ok(Self { + source, + schema, + field_parsers, + dynamic_field, + args, + config, + pool, + }) + } + + /// Read documents from the source and then index them into a new index file. + /// One batch means that one index file is created when calling this function. + /// In batch mode, we simply read from the source until the end. + /// In stream mode, we read from the source until a the --commit-interval timeout is reached. + pub async fn run_one_batch(&mut self) -> Result { + let id = uuid::Uuid::now_v7().hyphenated().to_string(); + let index_dir = Path::new(&self.args.build_dir).join(&id); + let _ = create_dir_all(&index_dir).await; + let index = Index::open_or_create(MmapDirectory::open(&index_dir)?, self.schema.clone())?; + let mut index_writer: IndexWriter = index.writer(self.args.memory_budget)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + + let mut added = 0; + let mut result = BatchResult::Eof; + + let mut commit_timeout_fut: Pin + Send>> = if self.args.stream { + Box::pin(sleep(self.args.commit_interval)) + } else { + // Infinite timeout by waiting on a future that never resolves. + Box::pin(pending::<()>()) }; - let mut doc = TantivyDocument::new(); + debug!("Piping source -> index of id '{}'", &id); - for field_parser in field_parsers { - let name = &field_parser.name; - let Some(json_value) = json_obj.remove(name) else { - debug!("Field '{}' in schema but not found", &name); - continue; + 'reader_loop: loop { + let item = select! { + _ = &mut commit_timeout_fut => { + result = BatchResult::Timeout; + break; + } + item = self.source.get_one() => { + item? + } }; - if let Err(e) = field_parser.add_parsed_field_value(&mut doc, json_value) { - error!( - "Failed to parse '{}' (on {} iteration): {}", - &name, added, e - ); - continue 'reader_loop; + let mut json_obj = match item { + SourceItem::Document(json_obj) => json_obj, + SourceItem::Close => { + debug!("Source closed for index of id '{}'", &id); + break; + } + SourceItem::Restart => { + debug!("Aborting index of id '{}' with {} documents", &id, added); + if let Err(e) = remove_dir_all(&index_dir).await { + warn!("Failed to remove aborted index of id '{}': {}", &id, e); + } + return Ok(BatchResult::Restart); + } + }; + + let mut doc = TantivyDocument::new(); + + for field_parser in &self.field_parsers { + let name = &field_parser.name; + let Some(json_value) = json_obj.remove(name) else { + debug!("Field '{}' in schema but not found", &name); + continue; + }; + + if let Err(e) = field_parser.add_parsed_field_value(&mut doc, json_value) { + error!( + "Failed to parse '{}' (on {} iteration): {}", + &name, added, e + ); + continue 'reader_loop; + } } - } - doc.add_field_value(dynamic_field, json_obj); - index_writer.add_document(doc)?; - added += 1; - } + doc.add_field_value(self.dynamic_field, json_obj); + index_writer.add_document(doc)?; + added += 1; + } - if added == 0 { - debug!("Not writing index: no documents added"); - return Ok(did_timeout); - } + if added == 0 { + debug!("Not writing index: no documents added"); + return Ok(result); + } - info!("Commiting {added} documents"); - index_writer.prepare_commit()?.commit_future().await?; + info!("Commiting {added} documents"); + index_writer.prepare_commit()?.commit_future().await?; - let segment_ids = index.searchable_segment_ids()?; - if segment_ids.len() > 1 { - info!("Merging {} segments", segment_ids.len()); - index_writer.merge(&segment_ids).await?; - } + let segment_ids = index.searchable_segment_ids()?; + if segment_ids.len() > 1 { + info!("Merging {} segments", segment_ids.len()); + index_writer.merge(&segment_ids).await?; + } - spawn_blocking(move || index_writer.wait_merging_threads()).await??; + spawn_blocking(move || index_writer.wait_merging_threads()).await??; - write_unified_index(&id, &index, &index_dir, &config.name, &config.path, pool).await?; + write_unified_index( + &id, + &index, + &index_dir, + &self.config.name, + &self.config.path, + &self.pool, + ) + .await?; - source.on_index_created().await?; + self.source.on_index_created().await?; - Ok(did_timeout) + Ok(result) + } } pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> { - let config = get_index_config(&args.name, pool).await?; - - let mut schema_builder = Schema::builder(); - let dynamic_field = schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config()); - let field_parsers = - build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?; - let schema = schema_builder.build(); - - let mut source = connect_to_source(args.input.as_deref(), args.stream, pool).await?; - - while pipe_source_to_index( - &mut source, - schema.clone(), - &field_parsers, - dynamic_field, - &args, - &config, - pool, - ) - .await? - {} - + let mut runner = IndexRunner::new(args, pool.clone()).await?; + while runner.run_one_batch().await? != BatchResult::Eof {} Ok(()) } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 5258a30..b053f56 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -4,7 +4,7 @@ mod field_parser; pub mod index; pub mod merge; pub mod search; -mod sources; +pub mod sources; use std::{ path::{Path, PathBuf}, diff --git a/src/commands/sources/kafka_source.rs b/src/commands/sources/kafka_source.rs index 79bec98..623ded5 100644 --- a/src/commands/sources/kafka_source.rs +++ b/src/commands/sources/kafka_source.rs @@ -23,6 +23,11 @@ pub const KAFKA_PREFIX: &str = "kafka://"; const CONSUMER_THREAD_MESSAGES_CHANNEL_SIZE: usize = 10; const POLL_DURATION: Duration = Duration::from_secs(1); +struct PostRebalance { + partitions: Vec, + checkpoint_tx: oneshot::Sender)>>, +} + enum MessageFromConsumerThread { Payload { bytes: Vec, @@ -31,10 +36,7 @@ enum MessageFromConsumerThread { }, Eof, PreRebalance, - PostRebalance { - partitions: Vec, - checkpoint_tx: oneshot::Sender)>>, - }, + PostRebalance(PostRebalance), } struct KafkaContext { @@ -52,6 +54,7 @@ impl ConsumerContext for KafkaContext { if self.ignore_rebalance { return; } + debug!("Got revoke event"); if let Err(e) = self .messages_tx @@ -64,6 +67,7 @@ impl ConsumerContext for KafkaContext { if self.ignore_rebalance { return; } + debug!("Got assignment event"); // elements() panics when tpl is empty, so we check capacity. let partitions = if tpl.capacity() > 0 { @@ -79,10 +83,10 @@ impl ConsumerContext for KafkaContext { }; let (tx, rx) = oneshot::channel(); - let msg = MessageFromConsumerThread::PostRebalance { + let msg = MessageFromConsumerThread::PostRebalance(PostRebalance { partitions, checkpoint_tx: tx, - }; + }); if let Err(e) = self.messages_tx.blocking_send(Ok(msg)) { error!("Failed to send post-rebalance event: {e}"); return; @@ -127,7 +131,7 @@ pub struct KafkaSource { partition_to_offset: BTreeMap, } -fn parse_url(url: &str) -> Result<(&str, &str)> { +pub fn parse_url(url: &str) -> Result<(&str, &str)> { if !url.starts_with(KAFKA_PREFIX) { return Err(eyre!("'{}' does not start with {}", url, KAFKA_PREFIX)); } @@ -193,7 +197,7 @@ fn run_consumer_thread( } impl KafkaSource { - pub fn from_url(url: &str, stream: bool, pool: &PgPool) -> Result { + pub async fn from_url(url: &str, stream: bool, pool: &PgPool) -> Result { let (servers, topic) = parse_url(url)?; let log_level = if cfg!(debug_assertions) { @@ -245,11 +249,49 @@ impl KafkaSource { None }; - Ok(Self { + let mut this = Self { messages_rx: rx, checkpoint, partition_to_offset: BTreeMap::new(), - }) + }; + + this.wait_for_assignment() + .await + .context("first message got is not an assignment message")?; + + Ok(this) + } + + async fn wait_for_assignment(&mut self) -> Result<()> { + let Some(msg) = self.messages_rx.recv().await else { + bail!("kafka consumer thread closed") + }; + + let MessageFromConsumerThread::PostRebalance(msg) = msg? else { + bail!("got a non assignment message"); + }; + + self.handle_post_rebalance_msg(msg).await?; + + Ok(()) + } + + async fn handle_post_rebalance_msg(&mut self, msg: PostRebalance) -> Result<()> { + let Some(ref checkpoint) = self.checkpoint else { + return Ok(()); + }; + + let partitions_and_offsets = checkpoint + .load(&msg.partitions) + .await + .context("failed to load checkpoint")?; + if msg.checkpoint_tx.send(partitions_and_offsets).is_err() { + bail!("failed to respond with partition offsets, kafka consumer thread probably closed") + } + + self.partition_to_offset.clear(); + + Ok(()) } } @@ -276,23 +318,8 @@ impl Source for KafkaSource { MessageFromConsumerThread::PreRebalance => { break SourceItem::Restart; } - MessageFromConsumerThread::PostRebalance { - partitions, - checkpoint_tx, - } => { - let Some(ref checkpoint) = self.checkpoint else { - continue; - }; - - let partitions_and_offsets = checkpoint - .load(&partitions) - .await - .context("failed to load checkpoint")?; - if checkpoint_tx.send(partitions_and_offsets).is_err() { - bail!("failed to respond with partition offsets, kafka consumer thread probably closed") - } - - self.partition_to_offset.clear(); + MessageFromConsumerThread::PostRebalance(msg) => { + self.handle_post_rebalance_msg(msg).await?; } } }) diff --git a/src/commands/sources/mod.rs b/src/commands/sources/mod.rs index 9d2dc1f..1cec79b 100644 --- a/src/commands/sources/mod.rs +++ b/src/commands/sources/mod.rs @@ -1,6 +1,6 @@ mod buf_source; mod kafka_checkpoint; -mod kafka_source; +pub mod kafka_source; use async_trait::async_trait; use color_eyre::{eyre::bail, Result}; @@ -38,10 +38,10 @@ pub async fn connect_to_source( input: Option<&str>, stream: bool, pool: &PgPool, -) -> Result> { +) -> Result> { Ok(match input { Some(url) if url.starts_with(KAFKA_PREFIX) => { - Box::new(KafkaSource::from_url(url, stream, pool)?) + Box::new(KafkaSource::from_url(url, stream, pool).await?) } Some(path) => { if stream { diff --git a/src/unified_index/writer.rs b/src/unified_index/writer.rs index 421e1d0..79930d5 100644 --- a/src/unified_index/writer.rs +++ b/src/unified_index/writer.rs @@ -18,7 +18,7 @@ use crate::bincode::bincode_options; use super::{FileCache, IndexFooter}; struct FileReader { - reader: Box, + reader: Box, file_name: PathBuf, } @@ -30,7 +30,7 @@ impl FileReader { )) } - fn new(reader: Box, file_name: PathBuf) -> Self { + fn new(reader: Box, file_name: PathBuf) -> Self { Self { reader, file_name } } } diff --git a/tests/kafka_indexing.rs b/tests/kafka_indexing.rs index 69e1479..82c8b6e 100644 --- a/tests/kafka_indexing.rs +++ b/tests/kafka_indexing.rs @@ -17,16 +17,22 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord, Producer}, ClientConfig, }; +use sqlx::PgPool; use testcontainers::runners::AsyncRunner; use testcontainers_modules::kafka::{Kafka as KafkaContainer, KAFKA_PORT}; use tokio::{ fs::{create_dir_all, remove_dir_all}, - select, - sync::mpsc, + join, select, spawn, + sync::{mpsc, oneshot}, + task::JoinHandle, }; use toshokan::{ args::IndexArgs, - commands::{create::run_create_from_config, index::run_index}, + commands::{ + create::run_create_from_config, + index::{run_index, BatchResult, IndexRunner}, + sources::kafka_source::parse_url, + }, config::IndexConfig, }; @@ -37,7 +43,60 @@ fn init() { test_init(); } -pub async fn watch_for_new_file(dir: &str) -> Result { +async fn produce_logs(url: &str, logs: &str) -> Result<()> { + let (servers, topic) = parse_url(url)?; + + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", servers) + .set("message.timeout.ms", "5000") + .create() + .context("producer creation error")?; + + for log in logs.trim().lines() { + producer + .send( + FutureRecord::to(topic).payload(log).key("key").partition(0), + Duration::from_secs(5), + ) + .await + .unwrap(); + } + producer.flush(Duration::from_secs(5))?; + + Ok(()) +} + +fn spawn_one_index_batch_run( + index_name: String, + kafka_url: String, + pool: PgPool, +) -> (oneshot::Receiver<()>, JoinHandle) { + let (tx, rx) = oneshot::channel(); + + let handle = spawn(async move { + let mut runner = IndexRunner::new( + IndexArgs::parse_from([ + "", + &index_name, + &kafka_url, + "--stream", + "--commit-interval", + "1m", + ]), + pool, + ) + .await + .unwrap(); + + tx.send(()).unwrap(); + + runner.run_one_batch().await.unwrap() + }); + + (rx, handle) +} + +async fn watch_for_new_file(dir: &str) -> Result { let (tx, mut rx) = mpsc::channel(1); let mut watcher = recommended_watcher(move |event| { @@ -86,6 +145,12 @@ async fn test_kafka_index_stream() -> Result<()> { run_create_from_config(&config, &postgres.pool).await?; + produce_logs( + &kafka_url, + include_str!("test_files/hdfs-logs-multitenants-2.json"), + ) + .await?; + let index_stream_fut = run_index( IndexArgs::parse_from([ "", @@ -98,27 +163,6 @@ async fn test_kafka_index_stream() -> Result<()> { &postgres.pool, ); - let producer: FutureProducer = ClientConfig::new() - .set("bootstrap.servers", format!("127.0.0.1:{kafka_port}")) - .set("message.timeout.ms", "5000") - .create() - .context("producer creation error")?; - - let logs = include_str!("test_files/hdfs-logs-multitenants-2.json"); - for log in logs.trim().lines() { - producer - .send( - FutureRecord::to("test_topic") - .payload(log) - .key("key") - .partition(0), - Duration::from_secs(5), - ) - .await - .unwrap(); - } - producer.flush(Duration::from_secs(5))?; - select! { result = watch_for_new_file(&config.path) => { let file_path = result?; @@ -131,3 +175,60 @@ async fn test_kafka_index_stream() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_kafka_index_stream_restart_on_rebalance() -> Result<()> { + let postgres = run_postgres().await?; + + let kafka_container = KafkaContainer::default().start().await?; + let kafka_port = kafka_container.get_host_port_ipv4(KAFKA_PORT).await?; + let kafka_url = format!("kafka://127.0.0.1:{kafka_port}/test_topic"); + + let mut config = IndexConfig::from_str(include_str!("../example_config.yaml"))?; + config.path = "/tmp/toshokan_kafka_stream_rebalance".to_string(); + + // Just in case this path already exists, remove it. + let _ = remove_dir_all(&config.path).await; + create_dir_all(&config.path).await?; + + run_create_from_config(&config, &postgres.pool).await?; + + produce_logs( + &kafka_url, + include_str!("test_files/hdfs-logs-multitenants-2.json"), + ) + .await?; + + let (assignment_rx1, index_handle1) = spawn_one_index_batch_run( + config.name.to_string(), + kafka_url.to_string(), + postgres.pool.clone(), + ); + let (assignment_rx2, index_handle2) = spawn_one_index_batch_run( + config.name.to_string(), + kafka_url.to_string(), + postgres.pool.clone(), + ); + + let (r1, r2) = join!(assignment_rx1, assignment_rx2); + r1?; + r2?; + + let (assignment_rx3, _) = spawn_one_index_batch_run( + config.name.to_string(), + kafka_url.to_string(), + postgres.pool.clone(), + ); + assignment_rx3.await?; + + select! { + result = index_handle1 => { + assert_eq!(result?, BatchResult::Restart); + } + result = index_handle2 => { + assert_eq!(result?, BatchResult::Restart); + } + } + + Ok(()) +}