Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: spillable MV correction buffer #30083

Merged
merged 5 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ pub const ENABLE_MATERIALIZED_VIEW_SINK_V2: Config<bool> = Config::new(
"Whether compute should use the new MV sink implementation.",
);

/// Whether rendering should use the new MV sink correction buffer implementation.
///
/// Only has an effect when `enable_compute_materialized_view_sink_v2` is enabled.
pub const ENABLE_CORRECTION_V2: Config<bool> = Config::new(
"enable_compute_correction_v2",
false,
"Whether compute should use the new MV sink correction buffer implementation.",
);

/// The yielding behavior with which linear joins should be rendered.
pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
"linear_join_yielding",
Expand Down Expand Up @@ -171,6 +180,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&ENABLE_MZ_JOIN_CORE)
.add(&ENABLE_MATERIALIZED_VIEW_SINK_V2)
.add(&ENABLE_CORRECTION_V2)
.add(&LINEAR_JOIN_YIELDING)
.add(&ENABLE_COLUMNATION_LGALLOC)
.add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

mod copy_to_s3_oneshot;
mod correction;
mod correction_v2;
mod materialized_view;
mod materialized_view_v2;
mod refresh;
Expand Down
121 changes: 103 additions & 18 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,94 @@ use std::collections::BTreeMap;
use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};

use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::Data;
use itertools::Itertools;
use mz_compute_types::dyncfgs::{CONSOLIDATING_VEC_GROWTH_DAMPENER, ENABLE_CORRECTION_V2};
use mz_dyncfg::ConfigSet;
use mz_ore::iter::IteratorExt;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use timely::progress::Antichain;
use timely::PartialOrder;

use crate::sink::correction_v2::{CorrectionV2, Data};

/// A data structure suitable for storing updates in a self-correcting persist sink.
///
/// Selects one of two correction buffer implementations. `V1` is the original simple
/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
/// `V1` in a pinch. The plan is to remove `V1` eventually.
pub(super) enum Correction<D: Data> {
V1(CorrectionV1<D>),
V2(CorrectionV2<D>),
}

impl<D: Data> Correction<D> {
/// Construct a new `Correction` instance.
pub fn new(
metrics: SinkMetrics,
worker_metrics: SinkWorkerMetrics,
config: &ConfigSet,
) -> Self {
if ENABLE_CORRECTION_V2.get(config) {
Self::V2(CorrectionV2::new(metrics, worker_metrics))
} else {
let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
}
}

/// Insert a batch of updates.
pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert(updates),
Self::V2(c) => c.insert(updates),
}
}

/// Insert a batch of updates, after negating their diffs.
pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert_negated(updates),
Self::V2(c) => c.insert_negated(updates),
}
}

/// Consolidate and return updates before the given `upper`.
pub fn updates_before(
&mut self,
upper: &Antichain<Timestamp>,
) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
match self {
Self::V1(c) => Box::new(c.updates_before(upper)),
Self::V2(c) => Box::new(c.updates_before(upper)),
}
}

/// Advance the since frontier.
///
/// # Panics
///
/// Panics if the given `since` is less than the current since frontier.
pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
match self {
Self::V1(c) => c.advance_since(since),
Self::V2(c) => c.advance_since(since),
}
}

/// Consolidate all updates at the current `since`.
pub fn consolidate_at_since(&mut self) {
match self {
Self::V1(c) => c.consolidate_at_since(),
Self::V2(c) => c.consolidate_at_since(),
}
}
}

/// A collection holding `persist_sink` updates.
///
/// The `Correction` data structure is purpose-built for the `persist_sink::write_batches`
/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
/// operator:
///
/// * It stores updates by time, to enable efficient separation between updates that should
Expand All @@ -33,7 +110,7 @@ use timely::PartialOrder;
/// are removed by inserting them again, with negated diffs. Stored updates are continuously
/// consolidated to give them opportunity to cancel each other out.
/// * It provides an interface for advancing all contained updates to a given frontier.
pub(super) struct Correction<D> {
pub(super) struct CorrectionV1<D> {
/// Stashed updates by time.
updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
/// Frontier to which all update times are advanced.
Expand All @@ -51,8 +128,8 @@ pub(super) struct Correction<D> {
growth_dampener: usize,
}

impl<D> Correction<D> {
/// Construct a new `Correction` instance.
impl<D> CorrectionV1<D> {
/// Construct a new `CorrectionV1` instance.
pub fn new(
metrics: SinkMetrics,
worker_metrics: SinkWorkerMetrics,
Expand Down Expand Up @@ -82,28 +159,29 @@ impl<D> Correction<D> {
}
}

impl<D: Data> Correction<D> {
impl<D: Data> CorrectionV1<D> {
/// Insert a batch of updates.
pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
// If the since frontier is empty, discard all updates.
return;
};

for (_, time, _) in &mut updates {
for (_, time, _) in &mut *updates {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this equivalent to just using updates directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason it's not. If you try that you get an error with the helpful hint that you should use a reborrow instead 🤷

*time = std::cmp::max(*time, *since_ts);
}
self.insert_inner(updates);
}

/// Insert a batch of updates, after negating their diffs.
pub fn insert_negated(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
// If the since frontier is empty, discard all updates.
updates.clear();
return;
};

for (_, time, diff) in &mut updates {
for (_, time, diff) in &mut *updates {
*time = std::cmp::max(*time, *since_ts);
*diff = -*diff;
}
Expand All @@ -113,12 +191,12 @@ impl<D: Data> Correction<D> {
/// Insert a batch of updates.
///
/// The given `updates` must all have been advanced by `self.since`.
fn insert_inner(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
consolidate_updates(&mut updates);
fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
consolidate_updates(updates);
updates.sort_unstable_by_key(|(_, time, _)| *time);

let mut new_size = self.total_size;
let mut updates = updates.into_iter().peekable();
let mut updates = updates.drain(..).peekable();
while let Some(&(_, time, _)) = updates.peek() {
debug_assert!(
self.since.less_equal(&time),
Expand Down Expand Up @@ -274,17 +352,24 @@ impl<D: Data> Correction<D> {
}
}

impl<D> Drop for Correction<D> {
impl<D> Drop for CorrectionV1<D> {
fn drop(&mut self) {
self.update_metrics(Default::default());
}
}

/// Helper type for convenient tracking of length and capacity together.
#[derive(Clone, Copy, Default)]
struct LengthAndCapacity {
length: usize,
capacity: usize,
#[derive(Clone, Copy, Debug, Default)]
pub(super) struct LengthAndCapacity {
pub length: usize,
pub capacity: usize,
}

impl AddAssign<Self> for LengthAndCapacity {
fn add_assign(&mut self, size: Self) {
self.length += size.length;
self.capacity += size.capacity;
}
}

impl AddAssign<(usize, usize)> for LengthAndCapacity {
Expand Down
Loading