diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index dcc325ba862cf..de6e2438bd907 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -95,6 +95,7 @@ def get_default_system_parameters( "enable_columnation_lgalloc": "true", "enable_comment": "true", "enable_compute_chunked_stack": "true", + "enable_compute_correction_v2": "true", "enable_connection_validation_syntax": "true", "enable_continual_task_builtins": "true", "enable_continual_task_create": "true", diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 2450228accf48..e53987d6afdba 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -28,6 +28,15 @@ pub const ENABLE_MATERIALIZED_VIEW_SINK_V2: Config = Config::new( "Whether compute should use the new MV sink implementation.", ); +/// Whether rendering should use the new MV sink correction buffer implementation. +/// +/// Only has an effect when `enable_compute_materialized_view_sink_v2` is enabled. +pub const ENABLE_CORRECTION_V2: Config = Config::new( + "enable_compute_correction_v2", + false, + "Whether compute should use the new MV sink correction buffer implementation.", +); + /// The yielding behavior with which linear joins should be rendered. pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new( "linear_join_yielding", @@ -154,6 +163,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs .add(&ENABLE_MZ_JOIN_CORE) .add(&ENABLE_MATERIALIZED_VIEW_SINK_V2) + .add(&ENABLE_CORRECTION_V2) .add(&LINEAR_JOIN_YIELDING) .add(&ENABLE_COLUMNATION_LGALLOC) .add(&ENABLE_LGALLOC_EAGER_RECLAMATION) diff --git a/src/compute/src/sink/correction.rs b/src/compute/src/sink/correction.rs index c4e4770698c3e..0d73390025760 100644 --- a/src/compute/src/sink/correction.rs +++ b/src/compute/src/sink/correction.rs @@ -14,7 +14,6 @@ use std::collections::BTreeMap; use std::ops::{AddAssign, Bound, SubAssign}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; -use differential_dataflow::Data; use itertools::Itertools; use mz_ore::iter::IteratorExt; use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta}; @@ -22,9 +21,80 @@ use mz_repr::{Diff, Timestamp}; use timely::progress::Antichain; use timely::PartialOrder; +use crate::sink::correction_v2::{CorrectionV2, Data}; + +/// A data structure suitable for storing updates in a self-correcting persist sink. +/// +/// Selects one of two correction buffer implementations. `V1` is the original simple +/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting +/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to +/// `V1` in a pinch. The plan is to remove `V1` eventually. +pub(super) enum Correction { + V1(CorrectionV1), + V2(CorrectionV2), +} + +impl Correction { + /// Construct a new `Correction` instance. + pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics, v2: bool) -> Self { + if v2 { + Self::V2(CorrectionV2::new(metrics, worker_metrics)) + } else { + Self::V1(CorrectionV1::new(metrics, worker_metrics)) + } + } + + /// Insert a batch of updates. + pub fn insert(&mut self, updates: Vec<(D, Timestamp, Diff)>) { + match self { + Self::V1(c) => c.insert(updates), + Self::V2(c) => c.insert(updates), + } + } + + /// Insert a batch of updates, after negating their diffs. + pub fn insert_negated(&mut self, updates: Vec<(D, Timestamp, Diff)>) { + match self { + Self::V1(c) => c.insert_negated(updates), + Self::V2(c) => c.insert_negated(updates), + } + } + + /// Consolidate and return updates before the given `upper`. + pub fn updates_before( + &mut self, + upper: &Antichain, + ) -> Box + '_> { + match self { + Self::V1(c) => Box::new(c.updates_before(upper)), + Self::V2(c) => Box::new(c.updates_before(upper)), + } + } + + /// Return the current since frontier. + pub fn since(&self) -> &Antichain { + match self { + Self::V1(c) => c.since(), + Self::V2(c) => c.since(), + } + } + + /// Advance the since frontier. + /// + /// # Panics + /// + /// Panics if the given `since` is less than the current since frontier. + pub fn advance_since(&mut self, since: Antichain) { + match self { + Self::V1(c) => c.advance_since(since), + Self::V2(c) => c.advance_since(since), + } + } +} + /// A collection holding `persist_sink` updates. /// -/// The `Correction` data structure is purpose-built for the `persist_sink::write_batches` +/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches` /// operator: /// /// * It stores updates by time, to enable efficient separation between updates that should @@ -33,7 +103,7 @@ use timely::PartialOrder; /// are removed by inserting them again, with negated diffs. Stored updates are continuously /// consolidated to give them opportunity to cancel each other out. /// * It provides an interface for advancing all contained updates to a given frontier. -pub(super) struct Correction { +pub(super) struct CorrectionV1 { /// Stashed updates by time. updates: BTreeMap>, /// Frontier to which all update times are advanced. @@ -49,8 +119,8 @@ pub(super) struct Correction { worker_metrics: SinkWorkerMetrics, } -impl Correction { - /// Construct a new `Correction` instance. +impl CorrectionV1 { + /// Construct a new `CorrectionV1` instance. pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self { Self { updates: Default::default(), @@ -75,7 +145,7 @@ impl Correction { } } -impl Correction { +impl CorrectionV1 { /// Insert a batch of updates. pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) { let Some(since_ts) = self.since.as_option() else { @@ -246,7 +316,7 @@ impl Correction { } } -impl Drop for Correction { +impl Drop for CorrectionV1 { fn drop(&mut self) { self.update_metrics(Default::default()); } diff --git a/src/compute/src/sink/correction_v2.rs b/src/compute/src/sink/correction_v2.rs index 4e0d78b52d52a..e18d36e1b15ca 100644 --- a/src/compute/src/sink/correction_v2.rs +++ b/src/compute/src/sink/correction_v2.rs @@ -27,7 +27,7 @@ pub trait Data: differential_dataflow::Data + Columnation {} impl Data for D {} #[derive(Debug)] -pub(super) struct Correction { +pub(super) struct CorrectionV2 { chains: Vec>, since: Antichain, /// Global persist sink metrics. @@ -36,8 +36,8 @@ pub(super) struct Correction { _worker_metrics: SinkWorkerMetrics, } -impl Correction { - /// Construct a new `Correction` instance. +impl CorrectionV2 { + /// Construct a new `CorrectionV2` instance. pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self { Self { chains: Default::default(), diff --git a/src/compute/src/sink/materialized_view.rs b/src/compute/src/sink/materialized_view.rs index 2c52bd063eb5c..ae57b257728c8 100644 --- a/src/compute/src/sink/materialized_view.rs +++ b/src/compute/src/sink/materialized_view.rs @@ -42,7 +42,7 @@ use tracing::trace; use crate::compute_state::ComputeState; use crate::render::sinks::SinkRender; use crate::render::StartSignal; -use crate::sink::correction::Correction; +use crate::sink::correction::CorrectionV1; use crate::sink::materialized_view_v2; use crate::sink::refresh::apply_refresh; @@ -666,8 +666,9 @@ where // Contains `desired - persist`, reflecting the updates we would like to commit // to `persist` in order to "correct" it to track `desired`. These collections are // only modified by updates received from either the `desired` or `persist` inputs. - let mut correction_oks = Correction::new(sink_metrics.clone(), sink_worker_metrics.clone()); - let mut correction_errs = Correction::new(sink_metrics, sink_worker_metrics); + let mut correction_oks = + CorrectionV1::new(sink_metrics.clone(), sink_worker_metrics.clone()); + let mut correction_errs = CorrectionV1::new(sink_metrics, sink_worker_metrics); // Contains descriptions of batches for which we know that we can // write data. We got these from the "centralized" operator that diff --git a/src/compute/src/sink/materialized_view_v2.rs b/src/compute/src/sink/materialized_view_v2.rs index 051ae6f8887db..c2ce97292e2e5 100644 --- a/src/compute/src/sink/materialized_view_v2.rs +++ b/src/compute/src/sink/materialized_view_v2.rs @@ -115,6 +115,7 @@ use std::sync::Arc; use differential_dataflow::{Collection, Hashable}; use futures::StreamExt; +use mz_compute_types::dyncfgs::ENABLE_CORRECTION_V2; use mz_ore::cast::CastFrom; use mz_persist_client::batch::{Batch, ProtoBatch}; use mz_persist_client::cache::PersistClientCache; @@ -216,8 +217,15 @@ where ); let name = operator_name("write"); - let (batches, write_token) = - write::render(name.clone(), persist_api(name), &desired, &persist, &descs); + let use_correction_v2 = ENABLE_CORRECTION_V2.get(&compute_state.worker_config); + let (batches, write_token) = write::render( + name.clone(), + persist_api(name), + &desired, + &persist, + &descs, + use_correction_v2, + ); let name = operator_name("append"); let append_token = append::render( @@ -630,6 +638,7 @@ mod write { desired: &DesiredStreams, persist: &PersistStreams, descs: &Stream, + use_correction_v2: bool, ) -> (BatchesStream, Box) where S: Scope, @@ -663,7 +672,7 @@ mod write { let writer = persist_api.open_writer().await; let sink_metrics = persist_api.open_metrics().await; - let mut state = State::new(worker_id, writer, sink_metrics); + let mut state = State::new(worker_id, writer, sink_metrics, use_correction_v2); loop { // Read from the inputs, extract `desired` updates as positive contributions to @@ -766,6 +775,7 @@ mod write { worker_id: usize, persist_writer: WriteHandle, metrics: SinkMetrics, + use_correction_v2: bool, ) -> Self { let worker_metrics = metrics.for_worker(worker_id); @@ -773,8 +783,8 @@ mod write { worker_id, persist_writer, corrections: OkErr::new( - Correction::new(metrics.clone(), worker_metrics.clone()), - Correction::new(metrics, worker_metrics), + Correction::new(metrics.clone(), worker_metrics.clone(), use_correction_v2), + Correction::new(metrics, worker_metrics, use_correction_v2), ), desired_frontiers: OkErr::new_frontiers(), persist_frontiers: OkErr::new_frontiers(),