From 8d25ee01ffc97322672212ba8a978070ce07d377 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sat, 18 May 2024 19:18:04 +0300 Subject: [PATCH] Add merge command --- src/args.rs | 17 +++++++ src/main.rs | 36 ++++++++++++- src/merge_directory.rs | 112 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 src/merge_directory.rs diff --git a/src/args.rs b/src/args.rs index 7ef30f7..65bfb77 100644 --- a/src/args.rs +++ b/src/args.rs @@ -12,6 +12,9 @@ pub enum SubCommand { #[clap(name = "index")] Index(IndexArgs), + #[clap(name = "merge")] + Merge(MergeArgs), + #[clap(name = "search")] Search(SearchArgs), } @@ -41,6 +44,20 @@ The memory is split evenly between all indexing threads, once a thread reaches i pub memory_budget: usize, } +#[derive(Parser, Debug, Clone)] +pub struct MergeArgs { + #[clap(help = "Path to the index dir.")] + pub index_dir: String, + + #[clap( + short, + long, + help = "Path to the dir to merge in the inverted indexes.", + default_value = "/tmp/toshokan_merge" + )] + pub merge_dir: String, +} + #[derive(Parser, Debug, Clone)] pub struct SearchArgs { #[clap(help = "Path to the index dir.")] diff --git a/src/main.rs b/src/main.rs index 403a348..666c3de 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod args; mod bincode; +mod merge_directory; mod opendal_file_handle; mod unified_index; @@ -10,14 +11,14 @@ use std::{ time::Duration, }; -use args::{IndexArgs, SearchArgs}; +use args::{IndexArgs, MergeArgs, SearchArgs}; use color_eyre::eyre::Result; use futures::future::{try_join, try_join_all}; use opendal::{layers::LoggingLayer, BlockingOperator, Operator}; use pretty_env_logger::formatted_timed_builder; use tantivy::{ collector::TopDocs, - directory::{FileSlice, MmapDirectory}, + directory::{DirectoryClone, FileSlice, MmapDirectory}, indexer::NoMergePolicy, query::QueryParser, schema::{ @@ -38,6 +39,7 @@ use unified_index::unified_directory::UnifiedDirectory; use crate::{ args::{parse_args, SubCommand}, + merge_directory::MergeDirectory, opendal_file_handle::OpenDalFileHandle, unified_index::{file_cache::build_file_cache, writer::UnifiedIndexWriter}, }; @@ -205,6 +207,33 @@ async fn index(args: IndexArgs) -> Result<()> { Ok(()) } +// async fn merge(args: MergeArgs) -> Result<()> { +// let _ = create_dir(&args.merge_dir).await; +// let output_dir = MmapDirectory::open(&args.merge_dir)?; +// +// let directories = open_unified_directories(&args.index_dir) +// .await? +// .into_iter() +// .map(|x| x.box_clone()) +// .collect::>(); +// +// let index = Index::open(MergeDirectory::new(directories, output_dir.box_clone())?)?; +// let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 15_000_000)?; +// index_writer.set_merge_policy(Box::new(NoMergePolicy)); +// +// let segment_ids = index.searchable_segment_ids()?; +// if segment_ids.len() > 1 { +// info!("Merging {} segments", segment_ids.len()); +// index_writer.merge(&segment_ids).await?; +// } +// +// spawn_blocking(move || index_writer.wait_merging_threads()).await??; +// +// write_unified_index(index, &args.merge_dir, &args.index_dir).await?; +// +// Ok(()) +// } + async fn search(args: SearchArgs) -> Result<()> { if args.limit == 0 { return Ok(()); @@ -294,6 +323,9 @@ async fn async_main() -> Result<()> { SubCommand::Index(index_args) => { index(index_args).await?; } + SubCommand::Merge(merge_args) => { + merge(merge_args).await?; + } SubCommand::Search(search_args) => { search(search_args).await?; } diff --git a/src/merge_directory.rs b/src/merge_directory.rs new file mode 100644 index 0000000..1f0c9d2 --- /dev/null +++ b/src/merge_directory.rs @@ -0,0 +1,112 @@ +use std::{path::Path, sync::Arc}; + +use color_eyre::eyre::Result; +use tantivy::{ + directory::{ + error::{DeleteError, LockError, OpenReadError, OpenWriteError}, + DirectoryLock, FileHandle, Lock, RamDirectory, WatchHandle, + }, + Directory, Index, IndexMeta, +}; + +fn build_combined_meta(mut dirs: Vec>) -> tantivy::Result { + assert!(!dirs.is_empty()); + let mut combined_meta = Index::open(dirs.pop().unwrap())?.load_metas()?; + while let Some(dir) = dirs.pop() { + let meta = Index::open(dir)?.load_metas()?; + combined_meta.segments.extend(meta.segments); + } + Ok(combined_meta) +} + +#[derive(Debug, Clone)] +pub struct MergeDirectory { + input_dirs: Vec>, + output_dir: Box, +} + +impl MergeDirectory { + pub fn new( + input_dirs: Vec>, + output_dir: Box, + ) -> Result { + if input_dirs.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Merge requires at least 1 input directory.", + ) + .into()); + } + + let meta = build_combined_meta(input_dirs.clone())?; + let meta_dir = Box::new(RamDirectory::create()); + meta_dir.atomic_write( + Path::new("meta.json"), + serde_json::to_string_pretty(&meta)?.as_bytes(), + )?; + + // Always read the output dir and combined meta dir before any of the input dir meta files. + let mut input_dirs_with_meta: Vec> = + Vec::with_capacity(input_dirs.len() + 2); + input_dirs_with_meta.push(output_dir.clone()); + input_dirs_with_meta.push(meta_dir); + input_dirs_with_meta.extend(input_dirs); + + Ok(Self { + input_dirs: input_dirs_with_meta, + output_dir, + }) + } + + fn get_directory_containing_path(&self, path: &Path) -> Result<&dyn Directory, OpenReadError> { + for dir in &self.input_dirs { + if dir.exists(path)? { + return Ok(dir.as_ref()); + } + } + Err(OpenReadError::FileDoesNotExist(path.to_path_buf())) + } +} + +impl Directory for MergeDirectory { + fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> { + self.get_directory_containing_path(path)? + .get_file_handle(path) + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + self.get_directory_containing_path(path)?.atomic_read(path) + } + + fn exists(&self, path: &Path) -> Result { + match self.get_directory_containing_path(path) { + Ok(_) => Ok(true), + Err(OpenReadError::FileDoesNotExist(_)) => Ok(false), + Err(err) => Err(err), + } + } + + fn delete(&self, _path: &Path) -> Result<(), DeleteError> { + Ok(()) + } + + fn open_write(&self, path: &Path) -> Result { + self.output_dir.open_write(path) + } + + fn atomic_write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> { + self.output_dir.atomic_write(path, data) + } + + fn watch(&self, callback: tantivy::directory::WatchCallback) -> tantivy::Result { + self.output_dir.watch(callback) + } + + fn sync_directory(&self) -> std::io::Result<()> { + self.output_dir.sync_directory() + } + + fn acquire_lock(&self, _lock: &Lock) -> Result { + Ok(DirectoryLock::from(Box::new(|| {}))) + } +}