Skip to content

Commit

Permalink
unified: Add file cache
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed May 16, 2024
1 parent 164e51c commit 5f0bcba
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ log = "0.4.21"
once_cell = "1.19.0"
opendal = { version = "0.46.0", features = ["layers-blocking", "services-fs"] }
pretty_env_logger = "0.5.0"
serde = { version = "1.0.201", features = ["derive"] }
serde = { version = "1.0.201", features = ["derive", "rc"] }
serde_json = "1.0.117"
tantivy = "0.22.0"
tokio = { version = "1.37.0", features = ["full"] }
Expand Down
14 changes: 11 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ mod bincode;
mod opendal_file_handle;
mod unified_index;

use std::{collections::HashSet, path::Path, sync::Arc, time::Duration};
use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};

use args::{IndexArgs, SearchArgs};
use color_eyre::eyre::Result;
Expand Down Expand Up @@ -34,7 +39,7 @@ use unified_index::unified_directory::UnifiedDirectory;
use crate::{
args::{parse_args, SubCommand},
opendal_file_handle::OpenDalFileHandle,
unified_index::writer::UnifiedIndexWriter,
unified_index::{file_cache::build_file_cache, writer::UnifiedIndexWriter},
};

extern crate pretty_env_logger;
Expand Down Expand Up @@ -102,6 +107,9 @@ async fn index(args: IndexArgs) -> Result<()> {

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

let build_dir_path = PathBuf::from(&args.build_dir);
let file_cache = spawn_blocking(move || build_file_cache(&build_dir_path)).await??;

let unified_index_writer = UnifiedIndexWriter::from_file_paths(
Path::new(&args.build_dir),
index.directory().list_managed_files(),
Expand All @@ -126,7 +134,7 @@ async fn index(args: IndexArgs) -> Result<()> {
.compat_write();

info!("Writing unified index file");
let (total_len, footer_len) = unified_index_writer.write(&mut writer).await?;
let (total_len, footer_len) = unified_index_writer.write(&mut writer, file_cache).await?;
writer.shutdown().await?;

write(
Expand Down
155 changes: 155 additions & 0 deletions src/unified_index/file_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use std::{
collections::BTreeMap,
ops::Range,
path::{Path, PathBuf},
sync::{Arc, Mutex},
};

use color_eyre::eyre::Result;
use tantivy::{
directory::{error::OpenReadError, FileHandle, MmapDirectory, OwnedBytes},
Directory, HasLen, Index, IndexReader, ReloadPolicy,
};

use super::utils::macros::read_only_directory;

// Don't store slices that are too large in the cache, except the term and the store index.
const CACHE_SLICE_SIZE_LIMIT: usize = 10_000_000;

// For each file, there are byte ranges stored as the value.
// Storing start of range as key, and end in value together with the buffer,
// as BTreeMap requires a key that implements Ord.
pub type RangeCache = BTreeMap<u64, (u64, Arc<[u8]>)>;
pub type FileCache = BTreeMap<PathBuf, RangeCache>;

#[derive(Debug)]
pub struct RecordingDirectory<D: Directory> {
inner: Arc<D>,
cache: Arc<Mutex<FileCache>>,
}

impl<D: Directory> RecordingDirectory<D> {
pub fn wrap(directory: D) -> Self {
RecordingDirectory {
inner: Arc::new(directory),
cache: Arc::new(Mutex::new(BTreeMap::new())),
}
}

fn record(&self, path: &Path, bytes: &[u8], offset: u64) -> std::io::Result<()> {
let path_str = path.to_string_lossy();
if !path_str.ends_with("store")
&& !path_str.ends_with("term")
&& bytes.len() > CACHE_SLICE_SIZE_LIMIT
{
return Ok(());
}

let mut guard = self.cache.lock().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Some other holder of the mutex panicked.",
)
})?;

guard.entry(path.to_path_buf()).or_default().insert(
offset,
(
offset + bytes.len() as u64,
Arc::from(bytes.to_vec().into_boxed_slice()),
),
);

Ok(())
}
}

impl<D: Directory> Clone for RecordingDirectory<D> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
cache: self.cache.clone(),
}
}
}

impl<D: Directory> Directory for RecordingDirectory<D> {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
let inner = self.inner.get_file_handle(path)?;

Ok(Arc::new(RecordingFileHandle {
inner,
directory: self.clone(),
path: path.to_path_buf(),
}))
}

fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
self.inner.exists(path)
}

fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let payload = self.inner.atomic_read(path)?;
self.record(path, &payload, 0)
.map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?;
Ok(payload)
}

read_only_directory!();
}

struct RecordingFileHandle<D: Directory> {
directory: RecordingDirectory<D>,
inner: Arc<dyn FileHandle>,
path: PathBuf,
}

impl<D: Directory> FileHandle for RecordingFileHandle<D> {
fn read_bytes(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
let start = range.start as u64;
let payload = self.inner.read_bytes(range)?;
self.directory.record(&self.path, &payload, start)?;
Ok(payload)
}
}

impl<D: Directory> std::fmt::Debug for RecordingFileHandle<D> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "DebugProxyFileHandle({:?})", &self.inner)
}
}

impl<D: Directory> HasLen for RecordingFileHandle<D> {
fn len(&self) -> usize {
self.inner.len()
}
}

pub fn build_file_cache(path: &Path) -> Result<FileCache> {
let directory = MmapDirectory::open(path)?;
let recording_directory = RecordingDirectory::wrap(directory);
let index = Index::open(recording_directory.clone())?;
let schema = index.schema();
let reader: IndexReader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();
for (field, field_entry) in schema.fields() {
if !field_entry.is_indexed() {
continue;
}
for reader in searcher.segment_readers() {
reader.inverted_index(field)?;
}
}

let guard = recording_directory.cache.lock().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Some other holder of the mutex panicked.",
)
})?;

Ok(guard.clone())
}
11 changes: 7 additions & 4 deletions src/unified_index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod file_cache;
pub mod unified_directory;
pub mod utils;
pub mod writer;
Expand All @@ -6,20 +7,22 @@ use std::{collections::HashMap, ops::Range, path::PathBuf};

use serde::{Deserialize, Serialize};

use self::file_cache::FileCache;

const VERSION: u32 = 1;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexFooter {
cache: HashMap<(PathBuf, Range<u64>), Vec<u8>>,
struct IndexFooter {
file_offsets: HashMap<PathBuf, Range<u64>>,
cache: FileCache,
version: u32,
}

impl IndexFooter {
pub fn new(file_offsets: HashMap<PathBuf, Range<u64>>) -> Self {
pub fn new(file_offsets: HashMap<PathBuf, Range<u64>>, cache: FileCache) -> Self {
Self {
cache: HashMap::new(),
file_offsets,
cache,
version: VERSION,
}
}
Expand Down
52 changes: 42 additions & 10 deletions src/unified_index/unified_directory.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,47 @@
use std::{path::Path, sync::Arc};
use std::{ops::Range, path::Path, sync::Arc};

use bincode::Options;
use color_eyre::eyre::Result;
use tantivy::{
directory::{error::OpenReadError, FileHandle, FileSlice},
Directory,
directory::{error::OpenReadError, FileHandle, FileSlice, OwnedBytes},
Directory, HasLen,
};

use crate::bincode::bincode_options;

use super::{utils::macros::read_only_directory, IndexFooter};
use super::{file_cache::RangeCache, utils::macros::read_only_directory, IndexFooter};

#[derive(Debug, Clone)]
struct CachedFileHandle {
raw_file_handle: Arc<dyn FileHandle>,
cache: RangeCache,
}

impl CachedFileHandle {
fn new(raw_file_handle: Arc<dyn FileHandle>, cache: RangeCache) -> Self {
Self {
raw_file_handle,
cache,
}
}
}

impl FileHandle for CachedFileHandle {
fn read_bytes(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
if let Some((end, bytes)) = self.cache.get(&(range.start as u64)).cloned() {
if end == range.end as u64 {
return Ok(OwnedBytes::new(bytes));
}
}
self.raw_file_handle.read_bytes(range)
}
}

impl HasLen for CachedFileHandle {
fn len(&self) -> usize {
self.raw_file_handle.len()
}
}

#[derive(Debug, Clone)]
pub struct UnifiedDirectory {
Expand All @@ -28,17 +60,17 @@ impl UnifiedDirectory {

impl Directory for UnifiedDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Arc::new(file_slice))
}

fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
let range = self
.footer
.file_offsets
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
Ok(self.slice.slice(range.start as usize..range.end as usize))
let slice = self.slice.slice(range.start as usize..range.end as usize);
if let Some(cache) = self.footer.cache.get(path).cloned() {
Ok(Arc::new(CachedFileHandle::new(Arc::new(slice), cache)))
} else {
Ok(Arc::new(slice))
}
}

fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
Expand Down
17 changes: 13 additions & 4 deletions src/unified_index/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{

use crate::bincode::bincode_options;

use super::IndexFooter;
use super::{FileCache, IndexFooter};

struct FileReader {
reader: Box<dyn AsyncRead + Unpin>,
Expand Down Expand Up @@ -58,7 +58,7 @@ impl UnifiedIndexWriter {
}
}

pub async fn write<W>(mut self, writer: &mut W) -> Result<(u64, u64)>
pub async fn write<W>(mut self, writer: &mut W, cache: FileCache) -> Result<(u64, u64)>
where
W: AsyncWrite + Unpin,
{
Expand All @@ -70,7 +70,8 @@ impl UnifiedIndexWriter {
self.file_offsets.insert(file_name, start..written);
}

let footer_bytes = bincode_options().serialize(&IndexFooter::new(self.file_offsets))?;
let footer_bytes =
bincode_options().serialize(&IndexFooter::new(self.file_offsets, cache))?;
let footer_len = footer_bytes.len() as u64;

let footer_written = tokio::io::copy(&mut Cursor::new(footer_bytes), writer).await?;
Expand All @@ -84,6 +85,14 @@ impl UnifiedIndexWriter {

Ok((written + footer_len, footer_len))
}

#[cfg(test)]
pub async fn write_without_cache<W>(self, writer: &mut W) -> Result<(u64, u64)>
where
W: AsyncWrite + Unpin,
{
self.write(writer, HashMap::new()).await
}
}

#[cfg(test)]
Expand Down Expand Up @@ -119,7 +128,7 @@ mod tests {
]);

let mut buf = vec![];
let (_, footer_len) = writer.write(&mut buf).await?;
let (_, footer_len) = writer.write_without_cache(&mut buf).await?;

let file_slice = FileSlice::new(Arc::new(OwnedBytes::new(buf)));
let dir = UnifiedDirectory::open_with_len(file_slice, footer_len as usize)?;
Expand Down

0 comments on commit 5f0bcba

Please sign in to comment.