Skip to content

Commit

Permalink
compute: make correction version configurable
Browse files Browse the repository at this point in the history
This commit adds a new dyncfg, `enable_compute_correction_v2`, that
controlls whether the MV sink v2 should use the old or the new
implementation of the correction buffer.
  • Loading branch information
teskje committed Jan 15, 2025
1 parent 1400139 commit 56c1a52
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 16 deletions.
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def get_default_system_parameters(
"enable_alter_swap": "true",
"enable_columnation_lgalloc": "true",
"enable_compute_chunked_stack": "true",
"enable_compute_correction_v2": "true",
"enable_connection_validation_syntax": "true",
"enable_continual_task_builtins": (
"true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
Expand Down
10 changes: 10 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ pub const ENABLE_MATERIALIZED_VIEW_SINK_V2: Config<bool> = Config::new(
"Whether compute should use the new MV sink implementation.",
);

/// Whether rendering should use the new MV sink correction buffer implementation.
///
/// Only has an effect when `enable_compute_materialized_view_sink_v2` is enabled.
pub const ENABLE_CORRECTION_V2: Config<bool> = Config::new(
"enable_compute_correction_v2",
false,
"Whether compute should use the new MV sink correction buffer implementation.",
);

/// The yielding behavior with which linear joins should be rendered.
pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
"linear_join_yielding",
Expand Down Expand Up @@ -163,6 +172,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&ENABLE_MZ_JOIN_CORE)
.add(&ENABLE_MATERIALIZED_VIEW_SINK_V2)
.add(&ENABLE_CORRECTION_V2)
.add(&LINEAR_JOIN_YIELDING)
.add(&ENABLE_COLUMNATION_LGALLOC)
.add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
Expand Down
84 changes: 77 additions & 7 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,87 @@ use std::collections::BTreeMap;
use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};

use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::Data;
use itertools::Itertools;
use mz_ore::iter::IteratorExt;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use timely::progress::Antichain;
use timely::PartialOrder;

use crate::sink::correction_v2::{CorrectionV2, Data};

/// A data structure suitable for storing updates in a self-correcting persist sink.
///
/// Selects one of two correction buffer implementations. `V1` is the original simple
/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
/// `V1` in a pinch. The plan is to remove `V1` eventually.
pub(super) enum Correction<D: Data> {
V1(CorrectionV1<D>),
V2(CorrectionV2<D>),
}

impl<D: Data> Correction<D> {
/// Construct a new `Correction` instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics, v2: bool) -> Self {
if v2 {
Self::V2(CorrectionV2::new(metrics, worker_metrics))
} else {
Self::V1(CorrectionV1::new(metrics, worker_metrics))
}
}

/// Insert a batch of updates.
pub fn insert(&mut self, updates: Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert(updates),
Self::V2(c) => c.insert(updates),
}
}

/// Insert a batch of updates, after negating their diffs.
pub fn insert_negated(&mut self, updates: Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert_negated(updates),
Self::V2(c) => c.insert_negated(updates),
}
}

/// Consolidate and return updates before the given `upper`.
pub fn updates_before(
&mut self,
upper: &Antichain<Timestamp>,
) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
match self {
Self::V1(c) => Box::new(c.updates_before(upper)),
Self::V2(c) => Box::new(c.updates_before(upper)),
}
}

/// Advance the since frontier.
///
/// # Panics
///
/// Panics if the given `since` is less than the current since frontier.
pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
match self {
Self::V1(c) => c.advance_since(since),
Self::V2(c) => c.advance_since(since),
}
}

/// Consolidate all updates at the current `since`.
pub fn consolidate_at_since(&mut self) {
match self {
Self::V1(c) => c.consolidate_at_since(),
Self::V2(c) => c.consolidate_at_since(),
}
}
}

/// A collection holding `persist_sink` updates.
///
/// The `Correction` data structure is purpose-built for the `persist_sink::write_batches`
/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
/// operator:
///
/// * It stores updates by time, to enable efficient separation between updates that should
Expand All @@ -33,7 +103,7 @@ use timely::PartialOrder;
/// are removed by inserting them again, with negated diffs. Stored updates are continuously
/// consolidated to give them opportunity to cancel each other out.
/// * It provides an interface for advancing all contained updates to a given frontier.
pub(super) struct Correction<D> {
pub(super) struct CorrectionV1<D> {
/// Stashed updates by time.
updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
/// Frontier to which all update times are advanced.
Expand All @@ -49,8 +119,8 @@ pub(super) struct Correction<D> {
worker_metrics: SinkWorkerMetrics,
}

impl<D> Correction<D> {
/// Construct a new `Correction` instance.
impl<D> CorrectionV1<D> {
/// Construct a new `CorrectionV1` instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
Self {
updates: Default::default(),
Expand All @@ -75,7 +145,7 @@ impl<D> Correction<D> {
}
}

impl<D: Data> Correction<D> {
impl<D: Data> CorrectionV1<D> {
/// Insert a batch of updates.
pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
Expand Down Expand Up @@ -266,7 +336,7 @@ impl<D: Data> Correction<D> {
}
}

impl<D> Drop for Correction<D> {
impl<D> Drop for CorrectionV1<D> {
fn drop(&mut self) {
self.update_metrics(Default::default());
}
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl<D: differential_dataflow::Data + Columnation> Data for D {}
/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
/// allowing their memory to be transparently spilled to disk.
#[derive(Debug)]
pub(super) struct Correction<D: Data> {
pub(super) struct CorrectionV2<D: Data> {
/// Chains containing sorted updates.
chains: Vec<Chain<D>>,
/// The frontier by which all contained times are advanced.
Expand All @@ -156,8 +156,8 @@ pub(super) struct Correction<D: Data> {
_worker_metrics: SinkWorkerMetrics,
}

impl<D: Data> Correction<D> {
/// Construct a new [`Correction`] instance.
impl<D: Data> CorrectionV2<D> {
/// Construct a new [`CorrectionV2`] instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
Self {
chains: Default::default(),
Expand Down
7 changes: 4 additions & 3 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tracing::trace;
use crate::compute_state::ComputeState;
use crate::render::sinks::SinkRender;
use crate::render::StartSignal;
use crate::sink::correction::Correction;
use crate::sink::correction::CorrectionV1;
use crate::sink::materialized_view_v2;
use crate::sink::refresh::apply_refresh;

Expand Down Expand Up @@ -667,8 +667,9 @@ where
// Contains `desired - persist`, reflecting the updates we would like to commit
// to `persist` in order to "correct" it to track `desired`. These collections are
// only modified by updates received from either the `desired` or `persist` inputs.
let mut correction_oks = Correction::new(sink_metrics.clone(), sink_worker_metrics.clone());
let mut correction_errs = Correction::new(sink_metrics, sink_worker_metrics);
let mut correction_oks =
CorrectionV1::new(sink_metrics.clone(), sink_worker_metrics.clone());
let mut correction_errs = CorrectionV1::new(sink_metrics, sink_worker_metrics);

// Contains descriptions of batches for which we know that we can
// write data. We got these from the "centralized" operator that
Expand Down
17 changes: 14 additions & 3 deletions src/compute/src/sink/materialized_view_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use std::sync::Arc;

use differential_dataflow::{Collection, Hashable};
use futures::StreamExt;
use mz_compute_types::dyncfgs::ENABLE_CORRECTION_V2;
use mz_ore::cast::CastFrom;
use mz_persist_client::batch::{Batch, ProtoBatch};
use mz_persist_client::cache::PersistClientCache;
Expand Down Expand Up @@ -211,6 +212,7 @@ where
&desired,
&persist,
&descs,
ENABLE_CORRECTION_V2.get(&compute_state.worker_config),
);

let append_token = append::render(sink_id, persist_api, active_worker_id, &descs, &batches);
Expand Down Expand Up @@ -668,6 +670,7 @@ mod write {
desired: &DesiredStreams<S>,
persist: &PersistStreams<S>,
descs: &Stream<S, BatchDescription>,
use_correction_v2: bool,
) -> (BatchesStream<S>, PressOnDropButton)
where
S: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -702,7 +705,14 @@ mod write {

let writer = persist_api.open_writer().await;
let sink_metrics = persist_api.open_metrics().await;
let mut state = State::new(sink_id, worker_id, writer, sink_metrics, as_of);
let mut state = State::new(
sink_id,
worker_id,
writer,
sink_metrics,
as_of,
use_correction_v2,
);

loop {
// Read from the inputs, extract `desired` updates as positive contributions to
Expand Down Expand Up @@ -821,6 +831,7 @@ mod write {
persist_writer: WriteHandle<SourceData, (), Timestamp, Diff>,
metrics: SinkMetrics,
as_of: Antichain<Timestamp>,
use_correction_v2: bool,
) -> Self {
let worker_metrics = metrics.for_worker(worker_id);

Expand All @@ -833,8 +844,8 @@ mod write {
worker_id,
persist_writer,
corrections: OkErr::new(
Correction::new(metrics.clone(), worker_metrics.clone()),
Correction::new(metrics, worker_metrics),
Correction::new(metrics.clone(), worker_metrics.clone(), use_correction_v2),
Correction::new(metrics, worker_metrics, use_correction_v2),
),
desired_frontiers: OkErr::new_frontiers(),
persist_frontiers: OkErr::new_frontiers(),
Expand Down

0 comments on commit 56c1a52

Please sign in to comment.