Skip to content

Commit

Permalink
compute: wire up sink metrics with CorrectionV2
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Jan 17, 2025
1 parent a45d149 commit 2262dee
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 15 deletions.
15 changes: 11 additions & 4 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,17 @@ impl<D> Drop for CorrectionV1<D> {
}

/// Helper type for convenient tracking of length and capacity together.
#[derive(Clone, Copy, Default)]
struct LengthAndCapacity {
length: usize,
capacity: usize,
#[derive(Clone, Copy, Debug, Default)]
pub(super) struct LengthAndCapacity {
pub length: usize,
pub capacity: usize,
}

impl AddAssign<Self> for LengthAndCapacity {
fn add_assign(&mut self, size: Self) {
self.length += size.length;
self.capacity += size.capacity;
}
}

impl AddAssign<(usize, usize)> for LengthAndCapacity {
Expand Down
118 changes: 107 additions & 11 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@ use std::fmt;
use std::rc::Rc;

use differential_dataflow::trace::implementations::BatchContainer;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics};
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use mz_timely_util::containers::stack::StackWrapper;
use timely::container::columnation::Columnation;
use timely::progress::Antichain;
use timely::{Container, PartialOrder};

use crate::sink::correction::LengthAndCapacity;

/// Determines the size factor of subsequent chains required by the chain invariant.
const CHAIN_PROPORTIONALITY: usize = 3;

Expand All @@ -150,10 +152,15 @@ pub(super) struct CorrectionV2<D: Data> {
chains: Vec<Chain<D>>,
/// The frontier by which all contained times are advanced.
since: Antichain<Timestamp>,

/// Total length and capacity of chunks in `chains`.
///
/// Tracked to maintain metrics.
total_size: LengthAndCapacity,
/// Global persist sink metrics.
_metrics: SinkMetrics,
metrics: SinkMetrics,
/// Per-worker persist sink metrics.
_worker_metrics: SinkWorkerMetrics,
worker_metrics: SinkWorkerMetrics,
}

impl<D: Data> CorrectionV2<D> {
Expand All @@ -162,8 +169,9 @@ impl<D: Data> CorrectionV2<D> {
Self {
chains: Default::default(),
since: Antichain::from_elem(Timestamp::MIN),
_metrics: metrics,
_worker_metrics: worker_metrics,
total_size: Default::default(),
metrics,
worker_metrics,
}
}

Expand Down Expand Up @@ -233,6 +241,8 @@ impl<D: Data> CorrectionV2<D> {
let merged = merge_chains(vec![a, b], &self.since);
self.chains.push(merged);
}

self.update_metrics();
}

/// Return consolidated updates before the given `upper`.
Expand Down Expand Up @@ -296,6 +306,8 @@ impl<D: Data> CorrectionV2<D> {
i -= 1;
}
}

self.update_metrics();
}

/// Advance the since frontier.
Expand All @@ -316,6 +328,24 @@ impl<D: Data> CorrectionV2<D> {
self.consolidate_before(&upper);
}
}

/// Update persist sink metrics.
fn update_metrics(&mut self) {
let mut new_size = LengthAndCapacity::default();
for chain in &mut self.chains {
new_size += chain.get_size();
}

let old_size = self.total_size;
let len_delta = UpdateDelta::new(new_size.length, old_size.length);
let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
self.metrics
.report_correction_update_deltas(len_delta, cap_delta);
self.worker_metrics
.report_correction_update_totals(new_size.length, new_size.capacity);

self.total_size = new_size;
}
}

/// A chain of [`Chunk`]s containing updates.
Expand All @@ -328,12 +358,15 @@ impl<D: Data> CorrectionV2<D> {
struct Chain<D: Data> {
/// The contained chunks.
chunks: Vec<Chunk<D>>,
/// Cached value of the current chain size, for efficient updating of metrics.
cached_size: Option<LengthAndCapacity>,
}

impl<D: Data> Default for Chain<D> {
fn default() -> Self {
Self {
chunks: Default::default(),
cached_size: None,
}
}
}
Expand Down Expand Up @@ -366,6 +399,8 @@ impl<D: Data> Chain<D> {
self.push_chunk(chunk);
}
}

self.invalidate_cached_size();
}

/// Push a chunk onto the chain.
Expand All @@ -376,6 +411,7 @@ impl<D: Data> Chain<D> {
debug_assert!(self.can_accept(chunk.first()));

self.chunks.push(chunk);
self.invalidate_cached_size();
}

/// Push the updates produced by a cursor onto the chain.
Expand Down Expand Up @@ -421,7 +457,30 @@ impl<D: Data> Chain<D> {
fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
self.chunks
.iter()
.flat_map(|c| c.0.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
.flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
}

/// Return the size of the chain, for use in metrics.
fn get_size(&mut self) -> LengthAndCapacity {
// This operation can be expensive as it requires inspecting the individual chunks and
// their backing regions. We thus cache the result to hopefully avoid the cost most of the
// time.
if self.cached_size.is_none() {
let mut size = LengthAndCapacity::default();
for chunk in &mut self.chunks {
size += chunk.get_size();
}
self.cached_size = Some(size);
}

self.cached_size.unwrap()
}

/// Invalidate the cached chain size.
///
/// This method must be called every time the size of the chain changed.
fn invalidate_cached_size(&mut self) {
self.cached_size = None;
}
}

Expand Down Expand Up @@ -705,12 +764,28 @@ impl<D: Data> From<Cursor<D>> for Chain<D> {
}
}

struct Chunk<D: Data>(StackWrapper<(D, Timestamp, Diff)>);
/// A non-empty chunk of updates, backed by a columnation region.
///
/// All updates in a chunk are sorted by (time, data) and consolidated.
///
/// We would like all chunks to have size [`CHUNK_SIZE_BYTES`], to make it easy for the allocator
/// to re-use chunk allocations. Unfortunately, the current `TimelyStack`/`ChunkedStack` API
/// doesn't provide a convenient way to pre-size regions, so chunks are currently only fixed-size
/// in spirit.
struct Chunk<D: Data> {
/// The contained updates.
data: StackWrapper<(D, Timestamp, Diff)>,
/// Cached value of the current chunk size, for efficient updating of metrics.
cached_size: Option<LengthAndCapacity>,
}

impl<D: Data> Default for Chunk<D> {
fn default() -> Self {
let capacity = Self::capacity();
Self(StackWrapper::with_capacity(capacity))
Self {
data: StackWrapper::with_capacity(capacity),
cached_size: None,
}
}
}

Expand Down Expand Up @@ -743,7 +818,7 @@ impl<D: Data> Chunk<D> {

/// Return the number of updates in the chunk.
fn len(&self) -> usize {
Container::len(&self.0)
Container::len(&self.data)
}

/// Return the number of updates left before the chunk capacity is reached.
Expand All @@ -757,7 +832,7 @@ impl<D: Data> Chunk<D> {
///
/// Panics if the given index is not populated.
fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
let (d, t, r) = self.0.index(idx);
let (d, t, r) = self.data.index(idx);
(d, *t, *r)
}

Expand All @@ -774,7 +849,9 @@ impl<D: Data> Chunk<D> {
/// Push an update onto the chunk.
fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
let (d, t, r) = update;
self.0.copy_destructured(d.borrow(), &t, &r);
self.data.copy_destructured(d.borrow(), &t, &r);

self.invalidate_cached_size();
}

/// Return the index of the first update at a time greater than `time`, or `None` if no such
Expand All @@ -797,6 +874,25 @@ impl<D: Data> Chunk<D> {

Some(lower)
}

/// Return the size of the chunk, for use in metrics.
fn get_size(&mut self) -> LengthAndCapacity {
if self.cached_size.is_none() {
let length = Container::len(&self.data);
let mut capacity = 0;
self.data.heap_size(|_, cap| capacity += cap);
self.cached_size = Some(LengthAndCapacity { length, capacity });
}

self.cached_size.unwrap()
}

/// Invalidate the cached chunk size.
///
/// This method must be called every time the size of the chunk changed.
fn invalidate_cached_size(&mut self) {
self.cached_size = None;
}
}

/// Sort and consolidate the given list of updates.
Expand Down

0 comments on commit 2262dee

Please sign in to comment.