Skip to content

Commit

Permalink
Merge pull request #398 from benjamin-747/main
Browse files Browse the repository at this point in the history
Refactor: Using ReceiverStream in pack encode
  • Loading branch information
genedna authored May 22, 2024
2 parents 1ff6e00 + 794f494 commit d4d3d96
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 213 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ thiserror = "1.0.60"
rand = "0.8.5"
smallvec = "1.13.2"
tokio = "1.37.0"
async-stream = "0.3.5"
tokio-stream = "0.1.15"
tokio-test = "0.4.4"
clap = "4.5.4"
async-trait = "0.1.80"
Expand Down
2 changes: 2 additions & 0 deletions ceres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ mercury = { workspace = true }
venus = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true, features = ["net"] }
tokio-stream = { workspace = true }
axum = { workspace = true }
async-stream = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
46 changes: 26 additions & 20 deletions ceres/src/http/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::convert::Infallible;

use anyhow::Result;
use axum::body::Body;
Expand All @@ -7,10 +8,11 @@ use axum::http::{Request, Response, StatusCode};
use bytes::{Bytes, BytesMut};
use futures::TryStreamExt;
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;

use common::model::GetParams;

use crate::protocol::{smart, SmartProtocol, ServiceType};
use crate::protocol::{smart, ServiceType, SmartProtocol};

// # Discovering Reference
// HTTP clients that support the "smart" protocol (or both the "smart" and "dumb" protocols) MUST
Expand Down Expand Up @@ -64,35 +66,39 @@ pub async fn git_upload_pack(
.await
.unwrap();
tracing::debug!("bytes from client: {:?}", upload_request);
let (send_pack_data, buf) = pack_protocol
let (mut send_pack_data, protocol_buf) = pack_protocol
.git_upload_pack(&mut upload_request.freeze())
.await
.unwrap();
tracing::info!("send ack/nak message buf: {:?}", buf);
tracing::info!("send ack/nak message buf: {:?}", protocol_buf);
let mut res_bytes = BytesMut::new();
res_bytes.extend(buf);
res_bytes.extend(protocol_buf);

let resp = build_res_header("application/x-git-upload-pack-result".to_owned());

tracing::info!("send response");

let mut reader = send_pack_data.as_slice();
loop {
let mut temp = BytesMut::new();
temp.reserve(65500);
let length = reader.read_buf(&mut temp).await.unwrap();
if length == 0 {
let bytes_out = Bytes::from_static(smart::PKT_LINE_END_MARKER);
tracing::info!("send back pkt-flush line '0000', actually: {:?}", bytes_out);
res_bytes.extend(bytes_out);
break;
let body_stream = async_stream::stream! {
yield Ok::<_, Infallible>(Bytes::copy_from_slice(&res_bytes));
while let Some(chunk) = send_pack_data.next().await {
let mut reader = chunk.as_slice();
loop {
let mut temp = BytesMut::new();
temp.reserve(65500);
let length = reader.read_buf(&mut temp).await.unwrap();
if length == 0 {
break;
}
let bytes_out = pack_protocol.build_side_band_format(temp, length);
// tracing::info!("send pack file: length: {:?}", bytes_out.len());
yield Ok::<_, Infallible>(bytes_out.freeze());
}
}
let bytes_out = pack_protocol.build_side_band_format(temp, length);
tracing::info!("send pack file: length: {:?}", bytes_out.len());
res_bytes.extend(bytes_out);
}
let body = Body::from(res_bytes.freeze());
let resp = resp.body(body).unwrap();
let bytes_out = Bytes::from_static(smart::PKT_LINE_END_MARKER);
tracing::info!("send back pkt-flush line '0000', actually: {:?}", bytes_out);
yield Ok::<_, Infallible>(bytes_out);
};
let resp = resp.body(Body::from_stream(body_stream)).unwrap();
Ok(resp)
}

Expand Down
27 changes: 16 additions & 11 deletions ceres/src/pack/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::{
io::Cursor,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::{self, Receiver, Sender},
mpsc::{self, Receiver},
},
};

use async_trait::async_trait;
use bytes::Bytes;
use tokio_stream::wrappers::ReceiverStream;

use callisto::raw_blob;
use common::{config::MonoConfig, errors::MegaError, utils::ZERO_ID};
use common::{config::PackConfig, errors::MegaError, utils::ZERO_ID};
use mercury::internal::pack::Pack;
use mercury::{
errors::GitError,
Expand Down Expand Up @@ -49,13 +50,13 @@ pub trait PackHandler: Send + Sync {
/// # Returns
/// * `Result<Vec<u8>, GitError>` - The packed binary data as a vector of bytes.
///
async fn full_pack(&self) -> Result<Vec<u8>, GitError>;
async fn full_pack(&self) -> Result<ReceiverStream<Vec<u8>>, GitError>;

async fn incremental_pack(
&self,
want: Vec<String>,
have: Vec<String>,
) -> Result<Vec<u8>, GitError>;
) -> Result<ReceiverStream<Vec<u8>>, GitError>;

async fn traverse_for_count(
&self,
Expand Down Expand Up @@ -87,7 +88,7 @@ pub trait PackHandler: Send + Sync {
&self,
tree: Tree,
exist_objs: &mut HashSet<String>,
sender: Option<&Sender<Entry>>,
sender: Option<&tokio::sync::mpsc::Sender<Entry>>,
) {
exist_objs.insert(tree.id.to_plain_str());
let mut search_tree_ids = vec![];
Expand All @@ -108,15 +109,15 @@ pub trait PackHandler: Send + Sync {
let blobs = self.get_blobs_by_hashes(search_blob_ids).await.unwrap();
for b in blobs {
let blob: Blob = b.into();
sender.send(blob.into()).unwrap();
sender.send(blob.into()).await.unwrap();
}
}
let trees = self.get_trees_by_hashes(search_tree_ids).await.unwrap();
for t in trees {
self.traverse(t, exist_objs, sender).await;
}
if let Some(sender) = sender {
sender.send(tree.into()).unwrap();
sender.send(tree.into()).await.unwrap();
}
}

Expand All @@ -133,7 +134,11 @@ pub trait PackHandler: Send + Sync {

async fn check_default_branch(&self) -> bool;

fn pack_decoder(&self, mono_config: &MonoConfig, pack_file: Bytes) -> Result<Receiver<Entry>, GitError> {
fn pack_decoder(
&self,
pack_config: &PackConfig,
pack_file: Bytes,
) -> Result<Receiver<Entry>, GitError> {
// #[cfg(debug_assertions)]
// {
// let datetime = chrono::Utc::now().naive_utc();
Expand All @@ -144,9 +149,9 @@ pub trait PackHandler: Send + Sync {
let (sender, receiver) = mpsc::channel();
let p = Pack::new(
None,
Some(1024 * 1024 * 1024 * mono_config.pack_decode_mem_size),
Some(mono_config.pack_decode_cache_path.clone()),
mono_config.clean_cache_after_decode,
Some(1024 * 1024 * 1024 * pack_config.pack_decode_mem_size),
Some(pack_config.pack_decode_cache_path.clone()),
pack_config.clean_cache_after_decode,
);
p.decode_async(Cursor::new(pack_file), sender); //Pack moved here
Ok(receiver)
Expand Down
74 changes: 31 additions & 43 deletions ceres/src/pack/import_repo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::mpsc;

use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use callisto::raw_blob;
use common::errors::MegaError;
Expand All @@ -17,6 +17,7 @@ use mercury::{
pack::entry::Entry,
},
};

use venus::import_repo::{
import_refs::{CommandType, RefCommand, Refs},
repo::Repo,
Expand Down Expand Up @@ -44,7 +45,9 @@ impl PackHandler for ImportRepo {
}

async fn unpack(&self, pack_file: Bytes) -> Result<(), GitError> {
let receiver = self.pack_decoder(&self.context.config.monorepo, pack_file).unwrap();
let receiver = self
.pack_decoder(&self.context.config.pack, pack_file)
.unwrap();

let storage = self.context.services.git_db_storage.clone();
let mut entry_list = Vec::new();
Expand All @@ -59,43 +62,24 @@ impl PackHandler for ImportRepo {
Ok(())
}

async fn full_pack(&self) -> Result<Vec<u8>, GitError> {
let (sender, receiver) = mpsc::channel();
async fn full_pack(&self) -> Result<ReceiverStream<Vec<u8>>, GitError> {
let pack_config = &self.context.config.pack;
let (entry_tx, entry_rx) = mpsc::channel(pack_config.channel_message_size);
let (stream_tx, stream_rx) = mpsc::channel(pack_config.channel_message_size);

let storage = self.context.services.git_db_storage.clone();
let total = storage.get_obj_count_by_repo_id(&self.repo).await;
let mut encoder = PackEncoder::new(total, 0);

for m in storage
.get_commits_by_repo_id(&self.repo)
.await
.unwrap()
.into_iter()
{
let c: Commit = m.into();
let entry: Entry = c.into();
sender.send(entry).unwrap();
}

for m in storage
.get_trees_by_repo_id(&self.repo)
.await
.unwrap()
.into_iter()
{
let c: Tree = m.into();
let entry: Entry = c.into();
sender.send(entry).unwrap();
}
let encoder = PackEncoder::new(total, 0, stream_tx);

let commits = storage.get_commits_by_repo_id(&self.repo).await.unwrap();
let trees = storage.get_trees_by_repo_id(&self.repo).await.unwrap();
let bids: Vec<String> = storage
.get_blobs_by_repo_id(&self.repo)
.await
.unwrap()
.into_iter()
.map(|b| b.blob_id)
.collect();

let raw_blobs = batch_query_by_columns::<raw_blob::Entity, raw_blob::Column>(
storage.get_connection(),
raw_blob::Column::Sha1,
Expand All @@ -105,35 +89,39 @@ impl PackHandler for ImportRepo {
)
.await
.unwrap();
let tags = storage.get_tags_by_repo_id(&self.repo).await.unwrap();

encoder.encode_async(entry_rx).await.unwrap();
for m in commits.into_iter() {
let c: Commit = m.into();
let entry: Entry = c.into();
entry_tx.send(entry).await.unwrap();
}
for m in trees.into_iter() {
let c: Tree = m.into();
let entry: Entry = c.into();
entry_tx.send(entry).await.unwrap();
}
for m in raw_blobs {
// todo handle storage type
let c: Blob = m.into();
let entry: Entry = c.into();
sender.send(entry).unwrap();
entry_tx.send(entry).await.unwrap();
}

for m in storage
.get_tags_by_repo_id(&self.repo)
.await
.unwrap()
.into_iter()
{
for m in tags.into_iter() {
let c: Tag = m.into();
let entry: Entry = c.into();
sender.send(entry).unwrap();
entry_tx.send(entry).await.unwrap();
}
drop(sender);
let data = encoder.encode(receiver).unwrap();

Ok(data)
drop(entry_tx);
Ok(ReceiverStream::new(stream_rx))
}

async fn incremental_pack(
&self,
_want: Vec<String>,
_have: Vec<String>,
) -> Result<Vec<u8>, GitError> {
) -> Result<ReceiverStream<Vec<u8>>, GitError> {
unimplemented!()
}

Expand Down
Loading

1 comment on commit d4d3d96

@vercel
Copy link

@vercel vercel bot commented on d4d3d96 May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

mega – ./

mega-gitmono.vercel.app
gitmega.dev
www.gitmega.dev
mega-git-main-gitmono.vercel.app

Please sign in to comment.