diff --git a/mercury/src/internal/pack/cache_object.rs b/mercury/src/internal/pack/cache_object.rs index 967d761a..700e6a15 100644 --- a/mercury/src/internal/pack/cache_object.rs +++ b/mercury/src/internal/pack/cache_object.rs @@ -9,9 +9,9 @@ use lru_mem::{HeapSize, MemSize}; use serde::{Deserialize, Serialize}; use threadpool::ThreadPool; +use crate::internal::pack::entry::Entry; use crate::internal::pack::utils; use crate::{hash::SHA1, internal::object::types::ObjectType}; -use crate::internal::pack::entry::Entry; // /// record heap-size of all CacheObjects, used for memory limit. // static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0); @@ -56,7 +56,14 @@ pub struct CacheObject { pub data_decompress: Vec, pub offset: usize, pub hash: SHA1, - pub mem_recorder: Option> // record mem-size of all CacheObjects of a Pack + /// If a [`CacheObject`] is an [`ObjectType::HashDelta`] or an [`ObjectType::OffsetDelta`], + /// it will expand to another [`CacheObject`] of other types. To prevent potential OOM, + /// we record the size of the expanded object as well as that of the object itself. + /// + /// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details. + #[serde(skip, default = "usize::default")] + pub delta_final_size: usize, + pub mem_recorder: Option>, // record mem-size of all CacheObjects of a Pack } impl Clone for CacheObject { @@ -68,6 +75,7 @@ impl Clone for CacheObject { data_decompress: self.data_decompress.clone(), offset: self.offset, hash: self.hash, + delta_final_size: self.delta_final_size, mem_recorder: self.mem_recorder.clone(), }; obj.record_mem_size(); @@ -87,6 +95,7 @@ impl Default for CacheObject { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::default(), + delta_final_size: 0, mem_recorder: None, }; obj.record_mem_size(); @@ -98,8 +107,21 @@ impl Default for CacheObject { // ! the implementation of HeapSize is not accurate, only calculate the size of the data_decompress // Note that: mem_size == value_size + heap_size, and we only need to impl HeapSize because value_size is known impl HeapSize for CacheObject { + /// For [`ObjectType::OffsetDelta`] and [`ObjectType::HashDelta`], + /// `delta_final_size` is the size of the expanded object; + /// for other types, `delta_final_size` is 0 as they won't expand. fn heap_size(&self) -> usize { - self.data_decompress.heap_size() + // To those who are concerned about why these two values are added, + // let's consider the lifetime of two `CacheObject`s, say `delta_obj` + // and `final_obj` in the function `Pack::rebuild_delta`. + // + // `delta_obj` is dropped only after `Pack::rebuild_delta` returns, + // but the space for `final_obj` is allocated in that function. + // + // Therefore, during the execution of `Pack::rebuild_delta`, both `delta_obj` + // and `final_obj` coexist. The maximum memory usage is the sum of the memory + // usage of `delta_obj` and `final_obj`. + self.data_decompress.heap_size() + self.delta_final_size } } @@ -111,7 +133,6 @@ impl Drop for CacheObject { if let Some(mem_recorder) = &self.mem_recorder { mem_recorder.fetch_sub((*self).mem_size(), Ordering::SeqCst); } - } } @@ -146,7 +167,7 @@ impl MemSizeRecorder for CacheObject { } impl CacheObject { - /// Create a new CacheObject witch is not offset_delta or hash_delta + /// Create a new CacheObject which is neither [`ObjectType::OffsetDelta`] nor [`ObjectType::HashDelta`]. pub fn new_for_undeltified(obj_type: ObjectType, data: Vec, offset: usize) -> Self { let hash = utils::calculate_object_hash(obj_type, &data); CacheObject { @@ -154,6 +175,7 @@ impl CacheObject { obj_type, offset, hash, + delta_final_size: 0, // Only delta objects have `delta_final_size` mem_recorder: None, ..Default::default() } @@ -162,13 +184,11 @@ impl CacheObject { /// transform the CacheObject to Entry pub fn to_entry(&self) -> Entry { match self.obj_type { - ObjectType::Blob | ObjectType::Tree | ObjectType::Commit | ObjectType::Tag => { - Entry { - obj_type: self.obj_type, - data: self.data_decompress.clone(), - hash: self.hash, - } - } + ObjectType::Blob | ObjectType::Tree | ObjectType::Commit | ObjectType::Tag => Entry { + obj_type: self.obj_type, + data: self.data_decompress.clone(), + hash: self.hash, + }, _ => { unreachable!("delta object should not persist!") } @@ -177,10 +197,16 @@ impl CacheObject { } /// trait alias for simple use -pub trait ArcWrapperBounds: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static {} +pub trait ArcWrapperBounds: + HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static +{ +} // You must impl `Alias Trait` for all the `T` satisfying Constraints // Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits -impl Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds for T {} +impl Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds + for T +{ +} /// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type /// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced @@ -300,6 +326,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), + delta_final_size: 0, mem_recorder: None, }; assert!(a.heap_size() == 1024); @@ -318,6 +345,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), + delta_final_size: 0, mem_recorder: None, }; println!("a.heap_size() = {}", a.heap_size()); @@ -329,6 +357,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![1; 20]), + delta_final_size: 0, mem_recorder: None, }; { @@ -433,6 +462,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), + delta_final_size: 0, mem_recorder: None, }; let s = bincode::serialize(&a).unwrap(); diff --git a/mercury/src/internal/pack/decode.rs b/mercury/src/internal/pack/decode.rs index e5332969..609a5585 100644 --- a/mercury/src/internal/pack/decode.rs +++ b/mercury/src/internal/pack/decode.rs @@ -275,11 +275,15 @@ impl Pack { }) .unwrap(); + let mut reader = Cursor::new(&data); + let (_, final_size) = utils::read_delta_object_size(&mut reader)?; + Ok(CacheObject { base_offset, data_decompress: data, obj_type: t, offset: init_offset, + delta_final_size: final_size, mem_recorder: None, ..Default::default() }) @@ -292,12 +296,16 @@ impl Pack { let (data, raw_size) = self.decompress_data(pack, size)?; *offset += raw_size; + + let mut reader = Cursor::new(&data); + let (_, final_size) = utils::read_delta_object_size(&mut reader)?; Ok(CacheObject { base_ref: ref_sha1, data_decompress: data, obj_type: t, offset: init_offset, + delta_final_size: final_size, mem_recorder: None, ..Default::default() }) @@ -306,8 +314,6 @@ impl Pack { } /// Decodes a pack file from a given Read and BufRead source and get a vec of objects. - /// - /// pub fn decode(&mut self, pack: &mut (impl BufRead + Send), callback: F) -> Result<(), GitError> where F: Fn(Entry, usize) + Sync + Send + 'static @@ -338,7 +344,7 @@ impl Pack { let mut offset: usize = 12; let mut i = 0; while i < self.number { - // log per 2000&more then 1 se objects + // log per 1000 objects and 1 second if i%1000 == 0 { let time_now = time.elapsed().as_millis(); if time_now - last_update_time > 1000 { @@ -536,16 +542,15 @@ impl Pack { let mut stream = Cursor::new(&delta_obj.data_decompress); - // Read the base object size & Result Size + // Read the base object size // (Size Encoding) - let base_size = utils::read_varint_le(&mut stream).unwrap().0; - let result_size = utils::read_varint_le(&mut stream).unwrap().0; + let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap(); //Get the base object row data let base_info = &base_obj.data_decompress; - assert_eq!(base_info.len() as u64, base_size); + assert_eq!(base_info.len(), base_size, "Base object size mismatch"); - let mut result = Vec::with_capacity(result_size as usize); + let mut result = Vec::with_capacity(result_size); loop { // Check if the stream has ended, meaning the new object is done @@ -597,7 +602,7 @@ impl Pack { } } } - assert_eq!(result_size, result.len() as u64); + assert_eq!(result_size, result.len(), "Result size mismatch"); let hash = utils::calculate_object_hash(base_obj.obj_type, &result); // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording @@ -605,6 +610,7 @@ impl Pack { data_decompress: result, obj_type: base_obj.obj_type, // Same as the Type of base object hash, + delta_final_size: 0, // The new object is not a delta obj, so set its final size to 0. mem_recorder: None, // This filed(Arc) can't be moved from `delta_obj` by `struct update syntax` ..delta_obj // This syntax is actually move `delta_obj` to `new_obj` } // Canonical form (Complete Object) diff --git a/mercury/src/internal/pack/utils.rs b/mercury/src/internal/pack/utils.rs index 04b586dc..520cd6b7 100644 --- a/mercury/src/internal/pack/utils.rs +++ b/mercury/src/internal/pack/utils.rs @@ -153,8 +153,6 @@ pub fn read_varint_le(reader: &mut R) -> io::Result<(u64, usize)> { Ok((value, offset)) } - - /// The offset for an OffsetDelta object(big-endian order) /// # Arguments /// @@ -232,6 +230,26 @@ pub fn read_partial_int( Ok(value) } +/// Reads the base size and result size of a delta object from the given stream. +/// +/// **Note**: The stream MUST be positioned at the start of the delta object. +/// +/// The base size and result size are encoded as variable-length integers in little-endian order. +/// +/// The base size is the size of the base object, and the result size is the size of the result object. +/// +/// # Parameters +/// * `stream`: The stream from which the sizes are read. +/// +/// # Returns +/// Returns a tuple containing the base size and result size. +/// +pub fn read_delta_object_size(stream: &mut R) -> io::Result<(usize, usize)> { + let base_size = read_varint_le(stream)?.0 as usize; + let result_size = read_varint_le(stream)?.0 as usize; + Ok((base_size, result_size)) +} + /// Calculate the SHA1 hash of the given object. ///
"` \0`" ///
data: The decompressed content of the object