Skip to content

Commit

Permalink
tests: kafka: Add index abort test on rebalancing when streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 22, 2024
1 parent 8d29c56 commit 1346870
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 162 deletions.
2 changes: 1 addition & 1 deletion src/commands/field_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::config::{
escaped_with_parent_name, number::NumberFieldType, FieldConfig, FieldConfigs, FieldType,
};

type ParseFn = Box<dyn Fn(serde_json::Value) -> Result<OwnedValue>>;
type ParseFn = Box<dyn Fn(serde_json::Value) -> Result<OwnedValue> + Send + Sync>;

enum FieldParserVariation {
Value { field: Field, parse_fn: ParseFn },
Expand Down
232 changes: 129 additions & 103 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,128 +30,154 @@ use super::{
DYNAMIC_FIELD_NAME,
};

async fn pipe_source_to_index(
source: &mut Box<dyn Source>,
#[derive(Debug, PartialEq, Eq)]
pub enum BatchResult {
Eof,
Timeout,
Restart,
}

pub struct IndexRunner {
source: Box<dyn Source + Send + Sync>,
schema: Schema,
field_parsers: &[FieldParser],
field_parsers: Vec<FieldParser>,
dynamic_field: Field,
args: &IndexArgs,
config: &IndexConfig,
pool: &PgPool,
) -> Result<bool> {
let id = uuid::Uuid::now_v7().hyphenated().to_string();
let index_dir = Path::new(&args.build_dir).join(&id);
let _ = create_dir_all(&index_dir).await;
let index = Index::open_or_create(MmapDirectory::open(&index_dir)?, schema)?;
let mut index_writer: IndexWriter = index.writer(args.memory_budget)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));

let mut added = 0;
let mut did_timeout = false;

let mut commit_timeout_fut: Pin<Box<dyn Future<Output = ()>>> = if args.stream {
Box::pin(sleep(args.commit_interval))
} else {
// Infinite timeout by waiting on a future that never resolves.
Box::pin(pending::<()>())
};

debug!("Piping source -> index of id '{}'", &id);

'reader_loop: loop {
let item = select! {
_ = &mut commit_timeout_fut => {
did_timeout = true;
break;
}
item = source.get_one() => {
item?
}
};
args: IndexArgs,
config: IndexConfig,
pool: PgPool,
}

let mut json_obj = match item {
SourceItem::Document(json_obj) => json_obj,
SourceItem::Close => {
debug!("Source closed for index of id '{}'", &id);
break;
}
SourceItem::Restart => {
debug!("Aborting index of id '{}' with {} documents", &id, added);
if let Err(e) = remove_dir_all(&index_dir).await {
warn!("Failed to remove aborted index of id '{}': {}", &id, e);
}
return Ok(true);
}
impl IndexRunner {
pub async fn new(args: IndexArgs, pool: PgPool) -> Result<Self> {
let config = get_index_config(&args.name, &pool).await?;

let mut schema_builder = Schema::builder();
let dynamic_field =
schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config());
let field_parsers =
build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?;
let schema = schema_builder.build();

let source = connect_to_source(args.input.as_deref(), args.stream, &pool).await?;

Ok(Self {
source,
schema,
field_parsers,
dynamic_field,
args,
config,
pool,
})
}

/// Read documents from the source and then index them into a new index file.
/// One batch means that one index file is created when calling this function.
/// In batch mode, we simply read from the source until the end.
/// In stream mode, we read from the source until a the --commit-interval timeout is reached.
pub async fn run_one_batch(&mut self) -> Result<BatchResult> {
let id = uuid::Uuid::now_v7().hyphenated().to_string();
let index_dir = Path::new(&self.args.build_dir).join(&id);
let _ = create_dir_all(&index_dir).await;
let index = Index::open_or_create(MmapDirectory::open(&index_dir)?, self.schema.clone())?;
let mut index_writer: IndexWriter = index.writer(self.args.memory_budget)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));

let mut added = 0;
let mut result = BatchResult::Eof;

let mut commit_timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>> = if self.args.stream {
Box::pin(sleep(self.args.commit_interval))
} else {
// Infinite timeout by waiting on a future that never resolves.
Box::pin(pending::<()>())
};

let mut doc = TantivyDocument::new();
debug!("Piping source -> index of id '{}'", &id);

for field_parser in field_parsers {
let name = &field_parser.name;
let Some(json_value) = json_obj.remove(name) else {
debug!("Field '{}' in schema but not found", &name);
continue;
'reader_loop: loop {
let item = select! {
_ = &mut commit_timeout_fut => {
result = BatchResult::Timeout;
break;
}
item = self.source.get_one() => {
item?
}
};

if let Err(e) = field_parser.add_parsed_field_value(&mut doc, json_value) {
error!(
"Failed to parse '{}' (on {} iteration): {}",
&name, added, e
);
continue 'reader_loop;
let mut json_obj = match item {
SourceItem::Document(json_obj) => json_obj,
SourceItem::Close => {
debug!("Source closed for index of id '{}'", &id);
break;
}
SourceItem::Restart => {
debug!("Aborting index of id '{}' with {} documents", &id, added);
if let Err(e) = remove_dir_all(&index_dir).await {
warn!("Failed to remove aborted index of id '{}': {}", &id, e);
}
return Ok(BatchResult::Restart);
}
};

let mut doc = TantivyDocument::new();

for field_parser in &self.field_parsers {
let name = &field_parser.name;
let Some(json_value) = json_obj.remove(name) else {
debug!("Field '{}' in schema but not found", &name);
continue;
};

if let Err(e) = field_parser.add_parsed_field_value(&mut doc, json_value) {
error!(
"Failed to parse '{}' (on {} iteration): {}",
&name, added, e
);
continue 'reader_loop;
}
}
}

doc.add_field_value(dynamic_field, json_obj);
index_writer.add_document(doc)?;
added += 1;
}
doc.add_field_value(self.dynamic_field, json_obj);
index_writer.add_document(doc)?;
added += 1;
}

if added == 0 {
debug!("Not writing index: no documents added");
return Ok(did_timeout);
}
if added == 0 {
debug!("Not writing index: no documents added");
return Ok(result);
}

info!("Commiting {added} documents");
index_writer.prepare_commit()?.commit_future().await?;
info!("Commiting {added} documents");
index_writer.prepare_commit()?.commit_future().await?;

let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() > 1 {
info!("Merging {} segments", segment_ids.len());
index_writer.merge(&segment_ids).await?;
}
let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() > 1 {
info!("Merging {} segments", segment_ids.len());
index_writer.merge(&segment_ids).await?;
}

spawn_blocking(move || index_writer.wait_merging_threads()).await??;
spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(&id, &index, &index_dir, &config.name, &config.path, pool).await?;
write_unified_index(
&id,
&index,
&index_dir,
&self.config.name,
&self.config.path,
&self.pool,
)
.await?;

source.on_index_created().await?;
self.source.on_index_created().await?;

Ok(did_timeout)
Ok(result)
}
}

pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
let config = get_index_config(&args.name, pool).await?;

let mut schema_builder = Schema::builder();
let dynamic_field = schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config());
let field_parsers =
build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?;
let schema = schema_builder.build();

let mut source = connect_to_source(args.input.as_deref(), args.stream, pool).await?;

while pipe_source_to_index(
&mut source,
schema.clone(),
&field_parsers,
dynamic_field,
&args,
&config,
pool,
)
.await?
{}

let mut runner = IndexRunner::new(args, pool.clone()).await?;
while runner.run_one_batch().await? != BatchResult::Eof {}
Ok(())
}
2 changes: 1 addition & 1 deletion src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod field_parser;
pub mod index;
pub mod merge;
pub mod search;
mod sources;
pub mod sources;

use std::{
path::{Path, PathBuf},
Expand Down
Loading

0 comments on commit 1346870

Please sign in to comment.