Skip to content

Commit

Permalink
index: kafka: stream: Commit index files in the background
Browse files Browse the repository at this point in the history
Achieves better throughput when indexing, as we continue to read from the
source, even while commiting an index file.
  • Loading branch information
tontinton committed Jun 26, 2024
1 parent 224ee12 commit ffc7031
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 37 deletions.
67 changes: 55 additions & 12 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tantivy::{
};
use tokio::{
fs::{create_dir_all, remove_dir_all},
select,
select, spawn,
task::spawn_blocking,
time::sleep,
};
Expand All @@ -26,8 +26,8 @@ use crate::{
};

use super::{
dynamic_field_config, field_parser::FieldParser, get_index_config, write_unified_index,
DYNAMIC_FIELD_NAME,
dynamic_field_config, field_parser::FieldParser, get_index_config, sources::CheckpointCommiter,
write_unified_index, DYNAMIC_FIELD_NAME,
};

#[derive(Debug, PartialEq, Eq)]
Expand All @@ -37,6 +37,13 @@ pub enum BatchResult {
Restart,
}

struct IndexCommiter {
index_name: String,
index_path: String,
pool: PgPool,
checkpoint_commiter: Option<Box<dyn CheckpointCommiter + Send>>,
}

pub struct IndexRunner {
source: Box<dyn Source + Send + Sync>,
schema: Schema,
Expand Down Expand Up @@ -87,7 +94,7 @@ impl IndexRunner {
let index_dir = Path::new(&self.args.build_dir).join(&id);
let _ = create_dir_all(&index_dir).await;
let index = Index::open_or_create(MmapDirectory::open(&index_dir)?, self.schema.clone())?;
let mut index_writer: IndexWriter = index.writer(self.args.memory_budget)?;
let index_writer: IndexWriter = index.writer(self.args.memory_budget)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));

let mut added = 0;
Expand Down Expand Up @@ -160,6 +167,40 @@ impl IndexRunner {
}

info!("Commiting {added} documents");

let commiter = self.index_commiter().await;
if self.args.stream && result != BatchResult::Eof {
// Commit in the background to not interfere with next batch in stream.
spawn(async move {
if let Err(e) =
Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await
{
error!("Failed to commit index of id '{}': {e}", &id);
}
});
} else {
Self::commit_index(commiter, &id, &index, index_writer, &index_dir).await?;
}

Ok(result)
}

async fn index_commiter(&mut self) -> IndexCommiter {
IndexCommiter {
index_name: self.config.name.clone(),
index_path: self.config.path.clone(),
pool: self.pool.clone(),
checkpoint_commiter: self.source.get_checkpoint_commiter().await,
}
}

async fn commit_index(
commiter: IndexCommiter,
id: &str,
index: &Index,
mut index_writer: IndexWriter,
input_dir: &Path,
) -> Result<()> {
index_writer.prepare_commit()?.commit_future().await?;

let segment_ids = index.searchable_segment_ids()?;
Expand All @@ -171,18 +212,20 @@ impl IndexRunner {
spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(
&id,
&index,
&index_dir,
&self.config.name,
&self.config.path,
&self.pool,
id,
index,
input_dir,
&commiter.index_name,
&commiter.index_path,
&commiter.pool,
)
.await?;

self.source.on_index_created().await?;
if let Some(checkpoint_commiter) = commiter.checkpoint_commiter {
checkpoint_commiter.commit().await?;
}

Ok(result)
Ok(())
}
}

Expand Down
4 changes: 0 additions & 4 deletions src/commands/sources/buf_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,4 @@ impl Source for BufSource {
self.line.clear();
Ok(SourceItem::Document(map))
}

async fn on_index_created(&mut self) -> Result<()> {
Ok(())
}
}
34 changes: 32 additions & 2 deletions src/commands/sources/kafka_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
use std::collections::BTreeMap;

use async_trait::async_trait;
use color_eyre::Result;
use sqlx::PgPool;

use super::CheckpointCommiter;

#[derive(Debug, Clone)]
pub struct Checkpoint {
pub struct KafkaCheckpoint {
source_id: String,
pool: PgPool,
}

impl Checkpoint {
#[derive(Debug)]
pub struct KafkaCheckpointCommiter {
checkpoint: KafkaCheckpoint,
partitions_and_offsets: Vec<(i32, i64)>,
}

impl KafkaCheckpoint {
pub fn new(source_id: String, pool: PgPool) -> Self {
Self { source_id, pool }
}
Expand Down Expand Up @@ -74,4 +83,25 @@ impl Checkpoint {

Ok(())
}

pub fn commiter(self, partitions_and_offsets: Vec<(i32, i64)>) -> KafkaCheckpointCommiter {
KafkaCheckpointCommiter::new(self, partitions_and_offsets)
}
}

impl KafkaCheckpointCommiter {
pub fn new(checkpoint: KafkaCheckpoint, partitions_and_offsets: Vec<(i32, i64)>) -> Self {
Self {
checkpoint,
partitions_and_offsets,
}
}
}

#[async_trait]
impl CheckpointCommiter for KafkaCheckpointCommiter {
async fn commit(&self) -> Result<()> {
self.checkpoint.save(&self.partitions_and_offsets).await?;
Ok(())
}
}
21 changes: 8 additions & 13 deletions src/commands/sources/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::{
task::spawn_blocking,
};

use super::{kafka_checkpoint::Checkpoint, Source, SourceItem};
use super::{kafka_checkpoint::KafkaCheckpoint, CheckpointCommiter, Source, SourceItem};

pub const KAFKA_PREFIX: &str = "kafka://";
const CONSUMER_THREAD_MESSAGES_CHANNEL_SIZE: usize = 10;
Expand Down Expand Up @@ -155,7 +155,7 @@ type KafkaConsumer = BaseConsumer<KafkaContext>;

pub struct KafkaSource {
messages_rx: mpsc::Receiver<Result<MessageFromConsumerThread>>,
checkpoint: Option<Checkpoint>,
checkpoint: Option<KafkaCheckpoint>,
partition_to_offset: BTreeMap<i32, i64>,

#[cfg(feature = "in-tests")]
Expand Down Expand Up @@ -277,7 +277,7 @@ impl KafkaSource {

let checkpoint = if stream {
// Url is not a good identifier as a source id, but we'll live with it for now.
Some(Checkpoint::new(url.to_string(), pool.clone()))
Some(KafkaCheckpoint::new(url.to_string(), pool.clone()))
} else {
None
};
Expand Down Expand Up @@ -335,7 +335,7 @@ impl KafkaSource {
Ok(())
}

fn track_saved_checkpoint(&mut self, partitions_and_offsets: Vec<(i32, i64)>) {
fn track_saved_checkpoint(&mut self, partitions_and_offsets: &[(i32, i64)]) {
track_saved_checkpoint_impl!(self, partitions_and_offsets);
}

Expand Down Expand Up @@ -374,23 +374,18 @@ impl Source for KafkaSource {
})
}

async fn on_index_created(&mut self) -> Result<()> {
let Some(ref checkpoint) = self.checkpoint else {
return Ok(());
};
async fn get_checkpoint_commiter(&mut self) -> Option<Box<dyn CheckpointCommiter + Send>> {
let checkpoint = self.checkpoint.clone()?;

let flat = self
.partition_to_offset
.iter()
// Add 1 as we don't want to seek to the last record already read, but the next.
.map(|(p, o)| (*p, *o + 1))
.collect::<Vec<_>>();
checkpoint.save(&flat).await?;

self.track_saved_checkpoint(flat);

self.track_saved_checkpoint(&flat);
self.partition_to_offset.clear();

Ok(())
Some(Box::new(checkpoint.commiter(flat)))
}
}
15 changes: 12 additions & 3 deletions src/commands/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ pub trait Source {
/// Get a document from the source.
async fn get_one(&mut self) -> Result<SourceItem>;

/// Called when an index file was just created to notify the source.
/// Useful for implementing checkpoints for example.
async fn on_index_created(&mut self) -> Result<()>;
/// If the source supports checkpointing, it creates a checkpoint commiter that stores a
/// snapshot of the last read state. Once the indexer has successfuly commited and uploaded
/// the new index file, it tells the checkpoint commiter to commit the checkpoint snapshot.
async fn get_checkpoint_commiter(&mut self) -> Option<Box<dyn CheckpointCommiter + Send>> {
None
}
}

#[async_trait]
pub trait CheckpointCommiter {
/// Commit the stored state snapshot.
async fn commit(&self) -> Result<()>;
}

pub async fn connect_to_source(
Expand Down
11 changes: 8 additions & 3 deletions tests/kafka_indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use toshokan::{
index::{run_index, BatchResult, IndexRunner},
sources::{
kafka_source::{parse_url, KafkaSource},
Source, SourceItem,
CheckpointCommiter, Source, SourceItem,
},
},
config::IndexConfig,
Expand Down Expand Up @@ -253,8 +253,13 @@ impl Source for ArcSource {
self.0.clone().lock_owned().await.get_one().await
}

async fn on_index_created(&mut self) -> Result<()> {
self.0.clone().lock_owned().await.on_index_created().await
async fn get_checkpoint_commiter(&mut self) -> Option<Box<dyn CheckpointCommiter + Send>> {
self.0
.clone()
.lock_owned()
.await
.get_checkpoint_commiter()
.await
}
}

Expand Down

0 comments on commit ffc7031

Please sign in to comment.