From 5f0bcbad56df95cef64bf53c65d94835e4aaf497 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Fri, 17 May 2024 00:46:23 +0300 Subject: [PATCH] unified: Add file cache --- Cargo.toml | 2 +- src/main.rs | 14 ++- src/unified_index/file_cache.rs | 155 +++++++++++++++++++++++++ src/unified_index/mod.rs | 11 +- src/unified_index/unified_directory.rs | 52 +++++++-- src/unified_index/writer.rs | 17 ++- 6 files changed, 229 insertions(+), 22 deletions(-) create mode 100644 src/unified_index/file_cache.rs diff --git a/Cargo.toml b/Cargo.toml index 0297f1f..214e14c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ log = "0.4.21" once_cell = "1.19.0" opendal = { version = "0.46.0", features = ["layers-blocking", "services-fs"] } pretty_env_logger = "0.5.0" -serde = { version = "1.0.201", features = ["derive"] } +serde = { version = "1.0.201", features = ["derive", "rc"] } serde_json = "1.0.117" tantivy = "0.22.0" tokio = { version = "1.37.0", features = ["full"] } diff --git a/src/main.rs b/src/main.rs index 0798d8c..cf7621b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,12 @@ mod bincode; mod opendal_file_handle; mod unified_index; -use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use args::{IndexArgs, SearchArgs}; use color_eyre::eyre::Result; @@ -34,7 +39,7 @@ use unified_index::unified_directory::UnifiedDirectory; use crate::{ args::{parse_args, SubCommand}, opendal_file_handle::OpenDalFileHandle, - unified_index::writer::UnifiedIndexWriter, + unified_index::{file_cache::build_file_cache, writer::UnifiedIndexWriter}, }; extern crate pretty_env_logger; @@ -102,6 +107,9 @@ async fn index(args: IndexArgs) -> Result<()> { spawn_blocking(move || index_writer.wait_merging_threads()).await??; + let build_dir_path = PathBuf::from(&args.build_dir); + let file_cache = spawn_blocking(move || build_file_cache(&build_dir_path)).await??; + let unified_index_writer = UnifiedIndexWriter::from_file_paths( Path::new(&args.build_dir), index.directory().list_managed_files(), @@ -126,7 +134,7 @@ async fn index(args: IndexArgs) -> Result<()> { .compat_write(); info!("Writing unified index file"); - let (total_len, footer_len) = unified_index_writer.write(&mut writer).await?; + let (total_len, footer_len) = unified_index_writer.write(&mut writer, file_cache).await?; writer.shutdown().await?; write( diff --git a/src/unified_index/file_cache.rs b/src/unified_index/file_cache.rs new file mode 100644 index 0000000..e5651f1 --- /dev/null +++ b/src/unified_index/file_cache.rs @@ -0,0 +1,155 @@ +use std::{ + collections::BTreeMap, + ops::Range, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; + +use color_eyre::eyre::Result; +use tantivy::{ + directory::{error::OpenReadError, FileHandle, MmapDirectory, OwnedBytes}, + Directory, HasLen, Index, IndexReader, ReloadPolicy, +}; + +use super::utils::macros::read_only_directory; + +// Don't store slices that are too large in the cache, except the term and the store index. +const CACHE_SLICE_SIZE_LIMIT: usize = 10_000_000; + +// For each file, there are byte ranges stored as the value. +// Storing start of range as key, and end in value together with the buffer, +// as BTreeMap requires a key that implements Ord. +pub type RangeCache = BTreeMap)>; +pub type FileCache = BTreeMap; + +#[derive(Debug)] +pub struct RecordingDirectory { + inner: Arc, + cache: Arc>, +} + +impl RecordingDirectory { + pub fn wrap(directory: D) -> Self { + RecordingDirectory { + inner: Arc::new(directory), + cache: Arc::new(Mutex::new(BTreeMap::new())), + } + } + + fn record(&self, path: &Path, bytes: &[u8], offset: u64) -> std::io::Result<()> { + let path_str = path.to_string_lossy(); + if !path_str.ends_with("store") + && !path_str.ends_with("term") + && bytes.len() > CACHE_SLICE_SIZE_LIMIT + { + return Ok(()); + } + + let mut guard = self.cache.lock().map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::Other, + "Some other holder of the mutex panicked.", + ) + })?; + + guard.entry(path.to_path_buf()).or_default().insert( + offset, + ( + offset + bytes.len() as u64, + Arc::from(bytes.to_vec().into_boxed_slice()), + ), + ); + + Ok(()) + } +} + +impl Clone for RecordingDirectory { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + cache: self.cache.clone(), + } + } +} + +impl Directory for RecordingDirectory { + fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> { + let inner = self.inner.get_file_handle(path)?; + + Ok(Arc::new(RecordingFileHandle { + inner, + directory: self.clone(), + path: path.to_path_buf(), + })) + } + + fn exists(&self, path: &Path) -> Result { + self.inner.exists(path) + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + let payload = self.inner.atomic_read(path)?; + self.record(path, &payload, 0) + .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?; + Ok(payload) + } + + read_only_directory!(); +} + +struct RecordingFileHandle { + directory: RecordingDirectory, + inner: Arc, + path: PathBuf, +} + +impl FileHandle for RecordingFileHandle { + fn read_bytes(&self, range: Range) -> std::io::Result { + let start = range.start as u64; + let payload = self.inner.read_bytes(range)?; + self.directory.record(&self.path, &payload, start)?; + Ok(payload) + } +} + +impl std::fmt::Debug for RecordingFileHandle { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "DebugProxyFileHandle({:?})", &self.inner) + } +} + +impl HasLen for RecordingFileHandle { + fn len(&self) -> usize { + self.inner.len() + } +} + +pub fn build_file_cache(path: &Path) -> Result { + let directory = MmapDirectory::open(path)?; + let recording_directory = RecordingDirectory::wrap(directory); + let index = Index::open(recording_directory.clone())?; + let schema = index.schema(); + let reader: IndexReader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into()?; + let searcher = reader.searcher(); + for (field, field_entry) in schema.fields() { + if !field_entry.is_indexed() { + continue; + } + for reader in searcher.segment_readers() { + reader.inverted_index(field)?; + } + } + + let guard = recording_directory.cache.lock().map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::Other, + "Some other holder of the mutex panicked.", + ) + })?; + + Ok(guard.clone()) +} diff --git a/src/unified_index/mod.rs b/src/unified_index/mod.rs index 9665685..a42a54c 100644 --- a/src/unified_index/mod.rs +++ b/src/unified_index/mod.rs @@ -1,3 +1,4 @@ +pub mod file_cache; pub mod unified_directory; pub mod utils; pub mod writer; @@ -6,20 +7,22 @@ use std::{collections::HashMap, ops::Range, path::PathBuf}; use serde::{Deserialize, Serialize}; +use self::file_cache::FileCache; + const VERSION: u32 = 1; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct IndexFooter { - cache: HashMap<(PathBuf, Range), Vec>, +struct IndexFooter { file_offsets: HashMap>, + cache: FileCache, version: u32, } impl IndexFooter { - pub fn new(file_offsets: HashMap>) -> Self { + pub fn new(file_offsets: HashMap>, cache: FileCache) -> Self { Self { - cache: HashMap::new(), file_offsets, + cache, version: VERSION, } } diff --git a/src/unified_index/unified_directory.rs b/src/unified_index/unified_directory.rs index 322da4f..b57ef35 100644 --- a/src/unified_index/unified_directory.rs +++ b/src/unified_index/unified_directory.rs @@ -1,15 +1,47 @@ -use std::{path::Path, sync::Arc}; +use std::{ops::Range, path::Path, sync::Arc}; use bincode::Options; use color_eyre::eyre::Result; use tantivy::{ - directory::{error::OpenReadError, FileHandle, FileSlice}, - Directory, + directory::{error::OpenReadError, FileHandle, FileSlice, OwnedBytes}, + Directory, HasLen, }; use crate::bincode::bincode_options; -use super::{utils::macros::read_only_directory, IndexFooter}; +use super::{file_cache::RangeCache, utils::macros::read_only_directory, IndexFooter}; + +#[derive(Debug, Clone)] +struct CachedFileHandle { + raw_file_handle: Arc, + cache: RangeCache, +} + +impl CachedFileHandle { + fn new(raw_file_handle: Arc, cache: RangeCache) -> Self { + Self { + raw_file_handle, + cache, + } + } +} + +impl FileHandle for CachedFileHandle { + fn read_bytes(&self, range: Range) -> std::io::Result { + if let Some((end, bytes)) = self.cache.get(&(range.start as u64)).cloned() { + if end == range.end as u64 { + return Ok(OwnedBytes::new(bytes)); + } + } + self.raw_file_handle.read_bytes(range) + } +} + +impl HasLen for CachedFileHandle { + fn len(&self) -> usize { + self.raw_file_handle.len() + } +} #[derive(Debug, Clone)] pub struct UnifiedDirectory { @@ -28,17 +60,17 @@ impl UnifiedDirectory { impl Directory for UnifiedDirectory { fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> { - let file_slice = self.open_read(path)?; - Ok(Arc::new(file_slice)) - } - - fn open_read(&self, path: &Path) -> Result { let range = self .footer .file_offsets .get(path) .ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?; - Ok(self.slice.slice(range.start as usize..range.end as usize)) + let slice = self.slice.slice(range.start as usize..range.end as usize); + if let Some(cache) = self.footer.cache.get(path).cloned() { + Ok(Arc::new(CachedFileHandle::new(Arc::new(slice), cache))) + } else { + Ok(Arc::new(slice)) + } } fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { diff --git a/src/unified_index/writer.rs b/src/unified_index/writer.rs index 0bfa4de..a6f0a19 100644 --- a/src/unified_index/writer.rs +++ b/src/unified_index/writer.rs @@ -14,7 +14,7 @@ use tokio::{ use crate::bincode::bincode_options; -use super::IndexFooter; +use super::{FileCache, IndexFooter}; struct FileReader { reader: Box, @@ -58,7 +58,7 @@ impl UnifiedIndexWriter { } } - pub async fn write(mut self, writer: &mut W) -> Result<(u64, u64)> + pub async fn write(mut self, writer: &mut W, cache: FileCache) -> Result<(u64, u64)> where W: AsyncWrite + Unpin, { @@ -70,7 +70,8 @@ impl UnifiedIndexWriter { self.file_offsets.insert(file_name, start..written); } - let footer_bytes = bincode_options().serialize(&IndexFooter::new(self.file_offsets))?; + let footer_bytes = + bincode_options().serialize(&IndexFooter::new(self.file_offsets, cache))?; let footer_len = footer_bytes.len() as u64; let footer_written = tokio::io::copy(&mut Cursor::new(footer_bytes), writer).await?; @@ -84,6 +85,14 @@ impl UnifiedIndexWriter { Ok((written + footer_len, footer_len)) } + + #[cfg(test)] + pub async fn write_without_cache(self, writer: &mut W) -> Result<(u64, u64)> + where + W: AsyncWrite + Unpin, + { + self.write(writer, HashMap::new()).await + } } #[cfg(test)] @@ -119,7 +128,7 @@ mod tests { ]); let mut buf = vec![]; - let (_, footer_len) = writer.write(&mut buf).await?; + let (_, footer_len) = writer.write_without_cache(&mut buf).await?; let file_slice = FileSlice::new(Arc::new(OwnedBytes::new(buf))); let dir = UnifiedDirectory::open_with_len(file_slice, footer_len as usize)?;