Skip to content

Commit

Permalink
compute: wire up sink metrics with CorrectionV2
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Jan 3, 2025
1 parent c7ee9d1 commit e95c32f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 15 deletions.
15 changes: 11 additions & 4 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,17 @@ impl<D> Drop for CorrectionV1<D> {
}

/// Helper type for convenient tracking of length and capacity together.
#[derive(Clone, Copy, Default)]
struct LengthAndCapacity {
length: usize,
capacity: usize,
#[derive(Clone, Copy, Debug, Default)]
pub(super) struct LengthAndCapacity {
pub length: usize,
pub capacity: usize,
}

impl AddAssign<Self> for LengthAndCapacity {
fn add_assign(&mut self, size: Self) {
self.length += size.length;
self.capacity += size.capacity;
}
}

impl AddAssign<(usize, usize)> for LengthAndCapacity {
Expand Down
97 changes: 86 additions & 11 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use std::fmt;
use std::rc::Rc;

use differential_dataflow::trace::implementations::BatchContainer;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics};
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use mz_timely_util::containers::stack::StackWrapper;
use timely::container::columnation::Columnation;
use timely::progress::Antichain;
use timely::{Container, PartialOrder};

use crate::sink::correction::LengthAndCapacity;

const CHUNK_SIZE_BYTES: usize = 8 << 10;

pub trait Data: differential_dataflow::Data + Columnation {}
Expand All @@ -30,10 +32,15 @@ impl<D: differential_dataflow::Data + Columnation> Data for D {}
pub(super) struct CorrectionV2<D: Data> {
chains: Vec<Chain<D>>,
since: Antichain<Timestamp>,

/// Total length and capacity of chunks in `chains`.
///
/// Tracked to maintain metrics.
total_size: LengthAndCapacity,
/// Global persist sink metrics.
_metrics: SinkMetrics,
metrics: SinkMetrics,
/// Per-worker persist sink metrics.
_worker_metrics: SinkWorkerMetrics,
worker_metrics: SinkWorkerMetrics,
}

impl<D: Data> CorrectionV2<D> {
Expand All @@ -42,8 +49,9 @@ impl<D: Data> CorrectionV2<D> {
Self {
chains: Default::default(),
since: Antichain::from_elem(Timestamp::MIN),
_metrics: metrics,
_worker_metrics: worker_metrics,
total_size: Default::default(),
metrics,
worker_metrics,
}
}

Expand Down Expand Up @@ -105,6 +113,8 @@ impl<D: Data> CorrectionV2<D> {
let merged = merge_chains(vec![a, b], &self.since);
self.chains.push(merged);
}

self.update_metrics();
}

/// Return consolidated updates before the given `upper`.
Expand All @@ -124,12 +134,15 @@ impl<D: Data> CorrectionV2<D> {
self.chains = remains;

if merged.is_empty() {
self.update_metrics();
return empty_result();
}

self.chains.push(merged);
self.chains.sort_unstable_by_key(|c| c.len());

self.update_metrics();

let merged = self
.chains
.iter()
Expand All @@ -152,17 +165,37 @@ impl<D: Data> CorrectionV2<D> {
assert!(PartialOrder::less_equal(&self.since, &since));
self.since = since;
}

/// Update persist sink metrics to the given new length and capacity.
fn update_metrics(&mut self) {
let mut new_size = LengthAndCapacity::default();
for chain in &mut self.chains {
new_size += chain.get_size();
}

let old_size = self.total_size;
let len_delta = UpdateDelta::new(new_size.length, old_size.length);
let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
self.metrics
.report_correction_update_deltas(len_delta, cap_delta);
self.worker_metrics
.report_correction_update_totals(new_size.length, new_size.capacity);

self.total_size = new_size;
}
}

#[derive(Debug)]
struct Chain<D: Data> {
chunks: Vec<Chunk<D>>,
cached_size: Option<LengthAndCapacity>,
}

impl<D: Data> Default for Chain<D> {
fn default() -> Self {
Self {
chunks: Default::default(),
cached_size: None,
}
}
}
Expand Down Expand Up @@ -190,10 +223,13 @@ impl<D: Data> Chain<D> {
self.push_chunk(chunk);
}
}

self.invalidate_cached_size();
}

fn push_chunk(&mut self, chunk: Chunk<D>) {
self.chunks.push(chunk);
self.invalidate_cached_size();
}

fn push_cursor(&mut self, cursor: Cursor<D>) {
Expand Down Expand Up @@ -224,7 +260,23 @@ impl<D: Data> Chain<D> {
fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
self.chunks
.iter()
.flat_map(|c| c.0.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
.flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
}

fn get_size(&mut self) -> LengthAndCapacity {
if self.cached_size.is_none() {
let mut size = LengthAndCapacity::default();
for chunk in &mut self.chunks {
size += chunk.get_size();
}
self.cached_size = Some(size);
}

self.cached_size.unwrap()
}

fn invalidate_cached_size(&mut self) {
self.cached_size = None;
}
}

Expand Down Expand Up @@ -448,12 +500,18 @@ impl<D: Data> From<Cursor<D>> for Chain<D> {
}
}

struct Chunk<D: Data>(StackWrapper<(D, Timestamp, Diff)>);
struct Chunk<D: Data> {
data: StackWrapper<(D, Timestamp, Diff)>,
cached_size: Option<LengthAndCapacity>,
}

impl<D: Data> Default for Chunk<D> {
fn default() -> Self {
let capacity = Self::capacity();
Self(StackWrapper::with_capacity(capacity))
Self {
data: StackWrapper::with_capacity(capacity),
cached_size: None,
}
}
}

Expand All @@ -470,15 +528,15 @@ impl<D: Data> Chunk<D> {
}

fn len(&self) -> usize {
Container::len(&self.0)
Container::len(&self.data)
}

fn capacity_left(&self) -> bool {
self.len() < Self::capacity()
}

fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
let (d, t, r) = self.0.index(idx);
let (d, t, r) = self.data.index(idx);
(d, *t, *r)
}

Expand All @@ -488,7 +546,9 @@ impl<D: Data> Chunk<D> {

fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
let (d, t, r) = update;
self.0.copy_destructured(d.borrow(), &t, &r);
self.data.copy_destructured(d.borrow(), &t, &r);

self.invalidate_cached_size();
}

fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
Expand All @@ -509,6 +569,21 @@ impl<D: Data> Chunk<D> {

Some(lower)
}

fn get_size(&mut self) -> LengthAndCapacity {
if self.cached_size.is_none() {
let length = Container::len(&self.data);
let mut capacity = 0;
self.data.heap_size(|_, cap| capacity += cap);
self.cached_size = Some(LengthAndCapacity { length, capacity });
}

self.cached_size.unwrap()
}

fn invalidate_cached_size(&mut self) {
self.cached_size = None;
}
}

fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
Expand Down

0 comments on commit e95c32f

Please sign in to comment.