Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(mercury): introduce CacheObjectStorageStrategy
Browse files Browse the repository at this point in the history
el-ev committed Dec 19, 2024
1 parent d1e7479 commit 2a973ce
Showing 2 changed files with 81 additions and 21 deletions.
98 changes: 79 additions & 19 deletions mercury/src/internal/pack/cache_object.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fs::{self, OpenOptions};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{ops::Deref, sync::Arc};
@@ -7,10 +9,10 @@ use serde::{Deserialize, Serialize};
use threadpool::ThreadPool;

use crate::hash::SHA1;
use crate::internal::object::types::ObjectType;
use crate::internal::pack::entry::Entry;
use crate::internal::pack::utils;
use crate::internal::object::types::ObjectType;
use crate::utils::storable::{DefaultFileStorageStrategy, Storable};
use crate::utils::storable::{Storable, StorageStrategy};

// /// record heap-size of all CacheObjects, used for memory limit.
// static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0);
@@ -69,9 +71,9 @@ impl HeapSize for CacheObject {
/// If a [`CacheObject`] is [`ObjectType::HashDelta`] or [`ObjectType::OffsetDelta`],
/// it will expand to another [`CacheObject`] of other types. To prevent potential OOM,
/// we record the size of the expanded object as well as that of the object itself.
///
/// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`],
/// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same
///
/// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`],
/// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same
/// as the size of the data.
///
/// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details.
@@ -107,9 +109,68 @@ impl Drop for CacheObject {
}
}

pub struct CacheObjectStorageStrategy;

impl StorageStrategy<CacheObject> for CacheObjectStorageStrategy {
fn load(location: &Path) -> Result<CacheObject, std::io::Error> {
let de_err = || std::io::Error::new(std::io::ErrorKind::Other, "Deserialize Error");
let stream = OpenOptions::new().read(true).open(location)?;
let mut reader = std::io::BufReader::new(stream);
let info: CacheObjectInfo = bincode::deserialize_from(&mut reader).map_err(|_|de_err())?;
let offset: usize = bincode::deserialize_from(&mut reader).map_err(|_|de_err())?;
let data_len: usize = bincode::deserialize_from(&mut reader).map_err(|_|de_err())?;
let mut data_decompressed = Vec::new();
reader.read_to_end(&mut data_decompressed).map_err(|_|de_err())?;
if data_decompressed.len() != data_len {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Data length mismatch",
));
}
match info {
CacheObjectInfo::BaseObject(_, _) => Ok(CacheObject {
info,
offset,
data_decompressed,
mem_recorder: None,
}),
_ => unreachable!()
}
}

fn store(obj: &CacheObject, location: &Path) -> Result<(), std::io::Error> {
if location.exists() {
return Ok(());
}
let tmp_path = location.with_extension("temp");
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)?;
// When this code is written, `CacheObject` is like this:
// NOTE: this should be updated if `CacheObject` is changed
// pub struct CacheObject {
// pub(crate) info: CacheObjectInfo,
// pub offset: usize,
// pub data_decompressed: Vec<u8>,
// pub mem_recorder: Option<Arc<AtomicUsize>>,
// }
let info = bincode::serialize(&obj.info).unwrap();
let offset = bincode::serialize(&obj.offset).unwrap();
let data_len = bincode::serialize(&(obj.data_decompressed.len())).unwrap();
file.write_all(&info)?;
file.write_all(&offset)?;
file.write_all(&data_len)?;
file.write_all(&obj.data_decompressed)?;
drop(file);
fs::rename(tmp_path, location)?;
Ok(())
}
}

impl Storable for CacheObject {
type Location = Path;
type Strategy = DefaultFileStorageStrategy;
type Strategy = CacheObjectStorageStrategy;
}

/// Heap-size recorder for a class(struct)
@@ -160,7 +221,7 @@ impl CacheObject {
}

/// Get the [`SHA1`] hash of the object.
///
///
/// If the object is a delta object, return [`None`].
pub fn base_object_hash(&self) -> Option<SHA1> {
match &self.info {
@@ -170,7 +231,7 @@ impl CacheObject {
}

/// Get the offset delta of the object.
///
///
/// If the object is not an offset delta, return [`None`].
pub fn offset_delta(&self) -> Option<usize> {
match &self.info {
@@ -180,7 +241,7 @@ impl CacheObject {
}

/// Get the hash delta of the object.
///
///
/// If the object is not a hash delta, return [`None`].
pub fn hash_delta(&self) -> Option<SHA1> {
match &self.info {
@@ -205,16 +266,10 @@ impl CacheObject {
}

/// trait alias for simple use
pub trait ArcWrapperBounds:
HeapSize + Storable<Location = Path> + Send + Sync + 'static
{
}
pub trait ArcWrapperBounds: HeapSize + Storable<Location = Path> + Send + Sync + 'static {}
// You must impl `Alias Trait` for all the `T` satisfying Constraints
// Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits
impl<T: HeapSize + Storable<Location = Path> + Send + Sync + 'static> ArcWrapperBounds
for T
{
}
impl<T: HeapSize + Storable<Location = Path> + Send + Sync + 'static> ArcWrapperBounds for T {}

/// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type
/// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced
@@ -307,6 +362,7 @@ mod test {

use lru_mem::LruCache;

use crate::utils::storable::DefaultFileStorageStrategy;
use crate::MERCURY_DEFAULT_TMP_DIR;

use super::*;
@@ -346,7 +402,7 @@ mod test {
#[test]
fn test_cache_object_with_lru() {
let mut cache = LruCache::new(2048);

let hash_a = SHA1::default();
let hash_b = SHA1::new(b"b"); // whatever different hash
let a = CacheObject {
@@ -481,7 +537,11 @@ mod test {
let mut path = PathBuf::from(MERCURY_DEFAULT_TMP_DIR).join("test_arc_wrapper_drop_store");
fs::create_dir_all(&path).unwrap();
path.push("test_obj");
let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), Arc::new(AtomicBool::new(false)), None);
let mut a = ArcWrapper::new(
Arc::new(Test { a: 1024 }),
Arc::new(AtomicBool::new(false)),
None,
);
a.set_store_path(path.clone());
drop(a);

4 changes: 2 additions & 2 deletions mercury/src/utils/storable.rs
Original file line number Diff line number Diff line change
@@ -260,14 +260,14 @@ impl<T> StorageStrategy<T> for DefaultFileStorageStrategy
where
T: Serialize + for<'de> Deserialize<'de> + Storable<Location = Path>,
{
fn load(location: &T::Location) -> Result<T, io::Error> {
fn load(location: &Path) -> Result<T, io::Error> {
let data = fs::read(location)?;
let obj: T =
bincode::deserialize(&data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(obj)
}

fn store(obj: &T, location: &T::Location) -> Result<(), io::Error> {
fn store(obj: &T, location: &Path) -> Result<(), io::Error> {
if location.exists() {
return Ok(());
}

0 comments on commit 2a973ce

Please sign in to comment.