Skip to content

Commit 0bd274d

Browse files
committed
index: Read from kafka forever when --stream is passed
1 parent 5a382ed commit 0bd274d

File tree

6 files changed

+92
-26
lines changed

6 files changed

+92
-26
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ version = "0.1.0"
44
edition = "2021"
55
authors = ["Tony Solomonik @ [email protected]"]
66

7-
[profile.dev.package.backtrace]
8-
opt-level = 3
9-
107
[profile.release]
118
strip = true
129
lto = true
@@ -22,6 +19,7 @@ clap = { version = "4.5.4", features = ["derive"] }
2219
color-eyre = { version = "0.6.3", default-features = false }
2320
dotenvy = "0.15.7"
2421
futures = "0.3.30"
22+
humantime = "2.1.0"
2523
log = "0.4.21"
2624
once_cell = "1.19.0"
2725
opendal = { version = "0.46.0", features = ["services-fs"] }

src/args.rs

+25
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1+
use std::time::Duration;
2+
13
use clap::Parser;
4+
use humantime::parse_duration;
5+
6+
fn parse_humantime(s: &str) -> Result<Duration, String> {
7+
parse_duration(s).map_err(|e| format!("Failed to parse duration: {}. Please refer to https://docs.rs/humantime/2.1.0/humantime/fn.parse_duration.html", e))
8+
}
29

310
#[derive(Parser, Debug, Clone)]
411
#[command(author, version, about, long_about = None)]
@@ -53,6 +60,24 @@ pub struct IndexArgs {
5360
Read from stdin by not providing any file path.")]
5461
pub input: Option<String>,
5562

63+
#[clap(
64+
short,
65+
long,
66+
help = "Whether to stream from the source without terminating. Will stop only once the source is closed.",
67+
default_value = "false"
68+
)]
69+
pub stream: bool,
70+
71+
#[clap(
72+
long,
73+
help = "How much time to collect docs from the source until an index file should be generated.
74+
Only used when streaming.
75+
Examples: '5s 500ms', '2m 10s'.",
76+
default_value = "30s",
77+
value_parser = parse_humantime
78+
)]
79+
pub commit_interval: Duration,
80+
5681
#[clap(
5782
short,
5883
long,

src/commands/index.rs

+48-15
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tantivy::{
88
schema::{Field, Schema},
99
Index, IndexWriter, TantivyDocument,
1010
};
11-
use tokio::{fs::create_dir_all, task::spawn_blocking};
11+
use tokio::{fs::create_dir_all, select, task::spawn_blocking, time::sleep};
1212

1313
use crate::{
1414
args::IndexArgs,
@@ -42,9 +42,27 @@ async fn pipe_source_to_index(
4242

4343
let mut added = 0;
4444

45+
let commit_timeout = sleep(args.commit_interval);
46+
tokio::pin!(commit_timeout);
47+
4548
'reader_loop: loop {
46-
let Some(mut json_obj) = source.get_one().await? else {
47-
break;
49+
let mut json_obj = if args.stream {
50+
select! {
51+
_ = &mut commit_timeout => {
52+
break;
53+
}
54+
maybe_json_obj = source.get_one() => {
55+
let Some(json_obj) = maybe_json_obj? else {
56+
break;
57+
};
58+
json_obj
59+
}
60+
}
61+
} else {
62+
let Some(json_obj) = source.get_one().await? else {
63+
break;
64+
};
65+
json_obj
4866
};
4967

5068
let mut doc = TantivyDocument::new();
@@ -100,18 +118,33 @@ pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
100118
build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?;
101119
let schema = schema_builder.build();
102120

103-
let mut source = connect_to_source(args.input.as_deref()).await?;
104-
105-
pipe_source_to_index(
106-
&mut source,
107-
schema,
108-
&field_parsers,
109-
dynamic_field,
110-
&args,
111-
&config,
112-
pool,
113-
)
114-
.await?;
121+
let mut source = connect_to_source(args.input.as_deref(), args.stream).await?;
122+
123+
if args.stream {
124+
loop {
125+
pipe_source_to_index(
126+
&mut source,
127+
schema.clone(),
128+
&field_parsers,
129+
dynamic_field,
130+
&args,
131+
&config,
132+
pool,
133+
)
134+
.await?;
135+
}
136+
} else {
137+
pipe_source_to_index(
138+
&mut source,
139+
schema,
140+
&field_parsers,
141+
dynamic_field,
142+
&args,
143+
&config,
144+
pool,
145+
)
146+
.await?;
147+
}
115148

116149
Ok(())
117150
}

src/commands/sources/kafka_source.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ fn run_consumer_thread(consumer: KafkaConsumer, tx: mpsc::Sender<Result<Option<V
9090
}
9191

9292
impl KafkaSource {
93-
pub fn from_url(url: &str) -> Result<Self> {
93+
pub fn from_url(url: &str, stream: bool) -> Result<Self> {
9494
let (servers, topic) = parse_url(url)?;
9595

9696
let log_level = if cfg!(debug_assertions) {
@@ -102,9 +102,13 @@ impl KafkaSource {
102102
let consumer: KafkaConsumer = ClientConfig::new()
103103
.set("bootstrap.servers", servers)
104104
.set("session.timeout.ms", "6000") // Minimum allowed timeout.
105-
.set("auto.offset.reset", "earliest")
105+
.set(
106+
"auto.offset.reset",
107+
// Stream will seek to offset saved in checkpoint in the future.
108+
if stream { "latest" } else { "earliest" },
109+
)
106110
.set("enable.auto.commit", "false")
107-
.set("enable.partition.eof", "true")
111+
.set("enable.partition.eof", (!stream).to_string())
108112
.set("group.id", topic) // Consumer group per topic for now.
109113
.set_log_level(log_level)
110114
.create_with_context(KafkaContext)

src/commands/sources/mod.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,29 @@ mod buf_source;
22
mod kafka_source;
33

44
use async_trait::async_trait;
5-
use color_eyre::Result;
5+
use color_eyre::{eyre::bail, Result};
66

77
use self::{
88
buf_source::BufSource,
99
kafka_source::{KafkaSource, KAFKA_PREFIX},
1010
};
1111

12-
type JsonMap = serde_json::Map<String, serde_json::Value>;
12+
pub type JsonMap = serde_json::Map<String, serde_json::Value>;
1313

1414
#[async_trait]
1515
pub trait Source {
1616
async fn get_one(&mut self) -> Result<Option<JsonMap>>;
1717
}
1818

19-
pub async fn connect_to_source(input: Option<&str>) -> Result<Box<dyn Source>> {
19+
pub async fn connect_to_source(input: Option<&str>, stream: bool) -> Result<Box<dyn Source>> {
2020
Ok(match input {
21-
Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url)?),
22-
Some(path) => Box::new(BufSource::from_path(path).await?),
21+
Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url, stream)?),
22+
Some(path) => {
23+
if stream {
24+
bail!("Streaming from a file is not currently supported.");
25+
}
26+
Box::new(BufSource::from_path(path).await?)
27+
}
2328
None => Box::new(BufSource::from_stdin()),
2429
})
2530
}

0 commit comments

Comments
 (0)