From df3308202c4a56a6e7f6624b1b6b6158545fd10a Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sat, 15 Jun 2024 20:18:38 +0300 Subject: [PATCH] index: Read from kafka forever when `--stream` is passed --- Cargo.lock | 1 + Cargo.toml | 4 +- src/args.rs | 25 +++++++++++ src/commands/index.rs | 63 +++++++++++++++++++++------- src/commands/sources/kafka_source.rs | 10 +++-- src/commands/sources/mod.rs | 22 +++++++--- 6 files changed, 98 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 748e671..d89450b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,6 +3171,7 @@ dependencies = [ "ctor", "dotenvy", "futures", + "humantime", "log", "once_cell", "opendal", diff --git a/Cargo.toml b/Cargo.toml index 414e260..854b9f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,9 +4,6 @@ version = "0.1.0" edition = "2021" authors = ["Tony Solomonik @ tony.solomonik@gmail.com"] -[profile.dev.package.backtrace] -opt-level = 3 - [profile.release] strip = true lto = true @@ -22,6 +19,7 @@ clap = { version = "4.5.4", features = ["derive"] } color-eyre = { version = "0.6.3", default-features = false } dotenvy = "0.15.7" futures = "0.3.30" +humantime = "2.1.0" log = "0.4.21" once_cell = "1.19.0" opendal = { version = "0.46.0", features = ["services-fs"] } diff --git a/src/args.rs b/src/args.rs index a72e27a..06a3e9c 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,4 +1,11 @@ +use std::time::Duration; + use clap::Parser; +use humantime::parse_duration; + +fn parse_humantime(s: &str) -> Result { + parse_duration(s).map_err(|e| format!("Failed to parse duration: {}. Please refer to https://docs.rs/humantime/2.1.0/humantime/fn.parse_duration.html", e)) +} #[derive(Parser, Debug, Clone)] #[command(author, version, about, long_about = None)] @@ -53,6 +60,24 @@ pub struct IndexArgs { Read from stdin by not providing any file path.")] pub input: Option, + #[clap( + short, + long, + help = "Whether to stream from the source without terminating. Will stop only once the source is closed.", + default_value = "false" + )] + pub stream: bool, + + #[clap( + long, + help = "How much time to collect docs from the source until an index file should be generated. +Only used when streaming. +Examples: '5s 500ms', '2m 10s'.", + default_value = "30s", + value_parser = parse_humantime + )] + pub commit_interval: Duration, + #[clap( short, long, diff --git a/src/commands/index.rs b/src/commands/index.rs index d7571ac..4a374c0 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -8,7 +8,7 @@ use tantivy::{ schema::{Field, Schema}, Index, IndexWriter, TantivyDocument, }; -use tokio::{fs::create_dir_all, task::spawn_blocking}; +use tokio::{fs::create_dir_all, select, task::spawn_blocking, time::sleep}; use crate::{ args::IndexArgs, @@ -42,9 +42,27 @@ async fn pipe_source_to_index( let mut added = 0; + let commit_timeout = sleep(args.commit_interval); + tokio::pin!(commit_timeout); + 'reader_loop: loop { - let Some(mut json_obj) = source.get_one().await? else { - break; + let mut json_obj = if args.stream { + select! { + _ = &mut commit_timeout => { + break; + } + maybe_json_obj = source.get_one() => { + let Some(json_obj) = maybe_json_obj? else { + break; + }; + json_obj + } + } + } else { + let Some(json_obj) = source.get_one().await? else { + break; + }; + json_obj }; let mut doc = TantivyDocument::new(); @@ -100,18 +118,33 @@ pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> { build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?; let schema = schema_builder.build(); - let mut source = connect_to_source(args.input.as_deref()).await?; - - pipe_source_to_index( - &mut source, - schema, - &field_parsers, - dynamic_field, - &args, - &config, - pool, - ) - .await?; + let mut source = connect_to_source(args.input.as_deref(), args.stream).await?; + + if args.stream { + loop { + pipe_source_to_index( + &mut source, + schema.clone(), + &field_parsers, + dynamic_field, + &args, + &config, + pool, + ) + .await?; + } + } else { + pipe_source_to_index( + &mut source, + schema, + &field_parsers, + dynamic_field, + &args, + &config, + pool, + ) + .await?; + } Ok(()) } diff --git a/src/commands/sources/kafka_source.rs b/src/commands/sources/kafka_source.rs index abf4f74..7324ee1 100644 --- a/src/commands/sources/kafka_source.rs +++ b/src/commands/sources/kafka_source.rs @@ -90,7 +90,7 @@ fn run_consumer_thread(consumer: KafkaConsumer, tx: mpsc::Sender Result { + pub fn from_url(url: &str, stream: bool) -> Result { let (servers, topic) = parse_url(url)?; let log_level = if cfg!(debug_assertions) { @@ -102,9 +102,13 @@ impl KafkaSource { let consumer: KafkaConsumer = ClientConfig::new() .set("bootstrap.servers", servers) .set("session.timeout.ms", "6000") // Minimum allowed timeout. - .set("auto.offset.reset", "earliest") + .set( + "auto.offset.reset", + // Stream will seek to offset saved in checkpoint in the future. + if stream { "latest" } else { "earliest" }, + ) .set("enable.auto.commit", "false") - .set("enable.partition.eof", "true") + .set("enable.partition.eof", (!stream).to_string()) .set("group.id", topic) // Consumer group per topic for now. .set_log_level(log_level) .create_with_context(KafkaContext) diff --git a/src/commands/sources/mod.rs b/src/commands/sources/mod.rs index 9eaf866..f654b40 100644 --- a/src/commands/sources/mod.rs +++ b/src/commands/sources/mod.rs @@ -2,24 +2,34 @@ mod buf_source; mod kafka_source; use async_trait::async_trait; -use color_eyre::Result; +use color_eyre::{eyre::bail, Result}; use self::{ buf_source::BufSource, kafka_source::{KafkaSource, KAFKA_PREFIX}, }; -type JsonMap = serde_json::Map; +pub type JsonMap = serde_json::Map; #[async_trait] pub trait Source { async fn get_one(&mut self) -> Result>; } -pub async fn connect_to_source(input: Option<&str>) -> Result> { +pub async fn connect_to_source(input: Option<&str>, stream: bool) -> Result> { Ok(match input { - Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url)?), - Some(path) => Box::new(BufSource::from_path(path).await?), - None => Box::new(BufSource::from_stdin()), + Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url, stream)?), + Some(path) => { + if stream { + bail!("Streaming from a file is not currently supported."); + } + Box::new(BufSource::from_path(path).await?) + } + None => { + if stream { + bail!("Streaming from stdin is not supported."); + } + Box::new(BufSource::from_stdin()) + } }) }