Skip to content

Commit bd328fd

Browse files
committed
Collect telemetry for upload and download path
Get the telemetry data to the py layer. This will be sent over the telemetry endpoint later. Related to STO-7
1 parent 92abd75 commit bd328fd

14 files changed

+160
-42
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ debug/
1717
.vscode
1818
venv
1919
**/*.env
20+
**/uv.lock

cas_client/src/caching_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl<T: Client + Reconstructable + Send + Sync> UploadClient for CachingClient<T
3636

3737
#[async_trait]
3838
impl<T: Client + Reconstructable + Send + Sync> ReconstructionClient for CachingClient<T> {
39-
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<()> {
39+
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<u64> {
4040
/*
4141
let file_info = self.reconstruct(hash, None).await?;
4242
@@ -60,7 +60,7 @@ impl<T: Client + Reconstructable + Send + Sync> ReconstructionClient for Caching
6060
offset: u64,
6161
length: u64,
6262
writer: &mut Box<dyn Write + Send>,
63-
) -> Result<()> {
63+
) -> Result<u64> {
6464
todo!()
6565
}
6666
}

cas_client/src/interface.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub trait UploadClient {
4242
#[async_trait]
4343
pub trait ReconstructionClient {
4444
/// Get a entire file by file hash.
45-
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<()>;
45+
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<u64>;
4646

4747
/// Get a entire file by file hash at a specific bytes range.
4848
async fn get_file_byte_range(
@@ -51,7 +51,7 @@ pub trait ReconstructionClient {
5151
offset: u64,
5252
length: u64,
5353
writer: &mut Box<dyn Write + Send>,
54-
) -> Result<()>;
54+
) -> Result<u64>;
5555
}
5656

5757
pub trait Client: UploadClient + ReconstructionClient {}

cas_client/src/remote_client.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,13 @@ impl UploadClient for RemoteClient {
102102

103103
#[async_trait]
104104
impl ReconstructionClient for RemoteClient {
105-
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<()> {
105+
async fn get_file(&self, hash: &MerkleHash, writer: &mut Box<dyn Write + Send>) -> Result<u64> {
106106
// get manifest of xorbs to download
107107
let manifest = self.reconstruct(hash, None).await?;
108108

109-
self.get_ranges(manifest, None, writer).await?;
109+
let bytes_downloaded = self.get_ranges(manifest, None, writer).await?;
110110

111-
Ok(())
111+
Ok(bytes_downloaded)
112112
}
113113

114114
#[allow(unused_variables)]
@@ -118,7 +118,7 @@ impl ReconstructionClient for RemoteClient {
118118
offset: u64,
119119
length: u64,
120120
writer: &mut Box<dyn Write + Send>,
121-
) -> Result<()> {
121+
) -> Result<u64> {
122122
todo!()
123123
}
124124
}
@@ -203,7 +203,7 @@ impl RemoteClient {
203203
reconstruction_response: QueryReconstructionResponse,
204204
_byte_range: Option<(u64, u64)>,
205205
writer: &mut Box<dyn Write + Send>,
206-
) -> Result<usize> {
206+
) -> Result<u64> {
207207
let info = reconstruction_response.reconstruction;
208208
let total_len = info.iter().fold(0, |acc, x| acc + x.unpacked_length);
209209
let futs = info
@@ -215,7 +215,8 @@ impl RemoteClient {
215215
.map_err(|e| CasClientError::InternalError(anyhow!("join error {e}")))??;
216216
writer.write_all(&piece)?;
217217
}
218-
Ok(total_len as usize)
218+
// Todo: return the bytes which were read from the cache for telemetry
219+
Ok(total_len as u64)
219220
}
220221
}
221222

data/src/bin/example.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ async fn smudge(mut reader: impl Read, writer: &mut Box<dyn Write + Send>) -> Re
213213
let mut input = String::new();
214214
reader.read_to_string(&mut input)?;
215215

216-
let pointer_file = PointerFile::init_from_string(&input, "");
216+
let mut pointer_file = PointerFile::init_from_string(&input, "");
217217

218218
// not a pointer file, leave it as it is.
219219
if !pointer_file.is_valid() {
@@ -223,7 +223,7 @@ async fn smudge(mut reader: impl Read, writer: &mut Box<dyn Write + Send>) -> Re
223223
let translator = PointerFileTranslator::new(default_smudge_config()?).await?;
224224

225225
translator
226-
.smudge_file_from_pointer(&pointer_file, writer, None)
226+
.smudge_file_from_pointer(&mut pointer_file, writer, None)
227227
.await?;
228228

229229
Ok(())

data/src/clean.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::mem::take;
2121
use std::ops::DerefMut;
2222
use std::path::{Path, PathBuf};
2323
use std::sync::Arc;
24+
use std::time::Instant;
2425
use tokio::sync::mpsc::error::TryRecvError;
2526
use tokio::sync::mpsc::{channel, Receiver, Sender};
2627
use tokio::sync::Mutex;
@@ -76,6 +77,9 @@ pub struct Cleaner {
7677

7778
// Auxiliary info
7879
file_name: Option<PathBuf>,
80+
81+
// Telemetry
82+
start: Instant,
7983
}
8084

8185
impl Cleaner {
@@ -113,6 +117,7 @@ impl Cleaner {
113117
tracking_info: Mutex::new(Default::default()),
114118
small_file_buffer: Mutex::new(Some(Vec::with_capacity(small_file_threshold))),
115119
file_name: file_name.map(|f| f.to_owned()),
120+
start: Instant::now(),
116121
});
117122

118123
Self::run(cleaner.clone(), chunk_c).await;
@@ -239,8 +244,9 @@ impl Cleaner {
239244
Ok(false)
240245
}
241246

242-
async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<()> {
247+
async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<u64> {
243248
info!("Dedup {} chunks", chunks.len());
249+
let mut total_compressed_bytes = 0;
244250
let mut tracking_info = self.tracking_info.lock().await;
245251

246252
let enable_global_dedup = self.enable_global_dedup_queries;
@@ -463,13 +469,14 @@ impl Cleaner {
463469
tracking_info.cas_data.data.extend(bytes);
464470

465471
if tracking_info.cas_data.data.len() > TARGET_CAS_BLOCK_SIZE {
466-
let cas_hash = register_new_cas_block(
472+
let (cas_hash, compressed_bytes) = register_new_cas_block(
467473
&mut tracking_info.cas_data,
468474
&self.shard_manager,
469475
&self.cas,
470476
&self.cas_prefix,
471477
)
472478
.await?;
479+
total_compressed_bytes += compressed_bytes;
473480

474481
for i in take(&mut tracking_info.current_cas_file_info_indices) {
475482
tracking_info.file_info[i].cas_hash = cas_hash;
@@ -483,7 +490,7 @@ impl Cleaner {
483490
}
484491
}
485492

486-
Ok(())
493+
Ok(total_compressed_bytes)
487494
}
488495

489496
async fn finish(&self) -> Result<()> {
@@ -516,7 +523,8 @@ impl Cleaner {
516523
Ok(())
517524
}
518525

519-
async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64)> {
526+
async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64, u64)> {
527+
let mut total_compressed_bytes = 0;
520528
let mut tracking_info = self.tracking_info.lock().await;
521529

522530
let file_hash = file_node_hash(
@@ -577,13 +585,14 @@ impl Cleaner {
577585
if cas_data_accumulator.data.len() >= TARGET_CAS_BLOCK_SIZE {
578586
let mut new_cas_data = take(cas_data_accumulator.deref_mut());
579587
drop(cas_data_accumulator); // Release the lock.
580-
register_new_cas_block(
588+
let (_cas_hash, compressed_bytes) = register_new_cas_block(
581589
&mut new_cas_data,
582590
&self.shard_manager,
583591
&self.cas,
584592
&self.cas_prefix,
585593
)
586594
.await?;
595+
total_compressed_bytes += compressed_bytes;
587596
} else {
588597
drop(cas_data_accumulator);
589598
}
@@ -593,11 +602,11 @@ impl Cleaner {
593602

594603
*tracking_info = Default::default();
595604

596-
Ok((file_hash, file_size))
605+
Ok((file_hash, file_size, total_compressed_bytes))
597606
}
598607

599608
async fn to_pointer_file(&self) -> Result<String> {
600-
let (hash, filesize) = self.summarize_dedup_info().await?;
609+
let (hash, filesize, compressed_size) = self.summarize_dedup_info().await?;
601610
let pointer_file = PointerFile::init_from_info(
602611
&self
603612
.file_name
@@ -606,6 +615,8 @@ impl Cleaner {
606615
.unwrap_or_default(),
607616
&hash.hex(),
608617
filesize,
618+
compressed_size,
619+
self.start.elapsed(),
609620
);
610621
Ok(pointer_file.to_string())
611622
}

data/src/data_processing.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::errors::*;
55
use crate::metrics::FILTER_CAS_BYTES_PRODUCED;
66
use crate::remote_shard_interface::RemoteShardInterface;
77
use crate::shard_interface::create_shard_manager;
8-
use crate::PointerFile;
8+
use crate::{PointerFile, PointerFileTelemetry};
99
use cas_client::Client;
1010
use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo};
1111
use mdb_shard::file_structs::MDBFileInfo;
@@ -17,6 +17,7 @@ use std::mem::take;
1717
use std::ops::DerefMut;
1818
use std::path::Path;
1919
use std::sync::Arc;
20+
use std::time::Instant;
2021
use tokio::sync::Mutex;
2122

2223
#[derive(Default, Debug)]
@@ -59,6 +60,9 @@ pub struct PointerFileTranslator {
5960

6061
/* ----- Deduped data shared across files ----- */
6162
global_cas_data: Arc<Mutex<CASDataAggregator>>,
63+
// Telemetry
64+
/* ----- Telemetry ----- */
65+
pub start: Instant,
6266
}
6367

6468
// Constructors
@@ -97,6 +101,7 @@ impl PointerFileTranslator {
97101
remote_shards,
98102
cas: cas_client,
99103
global_cas_data: Default::default(),
104+
start: Instant::now(),
100105
})
101106
}
102107
}
@@ -208,7 +213,7 @@ pub(crate) async fn register_new_cas_block(
208213
shard_manager: &Arc<ShardFileManager>,
209214
cas: &Arc<dyn Client + Send + Sync>,
210215
cas_prefix: &str,
211-
) -> Result<MerkleHash> {
216+
) -> Result<(MerkleHash, u64)> {
212217
let cas_hash = cas_node_hash(&cas_data.chunks[..]);
213218

214219
let raw_bytes_len = cas_data.data.len();
@@ -283,29 +288,37 @@ pub(crate) async fn register_new_cas_block(
283288
cas_data.chunks.clear();
284289
cas_data.pending_file_info.clear();
285290

286-
Ok(cas_hash)
291+
Ok((cas_hash, compressed_bytes_len as u64))
287292
}
288293

289294
/// Smudge operations
290295
impl PointerFileTranslator {
291296
pub async fn smudge_file_from_pointer(
292297
&self,
293-
pointer: &PointerFile,
298+
pointer_file: &mut PointerFile,
294299
writer: &mut Box<dyn Write + Send>,
295300
range: Option<(usize, usize)>,
296301
) -> Result<()> {
297-
self.smudge_file_from_hash(&pointer.hash()?, writer, range)
298-
.await
302+
let start = Instant::now();
303+
let bytes_downloaded = self
304+
.smudge_file_from_hash(&pointer_file.hash()?, writer, range)
305+
.await?;
306+
307+
pointer_file.telemetry = Some(PointerFileTelemetry {
308+
latency: Some(start.elapsed()),
309+
network_bytes: Some(bytes_downloaded),
310+
});
311+
312+
Ok(())
299313
}
300314

301315
pub async fn smudge_file_from_hash(
302316
&self,
303317
file_id: &MerkleHash,
304318
writer: &mut Box<dyn Write + Send>,
305319
_range: Option<(usize, usize)>,
306-
) -> Result<()> {
307-
self.cas.get_file(file_id, writer).await?;
308-
309-
Ok(())
320+
) -> Result<u64> {
321+
let bytes_downloaded = self.cas.get_file(file_id, writer).await?;
322+
Ok(bytes_downloaded)
310323
}
311324
}

data/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ mod test_utils;
1717

1818
pub use constants::SMALL_FILE_THRESHOLD;
1919
pub use data_processing::PointerFileTranslator;
20-
pub use pointer_file::PointerFile;
20+
pub use pointer_file::{PointerFile, PointerFileTelemetry};

0 commit comments

Comments
 (0)