Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mercury): introduce Storable #769

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
5 changes: 2 additions & 3 deletions libra/src/utils/client_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use mercury::internal::pack::Pack;
use mercury::errors::GitError;
use mercury::hash::SHA1;
use mercury::internal::object::types::ObjectType;
use mercury::utils::read_sha1;

use crate::command;
static PACK_OBJ_CACHE: Lazy<Mutex<LruCache<String, CacheObject>>> = Lazy::new(|| {
Expand Down Expand Up @@ -265,7 +264,7 @@ impl ClientStorage {
let mut objs = Vec::new();
for _ in 0..fanout[255] {
let _offset = idx_file.read_u32::<BigEndian>()?;
let hash = read_sha1(&mut idx_file)?;
let hash = SHA1::from_stream(&mut idx_file)?;

objs.push(hash);
}
Expand All @@ -288,7 +287,7 @@ impl ClientStorage {
idx_file.seek(io::SeekFrom::Start(FANOUT + 24 * start as u64))?;
for _ in start..end {
let offset = idx_file.read_u32::<BigEndian>()?;
let hash = read_sha1(&mut idx_file)?;
let hash = SHA1::from_stream(&mut idx_file)?;

if &hash == obj_id {
return Ok(Some(offset as u64));
Expand Down
13 changes: 7 additions & 6 deletions mercury/src/internal/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};

use crate::utils;
use crate::errors::GitError;
use crate::hash::SHA1;
use crate::internal::pack::wrapper::Wrapper;
Expand Down Expand Up @@ -246,7 +245,7 @@ impl Index {
uid: file.read_u32::<BigEndian>()?,
gid: file.read_u32::<BigEndian>()?,
size: file.read_u32::<BigEndian>()?,
hash: utils::read_sha1(file)?,
hash: SHA1::from_stream(file)?,
flags: Flags::from(file.read_u16::<BigEndian>()?),
name: String::new(),
};
Expand All @@ -260,19 +259,21 @@ impl Index {
// 1-8 nul bytes as necessary to pad the entry to a multiple of eight bytes
// while keeping the name NUL-terminated. // so at least 1 byte nul
let padding = 8 - ((22 + name_len) % 8); // 22 = sha1 + flags, others are 40 % 8 == 0
utils::read_bytes(file, padding)?;
// utils::read_bytes(file, padding)?;
file.read_exact(&mut vec![0; padding])?;
}

// Extensions
while file.bytes_read() + SHA1::SIZE < total_size as usize {
// The remaining 20 bytes must be checksum
let sign = utils::read_bytes(file, 4)?;
let mut sign = vec![0; 4];
file.read_exact(&mut sign)?;
println!("{:?}", String::from_utf8(sign.clone())?);
// If the first byte is 'A'...'Z' the extension is optional and can be ignored.
if sign[0] >= b'A' && sign[0] <= b'Z' {
// Optional extension
let size = file.read_u32::<BigEndian>()?;
utils::read_bytes(file, size as usize)?; // Ignore the extension
file.read_exact(&mut vec![0; size as usize])?; // Ignore the extension
} else {
// 'link' or 'sdir' extension
return Err(GitError::InvalidIndexFile(
Expand All @@ -283,7 +284,7 @@ impl Index {

// check sum
let file_hash = file.final_hash();
let check_sum = utils::read_sha1(file)?;
let check_sum = SHA1::from_stream(file)?;
if file_hash != check_sum {
return Err(GitError::InvalidIndexFile("Check sum failed".to_string()));
}
Expand Down
13 changes: 6 additions & 7 deletions mercury/src/internal/pack/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use threadpool::ThreadPool;

use crate::time_it;
use crate::hash::SHA1;
use crate::internal::pack::cache_object::{ArcWrapper, CacheObject, MemSizeRecorder, FileLoadStore};
use crate::internal::pack::cache_object::{ArcWrapper, CacheObject, MemSizeRecorder};

use crate::utils::storable::Storable;

pub trait _Cache {
fn new(mem_size: Option<usize>, tmp_path: PathBuf, thread_num: usize) -> Self
Expand Down Expand Up @@ -100,7 +102,7 @@ impl Caches {
}

fn read_from_temp(path: &Path) -> io::Result<CacheObject> {
let obj = CacheObject::f_load(path)?;
let obj = CacheObject::load(path)?;
// Deserializing will also create an object but without Construction outside and `::new()`
// So if you want to do sth. while Constructing, impl Deserialize trait yourself
obj.record_mem_size();
Expand Down Expand Up @@ -230,15 +232,12 @@ impl _Cache for Caches {

#[cfg(test)]
mod test {
use std::env;

use super::*;
use crate::{hash::SHA1, internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo}};
use crate::{hash::SHA1, internal::{object::types::ObjectType, pack::cache_object::CacheObjectInfo}, MERCURY_DEFAULT_TMP_DIR};

#[test]
fn test_cache_single_thread() {
let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap());
let cache = Caches::new(Some(2048), source.clone().join("tests/.cache_tmp"), 1);
let cache = Caches::new(Some(2048), MERCURY_DEFAULT_TMP_DIR.into(), 1);
let a_hash = SHA1::new(String::from("a").as_bytes());
let b_hash = SHA1::new(String::from("b").as_bytes());
let a = CacheObject {
Expand Down
107 changes: 43 additions & 64 deletions mercury/src/internal/pack/cache_object.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,20 @@
use std::fs::OpenOptions;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{fs, io};
use std::{ops::Deref, sync::Arc};

use lru_mem::{HeapSize, MemSize};
use serde::{Deserialize, Serialize};
use threadpool::ThreadPool;

use crate::hash::SHA1;
use crate::internal::object::types::ObjectType;
use crate::internal::pack::entry::Entry;
use crate::internal::pack::utils;
use crate::{hash::SHA1, internal::object::types::ObjectType};
use crate::utils::storable::{DefaultFileStorageStrategy, Storable};

// /// record heap-size of all CacheObjects, used for memory limit.
// static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0);

/// file load&store trait
pub trait FileLoadStore: Serialize + for<'a> Deserialize<'a> {
fn f_load(path: &Path) -> Result<Self, io::Error>;
fn f_save(&self, path: &Path) -> Result<(), io::Error>;
}

// trait alias, so that impl FileLoadStore == impl Serialize + Deserialize
impl<T: Serialize + for<'a> Deserialize<'a>> FileLoadStore for T {
fn f_load(path: &Path) -> Result<T, io::Error> {
let data = fs::read(path)?;
let obj: T =
bincode::deserialize(&data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(obj)
}
fn f_save(&self, path: &Path) -> Result<(), io::Error> {
if path.exists() {
return Ok(());
}
let data = bincode::serialize(&self).unwrap();
let path = path.with_extension("temp");
{
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(path.clone())?;
file.write_all(&data)?;
}
let final_path = path.with_extension("");
fs::rename(&path, final_path.clone())?;
Ok(())
}
}

/// Represents the metadata of a cache object, indicating whether it is a delta or not.
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub(crate) enum CacheObjectInfo {
Expand Down Expand Up @@ -103,9 +69,9 @@ impl HeapSize for CacheObject {
/// If a [`CacheObject`] is [`ObjectType::HashDelta`] or [`ObjectType::OffsetDelta`],
/// it will expand to another [`CacheObject`] of other types. To prevent potential OOM,
/// we record the size of the expanded object as well as that of the object itself.
///
/// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`],
/// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same
///
/// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`],
/// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same
/// as the size of the data.
///
/// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details.
Expand Down Expand Up @@ -141,6 +107,12 @@ impl Drop for CacheObject {
}
}

/// Allow CacheObject to be stored and loaded to/from a file.
impl Storable for CacheObject {
type Location = Path;
type Strategy = DefaultFileStorageStrategy;
}

/// Heap-size recorder for a class(struct)
/// <br> You should use a static Var to record mem-size
/// and record mem-size after construction & minus it in `drop()`
Expand Down Expand Up @@ -189,7 +161,7 @@ impl CacheObject {
}

/// Get the [`SHA1`] hash of the object.
///
///
/// If the object is a delta object, return [`None`].
pub fn base_object_hash(&self) -> Option<SHA1> {
match &self.info {
Expand All @@ -199,7 +171,7 @@ impl CacheObject {
}

/// Get the offset delta of the object.
///
///
/// If the object is not an offset delta, return [`None`].
pub fn offset_delta(&self) -> Option<usize> {
match &self.info {
Expand All @@ -209,7 +181,7 @@ impl CacheObject {
}

/// Get the hash delta of the object.
///
///
/// If the object is not a hash delta, return [`None`].
pub fn hash_delta(&self) -> Option<SHA1> {
match &self.info {
Expand All @@ -233,17 +205,10 @@ impl CacheObject {
}
}

/// trait alias for simple use
pub trait ArcWrapperBounds:
HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static
{
}
// You must impl `Alias Trait` for all the `T` satisfying Constraints
// Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits
impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds
for T
{
}
/// trait alias for simplicity
pub trait ArcWrapperBounds: HeapSize + Storable<Location = Path> + Send + Sync + 'static {}
// Auto implement the trait alias for all types that implement the required traits
impl<T: HeapSize + Storable<Location = Path> + Send + Sync + 'static> ArcWrapperBounds for T {}

/// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type
/// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced
Expand All @@ -254,6 +219,7 @@ pub struct ArcWrapper<T: ArcWrapperBounds> {
pool: Option<Arc<ThreadPool>>,
pub store_path: Option<PathBuf>, // path to store when drop
}

impl<T: ArcWrapperBounds> ArcWrapper<T> {
/// Create a new ArcWrapper
pub fn new(data: Arc<T>, share_flag: Arc<AtomicBool>, pool: Option<Arc<ThreadPool>>) -> Self {
Expand Down Expand Up @@ -293,6 +259,7 @@ impl<T: ArcWrapperBounds> Deref for ArcWrapper<T> {
&self.data
}
}

impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
// `drop` will be called in `lru_cache.insert()` when cache full & eject the LRU
// `lru_cache.insert()` is protected by Mutex
Expand All @@ -311,17 +278,17 @@ impl<T: ArcWrapperBounds> Drop for ArcWrapper<T> {
}
pool.execute(move || {
if !complete_signal.load(Ordering::Acquire) {
let res = data_copy.f_save(&path_copy);
let res = data_copy.store(&path_copy);
if let Err(e) = res {
println!("[f_save] {:?} error: {:?}", path_copy, e);
eprintln!("[{}] Error while storing {:?}: {:?}", std::any::type_name::<Self>(), path_copy, e);
}
}
});
}
None => {
let res = self.data.f_save(path);
let res = self.data.store(path);
if let Err(e) = res {
println!("[f_save] {:?} error: {:?}", path, e);
eprintln!("[{}] Error while storing {:?}: {:?}", std::any::type_name::<Self>(), path, e);
}
}
}
Expand All @@ -335,6 +302,9 @@ mod test {

use lru_mem::LruCache;

use crate::utils::storable::DefaultFileStorageStrategy;
use crate::MERCURY_DEFAULT_TMP_DIR;

use super::*;
#[test]
#[ignore = "only in single thread"]
Expand Down Expand Up @@ -372,7 +342,7 @@ mod test {
#[test]
fn test_cache_object_with_lru() {
let mut cache = LruCache::new(2048);

let hash_a = SHA1::default();
let hash_b = SHA1::new(b"b"); // whatever different hash
let a = CacheObject {
Expand Down Expand Up @@ -408,7 +378,7 @@ mod test {
panic!("Expected WouldEjectLru error");
}
let r = cache.insert(
hash_a.to_string(),
hash_b.to_string(),
ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None),
);
assert!(r.is_ok());
Expand All @@ -434,6 +404,11 @@ mod test {
self.a
}
}
impl Storable for Test {
type Location = Path;
type Strategy = DefaultFileStorageStrategy;
}

#[test]
fn test_lru_drop() {
println!("insert a");
Expand Down Expand Up @@ -499,10 +474,14 @@ mod test {

#[test]
fn test_arc_wrapper_drop_store() {
let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store");
let mut path = PathBuf::from(MERCURY_DEFAULT_TMP_DIR).join("test_arc_wrapper_drop_store");
fs::create_dir_all(&path).unwrap();
path.push("test_obj");
let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None);
let mut a = ArcWrapper::new(
Arc::new(Test { a: 1024 }),
Arc::new(AtomicBool::new(false)),
None,
);
a.set_store_path(path.clone());
drop(a);

Expand All @@ -515,7 +494,7 @@ mod test {
/// test warpper can't correctly store the data when lru eject it
fn test_arc_wrapper_with_lru() {
let mut cache = LruCache::new(1500);
let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru");
let path = PathBuf::from(MERCURY_DEFAULT_TMP_DIR).join("test_arc_wrapper_with_lru");
let _ = fs::remove_dir_all(&path);
fs::create_dir_all(&path).unwrap();
let shared_flag = Arc::new(AtomicBool::new(false));
Expand All @@ -525,7 +504,7 @@ mod test {
{
let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
a.set_store_path(a_path.clone());
let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None);
let b = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None);
assert!(b.store_path.is_none());

println!("insert a with heap size: {:?}", a.heap_size());
Expand Down
Loading
Loading