Skip to content

Commit

Permalink
storage: Improve read performance by adding a page cache
Browse files Browse the repository at this point in the history
Instead of reading from the data source each time, fetch a page into
a btree to act as a cache for later reads from the same page.
  • Loading branch information
tontinton committed Jun 29, 2024
1 parent 92ae257 commit 6faa6bb
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 8 deletions.
10 changes: 7 additions & 3 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{
FastFieldNormalizerType, FieldTokenizerType, IndexConfig,
},
opendal_file_handle::OpenDalFileHandle,
page_cache_file_handle::PageCacheFileHandle,
unified_index::{
file_cache::build_file_cache, unified_directory::UnifiedDirectory,
writer::UnifiedIndexWriter,
Expand Down Expand Up @@ -118,10 +119,13 @@ async fn open_unified_directories(
let mut directories_args = Vec::with_capacity(items.len());
for item in items {
let reader = op.reader_with(&item.file_name).await?;
let file_slice = FileSlice::new(Arc::new(OpenDalFileHandle::new(
let file_slice = FileSlice::new(Arc::new(PageCacheFileHandle::new(
handle.clone(),
reader,
item.len as usize,
Box::new(OpenDalFileHandle::new(
handle.clone(),
reader,
item.len as usize,
)),
)));
directories_args.push((item.id, file_slice, item.footer_len as usize))
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod commands;
pub mod config;
pub mod merge_directory;
pub mod opendal_file_handle;
pub mod page_cache_file_handle;
pub mod unified_index;

#[macro_use]
Expand Down
16 changes: 11 additions & 5 deletions src/opendal_file_handle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::Range;

use async_trait::async_trait;
use opendal::Reader;
use tantivy::{
directory::{FileHandle, OwnedBytes},
Expand All @@ -25,17 +26,22 @@ impl OpenDalFileHandle {

impl std::fmt::Debug for OpenDalFileHandle {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "FileReader({})", &self.size)
write!(f, "OpenDalFileHandle({})", &self.size)
}
}

#[async_trait]
impl FileHandle for OpenDalFileHandle {
fn read_bytes(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
self.handle.block_on(self.read_bytes_async(range))
}

async fn read_bytes_async(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
let mut bytes = Vec::new();
let size = self.handle.block_on(
self.reader
.read_into(&mut bytes, range.start as u64..range.end as u64),
)?;
let size = self
.reader
.read_into(&mut bytes, range.start as u64..range.end as u64)
.await?;
assert_eq!(size, bytes.len());
Ok(OwnedBytes::new(bytes))
}
Expand Down
148 changes: 148 additions & 0 deletions src/page_cache_file_handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::{collections::BTreeMap, ops::Range, sync::RwLock};

use futures::future::try_join_all;
use tantivy::{
directory::{FileHandle, OwnedBytes},
HasLen,
};
use tokio::runtime::Handle;

const PAGE_SIZE: usize = 4096;

pub struct PageCacheFileHandle {
handle: Handle,
inner: Box<dyn FileHandle>,
cache: RwLock<BTreeMap<u32, OwnedBytes>>,
}

impl PageCacheFileHandle {
pub fn new(handle: Handle, inner: Box<dyn FileHandle>) -> Self {
Self {
handle,
inner,
cache: RwLock::new(BTreeMap::new()),
}
}

fn fetch_pages_into_cache(&self, pages: &[u32]) -> std::io::Result<()> {
if pages.is_empty() {
return Ok(());
}

let mut merged_ranges = Vec::new();
let mut start_page = pages[0];
let mut end_page = start_page + 1;

for &page in pages.iter().skip(1) {
if page == end_page {
end_page = page + 1;
} else {
merged_ranges.push((start_page, end_page));
start_page = page;
end_page = page + 1;
}
}
merged_ranges.push((start_page, end_page));

let read_futures = merged_ranges
.iter()
.map(|(start_page, end_page)| {
let start = *start_page as usize * PAGE_SIZE;
let end = *end_page as usize * PAGE_SIZE;
self.inner.read_bytes_async(start..end)
})
.collect::<Vec<_>>();

let results = self.handle.block_on(try_join_all(read_futures))?;

for ((start_page, end_page), bytes) in merged_ranges.into_iter().zip(results) {
let mut cache = self.cache.write().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Cache lock is poisoned.")
})?;

let page_range_len = (end_page - start_page) as usize;
for (i, page) in (start_page..end_page).enumerate() {
let page_end = if i == page_range_len - 1 && bytes.len() % PAGE_SIZE != 0 {
i * PAGE_SIZE + bytes.len() % PAGE_SIZE
} else {
(i + 1) * PAGE_SIZE
};
let page_bytes = OwnedBytes::new(bytes[i * PAGE_SIZE..page_end].to_vec());
cache.insert(page, page_bytes);
}
}

Ok(())
}
}

impl std::fmt::Debug for PageCacheFileHandle {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "PageCacheFileHandle({})", &self.inner.len())
}
}

fn range_to_pages(range: &Range<usize>) -> Vec<u32> {
let mut pages = Vec::new();
let start_page = range.start / PAGE_SIZE;
let end_page = (range.end + PAGE_SIZE - 1) / PAGE_SIZE;

for page in start_page..end_page {
pages.push(page as u32);
}

pages
}

impl FileHandle for PageCacheFileHandle {
fn read_bytes(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
let pages = range_to_pages(&range);
let num_pages = pages.len();
assert_ne!(num_pages, 0);

let missing_pages = {
let cache = self.cache.read().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Cache lock is poisoned.")
})?;

pages
.iter()
.filter(|page| !cache.contains_key(page))
.copied()
.collect::<Vec<_>>()
};

self.fetch_pages_into_cache(&missing_pages)?;

let mut bytes = Vec::with_capacity(range.len());
{
let cache = self.cache.read().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Cache lock is poisoned.")
})?;

for (i, page) in pages.into_iter().enumerate() {
let page_bytes = cache
.get(&page)
.unwrap_or_else(|| panic!("just added page {page}, yet not found in cache?"));

let start = if i == 0 { range.start % PAGE_SIZE } else { 0 };

let end = if i == num_pages - 1 && range.end % PAGE_SIZE != 0 {
range.end % PAGE_SIZE
} else {
PAGE_SIZE
};

bytes.extend_from_slice(&page_bytes[start..end]);
}
}

Ok(OwnedBytes::new(bytes))
}
}

impl HasLen for PageCacheFileHandle {
fn len(&self) -> usize {
self.inner.len()
}
}

0 comments on commit 6faa6bb

Please sign in to comment.