Skip to content

Collect telemetry for upload path #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ debug/
.vscode
venv
**/*.env
**/uv.lock
4 changes: 2 additions & 2 deletions cas_client/src/caching_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<T: Client + Reconstructable + Send + Sync> UploadClient for CachingClient<T

#[async_trait]
impl<T: Client + Reconstructable + Send + Sync> ReconstructionClient for CachingClient<T> {
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<()> {
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<u64> {
/*
let file_info = self.reconstruct(hash, None).await?;

Expand All @@ -60,7 +60,7 @@ impl<T: Client + Reconstructable + Send + Sync> ReconstructionClient for Caching
offset: u64,
length: u64,
writer: &mut Box<dyn Write + Send>,
) -> Result<()> {
) -> Result<u64> {
todo!()
}
}
Expand Down
4 changes: 2 additions & 2 deletions cas_client/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait UploadClient {
#[async_trait]
pub trait ReconstructionClient {
/// Get a entire file by file hash.
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<()>;
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<u64>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment for what this return value is, presumably amount written to writer


/// Get a entire file by file hash at a specific bytes range.
async fn get_file_byte_range(
Expand All @@ -51,7 +51,7 @@ pub trait ReconstructionClient {
offset: u64,
length: u64,
writer: &mut Box<dyn Write + Send>,
) -> Result<()>;
) -> Result<u64>;
}

pub trait Client: UploadClient + ReconstructionClient {}
Expand Down
13 changes: 7 additions & 6 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ impl UploadClient for RemoteClient {

#[async_trait]
impl ReconstructionClient for RemoteClient {
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<()> {
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<u64> {
// get manifest of xorbs to download
let manifest = self.reconstruct(hash, None).await?;

self.get_ranges(manifest, None, writer).await?;
let bytes_downloaded = self.get_ranges(manifest, None, writer).await?;

Ok(())
Ok(bytes_downloaded)
}

#[allow(unused_variables)]
Expand All @@ -118,7 +118,7 @@ impl ReconstructionClient for RemoteClient {
offset: u64,
length: u64,
writer: &mut Box<dyn Write + Send>,
) -> Result<()> {
) -> Result<u64> {
todo!()
}
}
Expand Down Expand Up @@ -203,7 +203,7 @@ impl RemoteClient {
reconstruction_response: QueryReconstructionResponse,
_byte_range: Option<(u64, u64)>,
writer: &mut Box<dyn Write + Send>,
) -> Result<usize> {
) -> Result<u64> {
let info = reconstruction_response.reconstruction;
let total_len = info.iter().fold(0, |acc, x| acc + x.unpacked_length);
let futs = info
Expand All @@ -215,7 +215,8 @@ impl RemoteClient {
.map_err(|e| CasClientError::InternalError(anyhow!("join error {e}")))??;
writer.write_all(&piece)?;
}
Ok(total_len as usize)
// Todo: return the bytes which were read from the cache for telemetry
Ok(total_len as u64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this total len is the uncompressed length of the section, not the number of bytes over the network which is what I think you're attempting to track that number should be the difference in the url_range end - start for each piece.

}
}

Expand Down
4 changes: 2 additions & 2 deletions data/src/bin/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async fn smudge(mut reader: impl Read, writer: &mut Box<dyn Write + Send>) -> Re
let mut input = String::new();
reader.read_to_string(&mut input)?;

let pointer_file = PointerFile::init_from_string(&input, "");
let mut pointer_file = PointerFile::init_from_string(&input, "");

// not a pointer file, leave it as it is.
if !pointer_file.is_valid() {
Expand All @@ -223,7 +223,7 @@ async fn smudge(mut reader: impl Read, writer: &mut Box<dyn Write + Send>) -> Re
let translator = PointerFileTranslator::new(default_smudge_config()?).await?;

translator
.smudge_file_from_pointer(&pointer_file, writer, None)
.smudge_file_from_pointer(&mut pointer_file, writer, None)
.await?;

Ok(())
Expand Down
25 changes: 18 additions & 7 deletions data/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::mem::take;
use std::ops::DerefMut;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -76,6 +77,9 @@ pub struct Cleaner {

// Auxiliary info
file_name: Option<PathBuf>,

// Telemetry
start: Instant,
}

impl Cleaner {
Expand Down Expand Up @@ -113,6 +117,7 @@ impl Cleaner {
tracking_info: Mutex::new(Default::default()),
small_file_buffer: Mutex::new(Some(Vec::with_capacity(small_file_threshold))),
file_name: file_name.map(|f| f.to_owned()),
start: Instant::now(),
});

Self::run(cleaner.clone(), chunk_c).await;
Expand Down Expand Up @@ -239,8 +244,9 @@ impl Cleaner {
Ok(false)
}

async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<()> {
async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<u64> {
info!("Dedup {} chunks", chunks.len());
let mut total_compressed_bytes = 0;
let mut tracking_info = self.tracking_info.lock().await;

let enable_global_dedup = self.enable_global_dedup_queries;
Expand Down Expand Up @@ -463,13 +469,14 @@ impl Cleaner {
tracking_info.cas_data.data.extend(bytes);

if tracking_info.cas_data.data.len() > TARGET_CAS_BLOCK_SIZE {
let cas_hash = register_new_cas_block(
let (cas_hash, compressed_bytes) = register_new_cas_block(
&mut tracking_info.cas_data,
&self.shard_manager,
&self.cas,
&self.cas_prefix,
)
.await?;
total_compressed_bytes += compressed_bytes;

for i in take(&mut tracking_info.current_cas_file_info_indices) {
tracking_info.file_info[i].cas_hash = cas_hash;
Expand All @@ -483,7 +490,7 @@ impl Cleaner {
}
}

Ok(())
Ok(total_compressed_bytes)
}

async fn finish(&self) -> Result<()> {
Expand Down Expand Up @@ -516,7 +523,8 @@ impl Cleaner {
Ok(())
}

async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64)> {
async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64, u64)> {
let mut total_compressed_bytes = 0;
let mut tracking_info = self.tracking_info.lock().await;

let file_hash = file_node_hash(
Expand Down Expand Up @@ -577,13 +585,14 @@ impl Cleaner {
if cas_data_accumulator.data.len() >= TARGET_CAS_BLOCK_SIZE {
let mut new_cas_data = take(cas_data_accumulator.deref_mut());
drop(cas_data_accumulator); // Release the lock.
register_new_cas_block(
let (_cas_hash, compressed_bytes) = register_new_cas_block(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This compressed_bytes

  • doesn't include all bytes produced by this file after dedup: they are also produced in fn dedup(...). I noticed you modified that function but the return value is not used?
  • data in this cas_data_accumulator may also include bytes produced by other files cleaned before this file using the same PointerFileTranslator.

&mut new_cas_data,
&self.shard_manager,
&self.cas,
&self.cas_prefix,
)
.await?;
total_compressed_bytes += compressed_bytes;
} else {
drop(cas_data_accumulator);
}
Expand All @@ -593,11 +602,11 @@ impl Cleaner {

*tracking_info = Default::default();

Ok((file_hash, file_size))
Ok((file_hash, file_size, total_compressed_bytes))
}

async fn to_pointer_file(&self) -> Result<String> {
let (hash, filesize) = self.summarize_dedup_info().await?;
let (hash, filesize, compressed_size) = self.summarize_dedup_info().await?;
let pointer_file = PointerFile::init_from_info(
&self
.file_name
Expand All @@ -606,6 +615,8 @@ impl Cleaner {
.unwrap_or_default(),
&hash.hex(),
filesize,
compressed_size,
self.start.elapsed(),
);
Ok(pointer_file.to_string())
}
Expand Down
33 changes: 23 additions & 10 deletions data/src/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::errors::*;
use crate::metrics::FILTER_CAS_BYTES_PRODUCED;
use crate::remote_shard_interface::RemoteShardInterface;
use crate::shard_interface::create_shard_manager;
use crate::PointerFile;
use crate::{PointerFile, PointerFileTelemetry};
use cas_client::Client;
use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo};
use mdb_shard::file_structs::MDBFileInfo;
Expand All @@ -17,6 +17,7 @@ use std::mem::take;
use std::ops::DerefMut;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;

#[derive(Default, Debug)]
Expand Down Expand Up @@ -59,6 +60,9 @@ pub struct PointerFileTranslator {

/* ----- Deduped data shared across files ----- */
global_cas_data: Arc<Mutex<CASDataAggregator>>,
// Telemetry
/* ----- Telemetry ----- */
pub start: Instant,
}

// Constructors
Expand Down Expand Up @@ -97,6 +101,7 @@ impl PointerFileTranslator {
remote_shards,
cas: cas_client,
global_cas_data: Default::default(),
start: Instant::now(),
})
}
}
Expand Down Expand Up @@ -208,7 +213,7 @@ pub(crate) async fn register_new_cas_block(
shard_manager: &Arc<ShardFileManager>,
cas: &Arc<dyn Client + Send + Sync>,
cas_prefix: &str,
) -> Result<MerkleHash> {
) -> Result<(MerkleHash, u64)> {
let cas_hash = cas_node_hash(&cas_data.chunks[..]);

let raw_bytes_len = cas_data.data.len();
Expand Down Expand Up @@ -283,29 +288,37 @@ pub(crate) async fn register_new_cas_block(
cas_data.chunks.clear();
cas_data.pending_file_info.clear();

Ok(cas_hash)
Ok((cas_hash, compressed_bytes_len as u64))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using compressed_bytes_len from client side is unreliable, people can inject any number here and that does not impact clean process correctness.

}

/// Smudge operations
impl PointerFileTranslator {
pub async fn smudge_file_from_pointer(
&self,
pointer: &PointerFile,
pointer_file: &mut PointerFile,
writer: &mut Box<dyn Write + Send>,
range: Option<(usize, usize)>,
) -> Result<()> {
self.smudge_file_from_hash(&pointer.hash()?, writer, range)
.await
let start = Instant::now();
let bytes_downloaded = self
.smudge_file_from_hash(&pointer_file.hash()?, writer, range)
.await?;

pointer_file.telemetry = Some(PointerFileTelemetry {
latency: Some(start.elapsed()),
network_bytes: Some(bytes_downloaded),
});

Ok(())
}

pub async fn smudge_file_from_hash(
&self,
file_id: &MerkleHash,
writer: &mut Box<dyn Write + Send>,
_range: Option<(usize, usize)>,
) -> Result<()> {
self.cas.get_file(file_id, writer).await?;

Ok(())
) -> Result<u64> {
let bytes_downloaded = self.cas.get_file(file_id, writer).await?;
Ok(bytes_downloaded)
}
}
2 changes: 1 addition & 1 deletion data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ mod test_utils;

pub use constants::SMALL_FILE_THRESHOLD;
pub use data_processing::PointerFileTranslator;
pub use pointer_file::PointerFile;
pub use pointer_file::{PointerFile, PointerFileTelemetry};
Loading