diff --git a/Cargo.lock b/Cargo.lock index 2657322314fa2..afb4089d16dac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1436,9 +1436,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.20.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" [[package]] name = "byteorder" @@ -1754,9 +1754,9 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" [[package]] name = "columnar" -version = "0.2.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d754936b0b004c01c338b3d81c39e0a81d2ead5b6ee9fa64bfa140e6c430b80d" +checksum = "d58a4c12223e2d2140bbf4be9fb38b3a612804230c91388dfa4e56a8a6464bf3" dependencies = [ "bincode", "bytemuck", @@ -1768,9 +1768,9 @@ dependencies = [ [[package]] name = "columnar_derive" -version = "0.2.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "174af3249fb00e9845597cb3a8259f05ff62b4060650b6af6adb4e05df277f7a" +checksum = "087b1ac5c4ecad28348b6a9957e3dbd44ac7d041d267370acfdbbfa66b514c7d" dependencies = [ "proc-macro2", "quote", @@ -5113,6 +5113,7 @@ dependencies = [ "anyhow", "async-stream", "bytesize", + "columnar", "core_affinity", "crossbeam-channel", "dec", @@ -5199,6 +5200,7 @@ dependencies = [ name = "mz-compute-types" version = "0.0.0" dependencies = [ + "columnar", "columnation", "differential-dataflow", "itertools 0.12.1", @@ -7309,6 +7311,8 @@ version = "0.0.0" dependencies = [ "ahash", "bincode", + "bytemuck", + "columnar", "columnation", "differential-dataflow", "either", @@ -10369,9 +10373,9 @@ dependencies = [ [[package]] name = "timely" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b134806db44c8452ba2a9b18e6112f3d5e0a0185c6e58524269a08ca2891149c" +checksum = "0ace21eb2a22c1b80b0b9b5be0627eb0f95a128a5751e866aefc90a85e3007d3" dependencies = [ "bincode", "byteorder", @@ -10394,9 +10398,9 @@ checksum = "99223f9594ab7d4dd36b55b1d7eb9bd2cd205f4304b5cc5381d5cdd417ec06f2" [[package]] name = "timely_communication" -version = "0.16.1" +version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00d5c7f7bbb132a32220890240e5d951fc9582f82fb94452c011a8033c39f7a" +checksum = "de7b73d69bd229dae4ea989beb7e48e652c0a576774134e6fed59e29f2a6f52c" dependencies = [ "byteorder", "columnar", @@ -10410,9 +10414,9 @@ dependencies = [ [[package]] name = "timely_container" -version = "0.13.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92b763064e76ba4f650dcdb035d82bcaad09986971fab002276e0b4cb10b3aa5" +checksum = "b8bb051808b458a14146900968293da6166b1e6df5dab7c4d1d163f4734b7431" dependencies = [ "columnation", "flatcontainer", @@ -10421,9 +10425,9 @@ dependencies = [ [[package]] name = "timely_logging" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8300263e23bebac4ec9fbbeee65954a25402037609b7716d20191ed35829b14" +checksum = "e6760517ac4ffa29d00841b0fccf513ece5da896f159045a43970fdb228726d9" dependencies = [ "timely_container", ] diff --git a/src/compute-types/Cargo.toml b/src/compute-types/Cargo.toml index a26797e591820..a82db2a67b378 100644 --- a/src/compute-types/Cargo.toml +++ b/src/compute-types/Cargo.toml @@ -10,6 +10,7 @@ publish = false workspace = true [dependencies] +columnar = "0.2.2" columnation = "0.1.0" differential-dataflow = "0.13.3" itertools = "0.12.1" diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 88260444c32f7..abd1eee3afb17 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::num::NonZeroU64; +use columnar::Columnar; use mz_expr::{ CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, OptimizedMirRelationExpr, TableFunc, @@ -161,14 +162,12 @@ impl AvailableCollections { } /// An identifier for an LIR node. -/// -/// LirIds start at 1, not 0, which let's us get a better struct packing in `ComputeEvent::LirMapping`. -#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -pub struct LirId(NonZeroU64); +#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)] +pub struct LirId(u64); impl LirId { fn as_u64(&self) -> u64 { - self.0.into() + self.0 } } @@ -186,11 +185,11 @@ impl std::fmt::Display for LirId { impl RustType for LirId { fn into_proto(&self) -> u64 { - u64::from(self.0) + self.0 } fn from_proto(proto: u64) -> Result { - Ok(Self(proto.try_into()?)) + Ok(Self(proto)) } } @@ -513,7 +512,7 @@ impl Arbitrary for LirId { type Parameters = (); fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { - let lir_id = NonZeroU64::arbitrary(); + let lir_id = u64::arbitrary(); lir_id.prop_map(LirId).boxed() } } @@ -1132,16 +1131,6 @@ mod tests { use super::*; - #[mz_ore::test] - fn test_option_lirid_fits_in_usize() { - let option_lirid_size = std::mem::size_of::>(); - let usize_size = std::mem::size_of::(); - assert!( - option_lirid_size <= usize_size, - "Option (size {option_lirid_size}) should fit in usize (size {usize_size})" - ); - } - proptest! { #![proptest_config(ProptestConfig::with_cases(10))] #[mz_ore::test] diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 2fa49f3585d59..7b9d2d1a693c3 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -43,7 +43,7 @@ impl Context { pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self { Self { arrangements: Default::default(), - next_lir_id: LirId(std::num::NonZero::::MIN), + next_lir_id: LirId(1), debug_info: LirDebugInfo { debug_name, id: GlobalId::Transient(0), diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 5f459f9edeb53..3f9ba4d140a3d 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -13,6 +13,7 @@ workspace = true anyhow = "1.0.66" async-stream = "0.3.3" bytesize = "1.1.0" +columnar = "0.2.2" crossbeam-channel = "0.5.8" dec = { version = "0.4.8", features = ["serde"] } differential-dataflow = "0.13.3" diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 70aa3b9f419dc..1e419838c64a2 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -597,7 +597,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { // Log the receipt of the peek. if let Some(logger) = self.compute_state.compute_logger.as_mut() { - logger.log(pending.as_log_event(true)); + logger.log(&pending.as_log_event(true)); } self.process_peek(&mut Antichain::new(), pending); @@ -891,7 +891,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { // Log responding to the peek request. if let Some(logger) = self.compute_state.compute_logger.as_mut() { - logger.log(log_event); + logger.log(&log_event); } } diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 6409d44244d7d..555bc69a0271e 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -263,10 +263,10 @@ where .stream .unary(Pipeline, "ArrangementSize", |_cap, info| { let address = info.address; - logger.log(ComputeEvent::ArrangementHeapSizeOperator( + logger.log(&ComputeEvent::ArrangementHeapSizeOperator( ArrangementHeapSizeOperator { operator_id, - address, + address: address.to_vec(), }, )); move |input, output| { @@ -281,7 +281,7 @@ where let size = size.try_into().expect("must fit"); if size != old_size { - logger.log(ComputeEvent::ArrangementHeapSize(ArrangementHeapSize { + logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize { operator_id, delta_size: size - old_size, })); @@ -289,7 +289,7 @@ where let capacity = capacity.try_into().expect("must fit"); if capacity != old_capacity { - logger.log(ComputeEvent::ArrangementHeapCapacity( + logger.log(&ComputeEvent::ArrangementHeapCapacity( ArrangementHeapCapacity { operator_id, delta_capacity: capacity - old_capacity, @@ -299,7 +299,7 @@ where let allocations = allocations.try_into().expect("must fit"); if allocations != old_allocations { - logger.log(ComputeEvent::ArrangementHeapAllocations( + logger.log(&ComputeEvent::ArrangementHeapAllocations( ArrangementHeapAllocations { operator_id, delta_allocations: allocations - old_allocations, diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index 1aafb9847697e..17c9d04279a49 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -21,14 +21,22 @@ use std::marker::PhantomData; use std::rc::Rc; use std::time::Duration; +use ::timely::container::ContainerBuilder; +use ::timely::dataflow::channels::pact::Pipeline; +use ::timely::dataflow::channels::pushers::buffer::Session; +use ::timely::dataflow::channels::pushers::{Counter, Tee}; use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher}; +use ::timely::dataflow::operators::Operator; +use ::timely::dataflow::StreamCore; use ::timely::progress::Timestamp as TimelyTimestamp; use ::timely::scheduling::Activator; use ::timely::Container; +use differential_dataflow::trace::Batcher; use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog}; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; -use mz_repr::{Datum, Diff, Row, RowPacker, SharedRow, Timestamp}; +use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp}; use mz_timely_util::activator::RcActivator; +use mz_timely_util::operator::consolidate_pact; use crate::logging::compute::Logger as ComputeLogger; use crate::typedefs::RowRowAgent; @@ -64,30 +72,35 @@ where _marker: PhantomData, } } +} + +impl BatchLogger +where + P: EventPusher, + C: Container, +{ + /// Publishes a batch of logged events. + fn publish_batch(&mut self, data: C) { + self.event_pusher.push(Event::Messages(self.time_ms, data)); + } - /// Indicate progress up to a specific `time`. - fn report_progress(&mut self, time: &Duration) { + /// Indicate progress up to `time`, advances the capability. + /// + /// Returns `true` if the capability was advanced. + fn report_progress(&mut self, time: Duration) -> bool { let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms; let new_time_ms: Timestamp = time_ms.try_into().expect("must fit"); if self.time_ms < new_time_ms { self.event_pusher .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)])); self.time_ms = new_time_ms; + true + } else { + false } } } -impl BatchLogger -where - P: EventPusher, -{ - /// Publishes a batch of logged events and advances the capability. - fn publish_batch(&mut self, time: &Duration, data: C) { - self.event_pusher.push(Event::Messages(self.time_ms, data)); - self.report_progress(time); - } -} - impl Drop for BatchLogger where P: EventPusher, @@ -132,6 +145,8 @@ struct SharedLoggingState { pub(crate) struct PermutedRowPacker { key: Vec, value: Vec, + key_row: Row, + value_row: Row, } impl PermutedRowPacker { @@ -146,32 +161,45 @@ impl PermutedRowPacker { .collect::>(), variant.desc().arity(), ); - Self { key, value } + Self { + key, + value, + key_row: Row::default(), + value_row: Row::default(), + } } /// Pack a slice of datums suitable for the key columns in the log variant. - pub(crate) fn pack_slice(&self, datums: &[Datum]) -> (Row, Row) { + pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) { self.pack_by_index(|packer, index| packer.push(datums[index])) } - /// Pack using a callback suitable for the key columns in the log variant. - pub(crate) fn pack_by_index(&self, logic: F) -> (Row, Row) { - let binding = SharedRow::get(); - let mut row_builder = binding.borrow_mut(); + /// Pack a slice of datums suitable for the key columns in the log variant, returning owned + /// rows. + /// + /// This is equivalent to calling [`PermutedRowPacker::pack_slice`] and then calling `to_owned` + /// on the returned rows. + pub(crate) fn pack_slice_owned(&mut self, datums: &[Datum]) -> (Row, Row) { + let (key, value) = self.pack_slice(datums); + (key.to_owned(), value.to_owned()) + } - let mut packer = row_builder.packer(); + /// Pack using a callback suitable for the key columns in the log variant. + pub(crate) fn pack_by_index( + &mut self, + logic: F, + ) -> (&RowRef, &RowRef) { + let mut packer = self.key_row.packer(); for index in &self.key { logic(&mut packer, *index); } - let key_row = row_builder.clone(); - let mut packer = row_builder.packer(); + let mut packer = self.value_row.packer(); for index in &self.value { logic(&mut packer, *index); } - let value_row = row_builder.clone(); - (key_row, value_row) + (&self.key_row, &self.value_row) } } @@ -184,3 +212,49 @@ struct LogCollection { /// Index of the dataflow exporting this collection. dataflow_index: usize, } + +pub(super) type Pusher = Counter>; +pub(super) type OutputSession<'a, CB> = + Session<'a, Timestamp, CB, Pusher<::Container>>; + +/// A single-purpose function to consolidate and pack updates for log collection. +/// +/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts +/// the updates into `(Row, Row)` pairs using the provided logic function. It is crucial that the +/// data is not exchanged between workers, as the consolidation would not function as desired +/// otherwise. +pub(super) fn consolidate_and_pack( + input: &StreamCore, + log: L, + mut logic: F, +) -> StreamCore +where + G: ::timely::dataflow::Scope, + B: Batcher