diff --git a/mercury/src/internal/pack/cache_object.rs b/mercury/src/internal/pack/cache_object.rs index beee61fd..09d90777 100644 --- a/mercury/src/internal/pack/cache_object.rs +++ b/mercury/src/internal/pack/cache_object.rs @@ -1,3 +1,5 @@ +use std::fs::{self, OpenOptions}; +use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{ops::Deref, sync::Arc}; @@ -7,10 +9,10 @@ use serde::{Deserialize, Serialize}; use threadpool::ThreadPool; use crate::hash::SHA1; +use crate::internal::object::types::ObjectType; use crate::internal::pack::entry::Entry; use crate::internal::pack::utils; -use crate::internal::object::types::ObjectType; -use crate::utils::storable::{DefaultFileStorageStrategy, Storable}; +use crate::utils::storable::{Storable, StorageStrategy}; // /// record heap-size of all CacheObjects, used for memory limit. // static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0); @@ -69,9 +71,9 @@ impl HeapSize for CacheObject { /// If a [`CacheObject`] is [`ObjectType::HashDelta`] or [`ObjectType::OffsetDelta`], /// it will expand to another [`CacheObject`] of other types. To prevent potential OOM, /// we record the size of the expanded object as well as that of the object itself. - /// - /// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`], - /// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same + /// + /// Base objects, *i.e.*, [`ObjectType::Blob`], [`ObjectType::Tree`], [`ObjectType::Commit`], + /// and [`ObjectType::Tag`], will not be expanded, so the heap-size of the object is the same /// as the size of the data. /// /// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details. @@ -107,9 +109,68 @@ impl Drop for CacheObject { } } +pub struct CacheObjectStorageStrategy; + +impl StorageStrategy for CacheObjectStorageStrategy { + fn load(location: &Path) -> Result { + let de_err = || std::io::Error::new(std::io::ErrorKind::Other, "Deserialize Error"); + let stream = OpenOptions::new().read(true).open(location)?; + let mut reader = std::io::BufReader::new(stream); + let info: CacheObjectInfo = bincode::deserialize_from(&mut reader).map_err(|_|de_err())?; + let offset: usize = bincode::deserialize_from(&mut reader).map_err(|_|de_err())?; + let data_len: usize = bincode::deserialize_from(&mut reader).map_err(|_|de_err())?; + let mut data_decompressed = Vec::new(); + reader.read_to_end(&mut data_decompressed).map_err(|_|de_err())?; + if data_decompressed.len() != data_len { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Data length mismatch", + )); + } + match info { + CacheObjectInfo::BaseObject(_, _) => Ok(CacheObject { + info, + offset, + data_decompressed, + mem_recorder: None, + }), + _ => unreachable!() + } + } + + fn store(obj: &CacheObject, location: &Path) -> Result<(), std::io::Error> { + if location.exists() { + return Ok(()); + } + let tmp_path = location.with_extension("temp"); + let mut file = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path)?; + // When this code is written, `CacheObject` is like this: + // NOTE: this should be updated if `CacheObject` is changed + // pub struct CacheObject { + // pub(crate) info: CacheObjectInfo, + // pub offset: usize, + // pub data_decompressed: Vec, + // pub mem_recorder: Option>, + // } + let info = bincode::serialize(&obj.info).unwrap(); + let offset = bincode::serialize(&obj.offset).unwrap(); + let data_len = bincode::serialize(&(obj.data_decompressed.len())).unwrap(); + file.write_all(&info)?; + file.write_all(&offset)?; + file.write_all(&data_len)?; + file.write_all(&obj.data_decompressed)?; + drop(file); + fs::rename(tmp_path, location)?; + Ok(()) + } +} + impl Storable for CacheObject { type Location = Path; - type Strategy = DefaultFileStorageStrategy; + type Strategy = CacheObjectStorageStrategy; } /// Heap-size recorder for a class(struct) @@ -160,7 +221,7 @@ impl CacheObject { } /// Get the [`SHA1`] hash of the object. - /// + /// /// If the object is a delta object, return [`None`]. pub fn base_object_hash(&self) -> Option { match &self.info { @@ -170,7 +231,7 @@ impl CacheObject { } /// Get the offset delta of the object. - /// + /// /// If the object is not an offset delta, return [`None`]. pub fn offset_delta(&self) -> Option { match &self.info { @@ -180,7 +241,7 @@ impl CacheObject { } /// Get the hash delta of the object. - /// + /// /// If the object is not a hash delta, return [`None`]. pub fn hash_delta(&self) -> Option { match &self.info { @@ -205,16 +266,10 @@ impl CacheObject { } /// trait alias for simple use -pub trait ArcWrapperBounds: - HeapSize + Storable + Send + Sync + 'static -{ -} +pub trait ArcWrapperBounds: HeapSize + Storable + Send + Sync + 'static {} // You must impl `Alias Trait` for all the `T` satisfying Constraints // Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits -impl + Send + Sync + 'static> ArcWrapperBounds - for T -{ -} +impl + Send + Sync + 'static> ArcWrapperBounds for T {} /// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type /// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced @@ -307,6 +362,7 @@ mod test { use lru_mem::LruCache; + use crate::utils::storable::DefaultFileStorageStrategy; use crate::MERCURY_DEFAULT_TMP_DIR; use super::*; @@ -346,7 +402,7 @@ mod test { #[test] fn test_cache_object_with_lru() { let mut cache = LruCache::new(2048); - + let hash_a = SHA1::default(); let hash_b = SHA1::new(b"b"); // whatever different hash let a = CacheObject { @@ -481,7 +537,11 @@ mod test { let mut path = PathBuf::from(MERCURY_DEFAULT_TMP_DIR).join("test_arc_wrapper_drop_store"); fs::create_dir_all(&path).unwrap(); path.push("test_obj"); - let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), Arc::new(AtomicBool::new(false)), None); + let mut a = ArcWrapper::new( + Arc::new(Test { a: 1024 }), + Arc::new(AtomicBool::new(false)), + None, + ); a.set_store_path(path.clone()); drop(a); diff --git a/mercury/src/utils/storable.rs b/mercury/src/utils/storable.rs index d97959e2..6150e2cb 100644 --- a/mercury/src/utils/storable.rs +++ b/mercury/src/utils/storable.rs @@ -260,14 +260,14 @@ impl StorageStrategy for DefaultFileStorageStrategy where T: Serialize + for<'de> Deserialize<'de> + Storable, { - fn load(location: &T::Location) -> Result { + fn load(location: &Path) -> Result { let data = fs::read(location)?; let obj: T = bincode::deserialize(&data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; Ok(obj) } - fn store(obj: &T, location: &T::Location) -> Result<(), io::Error> { + fn store(obj: &T, location: &Path) -> Result<(), io::Error> { if location.exists() { return Ok(()); }