Skip to content

Commit c010a82

Browse files
committed
index: kafka: stream: Commit index files in the background
Achieves better throughput when indexing, as we continue to read from the source, even while committing an index file.
1 parent 224ee12 commit c010a82

File tree

6 files changed

+115
-37
lines changed

6 files changed

+115
-37
lines changed

src/commands/index.rs

+55-12
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tantivy::{
1111
};
1212
use tokio::{
1313
fs::{create_dir_all, remove_dir_all},
14-
select,
14+
select, spawn,
1515
task::spawn_blocking,
1616
time::sleep,
1717
};
@@ -26,8 +26,8 @@ use crate::{
2626
};
2727

2828
use super::{
29-
dynamic_field_config, field_parser::FieldParser, get_index_config, write_unified_index,
30-
DYNAMIC_FIELD_NAME,
29+
dynamic_field_config, field_parser::FieldParser, get_index_config, sources::CheckpointCommiter,
30+
write_unified_index, DYNAMIC_FIELD_NAME,
3131
};
3232

3333
#[derive(Debug, PartialEq, Eq)]
@@ -37,6 +37,13 @@ pub enum BatchResult {
3737
Restart,
3838
}
3939

40+
struct IndexCommiter {
41+
index_name: String,
42+
index_path: String,
43+
pool: PgPool,
44+
checkpoint_commiter: Option<Box<dyn CheckpointCommiter + Send>>,
45+
}
46+
4047
pub struct IndexRunner {
4148
source: Box<dyn Source + Send + Sync>,
4249
schema: Schema,
@@ -87,7 +94,7 @@ impl IndexRunner {
8794
let index_dir = Path::new(&self.args.build_dir).join(&id);
8895
let _ = create_dir_all(&index_dir).await;
8996
let index = Index::open_or_create(MmapDirectory::open(&index_dir)?, self.schema.clone())?;
90-
let mut index_writer: IndexWriter = index.writer(self.args.memory_budget)?;
97+
let index_writer: IndexWriter = index.writer(self.args.memory_budget)?;
9198
index_writer.set_merge_policy(Box::new(NoMergePolicy));
9299

93100
let mut added = 0;
@@ -160,6 +167,40 @@ impl IndexRunner {
160167
}
161168

162169
info!("Commiting {added} documents");
170+
171+
let commiter = self.index_commiter().await;
172+
if self.args.stream && result != BatchResult::Eof {
173+
// Commit in the background to not interfere with next batch in stream.
174+
spawn(async move {
175+
if let Err(e) =
176+
Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await
177+
{
178+
error!("Failed to commit index of id '{}': {e}", &id);
179+
}
180+
});
181+
} else {
182+
Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await?;
183+
}
184+
185+
Ok(result)
186+
}
187+
188+
async fn index_commiter(&mut self) -> IndexCommiter {
189+
IndexCommiter {
190+
index_name: self.config.name.clone(),
191+
index_path: self.config.path.clone(),
192+
pool: self.pool.clone(),
193+
checkpoint_commiter: self.source.get_checkpoint_commiter().await,
194+
}
195+
}
196+
197+
async fn commit_index(
198+
commiter: IndexCommiter,
199+
id: &str,
200+
index: &Index,
201+
mut index_writer: IndexWriter,
202+
input_dir: &Path,
203+
) -> Result<()> {
163204
index_writer.prepare_commit()?.commit_future().await?;
164205

165206
let segment_ids = index.searchable_segment_ids()?;
@@ -171,18 +212,20 @@ impl IndexRunner {
171212
spawn_blocking(move || index_writer.wait_merging_threads()).await??;
172213

173214
write_unified_index(
174-
&id,
175-
&index,
176-
&index_dir,
177-
&self.config.name,
178-
&self.config.path,
179-
&self.pool,
215+
id,
216+
index,
217+
input_dir,
218+
&commiter.index_name,
219+
&commiter.index_path,
220+
&commiter.pool,
180221
)
181222
.await?;
182223

183-
self.source.on_index_created().await?;
224+
if let Some(checkpoint_commiter) = commiter.checkpoint_commiter {
225+
checkpoint_commiter.commit().await?;
226+
}
184227

185-
Ok(result)
228+
Ok(())
186229
}
187230
}
188231

src/commands/sources/buf_source.rs

-4
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,4 @@ impl Source for BufSource {
4747
self.line.clear();
4848
Ok(SourceItem::Document(map))
4949
}
50-
51-
async fn on_index_created(&mut self) -> Result<()> {
52-
Ok(())
53-
}
5450
}

src/commands/sources/kafka_checkpoint.rs

+32-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
use std::collections::BTreeMap;
22

3+
use async_trait::async_trait;
34
use color_eyre::Result;
45
use sqlx::PgPool;
56

7+
use super::CheckpointCommiter;
8+
69
#[derive(Debug, Clone)]
7-
pub struct Checkpoint {
10+
pub struct KafkaCheckpoint {
811
source_id: String,
912
pool: PgPool,
1013
}
1114

12-
impl Checkpoint {
15+
#[derive(Debug)]
16+
pub struct KafkaCheckpointCommiter {
17+
checkpoint: KafkaCheckpoint,
18+
partitions_and_offsets: Vec<(i32, i64)>,
19+
}
20+
21+
impl KafkaCheckpoint {
1322
pub fn new(source_id: String, pool: PgPool) -> Self {
1423
Self { source_id, pool }
1524
}
@@ -74,4 +83,25 @@ impl Checkpoint {
7483

7584
Ok(())
7685
}
86+
87+
pub fn commiter(self, partitions_and_offsets: Vec<(i32, i64)>) -> KafkaCheckpointCommiter {
88+
KafkaCheckpointCommiter::new(self, partitions_and_offsets)
89+
}
90+
}
91+
92+
impl KafkaCheckpointCommiter {
93+
pub fn new(checkpoint: KafkaCheckpoint, partitions_and_offsets: Vec<(i32, i64)>) -> Self {
94+
Self {
95+
checkpoint,
96+
partitions_and_offsets,
97+
}
98+
}
99+
}
100+
101+
#[async_trait]
102+
impl CheckpointCommiter for KafkaCheckpointCommiter {
103+
async fn commit(&self) -> Result<()> {
104+
self.checkpoint.save(&self.partitions_and_offsets).await?;
105+
Ok(())
106+
}
77107
}

src/commands/sources/kafka_source.rs

+8-13
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tokio::{
1717
task::spawn_blocking,
1818
};
1919

20-
use super::{kafka_checkpoint::Checkpoint, Source, SourceItem};
20+
use super::{kafka_checkpoint::KafkaCheckpoint, CheckpointCommiter, Source, SourceItem};
2121

2222
pub const KAFKA_PREFIX: &str = "kafka://";
2323
const CONSUMER_THREAD_MESSAGES_CHANNEL_SIZE: usize = 10;
@@ -155,7 +155,7 @@ type KafkaConsumer = BaseConsumer<KafkaContext>;
155155

156156
pub struct KafkaSource {
157157
messages_rx: mpsc::Receiver<Result<MessageFromConsumerThread>>,
158-
checkpoint: Option<Checkpoint>,
158+
checkpoint: Option<KafkaCheckpoint>,
159159
partition_to_offset: BTreeMap<i32, i64>,
160160

161161
#[cfg(feature = "in-tests")]
@@ -277,7 +277,7 @@ impl KafkaSource {
277277

278278
let checkpoint = if stream {
279279
// Url is not a good identifier as a source id, but we'll live with it for now.
280-
Some(Checkpoint::new(url.to_string(), pool.clone()))
280+
Some(KafkaCheckpoint::new(url.to_string(), pool.clone()))
281281
} else {
282282
None
283283
};
@@ -335,7 +335,7 @@ impl KafkaSource {
335335
Ok(())
336336
}
337337

338-
fn track_saved_checkpoint(&mut self, partitions_and_offsets: Vec<(i32, i64)>) {
338+
fn track_saved_checkpoint(&mut self, partitions_and_offsets: &[(i32, i64)]) {
339339
track_saved_checkpoint_impl!(self, partitions_and_offsets);
340340
}
341341

@@ -374,23 +374,18 @@ impl Source for KafkaSource {
374374
})
375375
}
376376

377-
async fn on_index_created(&mut self) -> Result<()> {
378-
let Some(ref checkpoint) = self.checkpoint else {
379-
return Ok(());
380-
};
377+
async fn get_checkpoint_commiter(&mut self) -> Option<Box<dyn CheckpointCommiter + Send>> {
378+
let checkpoint = self.checkpoint.clone()?;
381379

382380
let flat = self
383381
.partition_to_offset
384382
.iter()
385383
// Add 1 as we don't want to seek to the last record already read, but the next.
386384
.map(|(p, o)| (*p, *o + 1))
387385
.collect::<Vec<_>>();
388-
checkpoint.save(&flat).await?;
389-
390-
self.track_saved_checkpoint(flat);
391-
386+
self.track_saved_checkpoint(&flat);
392387
self.partition_to_offset.clear();
393388

394-
Ok(())
389+
Some(Box::new(checkpoint.commiter(flat)))
395390
}
396391
}

src/commands/sources/mod.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,18 @@ pub trait Source {
2929
/// Get a document from the source.
3030
async fn get_one(&mut self) -> Result<SourceItem>;
3131

32-
/// Called when an index file was just created to notify the source.
33-
/// Useful for implementing checkpoints for example.
34-
async fn on_index_created(&mut self) -> Result<()>;
32+
/// If the source supports checkpointing, it creates a checkpoint commiter that stores a
33+
/// snapshot of the last read state. Once the indexer has successfuly commited and uploaded
34+
/// the new index file, it tells the checkpoint commiter to commit the snapshot.
35+
async fn get_checkpoint_commiter(&mut self) -> Option<Box<dyn CheckpointCommiter + Send>> {
36+
None
37+
}
38+
}
39+
40+
#[async_trait]
41+
pub trait CheckpointCommiter {
42+
/// Commit the stored state snapshot.
43+
async fn commit(&self) -> Result<()>;
3544
}
3645

3746
pub async fn connect_to_source(

tests/kafka_indexing.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use toshokan::{
3636
index::{run_index, BatchResult, IndexRunner},
3737
sources::{
3838
kafka_source::{parse_url, KafkaSource},
39-
Source, SourceItem,
39+
CheckpointCommiter, Source, SourceItem,
4040
},
4141
},
4242
config::IndexConfig,
@@ -253,8 +253,13 @@ impl Source for ArcSource {
253253
self.0.clone().lock_owned().await.get_one().await
254254
}
255255

256-
async fn on_index_created(&mut self) -> Result<()> {
257-
self.0.clone().lock_owned().await.on_index_created().await
256+
async fn get_checkpoint_commiter(&mut self) -> Option<Box<dyn CheckpointCommiter + Send>> {
257+
self.0
258+
.clone()
259+
.lock_owned()
260+
.await
261+
.get_checkpoint_commiter()
262+
.await
258263
}
259264
}
260265

0 commit comments

Comments
 (0)