diff --git a/Cargo.toml b/Cargo.toml index 5965937e..a9706ddc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ thiserror = "1.0.60" rand = "0.8.5" smallvec = "1.13.2" tokio = "1.37.0" +async-stream = "0.3.5" +tokio-stream = "0.1.15" tokio-test = "0.4.4" clap = "4.5.4" async-trait = "0.1.80" diff --git a/ceres/Cargo.toml b/ceres/Cargo.toml index ff56a926..39283641 100644 --- a/ceres/Cargo.toml +++ b/ceres/Cargo.toml @@ -18,7 +18,9 @@ mercury = { workspace = true } venus = { workspace = true } anyhow = { workspace = true } tokio = { workspace = true, features = ["net"] } +tokio-stream = { workspace = true } axum = { workspace = true } +async-stream = { workspace = true } tracing = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/ceres/src/http/handler.rs b/ceres/src/http/handler.rs index b1190af5..9a5770d8 100644 --- a/ceres/src/http/handler.rs +++ b/ceres/src/http/handler.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::convert::Infallible; use anyhow::Result; use axum::body::Body; @@ -7,10 +8,11 @@ use axum::http::{Request, Response, StatusCode}; use bytes::{Bytes, BytesMut}; use futures::TryStreamExt; use tokio::io::AsyncReadExt; +use tokio_stream::StreamExt; use common::model::GetParams; -use crate::protocol::{smart, SmartProtocol, ServiceType}; +use crate::protocol::{smart, ServiceType, SmartProtocol}; // # Discovering Reference // HTTP clients that support the "smart" protocol (or both the "smart" and "dumb" protocols) MUST @@ -64,35 +66,39 @@ pub async fn git_upload_pack( .await .unwrap(); tracing::debug!("bytes from client: {:?}", upload_request); - let (send_pack_data, buf) = pack_protocol + let (mut send_pack_data, protocol_buf) = pack_protocol .git_upload_pack(&mut upload_request.freeze()) .await .unwrap(); - tracing::info!("send ack/nak message buf: {:?}", buf); + tracing::info!("send ack/nak message buf: {:?}", protocol_buf); let mut res_bytes = BytesMut::new(); - res_bytes.extend(buf); + res_bytes.extend(protocol_buf); let resp = build_res_header("application/x-git-upload-pack-result".to_owned()); tracing::info!("send response"); - let mut reader = send_pack_data.as_slice(); - loop { - let mut temp = BytesMut::new(); - temp.reserve(65500); - let length = reader.read_buf(&mut temp).await.unwrap(); - if length == 0 { - let bytes_out = Bytes::from_static(smart::PKT_LINE_END_MARKER); - tracing::info!("send back pkt-flush line '0000', actually: {:?}", bytes_out); - res_bytes.extend(bytes_out); - break; + let body_stream = async_stream::stream! { + yield Ok::<_, Infallible>(Bytes::copy_from_slice(&res_bytes)); + while let Some(chunk) = send_pack_data.next().await { + let mut reader = chunk.as_slice(); + loop { + let mut temp = BytesMut::new(); + temp.reserve(65500); + let length = reader.read_buf(&mut temp).await.unwrap(); + if length == 0 { + break; + } + let bytes_out = pack_protocol.build_side_band_format(temp, length); + // tracing::info!("send pack file: length: {:?}", bytes_out.len()); + yield Ok::<_, Infallible>(bytes_out.freeze()); + } } - let bytes_out = pack_protocol.build_side_band_format(temp, length); - tracing::info!("send pack file: length: {:?}", bytes_out.len()); - res_bytes.extend(bytes_out); - } - let body = Body::from(res_bytes.freeze()); - let resp = resp.body(body).unwrap(); + let bytes_out = Bytes::from_static(smart::PKT_LINE_END_MARKER); + tracing::info!("send back pkt-flush line '0000', actually: {:?}", bytes_out); + yield Ok::<_, Infallible>(bytes_out); + }; + let resp = resp.body(Body::from_stream(body_stream)).unwrap(); Ok(resp) } diff --git a/ceres/src/pack/handler.rs b/ceres/src/pack/handler.rs index fc5618e1..9eb93354 100644 --- a/ceres/src/pack/handler.rs +++ b/ceres/src/pack/handler.rs @@ -3,15 +3,16 @@ use std::{ io::Cursor, sync::{ atomic::{AtomicUsize, Ordering}, - mpsc::{self, Receiver, Sender}, + mpsc::{self, Receiver}, }, }; use async_trait::async_trait; use bytes::Bytes; +use tokio_stream::wrappers::ReceiverStream; use callisto::raw_blob; -use common::{config::MonoConfig, errors::MegaError, utils::ZERO_ID}; +use common::{config::PackConfig, errors::MegaError, utils::ZERO_ID}; use mercury::internal::pack::Pack; use mercury::{ errors::GitError, @@ -49,13 +50,13 @@ pub trait PackHandler: Send + Sync { /// # Returns /// * `Result, GitError>` - The packed binary data as a vector of bytes. /// - async fn full_pack(&self) -> Result, GitError>; + async fn full_pack(&self) -> Result>, GitError>; async fn incremental_pack( &self, want: Vec, have: Vec, - ) -> Result, GitError>; + ) -> Result>, GitError>; async fn traverse_for_count( &self, @@ -87,7 +88,7 @@ pub trait PackHandler: Send + Sync { &self, tree: Tree, exist_objs: &mut HashSet, - sender: Option<&Sender>, + sender: Option<&tokio::sync::mpsc::Sender>, ) { exist_objs.insert(tree.id.to_plain_str()); let mut search_tree_ids = vec![]; @@ -108,7 +109,7 @@ pub trait PackHandler: Send + Sync { let blobs = self.get_blobs_by_hashes(search_blob_ids).await.unwrap(); for b in blobs { let blob: Blob = b.into(); - sender.send(blob.into()).unwrap(); + sender.send(blob.into()).await.unwrap(); } } let trees = self.get_trees_by_hashes(search_tree_ids).await.unwrap(); @@ -116,7 +117,7 @@ pub trait PackHandler: Send + Sync { self.traverse(t, exist_objs, sender).await; } if let Some(sender) = sender { - sender.send(tree.into()).unwrap(); + sender.send(tree.into()).await.unwrap(); } } @@ -133,7 +134,11 @@ pub trait PackHandler: Send + Sync { async fn check_default_branch(&self) -> bool; - fn pack_decoder(&self, mono_config: &MonoConfig, pack_file: Bytes) -> Result, GitError> { + fn pack_decoder( + &self, + pack_config: &PackConfig, + pack_file: Bytes, + ) -> Result, GitError> { // #[cfg(debug_assertions)] // { // let datetime = chrono::Utc::now().naive_utc(); @@ -144,9 +149,9 @@ pub trait PackHandler: Send + Sync { let (sender, receiver) = mpsc::channel(); let p = Pack::new( None, - Some(1024 * 1024 * 1024 * mono_config.pack_decode_mem_size), - Some(mono_config.pack_decode_cache_path.clone()), - mono_config.clean_cache_after_decode, + Some(1024 * 1024 * 1024 * pack_config.pack_decode_mem_size), + Some(pack_config.pack_decode_cache_path.clone()), + pack_config.clean_cache_after_decode, ); p.decode_async(Cursor::new(pack_file), sender); //Pack moved here Ok(receiver) diff --git a/ceres/src/pack/import_repo.rs b/ceres/src/pack/import_repo.rs index 62afef0d..bc1fab79 100644 --- a/ceres/src/pack/import_repo.rs +++ b/ceres/src/pack/import_repo.rs @@ -1,7 +1,7 @@ -use std::sync::mpsc; - use async_trait::async_trait; use bytes::Bytes; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use callisto::raw_blob; use common::errors::MegaError; @@ -17,6 +17,7 @@ use mercury::{ pack::entry::Entry, }, }; + use venus::import_repo::{ import_refs::{CommandType, RefCommand, Refs}, repo::Repo, @@ -44,7 +45,9 @@ impl PackHandler for ImportRepo { } async fn unpack(&self, pack_file: Bytes) -> Result<(), GitError> { - let receiver = self.pack_decoder(&self.context.config.monorepo, pack_file).unwrap(); + let receiver = self + .pack_decoder(&self.context.config.pack, pack_file) + .unwrap(); let storage = self.context.services.git_db_storage.clone(); let mut entry_list = Vec::new(); @@ -59,35 +62,17 @@ impl PackHandler for ImportRepo { Ok(()) } - async fn full_pack(&self) -> Result, GitError> { - let (sender, receiver) = mpsc::channel(); + async fn full_pack(&self) -> Result>, GitError> { + let pack_config = &self.context.config.pack; + let (entry_tx, entry_rx) = mpsc::channel(pack_config.channel_message_size); + let (stream_tx, stream_rx) = mpsc::channel(pack_config.channel_message_size); let storage = self.context.services.git_db_storage.clone(); let total = storage.get_obj_count_by_repo_id(&self.repo).await; - let mut encoder = PackEncoder::new(total, 0); - - for m in storage - .get_commits_by_repo_id(&self.repo) - .await - .unwrap() - .into_iter() - { - let c: Commit = m.into(); - let entry: Entry = c.into(); - sender.send(entry).unwrap(); - } - - for m in storage - .get_trees_by_repo_id(&self.repo) - .await - .unwrap() - .into_iter() - { - let c: Tree = m.into(); - let entry: Entry = c.into(); - sender.send(entry).unwrap(); - } + let encoder = PackEncoder::new(total, 0, stream_tx); + let commits = storage.get_commits_by_repo_id(&self.repo).await.unwrap(); + let trees = storage.get_trees_by_repo_id(&self.repo).await.unwrap(); let bids: Vec = storage .get_blobs_by_repo_id(&self.repo) .await @@ -95,7 +80,6 @@ impl PackHandler for ImportRepo { .into_iter() .map(|b| b.blob_id) .collect(); - let raw_blobs = batch_query_by_columns::( storage.get_connection(), raw_blob::Column::Sha1, @@ -105,35 +89,39 @@ impl PackHandler for ImportRepo { ) .await .unwrap(); + let tags = storage.get_tags_by_repo_id(&self.repo).await.unwrap(); + encoder.encode_async(entry_rx).await.unwrap(); + for m in commits.into_iter() { + let c: Commit = m.into(); + let entry: Entry = c.into(); + entry_tx.send(entry).await.unwrap(); + } + for m in trees.into_iter() { + let c: Tree = m.into(); + let entry: Entry = c.into(); + entry_tx.send(entry).await.unwrap(); + } for m in raw_blobs { // todo handle storage type let c: Blob = m.into(); let entry: Entry = c.into(); - sender.send(entry).unwrap(); + entry_tx.send(entry).await.unwrap(); } - - for m in storage - .get_tags_by_repo_id(&self.repo) - .await - .unwrap() - .into_iter() - { + for m in tags.into_iter() { let c: Tag = m.into(); let entry: Entry = c.into(); - sender.send(entry).unwrap(); + entry_tx.send(entry).await.unwrap(); } - drop(sender); - let data = encoder.encode(receiver).unwrap(); - - Ok(data) + drop(entry_tx); + Ok(ReceiverStream::new(stream_rx)) } async fn incremental_pack( &self, _want: Vec, _have: Vec, - ) -> Result, GitError> { + ) -> Result>, GitError> { unimplemented!() } diff --git a/ceres/src/pack/monorepo.rs b/ceres/src/pack/monorepo.rs index e4ca6b40..cb8d7980 100644 --- a/ceres/src/pack/monorepo.rs +++ b/ceres/src/pack/monorepo.rs @@ -4,7 +4,7 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicUsize, Ordering}, - mpsc::{self, Receiver}, + mpsc::Receiver, }, vec, }; @@ -24,6 +24,8 @@ use mercury::{ pack::entry::Entry, }, }; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use venus::{ import_repo::import_refs::{RefCommand, Refs}, monorepo::mr::MergeRequest, @@ -115,7 +117,7 @@ impl PackHandler for MonoRepo { async fn unpack(&self, pack_file: Bytes) -> Result<(), GitError> { let receiver = self - .pack_decoder(&self.context.config.monorepo, pack_file) + .pack_decoder(&self.context.config.pack, pack_file) .unwrap(); let storage = self.context.services.mega_storage.clone(); @@ -164,7 +166,8 @@ impl PackHandler for MonoRepo { } // monorepo full pack should follow the shallow clone command 'git clone --depth=1' - async fn full_pack(&self) -> Result, GitError> { + async fn full_pack(&self) -> Result>, GitError> { + let pack_config = &self.context.config.pack; let storage = self.context.services.mega_storage.clone(); let obj_num = AtomicUsize::new(0); @@ -190,22 +193,24 @@ impl PackHandler for MonoRepo { obj_num.fetch_add(1, Ordering::SeqCst); - let (sender, receiver) = mpsc::channel(); - let encoder = PackEncoder::new(obj_num.into_inner(), 0); - let data = encoder.encode_async(receiver).unwrap(); + let (entry_tx, entry_rx) = mpsc::channel(pack_config.channel_message_size); + let (stream_tx, stream_rx) = mpsc::channel(pack_config.channel_message_size); - self.traverse(tree, &mut HashSet::new(), Some(&sender)) + let encoder = PackEncoder::new(obj_num.into_inner(), 0, stream_tx); + encoder.encode_async(entry_rx).await.unwrap(); + self.traverse(tree, &mut HashSet::new(), Some(&entry_tx)) .await; - sender.send(commit.into()).unwrap(); - drop(sender); - Ok(data.join().unwrap()) + entry_tx.send(commit.into()).await.unwrap(); + drop(entry_tx); + Ok(ReceiverStream::new(stream_rx)) } async fn incremental_pack( &self, mut want: Vec, have: Vec, - ) -> Result, GitError> { + ) -> Result>, GitError> { + let pack_config = &self.context.config.pack; let storage = self.context.services.mega_storage.clone(); let obj_num = AtomicUsize::new(0); @@ -271,23 +276,23 @@ impl PackHandler for MonoRepo { ) .await; } - - let (sender, receiver) = mpsc::channel(); - let encoder = PackEncoder::new(obj_num.into_inner(), 0); - let data = encoder.encode_async(receiver).unwrap(); + let (entry_tx, entry_rx) = mpsc::channel(pack_config.channel_message_size); + let (stream_tx, stream_rx) = mpsc::channel(pack_config.channel_message_size); + let encoder = PackEncoder::new(obj_num.into_inner(), 0, stream_tx); + encoder.encode_async(entry_rx).await.unwrap(); for c in want_commits { self.traverse( want_trees.get(&c.tree_id).unwrap().clone(), &mut exist_objs, - Some(&sender), + Some(&entry_tx), ) .await; - sender.send(c.into()).unwrap(); + entry_tx.send(c.into()).await.unwrap(); } - drop(sender); + drop(entry_tx); - Ok(data.join().unwrap()) + Ok(ReceiverStream::new(stream_rx)) } async fn get_trees_by_hashes(&self, hashes: Vec) -> Result, MegaError> { diff --git a/ceres/src/protocol/smart.rs b/ceres/src/protocol/smart.rs index 2e8f8b70..b6a7b1b8 100644 --- a/ceres/src/protocol/smart.rs +++ b/ceres/src/protocol/smart.rs @@ -1,6 +1,6 @@ - use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use tokio_stream::wrappers::ReceiverStream; use callisto::db_enums::RefType; @@ -85,7 +85,7 @@ impl SmartProtocol { pub async fn git_upload_pack( &mut self, upload_request: &mut Bytes, - ) -> Result<(Vec, BytesMut)> { + ) -> Result<(ReceiverStream>, BytesMut)> { let pack_handler = self.pack_handler().await; let mut want: Vec = Vec::new(); @@ -135,29 +135,31 @@ impl SmartProtocol { self.capabilities ); - let mut pack_data = vec![]; - let mut buf = BytesMut::new(); + // init a empty receiverstream + let (tx, rx) = tokio::sync::mpsc::channel::>(1); + let mut pack_data = ReceiverStream::new(rx); + let mut protocol_buf = BytesMut::new(); if have.is_empty() { pack_data = pack_handler.full_pack().await.unwrap(); - add_pkt_line_string(&mut buf, String::from("NAK\n")); + add_pkt_line_string(&mut protocol_buf, String::from("NAK\n")); } else { if self.capabilities.contains(&Capability::MultiAckDetailed) { // multi_ack_detailed mode, the server will differentiate the ACKs where it is signaling that // it is ready to send data with ACK obj-id ready lines, // and signals the identified common commits with ACK obj-id common lines - + for hash in &have { - if pack_handler.check_commit_exist(hash).await - { - add_pkt_line_string(&mut buf, format!("ACK {} common\n", hash)); + if pack_handler.check_commit_exist(hash).await { + add_pkt_line_string(&mut protocol_buf, format!("ACK {} common\n", hash)); if last_common_commit.is_empty() { last_common_commit = hash.to_string(); } } else { //send NAK if missing common commit - add_pkt_line_string(&mut buf, String::from("NAK\n")); - return Ok((pack_data, buf)); + add_pkt_line_string(&mut protocol_buf, String::from("NAK\n")); + drop(tx); + return Ok((pack_data, protocol_buf)); } } @@ -165,7 +167,7 @@ impl SmartProtocol { if self.capabilities.contains(&Capability::NoDone) { // If multi_ack_detailed and no-done are both present, then the sender is free to immediately send a pack // following its first "ACK obj-id ready" message. - add_pkt_line_string(&mut buf, format!("ACK {} ready\n", hash)); + add_pkt_line_string(&mut protocol_buf, format!("ACK {} ready\n", hash)); } } @@ -173,9 +175,9 @@ impl SmartProtocol { } else { tracing::error!("capability unsupported"); } - add_pkt_line_string(&mut buf, format!("ACK {} \n", last_common_commit)); + add_pkt_line_string(&mut protocol_buf, format!("ACK {} \n", last_common_commit)); } - Ok((pack_data, buf)) + Ok((pack_data, protocol_buf)) } pub async fn git_receive_pack(&mut self, mut body_bytes: Bytes) -> Result { @@ -440,7 +442,7 @@ pub mod test { error_msg: String::new(), command_type: CommandType::Create, ref_type: RefType::Branch, - default_branch: false + default_branch: false, }; assert_eq!(result, command); } diff --git a/common/src/config.rs b/common/src/config.rs index 426f48f2..c24e45ef 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -11,6 +11,7 @@ pub struct Config { pub ssh: SshConfig, pub storage: StorageConfig, pub monorepo: MonoConfig, + pub pack: PackConfig, } impl Config { @@ -109,18 +110,32 @@ impl Default for StorageConfig { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MonoConfig { pub import_dir: PathBuf, +} + +impl Default for MonoConfig { + fn default() -> Self { + Self { + import_dir: PathBuf::from("/third-part"), + } + } +} + + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PackConfig { pub pack_decode_mem_size: usize, pub pack_decode_cache_path: PathBuf, pub clean_cache_after_decode: bool, + pub channel_message_size: usize, } -impl Default for MonoConfig { +impl Default for PackConfig { fn default() -> Self { Self { - import_dir: PathBuf::from("/third-part"), pack_decode_mem_size: 4, pack_decode_cache_path: PathBuf::from("/tmp/.mega/cache"), clean_cache_after_decode: true, + channel_message_size: 1_000_000, } } } \ No newline at end of file diff --git a/gateway/src/git_protocol/ssh.rs b/gateway/src/git_protocol/ssh.rs index e7869240..c96fbef5 100644 --- a/gateway/src/git_protocol/ssh.rs +++ b/gateway/src/git_protocol/ssh.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Duration, Utc}; +use futures::StreamExt; use russh::server::{self, Auth, Msg, Response, Session}; use russh::{Channel, ChannelId}; use russh_keys::key; @@ -141,6 +142,7 @@ impl server::Handler for SshServer { Ok(Auth::Accept) } + // TODO! disable password auth async fn auth_password(&mut self, user: &str, password: &str) -> Result { tracing::info!("auth_password: {} / {}", user, password); // in this example implementation, any username/password combination is accepted @@ -197,7 +199,7 @@ impl SshServer { async fn handle_upload_pack(&mut self, channel: ChannelId, data: &[u8], session: &mut Session) { let smart_protocol = self.smart_protocol.as_mut().unwrap(); - let (send_pack_data, buf) = smart_protocol + let (mut send_pack_data, buf) = smart_protocol .git_upload_pack(&mut Bytes::copy_from_slice(data)) .await .unwrap(); @@ -205,18 +207,20 @@ impl SshServer { tracing::info!("buf is {:?}", buf); session.data(channel, String::from_utf8(buf.to_vec()).unwrap().into()); - let mut reader = send_pack_data.as_slice(); - loop { - let mut temp = BytesMut::new(); - temp.reserve(65500); - let length = reader.read_buf(&mut temp).await.unwrap(); - if temp.is_empty() { - session.data(channel, smart::PKT_LINE_END_MARKER.to_vec().into()); - return; + while let Some(chunk) = send_pack_data.next().await { + let mut reader = chunk.as_slice(); + loop { + let mut temp = BytesMut::new(); + temp.reserve(65500); + let length = reader.read_buf(&mut temp).await.unwrap(); + if length == 0 { + break; + } + let bytes_out = smart_protocol.build_side_band_format(temp, length); + session.data(channel, bytes_out.to_vec().into()); } - let bytes_out = smart_protocol.build_side_band_format(temp, length); - session.data(channel, bytes_out.to_vec().into()); } + session.data(channel, smart::PKT_LINE_END_MARKER.to_vec().into()); } async fn handle_receive_pack(&mut self, channel: ChannelId, session: &mut Session) { diff --git a/libra/src/command/push.rs b/libra/src/command/push.rs index 37041f08..84748b4d 100644 --- a/libra/src/command/push.rs +++ b/libra/src/command/push.rs @@ -1,8 +1,8 @@ use std::collections::{HashSet, VecDeque}; use std::str::FromStr; -use std::sync::mpsc; use bytes::BytesMut; use clap::Parser; +use tokio::sync::mpsc; use url::Url; use ceres::protocol::ServiceType::ReceivePack; use ceres::protocol::smart::{add_pkt_line_string, read_pkt_line}; @@ -116,16 +116,24 @@ pub async fn execute(args: PushArgs) { ); println!("Counting objects: {}", objs.len()); - let mut encoder = PackEncoder::new(objs.len(), 5); - let (tx, rx) = mpsc::channel::(); + // let (tx, rx) = mpsc::channel::(); + let (entry_tx, entry_rx) = mpsc::channel(1_000_000); + let (stream_tx, mut stream_rx) = mpsc::channel(1_000_000); + + let encoder = PackEncoder::new(objs.len(), 5, stream_tx); + encoder.encode_async(entry_rx).await.unwrap(); + for entry in objs { // TODO progress bar - tx.send(entry).unwrap(); + entry_tx.send(entry).await.unwrap(); } - drop(tx); - let pack_data = encoder.encode(rx).unwrap(); + drop(entry_tx); println!("Delta compression done."); + let mut pack_data = Vec::new(); + while let Some(chunk) = stream_rx.recv().await { + pack_data.extend(chunk); + } data.extend_from_slice(&pack_data); let res = client.send_pack(data.freeze(), auth).await.unwrap(); diff --git a/mega/config.toml b/mega/config.toml index 52991683..47310ac1 100644 --- a/mega/config.toml +++ b/mega/config.toml @@ -52,10 +52,11 @@ obs_endpoint = "https://obs.cn-east-3.myhuaweicloud.com" [monorepo] ## Only import directory support multi-branch commit and tag, repo under regular directory only support main branch only -## Mega treats files in that directory as import repo and other directories as monorepo +## Mega treats files under this directory as import repo and other directories as monorepo import_dir = "/third-part" +[pack] # The maximum memory used by decode, Unit is GB pack_decode_mem_size = 4 @@ -63,3 +64,6 @@ pack_decode_mem_size = 4 pack_decode_cache_path = "/tmp/.mega/cache" clean_cache_after_decode = true + +# The maximum meesage size in channel buffer while decode +channel_message_size = 1_000_000 \ No newline at end of file diff --git a/mega/config.toml.example b/mega/config.toml.example index f9627f8d..8406dc7b 100644 --- a/mega/config.toml.example +++ b/mega/config.toml.example @@ -52,10 +52,11 @@ obs_endpoint = "https://obs.cn-east-3.myhuaweicloud.com" [monorepo] ## Only import directory support multi-branch commit and tag, repo under regular directory only support main branch only -## Mega treats files in that directory as import repo and other directories as monorepo +## Mega treats files under this directory as import repo and other directories as monorepo import_dir = "/third-part" +[pack] # The maximum memory used by decode, Unit is GB pack_decode_mem_size = 4 @@ -63,3 +64,6 @@ pack_decode_mem_size = 4 pack_decode_cache_path = "/tmp/.mega/cache" clean_cache_after_decode = true + +# The maximum meesage size in channel buffer while decode +channel_message_size = 1_000_000 \ No newline at end of file diff --git a/mercury/Cargo.toml b/mercury/Cargo.toml index c3d2edc8..21da3115 100644 --- a/mercury/Cargo.toml +++ b/mercury/Cargo.toml @@ -9,7 +9,9 @@ edition = "2021" delta = { path = "delta" } common = { workspace = true } callisto = { workspace = true } -flate2 = { workspace = true, features = ["zlib"] } # enable linking against the libz(C lib); better performance +flate2 = { workspace = true, features = [ + "zlib", +] } # enable linking against the libz(C lib); better performance serde = { workspace = true, features = ["derive"] } bstr = { workspace = true } hex = { workspace = true } @@ -19,7 +21,7 @@ sha1 = { workspace = true } colored = { workspace = true } chrono = { workspace = true } tracing-subscriber = { workspace = true } -uuid = { version = "1.7.0", features = ["v4"]} +uuid = { version = "1.7.0", features = ["v4"] } sha1_smol = "1.0.0" threadpool = "1.8.1" num_cpus.workspace = true @@ -34,3 +36,4 @@ mimalloc = "0.1.39" # avoid sticking on dropping on Windows [dev-dependencies] tracing-test = "0.2.4" +tokio = { workspace = true, features = ["full"] } \ No newline at end of file diff --git a/mercury/src/internal/object/commit.rs b/mercury/src/internal/object/commit.rs index 10412568..1187643e 100644 --- a/mercury/src/internal/object/commit.rs +++ b/mercury/src/internal/object/commit.rs @@ -57,13 +57,18 @@ impl Display for Commit { } writeln!(f, "author {}", self.author)?; writeln!(f, "committer {}", self.committer)?; - writeln!(f, "\n{}", self.message) + writeln!(f, "{}", self.message) } } impl Commit { - - pub fn new(author: Signature, committer: Signature, tree_id: SHA1, parent_commit_ids: Vec, message: &str) -> Commit { + pub fn new( + author: Signature, + committer: Signature, + tree_id: SHA1, + parent_commit_ids: Vec, + message: &str, + ) -> Commit { let mut commit = Commit { id: SHA1::default(), tree_id, @@ -142,7 +147,7 @@ impl ObjectTrait for Commit { // The rest is the message let message = unsafe { // + 2: skip the blank line between committer and message - String::from_utf8_unchecked(commit[commit.find_byte(0x0a).unwrap() + 2..].to_vec()) + String::from_utf8_unchecked(commit[commit.find_byte(0x0a).unwrap() + 1..].to_vec()) }; Ok(Commit { @@ -181,7 +186,9 @@ impl ObjectTrait for Commit { data.extend(&[0x0a]); data.extend(self.committer.to_data()?); data.extend(&[0x0a]); - data.extend(&[0x0a]); // Important! or Git Server can't parse & reply: unpack-objects abnormal exit + // Important! or Git Server can't parse & reply: unpack-objects abnormal exit + // We can move [0x0a] to message instead here. + // data.extend(&[0x0a]); data.extend(self.message.as_bytes()); Ok(data) diff --git a/mercury/src/internal/pack/encode.rs b/mercury/src/internal/pack/encode.rs index 8e9a4dc0..2b1185d9 100644 --- a/mercury/src/internal/pack/encode.rs +++ b/mercury/src/internal/pack/encode.rs @@ -1,9 +1,11 @@ +use std::collections::VecDeque; +use std::io::Write; use flate2::write::ZlibEncoder; use sha1::{Digest, Sha1}; -use std::collections::VecDeque; -use std::{mem, thread}; -use std::{io::Write, sync::mpsc}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + use crate::internal::object::types::ObjectType; use crate::{errors::GitError, hash::SHA1, internal::pack::entry::Entry}; @@ -15,7 +17,7 @@ pub struct PackEncoder { process_index: usize, window_size: usize, window: VecDeque<(Entry, usize)>, // entry and offset - writer: Vec, + sender: Option>>, inner_offset: usize, // offset of current entry inner_hash: Sha1, // Not SHA1 because need update trait final_hash: Option, @@ -57,25 +59,30 @@ fn encode_offset(mut value: usize) -> Vec { } impl PackEncoder { - pub fn new(object_number: usize, window_size: usize) -> Self { - let head = encode_header(object_number); - let mut writer = Vec::new(); - writer.write_all(&head).unwrap(); - let mut hash = Sha1::new(); - hash.update(&head); + pub fn new(object_number: usize, window_size: usize, sender: mpsc::Sender>) -> Self { PackEncoder { object_number, window_size, process_index: 0, window: VecDeque::with_capacity(window_size), - writer, + sender: Some(sender), inner_offset: 12, // 12 bytes header - inner_hash: hash, + inner_hash: Sha1::new(), final_hash: None, start_encoding: false, } } + pub fn drop_sender(&mut self) { + self.sender.take(); // Take the sender out, dropping it + } + + pub async fn send_data(&mut self, data: Vec) { + if let Some(sender) = &self.sender { + sender.send(data).await.unwrap(); + } + } + /// Get the hash of the pack file. if the pack file is not finished, return None pub fn get_hash(&self) -> Option { self.final_hash @@ -88,7 +95,11 @@ impl PackEncoder { /// Returns `Ok(())` if encoding is successful, or a `GitError` in case of failure. /// - Returns a `GitError` if there is a failure during the encoding process. /// - Returns `PackEncodeError` if an encoding operation is already in progress. - pub fn encode(&mut self, rx: mpsc::Receiver) -> Result, GitError> { + pub async fn encode(&mut self, mut entry_rx: mpsc::Receiver) -> Result<(), GitError> { + let head = encode_header(self.object_number); + self.send_data(head.clone()).await; + self.inner_hash.update(&head); + // ensure only one decode can only invoke once if self.start_encoding { return Err(GitError::PackEncodeError( @@ -96,18 +107,18 @@ impl PackEncoder { )); } loop { - match rx.recv() { - Ok(entry) => { + match entry_rx.recv().await { + Some(entry) => { self.process_index += 1; // push window after encode to void diff by self let offset = self.inner_offset; - self.encode_one_object(&entry)?; + self.encode_one_object(&entry).await?; self.window.push_back((entry, offset)); if self.window.len() > self.window_size { self.window.pop_front(); } } - Err(_) => { + None => { if self.process_index != self.object_number { panic!("not all objects are encoded"); } @@ -119,8 +130,9 @@ impl PackEncoder { // hash signature let hash_result = self.inner_hash.clone().finalize(); self.final_hash = Some(SHA1::from_bytes(&hash_result)); - self.writer.write_all(&hash_result).unwrap(); - Ok(mem::take(&mut self.writer)) + self.send_data((hash_result).to_vec()).await; + self.drop_sender(); + Ok(()) } /// Try to encode as delta using objects in window @@ -158,14 +170,14 @@ impl PackEncoder { } /// Write data to writer and update hash & offset - fn write_all_and_update(&mut self, data: &[u8]) { + async fn write_all_and_update(&mut self, data: &[u8]) { self.inner_hash.update(data); self.inner_offset += data.len(); - self.writer.write_all(data).unwrap(); + self.send_data(data.to_vec()).await; } /// Encode one object, and update the hash - fn encode_one_object(&mut self, entry: &Entry) -> Result<(), GitError> { + async fn encode_one_object(&mut self, entry: &Entry) -> Result<(), GitError> { // try encode as delta let (entry, offset) = self.try_as_offset_delta(entry); let obj_data = entry.data; @@ -188,12 +200,12 @@ impl PackEncoder { } else { header_data.push(0); } - self.write_all_and_update(&header_data); + self.write_all_and_update(&header_data).await; // **offset** encoding if entry.obj_type == ObjectType::OffsetDelta { let offset_data = encode_offset(offset.unwrap()); - self.write_all_and_update(&offset_data); + self.write_all_and_update(&offset_data).await; } else if entry.obj_type == ObjectType::HashDelta { unreachable!("unsupported type") } @@ -205,62 +217,72 @@ impl PackEncoder { .expect("zlib compress should never failed"); inflate.flush().expect("zlib flush should never failed"); let compressed_data = inflate.finish().expect("zlib compress should never failed"); - self.write_all_and_update(&compressed_data); + self.write_all_and_update(&compressed_data).await; Ok(()) } /// async version of encode, result data will be returned by JoinHandle. /// It will consume PackEncoder, so you can't use it after calling this function. - pub fn encode_async(mut self, rx: mpsc::Receiver) -> Result>, GitError> { - Ok(thread::spawn(move || { - self.encode(rx).unwrap() - })) + pub async fn encode_async( + mut self, + rx: mpsc::Receiver, + ) -> Result, GitError> { + Ok(tokio::spawn(async move { self.encode(rx).await.unwrap() })) } } #[cfg(test)] mod tests { - use std::{io::Cursor, path::PathBuf, usize}; + use std::{io::Cursor, path::PathBuf, usize}; + use crate::internal::object::blob::Blob; use crate::internal::pack::Pack; use super::*; - #[test] - fn test_pack_encoder() { - fn encode_once(window_size: usize) -> Vec { + #[tokio::test] + async fn test_pack_encoder() { + async fn encode_once(window_size: usize) -> Vec { + let (tx, mut rx) = mpsc::channel(100); + let (entry_tx, entry_rx) = mpsc::channel::(1); + // make some different objects, or decode will fail let str_vec = vec!["hello, code,", "hello, world.", "!", "123141251251"]; - let mut encoder = PackEncoder::new(str_vec.len(), window_size); - let (tx, rx) = mpsc::channel::(); + let encoder = PackEncoder::new(str_vec.len(), window_size, tx); + encoder.encode_async(entry_rx).await.unwrap(); + for str in str_vec { let blob = Blob::from_content(str); let entry: Entry = blob.into(); - tx.send(entry).unwrap(); + entry_tx.send(entry).await.unwrap(); } - drop(tx); - let res = encoder.encode(rx).unwrap(); - assert!(encoder.get_hash().is_some()); - res + drop(entry_tx); + // assert!(encoder.get_hash().is_some()); + let mut result = Vec::new(); + while let Some(chunk) = rx.recv().await { + result.extend(chunk); + } + result } + fn check_format(data: Vec) { let mut p = Pack::new( None, Some(1024 * 20), Some(PathBuf::from("/tmp/.cache_temp")), - true + true, ); let mut reader = Cursor::new(data); - p.decode(&mut reader, |_,_| {}) + p.decode(&mut reader, |_, _| {}) .expect("pack file format error"); } // without delta - let pack_without_delta = encode_once(0); + let pack_without_delta = encode_once(0).await; let pack_without_delta_size = pack_without_delta.len(); check_format(pack_without_delta); // with delta - let pack_with_delta = encode_once(3); + let pack_with_delta = encode_once(3).await; assert_ne!(pack_with_delta.len(), pack_without_delta_size); check_format(pack_with_delta); } @@ -275,37 +297,4 @@ mod tests { assert_eq!(data[1], 0b_0000_0101); } - #[test] - fn test_async_pack_encoder() { - let str_vec = vec!["hello, code,", "hello, world.", "!", "123141251251"]; - let encoder = PackEncoder::new(str_vec.len(), 3); - let (tx, rx) = mpsc::channel::(); - let handle = encoder.encode_async(rx).unwrap(); - - for str in str_vec { - let blob = Blob::from_content(str); - let entry: Entry = blob.into(); - tx.send(entry).unwrap(); - } - drop(tx); - - let data = handle.join().unwrap(); - - let correct_hash = { - let str_vec = vec!["hello, code,", "hello, world.", "!", "123141251251"]; - let mut encoder = PackEncoder::new(str_vec.len(), 3); - let (tx, rx) = mpsc::channel::(); - for str in str_vec { - let blob = Blob::from_content(str); - let entry: Entry = blob.into(); - tx.send(entry).unwrap(); - } - drop(tx); - let _data = encoder.encode(rx).unwrap(); - assert!(encoder.get_hash().is_some()); - encoder.get_hash().unwrap() - }; - let hash = SHA1::from_bytes(&data[data.len().saturating_sub(20)..]); - assert_eq!(hash, correct_hash); - } }