Skip to content

Commit

Permalink
index: Read from kafka forever when --stream is passed
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 15, 2024
1 parent b20751c commit df33082
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ version = "0.1.0"
edition = "2021"
authors = ["Tony Solomonik @ [email protected]"]

[profile.dev.package.backtrace]
opt-level = 3

[profile.release]
strip = true
lto = true
Expand All @@ -22,6 +19,7 @@ clap = { version = "4.5.4", features = ["derive"] }
color-eyre = { version = "0.6.3", default-features = false }
dotenvy = "0.15.7"
futures = "0.3.30"
humantime = "2.1.0"
log = "0.4.21"
once_cell = "1.19.0"
opendal = { version = "0.46.0", features = ["services-fs"] }
Expand Down
25 changes: 25 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::time::Duration;

use clap::Parser;
use humantime::parse_duration;

fn parse_humantime(s: &str) -> Result<Duration, String> {
parse_duration(s).map_err(|e| format!("Failed to parse duration: {}. Please refer to https://docs.rs/humantime/2.1.0/humantime/fn.parse_duration.html", e))
}

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -53,6 +60,24 @@ pub struct IndexArgs {
Read from stdin by not providing any file path.")]
pub input: Option<String>,

#[clap(
short,
long,
help = "Whether to stream from the source without terminating. Will stop only once the source is closed.",
default_value = "false"
)]
pub stream: bool,

#[clap(
long,
help = "How much time to collect docs from the source until an index file should be generated.
Only used when streaming.
Examples: '5s 500ms', '2m 10s'.",
default_value = "30s",
value_parser = parse_humantime
)]
pub commit_interval: Duration,

#[clap(
short,
long,
Expand Down
63 changes: 48 additions & 15 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tantivy::{
schema::{Field, Schema},
Index, IndexWriter, TantivyDocument,
};
use tokio::{fs::create_dir_all, task::spawn_blocking};
use tokio::{fs::create_dir_all, select, task::spawn_blocking, time::sleep};

use crate::{
args::IndexArgs,
Expand Down Expand Up @@ -42,9 +42,27 @@ async fn pipe_source_to_index(

let mut added = 0;

let commit_timeout = sleep(args.commit_interval);
tokio::pin!(commit_timeout);

'reader_loop: loop {
let Some(mut json_obj) = source.get_one().await? else {
break;
let mut json_obj = if args.stream {
select! {
_ = &mut commit_timeout => {
break;
}
maybe_json_obj = source.get_one() => {
let Some(json_obj) = maybe_json_obj? else {
break;
};
json_obj
}
}
} else {
let Some(json_obj) = source.get_one().await? else {
break;
};
json_obj
};

let mut doc = TantivyDocument::new();
Expand Down Expand Up @@ -100,18 +118,33 @@ pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?;
let schema = schema_builder.build();

let mut source = connect_to_source(args.input.as_deref()).await?;

pipe_source_to_index(
&mut source,
schema,
&field_parsers,
dynamic_field,
&args,
&config,
pool,
)
.await?;
let mut source = connect_to_source(args.input.as_deref(), args.stream).await?;

if args.stream {
loop {
pipe_source_to_index(
&mut source,
schema.clone(),
&field_parsers,
dynamic_field,
&args,
&config,
pool,
)
.await?;
}
} else {
pipe_source_to_index(
&mut source,
schema,
&field_parsers,
dynamic_field,
&args,
&config,
pool,
)
.await?;
}

Ok(())
}
10 changes: 7 additions & 3 deletions src/commands/sources/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn run_consumer_thread(consumer: KafkaConsumer, tx: mpsc::Sender<Result<Option<V
}

impl KafkaSource {
pub fn from_url(url: &str) -> Result<Self> {
pub fn from_url(url: &str, stream: bool) -> Result<Self> {
let (servers, topic) = parse_url(url)?;

let log_level = if cfg!(debug_assertions) {
Expand All @@ -102,9 +102,13 @@ impl KafkaSource {
let consumer: KafkaConsumer = ClientConfig::new()
.set("bootstrap.servers", servers)
.set("session.timeout.ms", "6000") // Minimum allowed timeout.
.set("auto.offset.reset", "earliest")
.set(
"auto.offset.reset",
// Stream will seek to offset saved in checkpoint in the future.
if stream { "latest" } else { "earliest" },
)
.set("enable.auto.commit", "false")
.set("enable.partition.eof", "true")
.set("enable.partition.eof", (!stream).to_string())
.set("group.id", topic) // Consumer group per topic for now.
.set_log_level(log_level)
.create_with_context(KafkaContext)
Expand Down
22 changes: 16 additions & 6 deletions src/commands/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,34 @@ mod buf_source;
mod kafka_source;

use async_trait::async_trait;
use color_eyre::Result;
use color_eyre::{eyre::bail, Result};

use self::{
buf_source::BufSource,
kafka_source::{KafkaSource, KAFKA_PREFIX},
};

type JsonMap = serde_json::Map<String, serde_json::Value>;
pub type JsonMap = serde_json::Map<String, serde_json::Value>;

#[async_trait]
pub trait Source {
async fn get_one(&mut self) -> Result<Option<JsonMap>>;
}

pub async fn connect_to_source(input: Option<&str>) -> Result<Box<dyn Source>> {
pub async fn connect_to_source(input: Option<&str>, stream: bool) -> Result<Box<dyn Source>> {
Ok(match input {
Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url)?),
Some(path) => Box::new(BufSource::from_path(path).await?),
None => Box::new(BufSource::from_stdin()),
Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url, stream)?),
Some(path) => {
if stream {
bail!("Streaming from a file is not currently supported.");
}
Box::new(BufSource::from_path(path).await?)
}
None => {
if stream {
bail!("Streaming from stdin is not supported.");
}
Box::new(BufSource::from_stdin())
}
})
}

0 comments on commit df33082

Please sign in to comment.