Skip to content

Commit

Permalink
compute: make correction version configurable
Browse files Browse the repository at this point in the history
This commit adds a new dyncfg, `enable_compute_correction_v2`, that
controlls whether the MV sink v2 should use the old or the new
implementation of the correction buffer.
  • Loading branch information
teskje committed Nov 27, 2024
1 parent 8e10961 commit 4356350
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 18 deletions.
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def get_default_system_parameters(
"enable_columnation_lgalloc": "true",
"enable_comment": "true",
"enable_compute_chunked_stack": "true",
"enable_compute_correction_v2": "true",
"enable_connection_validation_syntax": "true",
"enable_continual_task_builtins": "true",
"enable_continual_task_create": "true",
Expand Down
10 changes: 10 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ pub const ENABLE_MATERIALIZED_VIEW_SINK_V2: Config<bool> = Config::new(
"Whether compute should use the new MV sink implementation.",
);

/// Whether rendering should use the new MV sink correction buffer implementation.
///
/// Only has an effect when `enable_compute_materialized_view_sink_v2` is enabled.
pub const ENABLE_CORRECTION_V2: Config<bool> = Config::new(
"enable_compute_correction_v2",
false,
"Whether compute should use the new MV sink correction buffer implementation.",
);

/// The yielding behavior with which linear joins should be rendered.
pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
"linear_join_yielding",
Expand Down Expand Up @@ -154,6 +163,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&ENABLE_MZ_JOIN_CORE)
.add(&ENABLE_MATERIALIZED_VIEW_SINK_V2)
.add(&ENABLE_CORRECTION_V2)
.add(&LINEAR_JOIN_YIELDING)
.add(&ENABLE_COLUMNATION_LGALLOC)
.add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
Expand Down
84 changes: 77 additions & 7 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,87 @@ use std::collections::BTreeMap;
use std::ops::{AddAssign, Bound, SubAssign};

use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::Data;
use itertools::Itertools;
use mz_ore::iter::IteratorExt;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use timely::progress::Antichain;
use timely::PartialOrder;

use crate::sink::correction_v2::{CorrectionV2, Data};

/// A data structure suitable for storing updates in a self-correcting persist sink.
///
/// Selects one of two correction buffer implementations. `V1` is the original simple
/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
/// `V1` in a pinch. The plan is to remove `V1` eventually.
pub(super) enum Correction<D: Data> {
V1(CorrectionV1<D>),
V2(CorrectionV2<D>),
}

impl<D: Data> Correction<D> {
/// Construct a new `Correction` instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics, v2: bool) -> Self {
if v2 {
Self::V2(CorrectionV2::new(metrics, worker_metrics))
} else {
Self::V1(CorrectionV1::new(metrics, worker_metrics))
}
}

/// Insert a batch of updates.
pub fn insert(&mut self, updates: Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert(updates),
Self::V2(c) => c.insert(updates),
}
}

/// Insert a batch of updates, after negating their diffs.
pub fn insert_negated(&mut self, updates: Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert_negated(updates),
Self::V2(c) => c.insert_negated(updates),
}
}

/// Consolidate and return updates before the given `upper`.
pub fn updates_before(
&mut self,
upper: &Antichain<Timestamp>,
) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
match self {
Self::V1(c) => Box::new(c.updates_before(upper)),
Self::V2(c) => Box::new(c.updates_before(upper)),
}
}

/// Return the current since frontier.
pub fn since(&self) -> &Antichain<Timestamp> {
match self {
Self::V1(c) => c.since(),
Self::V2(c) => c.since(),
}
}

/// Advance the since frontier.
///
/// # Panics
///
/// Panics if the given `since` is less than the current since frontier.
pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
match self {
Self::V1(c) => c.advance_since(since),
Self::V2(c) => c.advance_since(since),
}
}
}

/// A collection holding `persist_sink` updates.
///
/// The `Correction` data structure is purpose-built for the `persist_sink::write_batches`
/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
/// operator:
///
/// * It stores updates by time, to enable efficient separation between updates that should
Expand All @@ -33,7 +103,7 @@ use timely::PartialOrder;
/// are removed by inserting them again, with negated diffs. Stored updates are continuously
/// consolidated to give them opportunity to cancel each other out.
/// * It provides an interface for advancing all contained updates to a given frontier.
pub(super) struct Correction<D> {
pub(super) struct CorrectionV1<D> {
/// Stashed updates by time.
updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
/// Frontier to which all update times are advanced.
Expand All @@ -49,8 +119,8 @@ pub(super) struct Correction<D> {
worker_metrics: SinkWorkerMetrics,
}

impl<D> Correction<D> {
/// Construct a new `Correction` instance.
impl<D> CorrectionV1<D> {
/// Construct a new `CorrectionV1` instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
Self {
updates: Default::default(),
Expand All @@ -75,7 +145,7 @@ impl<D> Correction<D> {
}
}

impl<D: Data> Correction<D> {
impl<D: Data> CorrectionV1<D> {
/// Insert a batch of updates.
pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
Expand Down Expand Up @@ -246,7 +316,7 @@ impl<D: Data> Correction<D> {
}
}

impl<D> Drop for Correction<D> {
impl<D> Drop for CorrectionV1<D> {
fn drop(&mut self) {
self.update_metrics(Default::default());
}
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl<D: differential_dataflow::Data + Columnation> Data for D {}
/// 2: xx
/// 3: x
///
pub(super) struct Correction<D: Data> {
pub(super) struct CorrectionV2<D: Data> {
chains: Vec<Chain<D>>,
since: Antichain<Timestamp>,
/// Global persist sink metrics.
Expand All @@ -41,8 +41,8 @@ pub(super) struct Correction<D: Data> {
_worker_metrics: SinkWorkerMetrics,
}

impl<D: Data> Correction<D> {
/// Construct a new `Correction` instance.
impl<D: Data> CorrectionV2<D> {
/// Construct a new `CorrectionV2` instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
Self {
chains: Default::default(),
Expand Down
7 changes: 4 additions & 3 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tracing::trace;
use crate::compute_state::ComputeState;
use crate::render::sinks::SinkRender;
use crate::render::StartSignal;
use crate::sink::correction::Correction;
use crate::sink::correction::CorrectionV1;
use crate::sink::materialized_view_v2;
use crate::sink::refresh::apply_refresh;

Expand Down Expand Up @@ -666,8 +666,9 @@ where
// Contains `desired - persist`, reflecting the updates we would like to commit
// to `persist` in order to "correct" it to track `desired`. These collections are
// only modified by updates received from either the `desired` or `persist` inputs.
let mut correction_oks = Correction::new(sink_metrics.clone(), sink_worker_metrics.clone());
let mut correction_errs = Correction::new(sink_metrics, sink_worker_metrics);
let mut correction_oks =
CorrectionV1::new(sink_metrics.clone(), sink_worker_metrics.clone());
let mut correction_errs = CorrectionV1::new(sink_metrics, sink_worker_metrics);

// Contains descriptions of batches for which we know that we can
// write data. We got these from the "centralized" operator that
Expand Down
20 changes: 15 additions & 5 deletions src/compute/src/sink/materialized_view_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use std::sync::Arc;

use differential_dataflow::{Collection, Hashable};
use futures::StreamExt;
use mz_compute_types::dyncfgs::ENABLE_CORRECTION_V2;
use mz_ore::cast::CastFrom;
use mz_persist_client::batch::{Batch, ProtoBatch};
use mz_persist_client::cache::PersistClientCache;
Expand Down Expand Up @@ -216,8 +217,15 @@ where
);

let name = operator_name("write");
let (batches, write_token) =
write::render(name.clone(), persist_api(name), &desired, &persist, &descs);
let use_correction_v2 = ENABLE_CORRECTION_V2.get(&compute_state.worker_config);
let (batches, write_token) = write::render(
name.clone(),
persist_api(name),
&desired,
&persist,
&descs,
use_correction_v2,
);

let name = operator_name("append");
let append_token = append::render(
Expand Down Expand Up @@ -630,6 +638,7 @@ mod write {
desired: &DesiredStreams<S>,
persist: &PersistStreams<S>,
descs: &Stream<S, BatchDescription>,
use_correction_v2: bool,
) -> (BatchesStream<S>, Box<dyn Any>)
where
S: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -663,7 +672,7 @@ mod write {

let writer = persist_api.open_writer().await;
let sink_metrics = persist_api.open_metrics().await;
let mut state = State::new(worker_id, writer, sink_metrics);
let mut state = State::new(worker_id, writer, sink_metrics, use_correction_v2);

loop {
// Read from the inputs, extract `desired` updates as positive contributions to
Expand Down Expand Up @@ -766,15 +775,16 @@ mod write {
worker_id: usize,
persist_writer: WriteHandle<SourceData, (), Timestamp, Diff>,
metrics: SinkMetrics,
use_correction_v2: bool,
) -> Self {
let worker_metrics = metrics.for_worker(worker_id);

Self {
worker_id,
persist_writer,
corrections: OkErr::new(
Correction::new(metrics.clone(), worker_metrics.clone()),
Correction::new(metrics, worker_metrics),
Correction::new(metrics.clone(), worker_metrics.clone(), use_correction_v2),
Correction::new(metrics, worker_metrics, use_correction_v2),
),
desired_frontiers: OkErr::new_frontiers(),
persist_frontiers: OkErr::new_frontiers(),
Expand Down

0 comments on commit 4356350

Please sign in to comment.