Skip to content

Commit

Permalink
Add merge command
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed May 18, 2024
1 parent 06ed1fd commit 8d25ee0
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 2 deletions.
17 changes: 17 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub enum SubCommand {
#[clap(name = "index")]
Index(IndexArgs),

#[clap(name = "merge")]
Merge(MergeArgs),

#[clap(name = "search")]
Search(SearchArgs),
}
Expand Down Expand Up @@ -41,6 +44,20 @@ The memory is split evenly between all indexing threads, once a thread reaches i
pub memory_budget: usize,
}

#[derive(Parser, Debug, Clone)]
pub struct MergeArgs {
#[clap(help = "Path to the index dir.")]
pub index_dir: String,

#[clap(
short,
long,
help = "Path to the dir to merge in the inverted indexes.",
default_value = "/tmp/toshokan_merge"
)]
pub merge_dir: String,
}

#[derive(Parser, Debug, Clone)]
pub struct SearchArgs {
#[clap(help = "Path to the index dir.")]
Expand Down
36 changes: 34 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod args;
mod bincode;
mod merge_directory;
mod opendal_file_handle;
mod unified_index;

Expand All @@ -10,14 +11,14 @@ use std::{
time::Duration,
};

use args::{IndexArgs, SearchArgs};
use args::{IndexArgs, MergeArgs, SearchArgs};

Check warning on line 14 in src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `MergeArgs`

warning: unused import: `MergeArgs` --> src/main.rs:14:23 | 14 | use args::{IndexArgs, MergeArgs, SearchArgs}; | ^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use color_eyre::eyre::Result;
use futures::future::{try_join, try_join_all};
use opendal::{layers::LoggingLayer, BlockingOperator, Operator};
use pretty_env_logger::formatted_timed_builder;
use tantivy::{
collector::TopDocs,
directory::{FileSlice, MmapDirectory},
directory::{DirectoryClone, FileSlice, MmapDirectory},

Check warning on line 21 in src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `DirectoryClone`

warning: unused import: `DirectoryClone` --> src/main.rs:21:17 | 21 | directory::{DirectoryClone, FileSlice, MmapDirectory}, | ^^^^^^^^^^^^^^
indexer::NoMergePolicy,
query::QueryParser,
schema::{
Expand All @@ -38,6 +39,7 @@ use unified_index::unified_directory::UnifiedDirectory;

use crate::{
args::{parse_args, SubCommand},
merge_directory::MergeDirectory,

Check warning on line 42 in src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `merge_directory::MergeDirectory`

warning: unused import: `merge_directory::MergeDirectory` --> src/main.rs:42:5 | 42 | merge_directory::MergeDirectory, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
opendal_file_handle::OpenDalFileHandle,
unified_index::{file_cache::build_file_cache, writer::UnifiedIndexWriter},
};
Expand Down Expand Up @@ -205,6 +207,33 @@ async fn index(args: IndexArgs) -> Result<()> {
Ok(())
}

// async fn merge(args: MergeArgs) -> Result<()> {
// let _ = create_dir(&args.merge_dir).await;
// let output_dir = MmapDirectory::open(&args.merge_dir)?;
//
// let directories = open_unified_directories(&args.index_dir)
// .await?
// .into_iter()
// .map(|x| x.box_clone())
// .collect::<Vec<_>>();
//
// let index = Index::open(MergeDirectory::new(directories, output_dir.box_clone())?)?;
// let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 15_000_000)?;
// index_writer.set_merge_policy(Box::new(NoMergePolicy));
//
// let segment_ids = index.searchable_segment_ids()?;
// if segment_ids.len() > 1 {
// info!("Merging {} segments", segment_ids.len());
// index_writer.merge(&segment_ids).await?;
// }
//
// spawn_blocking(move || index_writer.wait_merging_threads()).await??;
//
// write_unified_index(index, &args.merge_dir, &args.index_dir).await?;
//
// Ok(())
// }

async fn search(args: SearchArgs) -> Result<()> {
if args.limit == 0 {
return Ok(());
Expand Down Expand Up @@ -294,6 +323,9 @@ async fn async_main() -> Result<()> {
SubCommand::Index(index_args) => {
index(index_args).await?;
}
SubCommand::Merge(merge_args) => {
merge(merge_args).await?;

Check failure on line 327 in src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

cannot find function `merge` in this scope

error[E0425]: cannot find function `merge` in this scope --> src/main.rs:327:13 | 327 | merge(merge_args).await?; | ^^^^^ not found in this scope
}
SubCommand::Search(search_args) => {
search(search_args).await?;
}
Expand Down
112 changes: 112 additions & 0 deletions src/merge_directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::{path::Path, sync::Arc};

use color_eyre::eyre::Result;
use tantivy::{
directory::{
error::{DeleteError, LockError, OpenReadError, OpenWriteError},
DirectoryLock, FileHandle, Lock, RamDirectory, WatchHandle,
},
Directory, Index, IndexMeta,
};

fn build_combined_meta(mut dirs: Vec<Box<dyn Directory>>) -> tantivy::Result<IndexMeta> {
assert!(!dirs.is_empty());
let mut combined_meta = Index::open(dirs.pop().unwrap())?.load_metas()?;
while let Some(dir) = dirs.pop() {
let meta = Index::open(dir)?.load_metas()?;
combined_meta.segments.extend(meta.segments);
}
Ok(combined_meta)
}

#[derive(Debug, Clone)]
pub struct MergeDirectory {
input_dirs: Vec<Box<dyn Directory>>,
output_dir: Box<dyn Directory>,
}

impl MergeDirectory {
pub fn new(
input_dirs: Vec<Box<dyn Directory>>,
output_dir: Box<dyn Directory>,
) -> Result<Self> {
if input_dirs.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Merge requires at least 1 input directory.",
)
.into());
}

let meta = build_combined_meta(input_dirs.clone())?;
let meta_dir = Box::new(RamDirectory::create());
meta_dir.atomic_write(
Path::new("meta.json"),
serde_json::to_string_pretty(&meta)?.as_bytes(),
)?;

// Always read the output dir and combined meta dir before any of the input dir meta files.
let mut input_dirs_with_meta: Vec<Box<dyn Directory>> =
Vec::with_capacity(input_dirs.len() + 2);
input_dirs_with_meta.push(output_dir.clone());
input_dirs_with_meta.push(meta_dir);
input_dirs_with_meta.extend(input_dirs);

Ok(Self {
input_dirs: input_dirs_with_meta,
output_dir,
})
}

fn get_directory_containing_path(&self, path: &Path) -> Result<&dyn Directory, OpenReadError> {
for dir in &self.input_dirs {
if dir.exists(path)? {
return Ok(dir.as_ref());
}
}
Err(OpenReadError::FileDoesNotExist(path.to_path_buf()))
}
}

impl Directory for MergeDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
self.get_directory_containing_path(path)?
.get_file_handle(path)
}

fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
self.get_directory_containing_path(path)?.atomic_read(path)
}

fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
match self.get_directory_containing_path(path) {
Ok(_) => Ok(true),
Err(OpenReadError::FileDoesNotExist(_)) => Ok(false),
Err(err) => Err(err),
}
}

fn delete(&self, _path: &Path) -> Result<(), DeleteError> {
Ok(())
}

fn open_write(&self, path: &Path) -> Result<tantivy::directory::WritePtr, OpenWriteError> {
self.output_dir.open_write(path)
}

fn atomic_write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
self.output_dir.atomic_write(path, data)
}

fn watch(&self, callback: tantivy::directory::WatchCallback) -> tantivy::Result<WatchHandle> {
self.output_dir.watch(callback)
}

fn sync_directory(&self) -> std::io::Result<()> {
self.output_dir.sync_directory()
}

fn acquire_lock(&self, _lock: &Lock) -> Result<DirectoryLock, LockError> {
Ok(DirectoryLock::from(Box::new(|| {})))
}
}

0 comments on commit 8d25ee0

Please sign in to comment.