From a9cff3895a6fc344041af38d4411b2238db4d60e Mon Sep 17 00:00:00 2001 From: Iris Shi <0.0@owo.li> Date: Sun, 15 Dec 2024 00:25:32 +0800 Subject: [PATCH 1/4] fix memory monitoring --- mercury/src/internal/pack/cache_object.rs | 48 ++++++++++++++++------- mercury/src/internal/pack/decode.rs | 28 +++++++++---- 2 files changed, 54 insertions(+), 22 deletions(-) diff --git a/mercury/src/internal/pack/cache_object.rs b/mercury/src/internal/pack/cache_object.rs index 967d761a..c86445ab 100644 --- a/mercury/src/internal/pack/cache_object.rs +++ b/mercury/src/internal/pack/cache_object.rs @@ -9,9 +9,9 @@ use lru_mem::{HeapSize, MemSize}; use serde::{Deserialize, Serialize}; use threadpool::ThreadPool; +use crate::internal::pack::entry::Entry; use crate::internal::pack::utils; use crate::{hash::SHA1, internal::object::types::ObjectType}; -use crate::internal::pack::entry::Entry; // /// record heap-size of all CacheObjects, used for memory limit. // static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0); @@ -56,7 +56,14 @@ pub struct CacheObject { pub data_decompress: Vec, pub offset: usize, pub hash: SHA1, - pub mem_recorder: Option> // record mem-size of all CacheObjects of a Pack + /// If a [`CacheObject`] is an [`ObjectType::HashDelta`] or an [`ObjectType::OffsetDelta`], + /// it will expand to another [`CacheObject`] of other types. To prevent potential OOM, + /// we record the size of the expanded object as well as that of the object itself. + /// + /// See https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481 for more details. + #[serde(skip, default = "usize::default")] + pub final_size: usize, + pub mem_recorder: Option>, // record mem-size of all CacheObjects of a Pack } impl Clone for CacheObject { @@ -68,6 +75,7 @@ impl Clone for CacheObject { data_decompress: self.data_decompress.clone(), offset: self.offset, hash: self.hash, + final_size: self.final_size, mem_recorder: self.mem_recorder.clone(), }; obj.record_mem_size(); @@ -87,6 +95,7 @@ impl Default for CacheObject { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::default(), + final_size: 0, mem_recorder: None, }; obj.record_mem_size(); @@ -98,8 +107,11 @@ impl Default for CacheObject { // ! the implementation of HeapSize is not accurate, only calculate the size of the data_decompress // Note that: mem_size == value_size + heap_size, and we only need to impl HeapSize because value_size is known impl HeapSize for CacheObject { + /// For [`ObjectType::OffsetDelta`] and [`ObjectType::HashDelta`], + /// `final_size` is the size of the expanded object; + /// for other types, `final_size` is 0. fn heap_size(&self) -> usize { - self.data_decompress.heap_size() + self.data_decompress.heap_size() + self.final_size } } @@ -111,7 +123,6 @@ impl Drop for CacheObject { if let Some(mem_recorder) = &self.mem_recorder { mem_recorder.fetch_sub((*self).mem_size(), Ordering::SeqCst); } - } } @@ -146,7 +157,7 @@ impl MemSizeRecorder for CacheObject { } impl CacheObject { - /// Create a new CacheObject witch is not offset_delta or hash_delta + /// Create a new CacheObject which is neither [`ObjectType::OffsetDelta`] nor [`ObjectType::HashDelta`]. pub fn new_for_undeltified(obj_type: ObjectType, data: Vec, offset: usize) -> Self { let hash = utils::calculate_object_hash(obj_type, &data); CacheObject { @@ -154,6 +165,7 @@ impl CacheObject { obj_type, offset, hash, + final_size: 0, // Only delta objects have final_size mem_recorder: None, ..Default::default() } @@ -162,13 +174,11 @@ impl CacheObject { /// transform the CacheObject to Entry pub fn to_entry(&self) -> Entry { match self.obj_type { - ObjectType::Blob | ObjectType::Tree | ObjectType::Commit | ObjectType::Tag => { - Entry { - obj_type: self.obj_type, - data: self.data_decompress.clone(), - hash: self.hash, - } - } + ObjectType::Blob | ObjectType::Tree | ObjectType::Commit | ObjectType::Tag => Entry { + obj_type: self.obj_type, + data: self.data_decompress.clone(), + hash: self.hash, + }, _ => { unreachable!("delta object should not persist!") } @@ -177,10 +187,16 @@ impl CacheObject { } /// trait alias for simple use -pub trait ArcWrapperBounds: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static {} +pub trait ArcWrapperBounds: + HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static +{ +} // You must impl `Alias Trait` for all the `T` satisfying Constraints // Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits -impl Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds for T {} +impl Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds + for T +{ +} /// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type /// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced @@ -300,6 +316,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), + final_size: 0, mem_recorder: None, }; assert!(a.heap_size() == 1024); @@ -318,6 +335,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), + final_size: 0, mem_recorder: None, }; println!("a.heap_size() = {}", a.heap_size()); @@ -329,6 +347,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![1; 20]), + final_size: 0, mem_recorder: None, }; { @@ -433,6 +452,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), + final_size: 0, mem_recorder: None, }; let s = bincode::serialize(&a).unwrap(); diff --git a/mercury/src/internal/pack/decode.rs b/mercury/src/internal/pack/decode.rs index e5332969..918e6d7b 100644 --- a/mercury/src/internal/pack/decode.rs +++ b/mercury/src/internal/pack/decode.rs @@ -275,11 +275,16 @@ impl Pack { }) .unwrap(); + let mut reader = Cursor::new(&data); + let _ = utils::read_varint_le(&mut reader).unwrap().0; // Base size + let final_size = utils::read_varint_le(&mut reader).unwrap().0 as usize; // Final size + Ok(CacheObject { base_offset, data_decompress: data, obj_type: t, offset: init_offset, + final_size, mem_recorder: None, ..Default::default() }) @@ -293,11 +298,18 @@ impl Pack { let (data, raw_size) = self.decompress_data(pack, size)?; *offset += raw_size; + + let mut reader = Cursor::new(&data); + let _ = utils::read_varint_le(&mut reader).unwrap().0; + let final_size = utils::read_varint_le(&mut reader).unwrap().0 as usize; + + Ok(CacheObject { base_ref: ref_sha1, data_decompress: data, obj_type: t, offset: init_offset, + final_size, mem_recorder: None, ..Default::default() }) @@ -339,13 +351,12 @@ impl Pack { let mut i = 0; while i < self.number { // log per 2000&more then 1 se objects - if i%1000 == 0 { let time_now = time.elapsed().as_millis(); if time_now - last_update_time > 1000 { log_info(i, self); last_update_time = time_now; } - } + // 3 parts: Waitlist + TheadPool + Caches // hardcode the limit of the tasks of threads_pool queue, to limit memory while self.pool.queued_count() > 2000 @@ -536,16 +547,16 @@ impl Pack { let mut stream = Cursor::new(&delta_obj.data_decompress); - // Read the base object size & Result Size + // Read the base object size // (Size Encoding) - let base_size = utils::read_varint_le(&mut stream).unwrap().0; - let result_size = utils::read_varint_le(&mut stream).unwrap().0; + let base_size = utils::read_varint_le(&mut stream).unwrap().0 as usize; + let result_size = utils::read_varint_le(&mut stream).unwrap().0 as usize; //Get the base object row data let base_info = &base_obj.data_decompress; - assert_eq!(base_info.len() as u64, base_size); + assert_eq!(base_info.len(), base_size); - let mut result = Vec::with_capacity(result_size as usize); + let mut result = Vec::with_capacity(result_size); loop { // Check if the stream has ended, meaning the new object is done @@ -597,7 +608,7 @@ impl Pack { } } } - assert_eq!(result_size, result.len() as u64); + assert_eq!(result_size, result.len()); let hash = utils::calculate_object_hash(base_obj.obj_type, &result); // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording @@ -605,6 +616,7 @@ impl Pack { data_decompress: result, obj_type: base_obj.obj_type, // Same as the Type of base object hash, + final_size: 0, // The new object is not a delta obj, so set its final size to 0. mem_recorder: None, // This filed(Arc) can't be moved from `delta_obj` by `struct update syntax` ..delta_obj // This syntax is actually move `delta_obj` to `new_obj` } // Canonical form (Complete Object) From b615c6ba2683ab0d8d5fc755911c5313d6065163 Mon Sep 17 00:00:00 2001 From: Iris Shi <0.0@owo.li> Date: Sun, 15 Dec 2024 00:43:04 +0800 Subject: [PATCH 2/4] how --- mercury/src/internal/pack/decode.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mercury/src/internal/pack/decode.rs b/mercury/src/internal/pack/decode.rs index 918e6d7b..6d8ad0a9 100644 --- a/mercury/src/internal/pack/decode.rs +++ b/mercury/src/internal/pack/decode.rs @@ -350,13 +350,14 @@ impl Pack { let mut offset: usize = 12; let mut i = 0; while i < self.number { - // log per 2000&more then 1 se objects + // log per 1000 objects and 1 second + if i%1000 == 0 { let time_now = time.elapsed().as_millis(); if time_now - last_update_time > 1000 { log_info(i, self); last_update_time = time_now; } - + } // 3 parts: Waitlist + TheadPool + Caches // hardcode the limit of the tasks of threads_pool queue, to limit memory while self.pool.queued_count() > 2000 From f8acd1d12ac80452b0fa14b908436807eecb8237 Mon Sep 17 00:00:00 2001 From: Iris Shi <0.0@owo.li> Date: Sun, 15 Dec 2024 11:18:18 +0800 Subject: [PATCH 3/4] refactor as per instruction --- mercury/src/internal/pack/cache_object.rs | 2 +- mercury/src/internal/pack/decode.rs | 15 +++++---------- mercury/src/internal/pack/utils.rs | 22 ++++++++++++++++++++-- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/mercury/src/internal/pack/cache_object.rs b/mercury/src/internal/pack/cache_object.rs index c86445ab..e8b50f85 100644 --- a/mercury/src/internal/pack/cache_object.rs +++ b/mercury/src/internal/pack/cache_object.rs @@ -60,7 +60,7 @@ pub struct CacheObject { /// it will expand to another [`CacheObject`] of other types. To prevent potential OOM, /// we record the size of the expanded object as well as that of the object itself. /// - /// See https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481 for more details. + /// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details. #[serde(skip, default = "usize::default")] pub final_size: usize, pub mem_recorder: Option>, // record mem-size of all CacheObjects of a Pack diff --git a/mercury/src/internal/pack/decode.rs b/mercury/src/internal/pack/decode.rs index 6d8ad0a9..261c60a2 100644 --- a/mercury/src/internal/pack/decode.rs +++ b/mercury/src/internal/pack/decode.rs @@ -276,8 +276,7 @@ impl Pack { .unwrap(); let mut reader = Cursor::new(&data); - let _ = utils::read_varint_le(&mut reader).unwrap().0; // Base size - let final_size = utils::read_varint_le(&mut reader).unwrap().0 as usize; // Final size + let (_, final_size) = utils::read_delta_object_size(&mut reader)?; Ok(CacheObject { base_offset, @@ -297,12 +296,9 @@ impl Pack { let (data, raw_size) = self.decompress_data(pack, size)?; *offset += raw_size; - let mut reader = Cursor::new(&data); - let _ = utils::read_varint_le(&mut reader).unwrap().0; - let final_size = utils::read_varint_le(&mut reader).unwrap().0 as usize; - + let (_, final_size) = utils::read_delta_object_size(&mut reader)?; Ok(CacheObject { base_ref: ref_sha1, @@ -550,12 +546,11 @@ impl Pack { // Read the base object size // (Size Encoding) - let base_size = utils::read_varint_le(&mut stream).unwrap().0 as usize; - let result_size = utils::read_varint_le(&mut stream).unwrap().0 as usize; + let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap(); //Get the base object row data let base_info = &base_obj.data_decompress; - assert_eq!(base_info.len(), base_size); + assert_eq!(base_info.len(), base_size, "Base object size mismatch"); let mut result = Vec::with_capacity(result_size); @@ -609,7 +604,7 @@ impl Pack { } } } - assert_eq!(result_size, result.len()); + assert_eq!(result_size, result.len(), "Result size mismatch"); let hash = utils::calculate_object_hash(base_obj.obj_type, &result); // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording diff --git a/mercury/src/internal/pack/utils.rs b/mercury/src/internal/pack/utils.rs index 04b586dc..520cd6b7 100644 --- a/mercury/src/internal/pack/utils.rs +++ b/mercury/src/internal/pack/utils.rs @@ -153,8 +153,6 @@ pub fn read_varint_le(reader: &mut R) -> io::Result<(u64, usize)> { Ok((value, offset)) } - - /// The offset for an OffsetDelta object(big-endian order) /// # Arguments /// @@ -232,6 +230,26 @@ pub fn read_partial_int( Ok(value) } +/// Reads the base size and result size of a delta object from the given stream. +/// +/// **Note**: The stream MUST be positioned at the start of the delta object. +/// +/// The base size and result size are encoded as variable-length integers in little-endian order. +/// +/// The base size is the size of the base object, and the result size is the size of the result object. +/// +/// # Parameters +/// * `stream`: The stream from which the sizes are read. +/// +/// # Returns +/// Returns a tuple containing the base size and result size. +/// +pub fn read_delta_object_size(stream: &mut R) -> io::Result<(usize, usize)> { + let base_size = read_varint_le(stream)?.0 as usize; + let result_size = read_varint_le(stream)?.0 as usize; + Ok((base_size, result_size)) +} + /// Calculate the SHA1 hash of the given object. ///
"` \0`" ///
data: The decompressed content of the object From 1fb33897a67d90efbd89844947734578992b9b2a Mon Sep 17 00:00:00 2001 From: Iris Shi <0.0@owo.li> Date: Sun, 15 Dec 2024 11:38:27 +0800 Subject: [PATCH 4/4] add docs as per instruction --- mercury/src/internal/pack/cache_object.rs | 32 +++++++++++++++-------- mercury/src/internal/pack/decode.rs | 8 +++--- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/mercury/src/internal/pack/cache_object.rs b/mercury/src/internal/pack/cache_object.rs index e8b50f85..700e6a15 100644 --- a/mercury/src/internal/pack/cache_object.rs +++ b/mercury/src/internal/pack/cache_object.rs @@ -62,7 +62,7 @@ pub struct CacheObject { /// /// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details. #[serde(skip, default = "usize::default")] - pub final_size: usize, + pub delta_final_size: usize, pub mem_recorder: Option>, // record mem-size of all CacheObjects of a Pack } @@ -75,7 +75,7 @@ impl Clone for CacheObject { data_decompress: self.data_decompress.clone(), offset: self.offset, hash: self.hash, - final_size: self.final_size, + delta_final_size: self.delta_final_size, mem_recorder: self.mem_recorder.clone(), }; obj.record_mem_size(); @@ -95,7 +95,7 @@ impl Default for CacheObject { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::default(), - final_size: 0, + delta_final_size: 0, mem_recorder: None, }; obj.record_mem_size(); @@ -108,10 +108,20 @@ impl Default for CacheObject { // Note that: mem_size == value_size + heap_size, and we only need to impl HeapSize because value_size is known impl HeapSize for CacheObject { /// For [`ObjectType::OffsetDelta`] and [`ObjectType::HashDelta`], - /// `final_size` is the size of the expanded object; - /// for other types, `final_size` is 0. + /// `delta_final_size` is the size of the expanded object; + /// for other types, `delta_final_size` is 0 as they won't expand. fn heap_size(&self) -> usize { - self.data_decompress.heap_size() + self.final_size + // To those who are concerned about why these two values are added, + // let's consider the lifetime of two `CacheObject`s, say `delta_obj` + // and `final_obj` in the function `Pack::rebuild_delta`. + // + // `delta_obj` is dropped only after `Pack::rebuild_delta` returns, + // but the space for `final_obj` is allocated in that function. + // + // Therefore, during the execution of `Pack::rebuild_delta`, both `delta_obj` + // and `final_obj` coexist. The maximum memory usage is the sum of the memory + // usage of `delta_obj` and `final_obj`. + self.data_decompress.heap_size() + self.delta_final_size } } @@ -165,7 +175,7 @@ impl CacheObject { obj_type, offset, hash, - final_size: 0, // Only delta objects have final_size + delta_final_size: 0, // Only delta objects have `delta_final_size` mem_recorder: None, ..Default::default() } @@ -316,7 +326,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), - final_size: 0, + delta_final_size: 0, mem_recorder: None, }; assert!(a.heap_size() == 1024); @@ -335,7 +345,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), - final_size: 0, + delta_final_size: 0, mem_recorder: None, }; println!("a.heap_size() = {}", a.heap_size()); @@ -347,7 +357,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![1; 20]), - final_size: 0, + delta_final_size: 0, mem_recorder: None, }; { @@ -452,7 +462,7 @@ mod test { obj_type: ObjectType::Blob, offset: 0, hash: SHA1::new(&vec![0; 20]), - final_size: 0, + delta_final_size: 0, mem_recorder: None, }; let s = bincode::serialize(&a).unwrap(); diff --git a/mercury/src/internal/pack/decode.rs b/mercury/src/internal/pack/decode.rs index 261c60a2..609a5585 100644 --- a/mercury/src/internal/pack/decode.rs +++ b/mercury/src/internal/pack/decode.rs @@ -283,7 +283,7 @@ impl Pack { data_decompress: data, obj_type: t, offset: init_offset, - final_size, + delta_final_size: final_size, mem_recorder: None, ..Default::default() }) @@ -305,7 +305,7 @@ impl Pack { data_decompress: data, obj_type: t, offset: init_offset, - final_size, + delta_final_size: final_size, mem_recorder: None, ..Default::default() }) @@ -314,8 +314,6 @@ impl Pack { } /// Decodes a pack file from a given Read and BufRead source and get a vec of objects. - /// - /// pub fn decode(&mut self, pack: &mut (impl BufRead + Send), callback: F) -> Result<(), GitError> where F: Fn(Entry, usize) + Sync + Send + 'static @@ -612,7 +610,7 @@ impl Pack { data_decompress: result, obj_type: base_obj.obj_type, // Same as the Type of base object hash, - final_size: 0, // The new object is not a delta obj, so set its final size to 0. + delta_final_size: 0, // The new object is not a delta obj, so set its final size to 0. mem_recorder: None, // This filed(Arc) can't be moved from `delta_obj` by `struct update syntax` ..delta_obj // This syntax is actually move `delta_obj` to `new_obj` } // Canonical form (Complete Object)