diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 0b08765e73403..9fbc90c183c92 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -84,6 +84,7 @@ def get_default_system_parameters( "compute_dataflow_max_inflight_bytes": "134217728", # 128 MiB "compute_hydration_concurrency": "2", "compute_replica_expiration_offset": "3d", + "compute_apply_column_demands": "true", "disk_cluster_replicas_default": "true", "enable_0dt_deployment": "true" if zero_downtime else "false", "enable_0dt_deployment_panic_after_timeout": "true", diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 8824a081d4d05..5d49b1604c119 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1047,6 +1047,7 @@ def __init__( self.flags_with_values["persist_encoding_enable_dictionary"] = ( BOOLEAN_FLAG_VALUES ) + self.flags_with_values["compute_apply_column_demands"] = BOOLEAN_FLAG_VALUES def run(self, exe: Executor) -> bool: flag_name = self.rng.choice(list(self.flags_with_values.keys())) diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 2450228accf48..d862efadcae86 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -149,6 +149,15 @@ pub const COMPUTE_REPLICA_EXPIRATION_OFFSET: Config = Config::new( "The expiration time offset for replicas. Zero disables expiration.", ); +/// When enabled, applies the column demands from a MapFilterProject onto the RelationDesc used to +/// read out of Persist. This allows Persist to prune unneeded columns as a performance +/// optimization. +pub const COMPUTE_APPLY_COLUMN_DEMANDS: Config = Config::new( + "compute_apply_column_demands", + false, + "When enabled, passes applys column demands to the RelationDesc used to read out of Persist.", +); + /// Adds the full set of all compute `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs @@ -169,4 +178,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(©_TO_S3_MULTIPART_PART_SIZE_BYTES) .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION) .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET) + .add(&COMPUTE_APPLY_COLUMN_DEMANDS) } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 568bd5f2af264..2a346b79f27f9 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -119,6 +119,7 @@ use differential_dataflow::{AsCollection, Collection, Data}; use futures::channel::oneshot; use futures::FutureExt; use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; +use mz_compute_types::dyncfgs::COMPUTE_APPLY_COLUMN_DEMANDS; use mz_compute_types::plan::render_plan::{ self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan, }; @@ -204,6 +205,7 @@ pub fn build_compute_dataflow( .collect::>(); let worker_logging = timely_worker.log_register().get("timely"); + let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config); let name = format!("Dataflow: {}", &dataflow.debug_name); let input_name = format!("InputRegion: {}", &dataflow.debug_name); @@ -224,7 +226,24 @@ pub fn build_compute_dataflow( // Import declared sources into the rendering context. for (source_id, (source, _monotonic)) in dataflow.source_imports.iter() { region.region_named(&format!("Source({:?})", source_id), |inner| { - let mut mfp = source.arguments.operators.clone().map(|ops| { + let mut read_schema = None; + let mut mfp = source.arguments.operators.clone().map(|mut ops| { + // If enabled, we read from Persist with a `RelationDesc` that + // omits uneeded columns. + if apply_demands { + let demands = ops.demand(); + let new_desc = + source.storage_metadata.relation_desc.apply_demand(&demands); + let new_arity = demands.len(); + let remap: BTreeMap<_, _> = demands + .into_iter() + .enumerate() + .map(|(new, old)| (old, new)) + .collect(); + ops.permute_fn(|old_idx| remap[&old_idx], new_arity); + read_schema = Some(new_desc); + } + mz_expr::MfpPlan::create_from(ops) .expect("Linear operators should always be valid") }); @@ -247,6 +266,7 @@ pub fn build_compute_dataflow( &compute_state.txns_ctx, &compute_state.worker_config, source.storage_metadata.clone(), + read_schema, dataflow.as_of.clone(), snapshot_mode, until.clone(), diff --git a/src/compute/src/sink/materialized_view.rs b/src/compute/src/sink/materialized_view.rs index 2c52bd063eb5c..f4f71e1fc91d2 100644 --- a/src/compute/src/sink/materialized_view.rs +++ b/src/compute/src/sink/materialized_view.rs @@ -133,6 +133,7 @@ where &compute_state.txns_ctx, &compute_state.worker_config, target.clone(), + None, source_as_of, SnapshotMode::Include, Antichain::new(), // we want all updates diff --git a/src/compute/src/sink/materialized_view_v2.rs b/src/compute/src/sink/materialized_view_v2.rs index b409fa92d671a..7e865fe8915b7 100644 --- a/src/compute/src/sink/materialized_view_v2.rs +++ b/src/compute/src/sink/materialized_view_v2.rs @@ -337,6 +337,7 @@ where &compute_state.txns_ctx, &compute_state.worker_config, target, + None, as_of, SnapshotMode::Include, until, diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 1bef18ae2b976..3e00456ee9052 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -29,6 +29,7 @@ use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredEx use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates}; use mz_persist::location::{Blob, SeqNo}; use mz_persist::metrics::ColumnarMetrics; +use mz_persist_types::arrow::ArrayOrd; use mz_persist_types::columnar::{ColumnDecoder, Schema2}; use mz_persist_types::stats::PartStats; use mz_persist_types::{Codec, Codec64}; @@ -838,10 +839,23 @@ impl FetchedPart ( - decode::("key", &*both.key, &structured.key), - decode::("val", &*both.val, &structured.val), - ), + PartMigration::SameSchema { both } => { + let key_size_before = ArrayOrd::new(&structured.key).goodbytes(); + + let key = decode::("key", &*both.key, &structured.key); + let val = decode::("val", &*both.val, &structured.val); + + if let Some(key_decoder) = key.as_ref() { + let key_size_after = key_decoder.goodbytes(); + let key_diff = key_size_before.saturating_sub(key_size_after); + metrics + .pushdown + .parts_projection_trimmed_bytes + .inc_by(u64::cast_from(key_diff)); + } + + (key, val) + } PartMigration::Codec { .. } => (None, None), PartMigration::Either { _write, @@ -857,6 +871,14 @@ impl FetchedPart("key", &*read.key, &key), decode::("val", &*read.val, &val), diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 79bae1903760f..9362c31766b89 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -2377,6 +2377,7 @@ pub struct PushdownMetrics { pub(crate) parts_faked_bytes: IntCounter, pub(crate) parts_stats_trimmed_count: IntCounter, pub(crate) parts_stats_trimmed_bytes: IntCounter, + pub(crate) parts_projection_trimmed_bytes: IntCounter, pub part_stats: PartStatsMetrics, } @@ -2431,6 +2432,10 @@ impl PushdownMetrics { name: "mz_persist_pushdown_parts_stats_trimmed_bytes", help: "total bytes trimmed from part stats", )), + parts_projection_trimmed_bytes: registry.register(metric!( + name: "mz_persist_pushdown_parts_projection_trimmed_bytes", + help: "total bytes trimmed from columnar data because of projection pushdown", + )), part_stats: PartStatsMetrics::new(registry), } } diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs index 5335033c3e9bd..d7e1be8635a77 100644 --- a/src/persist-client/src/schema.rs +++ b/src/persist-client/src/schema.rs @@ -501,6 +501,7 @@ mod tests { use mz_dyncfg::ConfigUpdates; use mz_ore::error::ErrorExt; use mz_ore::{assert_contains, assert_err, assert_ok}; + use mz_persist_types::arrow::ArrayOrd; use mz_persist_types::codec_impls::UnitSchema; use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema2}; use mz_persist_types::stats::{NoneStats, StructStats}; @@ -614,6 +615,12 @@ mod tests { fn is_null(&self, _: usize) -> bool { false } + fn goodbytes(&self) -> usize { + self.0 + .iter() + .map(|val| ArrayOrd::String(val.clone()).goodbytes()) + .sum() + } fn stats(&self) -> StructStats { StructStats { len: self.0[0].len(), diff --git a/src/persist-types/src/codec_impls.rs b/src/persist-types/src/codec_impls.rs index 75a5a27628765..6a51799cfc9d5 100644 --- a/src/persist-types/src/codec_impls.rs +++ b/src/persist-types/src/codec_impls.rs @@ -20,6 +20,7 @@ use arrow::array::{ use bytes::{BufMut, Bytes}; use timely::order::Product; +use crate::arrow::ArrayOrd; use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema2}; use crate::stats::{ColumnStatKinds, ColumnarStats, NoneStats, StructStats}; use crate::{Codec, Codec64, Opaque, ShardId}; @@ -94,6 +95,10 @@ impl ColumnDecoder<()> for UnitColumnar { } } + fn goodbytes(&self) -> usize { + 0 + } + fn stats(&self) -> StructStats { StructStats { len: self.len, @@ -282,6 +287,9 @@ impl ColumnDecoder for SimpleColumnarDecoder { fn is_null(&self, idx: usize) -> bool { self.0.is_null(idx) } + fn goodbytes(&self) -> usize { + ArrayOrd::new(&self.0).goodbytes() + } fn stats(&self) -> StructStats { ColumnarStats::one_column_struct(self.0.len(), ColumnStatKinds::None) @@ -602,6 +610,10 @@ impl ColumnDecoder for TodoColumnarDecoder { panic!("TODO") } + fn goodbytes(&self) -> usize { + panic!("TODO") + } + fn stats(&self) -> StructStats { panic!("TODO") } diff --git a/src/persist-types/src/columnar.rs b/src/persist-types/src/columnar.rs index cd76a66731563..7be4476d024b7 100644 --- a/src/persist-types/src/columnar.rs +++ b/src/persist-types/src/columnar.rs @@ -70,6 +70,9 @@ pub trait ColumnDecoder { /// Returns if the value at `idx` is null. fn is_null(&self, idx: usize) -> bool; + /// Returns the number of bytes used by this decoder. + fn goodbytes(&self) -> usize; + /// Returns statistics for the column. This structure is defined by Persist, /// but the contents are determined by the client; Persist will preserve /// them in the part metadata and make them available to readers. diff --git a/src/persist-types/src/schema.rs b/src/persist-types/src/schema.rs index 0ce80d84a3055..b98d7c9ddb83d 100644 --- a/src/persist-types/src/schema.rs +++ b/src/persist-types/src/schema.rs @@ -12,7 +12,7 @@ use std::str::FromStr; use std::sync::Arc; -use arrow::array::{new_null_array, Array, AsArray, ListArray, StructArray}; +use arrow::array::{new_null_array, Array, AsArray, ListArray, NullArray, StructArray}; use arrow::datatypes::{DataType, Field, FieldRef, Fields, SchemaBuilder}; use itertools::Itertools; use mz_ore::cast::CastFrom; @@ -108,9 +108,17 @@ pub(crate) enum StructArrayMigration { name: String, typ: DataType, }, + /// Drop the field of the provided name. DropField { name: String, }, + /// Replace the field with a NullArray. + /// + /// Special case for projecting away all of the columns in a StructArray. + MakeNull { + name: String, + }, + /// Make the field of the provided name nullable. AlterFieldNullable { name: String, }, @@ -159,6 +167,8 @@ impl ArrayMigration { for migration in migrations { migration.migrate(len, &mut fields, &mut arrays); } + assert_eq!(fields.len(), arrays.len(), "invalid migration"); + Arc::new(StructArray::new(fields, arrays, nulls)) } List(field, entry_migration) => { @@ -183,7 +193,7 @@ impl StructArrayMigration { use StructArrayMigration::*; match self { AddFieldNullableAtEnd { .. } => false, - DropField { .. } => true, + DropField { .. } | MakeNull { .. } => true, AlterFieldNullable { .. } => false, Recurse { migration, .. } => migration.contains_drop(), } @@ -207,6 +217,20 @@ impl StructArrayMigration { f.remove(idx); *fields = f.finish().fields; } + MakeNull { name } => { + let (idx, _) = fields + .find(name) + .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields)); + let array_len = arrays + .get(idx) + .expect("checked above that this exists") + .len(); + arrays[idx] = Arc::new(NullArray::new(array_len)); + let mut f = SchemaBuilder::from(&*fields); + let field = f.field_mut(idx); + *field = Arc::new(Field::new(name.clone(), DataType::Null, true)); + *fields = f.finish().fields; + } AlterFieldNullable { name } => { let (idx, _) = fields .find(name) @@ -334,6 +358,22 @@ fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option Option true, List(_field, child) => recursively_all_nullable(child), Struct(children) => children.iter().all(|child| match child { - AddFieldNullableAtEnd { .. } | DropField { .. } => false, + AddFieldNullableAtEnd { .. } | DropField { .. } | MakeNull { .. } => { + false + } AlterFieldNullable { .. } => true, Recurse { migration, .. } => recursively_all_nullable(migration), }), diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs index 3311395cff298..b3c9471219b97 100644 --- a/src/repr/src/lib.rs +++ b/src/repr/src/lib.rs @@ -53,10 +53,10 @@ pub use crate::datum_vec::{DatumVec, DatumVecBorrow}; pub use crate::diff::Diff; pub use crate::global_id::GlobalId; pub use crate::relation::{ - arb_relation_desc_diff, arb_row_for_relation, ColumnName, ColumnType, NotNullViolation, - PropRelationDescDiff, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType, - RelationDesc, RelationDescBuilder, RelationType, RelationVersion, RelationVersionSelector, - VersionedRelationDesc, + arb_relation_desc_diff, arb_relation_desc_projection, arb_row_for_relation, ColumnIndex, + ColumnName, ColumnType, NotNullViolation, PropRelationDescDiff, ProtoColumnName, + ProtoColumnType, ProtoRelationDesc, ProtoRelationType, RelationDesc, RelationDescBuilder, + RelationType, RelationVersion, RelationVersionSelector, VersionedRelationDesc, }; pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder}; pub use crate::row::iter::{IntoRowIterator, RowIterator}; diff --git a/src/repr/src/relation.rs b/src/repr/src/relation.rs index 3d8b679312233..a7936d64f3ec4 100644 --- a/src/repr/src/relation.rs +++ b/src/repr/src/relation.rs @@ -367,6 +367,21 @@ pub struct ColumnIndex(usize); static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary); +impl ColumnIndex { + /// Returns a stable identifier for this [`ColumnIndex`]. + pub fn to_stable_name(&self) -> String { + self.0.to_string() + } + + pub fn to_raw(&self) -> usize { + self.0 + } + + pub fn from_raw(val: usize) -> Self { + ColumnIndex(val) + } +} + /// The version a given column was added at. #[derive( Clone, @@ -451,7 +466,8 @@ struct ColumnMetadata { /// A description of the shape of a relation. /// -/// It bundles a [`RelationType`] with the name of each column in the relation. +/// It bundles a [`RelationType`] with `ColumnMetadata` for each column in +/// the relation. /// /// # Examples /// @@ -482,6 +498,39 @@ struct ColumnMetadata { /// }); /// let desc = RelationDesc::new(relation_type, names); /// ``` +/// +/// Next to the [`RelationType`] we maintain a map of `ColumnIndex` to +/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a +/// column throughout the lifetime of the relation. This allows a +/// [`RelationDesc`] to represent a projection over a version of itself. +/// +/// ``` +/// use std::collections::BTreeSet; +/// use mz_repr::{ColumnIndex, RelationDesc, ScalarType}; +/// +/// let desc = RelationDesc::builder() +/// .with_column("name", ScalarType::String.nullable(false)) +/// .with_column("email", ScalarType::String.nullable(false)) +/// .finish(); +/// +/// // Project away the second column. +/// let demands = BTreeSet::from([1]); +/// let proj = desc.apply_demand(&demands); +/// +/// // We projected away the first column. +/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0))); +/// // But retained the second. +/// assert!(proj.contains_index(&ColumnIndex::from_raw(1))); +/// +/// // The underlying `RelationType` also contains a single column. +/// assert_eq!(proj.typ().arity(), 1); +/// ``` +/// +/// To maintain this stable mapping and track the lifetime of a column (e.g. +/// when adding or dropping a column) we use `ColumnMetadata`. It maintains +/// the index in [`RelationType`] that corresponds to a given column, and the +/// version at which this column was added or dropped. +/// #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)] pub struct RelationDesc { typ: RelationType, @@ -582,6 +631,11 @@ impl RelationDesc { self == &Self::empty() } + /// Returns the number of columns in this [`RelationDesc`]. + pub fn len(&self) -> usize { + self.typ().column_types.len() + } + /// Constructs a new `RelationDesc` from a `RelationType` and an iterator /// over column names. /// @@ -709,7 +763,10 @@ impl RelationDesc { /// Returns an iterator over the columns in this relation. pub fn iter(&self) -> impl Iterator { - self.iter_names().zip(self.iter_types()) + self.metadata.values().map(|meta| { + let typ = &self.typ.columns()[meta.typ_idx]; + (&meta.name, typ) + }) } /// Returns an iterator over the types of the columns in this relation. @@ -722,6 +779,14 @@ impl RelationDesc { self.metadata.values().map(|meta| &meta.name) } + /// Returns an iterator over the columns in this relation, with all their metadata. + pub fn iter_all(&self) -> impl Iterator { + self.metadata.iter().map(|(col_idx, metadata)| { + let col_typ = &self.typ.columns()[metadata.typ_idx]; + (col_idx, &metadata.name, col_typ) + }) + } + /// Returns an iterator over the names of the columns in this relation that are "similar" to /// the provided `name`. pub fn iter_similar_names<'a>( @@ -731,6 +796,11 @@ impl RelationDesc { self.iter_names().filter(|n| n.is_similar(name)) } + /// Returns whether this [`RelationDesc`] contains a column at the specified index. + pub fn contains_index(&self, idx: &ColumnIndex) -> bool { + self.metadata.contains_key(idx) + } + /// Finds a column by name. /// /// Returns the index and type of the column named `name`. If no column with @@ -747,13 +817,20 @@ impl RelationDesc { /// # Panics /// /// Panics if `i` is not a valid column index. + /// + /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`]. pub fn get_name(&self, i: usize) -> &ColumnName { // TODO(parkmycar): Refactor this to use `ColumnIndex`. - &self - .metadata - .get(&ColumnIndex(i)) - .expect("should exist") - .name + self.get_name_idx(&ColumnIndex(i)) + } + + /// Gets the name of the column at `idx`. + /// + /// # Panics + /// + /// Panics if no column exists at `idx`. + pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName { + &self.metadata.get(idx).expect("should exist").name } /// Mutably gets the name of the `i`th column. @@ -770,6 +847,16 @@ impl RelationDesc { .name } + /// Gets the [`ColumnType`] of the column at `idx`. + /// + /// # Panics + /// + /// Panics if no column exists at `idx`. + pub fn get_type(&self, idx: &ColumnIndex) -> &ColumnType { + let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx; + &self.typ.column_types[typ_idx] + } + /// Gets the name of the `i`th column if that column name is unambiguous. /// /// If at least one other column has the same name as the `i`th column, @@ -800,6 +887,33 @@ impl RelationDesc { Ok(()) } } + + /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`. + pub fn apply_demand(&self, demands: &BTreeSet) -> RelationDesc { + let mut new_desc = self.clone(); + + // Update ColumnMetadata. + let mut removed = 0; + new_desc.metadata.retain(|idx, metadata| { + let retain = demands.contains(&idx.0); + if !retain { + removed += 1; + } else { + metadata.typ_idx -= removed; + } + retain + }); + + // Update ColumnType. + let mut idx = 0; + new_desc.typ.column_types.retain(|_| { + let keep = demands.contains(&idx); + idx += 1; + keep + }); + + new_desc + } } impl Arbitrary for RelationDesc { @@ -829,6 +943,19 @@ pub fn arb_relation_desc(num_cols: std::ops::Range) -> impl Strategy impl Strategy { + let mask: Vec<_> = (0..desc.len()).map(|_| any::()).collect(); + mask.prop_map(move |mask| { + let demands: BTreeSet<_> = mask + .into_iter() + .enumerate() + .filter_map(|(idx, keep)| keep.then_some(idx)) + .collect(); + desc.apply_demand(&demands) + }) +} + impl IntoIterator for RelationDesc { type Item = (ColumnName, ColumnType); type IntoIter = Box>; @@ -1656,6 +1783,28 @@ mod tests { "###); } + #[mz_ore::test] + #[cfg_attr(miri, ignore)] + fn apply_demand() { + let desc = RelationDesc::builder() + .with_column("a", ScalarType::String.nullable(true)) + .with_column("b", ScalarType::Int64.nullable(false)) + .with_column("c", ScalarType::Time.nullable(false)) + .finish(); + let desc = desc.apply_demand(&BTreeSet::from([0, 2])); + assert_eq!(desc.arity(), 2); + // TODO(parkmycar): Move validate onto RelationDesc. + VersionedRelationDesc::new(desc).validate(); + } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] + fn smoketest_column_index_stable_ident() { + let idx_a = ColumnIndex(42); + // Note(parkmycar): This should never change. + assert_eq!(idx_a.to_stable_name(), "42"); + } + #[mz_ore::test] #[cfg_attr(miri, ignore)] // too slow fn proptest_relation_desc_roundtrips() { diff --git a/src/repr/src/row.rs b/src/repr/src/row.rs index 127d7609230e1..8769ed2be5766 100644 --- a/src/repr/src/row.rs +++ b/src/repr/src/row.rs @@ -121,30 +121,18 @@ impl Row { proto: &ProtoRow, desc: &RelationDesc, ) -> Result<(), String> { - let mut col_idx = 0; let mut packer = self.packer(); - for d in proto.datums.iter() { + for (col_idx, _, _) in desc.iter_all() { + let d = match proto.datums.get(col_idx.to_raw()) { + Some(x) => x, + None => { + packer.push(Datum::Null); + continue; + } + }; packer.try_push_proto(d)?; - col_idx += 1; } - let num_columns = desc.typ().column_types.len(); - if col_idx < num_columns { - let missing_columns = col_idx..num_columns; - for _ in missing_columns { - packer.push(Datum::Null); - col_idx += 1; - } - } - - mz_ore::soft_assert_eq_or_log!( - col_idx, - num_columns, - "wrong number of columns when decoding a Row!, got {row:?}, expected {desc:?}", - row = self, - desc = desc, - ); - Ok(()) } diff --git a/src/repr/src/row/encode.rs b/src/repr/src/row/encode.rs index 084a9c2beebf8..4418b3f3cc2a5 100644 --- a/src/repr/src/row/encode.rs +++ b/src/repr/src/row/encode.rs @@ -32,6 +32,7 @@ use dec::{Context, Decimal, OrderedDecimal}; use itertools::{EitherOrBoth, Itertools}; use mz_ore::assert_none; use mz_ore::cast::CastFrom; +use mz_persist_types::arrow::ArrayOrd; use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, FixedSizeCodec, Schema2}; use mz_persist_types::stats::{ ColumnNullStats, ColumnStatKinds, ColumnarStats, FixedSizeBytesStatsKind, OptionStats, @@ -1215,6 +1216,44 @@ impl DatumColumnDecoder { | DatumColumnDecoder::RecordEmpty(_) => ColumnStatKinds::None, } } + + fn goodbytes(&self) -> usize { + match self { + DatumColumnDecoder::Bool(a) => ArrayOrd::Bool(a.clone()).goodbytes(), + DatumColumnDecoder::U8(a) => ArrayOrd::UInt8(a.clone()).goodbytes(), + DatumColumnDecoder::U16(a) => ArrayOrd::UInt16(a.clone()).goodbytes(), + DatumColumnDecoder::U32(a) => ArrayOrd::UInt32(a.clone()).goodbytes(), + DatumColumnDecoder::U64(a) => ArrayOrd::UInt64(a.clone()).goodbytes(), + DatumColumnDecoder::I16(a) => ArrayOrd::Int16(a.clone()).goodbytes(), + DatumColumnDecoder::I32(a) => ArrayOrd::Int32(a.clone()).goodbytes(), + DatumColumnDecoder::I64(a) => ArrayOrd::Int64(a.clone()).goodbytes(), + DatumColumnDecoder::F32(a) => ArrayOrd::Float32(a.clone()).goodbytes(), + DatumColumnDecoder::F64(a) => ArrayOrd::Float64(a.clone()).goodbytes(), + DatumColumnDecoder::Numeric(a) => ArrayOrd::Binary(a.clone()).goodbytes(), + DatumColumnDecoder::String(a) => ArrayOrd::String(a.clone()).goodbytes(), + DatumColumnDecoder::Bytes(a) => ArrayOrd::Binary(a.clone()).goodbytes(), + DatumColumnDecoder::Date(a) => ArrayOrd::Int32(a.clone()).goodbytes(), + DatumColumnDecoder::Time(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(), + DatumColumnDecoder::Timestamp(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(), + DatumColumnDecoder::TimestampTz(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(), + DatumColumnDecoder::MzTimestamp(a) => ArrayOrd::UInt64(a.clone()).goodbytes(), + DatumColumnDecoder::Interval(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(), + DatumColumnDecoder::Uuid(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(), + DatumColumnDecoder::AclItem(a) => ArrayOrd::FixedSizeBinary(a.clone()).goodbytes(), + DatumColumnDecoder::MzAclItem(a) => ArrayOrd::Binary(a.clone()).goodbytes(), + DatumColumnDecoder::Range(a) => ArrayOrd::Binary(a.clone()).goodbytes(), + DatumColumnDecoder::Json(a) => ArrayOrd::String(a.clone()).goodbytes(), + DatumColumnDecoder::Array { dims, vals, .. } => { + (dims.len() * PackedArrayDimension::SIZE) + vals.goodbytes() + } + DatumColumnDecoder::List { values, .. } => values.goodbytes(), + DatumColumnDecoder::Map { keys, vals, .. } => { + ArrayOrd::String(keys.clone()).goodbytes() + vals.goodbytes() + } + DatumColumnDecoder::Record { fields, .. } => fields.iter().map(|f| f.goodbytes()).sum(), + DatumColumnDecoder::RecordEmpty(a) => ArrayOrd::Bool(a.clone()).goodbytes(), + } + } } impl Schema2 for RelationDesc { @@ -1275,9 +1314,9 @@ impl RowColumnarDecoder { let inner_columns = col.columns(); let desc_columns = desc.typ().columns(); - if inner_columns.len() != desc_columns.len() { + if desc_columns.len() > inner_columns.len() { anyhow::bail!( - "provided array has {inner_columns:?}, relation desc has {desc_columns:?}" + "provided array has too few columns! {desc_columns:?} > {inner_columns:?}" ); } @@ -1287,8 +1326,8 @@ impl RowColumnarDecoder { let null_mask = col.nulls(); // The columns of the `StructArray` are named with their column index. - for (col_idx, (col_name, col_type)) in desc.iter().enumerate() { - let field_name = col_idx.to_string(); + for (col_idx, col_name, col_type) in desc.iter_all() { + let field_name = col_idx.to_stable_name(); let column = col.column_by_name(&field_name).ok_or_else(|| { anyhow::anyhow!( "StructArray did not contain column name {field_name}, found {:?}", @@ -1332,6 +1371,21 @@ impl ColumnDecoder for RowColumnarDecoder { nullability.is_null(idx) } + fn goodbytes(&self) -> usize { + let decoders_size: usize = self + .decoders + .iter() + .map(|(_name, _null_count, decoder)| decoder.goodbytes()) + .sum(); + + decoders_size + + self + .nullability + .as_ref() + .map(|nulls| nulls.inner().inner().len()) + .unwrap_or(0) + } + fn stats(&self) -> StructStats { StructStats { len: self.len, @@ -1377,9 +1431,8 @@ impl RowColumnarEncoder { } let (col_names, encoders): (Vec<_>, Vec<_>) = desc - .iter() - .enumerate() - .map(|(idx, (col_name, col_type))| { + .iter_all() + .map(|(col_idx, col_name, col_type)| { let encoder = scalar_type_to_encoder(&col_type.scalar_type) .expect("failed to create encoder"); let encoder = DatumEncoder { @@ -1389,7 +1442,7 @@ impl RowColumnarEncoder { // We name the Fields in Parquet with the column index, but for // backwards compat use the column name for stats. - let name = (idx, col_name.as_str().into()); + let name = (col_idx.to_raw(), col_name.as_str().into()); (name, encoder) }) @@ -2157,6 +2210,8 @@ impl RustType for Row { #[cfg(test)] mod tests { + use std::collections::BTreeSet; + use arrow::array::{make_array, ArrayData}; use arrow::compute::SortOptions; use arrow::datatypes::ArrowNativeType; @@ -2666,4 +2721,47 @@ mod tests { let encoded = row.encode_to_vec(); assert_eq!(Row::decode(&encoded, &desc), Ok(row)); } + + #[mz_ore::test] + fn smoketest_projection() { + let desc = RelationDesc::builder() + .with_column("a", ScalarType::Int64.nullable(true)) + .with_column("b", ScalarType::String.nullable(true)) + .with_column("c", ScalarType::Bool.nullable(true)) + .finish(); + let mut encoder = >::encoder(&desc).unwrap(); + + let mut og_row = Row::default(); + { + let mut packer = og_row.packer(); + packer.push(Datum::Int64(100)); + packer.push(Datum::String("hello world")); + packer.push(Datum::True); + } + let mut og_row_2 = Row::default(); + { + let mut packer = og_row_2.packer(); + packer.push(Datum::Null); + packer.push(Datum::Null); + packer.push(Datum::Null); + } + + encoder.append(&og_row); + encoder.append(&og_row_2); + let col = encoder.finish(); + + let projected_desc = desc.apply_demand(&BTreeSet::from([0, 2])); + + let decoder = >::decoder(&projected_desc, col).unwrap(); + + let mut rnd_row = Row::default(); + decoder.decode(0, &mut rnd_row); + let expected_row = Row::pack_slice(&[Datum::Int64(100), Datum::True]); + assert_eq!(expected_row, rnd_row); + + let mut rnd_row = Row::default(); + decoder.decode(1, &mut rnd_row); + let expected_row = Row::pack_slice(&[Datum::Null, Datum::Null]); + assert_eq!(expected_row, rnd_row); + } } diff --git a/src/storage-operators/src/persist_source.rs b/src/storage-operators/src/persist_source.rs index 48619d8d2f220..f6063630e9d1b 100644 --- a/src/storage-operators/src/persist_source.rs +++ b/src/storage-operators/src/persist_source.rs @@ -33,7 +33,7 @@ use mz_persist_client::fetch::{SerdeLeasedBatchPart, ShardSourcePart}; use mz_persist_client::operators::shard_source::{shard_source, SnapshotMode}; use mz_persist_types::codec_impls::UnitSchema; use mz_persist_types::{Codec, Codec64}; -use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationType, Row, RowArena, Timestamp}; +use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp}; use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow}; use mz_storage_types::errors::DataflowError; use mz_storage_types::sources::SourceData; @@ -141,6 +141,7 @@ pub fn persist_source( // dataflow. worker_dyncfgs: &ConfigSet, metadata: CollectionMetadata, + read_schema: Option, as_of: Option>, snapshot_mode: SnapshotMode, until: Antichain, @@ -204,6 +205,7 @@ where source_id, Arc::clone(&persist_clients), metadata.clone(), + read_schema, as_of.clone(), snapshot_mode, until.clone(), @@ -274,6 +276,7 @@ pub fn persist_source_core<'g, G>( source_id: GlobalId, persist_clients: Arc, metadata: CollectionMetadata, + read_schema: Option, as_of: Option>, snapshot_mode: SnapshotMode, until: Antichain, @@ -299,7 +302,6 @@ where { let cfg = persist_clients.cfg().clone(); let name = source_id.to_string(); - let desc = metadata.relation_desc.clone(); let ignores_data = map_filter_project .as_ref() .map_or(false, |x| x.ignores_input()); @@ -314,6 +316,12 @@ where }; let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone()); + // N.B. `read_schema` may be a subset of the total columns for this shard. + let read_desc = match read_schema { + Some(desc) => desc, + None => metadata.relation_desc, + }; + let desc_transformer = match flow_control { Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| { let (stream, token) = backpressure( @@ -350,7 +358,7 @@ where snapshot_mode, until.clone(), desc_transformer, - Arc::new(metadata.relation_desc), + Arc::new(read_desc.clone()), Arc::new(UnitSchema), move |stats, frontier| { let Some(lower) = frontier.as_option().copied() else { @@ -369,8 +377,8 @@ where ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper)); if let Some(plan) = &filter_plan { let metrics = &metrics.pushdown.part_stats; - let stats = RelationPartStats::new(&filter_name, metrics, &desc, stats); - filter_may_match(desc.typ(), time_range, stats, plan) + let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats); + filter_may_match(&read_desc, time_range, stats, plan) } else { true } @@ -385,13 +393,13 @@ where } fn filter_may_match( - relation_type: &RelationType, + relation_desc: &RelationDesc, time_range: ResultSpec, stats: RelationPartStats, plan: &MfpPlan, ) -> bool { let arena = RowArena::new(); - let mut ranges = ColumnSpecs::new(relation_type, &arena); + let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena); ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range); if stats.err_count().into_iter().any(|count| count > 0) { @@ -399,9 +407,11 @@ fn filter_may_match( return true; } - for (id, _) in relation_type.column_types.iter().enumerate() { - let result_spec = stats.col_stats(id, &arena); - ranges.push_column(id, result_spec); + // N.B. We may have pushed down column "demands" into Persist, so this + // relation desc may have a different set of columns than the stats. + for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() { + let result_spec = stats.col_stats(idx, &arena); + ranges.push_column(pos, result_spec); } let result = ranges.mfp_plan_filter(plan).range; result.may_contain(Datum::True) || result.may_fail() @@ -603,6 +613,9 @@ impl PendingWork { |time| !until.less_equal(time), row_builder, ) { + // Earlier we decided this Part doesn't need to be fetched, but to + // audit our logic we fetched it any way. If the MFP returned data it + // means our earlier decision to not fetch this part was incorrect. if let Some(_stats) = &is_filter_pushdown_audit { // NB: The tag added by this scope is used for alerting. The panic // message may be changed arbitrarily, but the tag key and val must diff --git a/src/storage-types/src/sources.rs b/src/storage-types/src/sources.rs index e59183e3c912b..56f2e1ca5dab7 100644 --- a/src/storage-types/src/sources.rs +++ b/src/storage-types/src/sources.rs @@ -26,6 +26,7 @@ use itertools::Itertools; use kafka::KafkaSourceExportDetails; use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails}; use mz_ore::assert_none; +use mz_persist_types::arrow::ArrayOrd; use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema2}; use mz_persist_types::stats::{ ColumnNullStats, ColumnStatKinds, ColumnarStats, PrimitiveStats, StructStats, @@ -1664,6 +1665,13 @@ impl SourceDataRowColumnarDecoder { } } } + + pub fn goodbytes(&self) -> usize { + match self { + SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(), + SourceDataRowColumnarDecoder::EmptyRow => 0, + } + } } #[derive(Debug)] @@ -1754,6 +1762,10 @@ impl ColumnDecoder for SourceDataColumnarDecoder { false } + fn goodbytes(&self) -> usize { + self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes() + } + fn stats(&self) -> StructStats { let len = self.err_decoder.len(); let err_stats = ColumnarStats { @@ -1943,17 +1955,21 @@ mod tests { use bytes::Bytes; use mz_expr::EvalError; use mz_ore::assert_err; + use mz_ore::metrics::MetricsRegistry; use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array}; use mz_persist::metrics::ColumnarMetrics; use mz_persist_types::parquet::EncodingConfig; use mz_persist_types::schema::{backward_compatible, Migration}; + use mz_persist_types::stats::{PartStats, PartStatsMetrics}; use mz_repr::{ - arb_relation_desc_diff, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder, - ScalarType, + arb_relation_desc_diff, arb_relation_desc_projection, ColumnIndex, DatumVec, + PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder, RowArena, ScalarType, }; use proptest::prelude::*; use proptest::strategy::{Union, ValueTree}; + use crate::stats::RelationPartStats; + use super::*; #[mz_ore::test] @@ -1970,14 +1986,22 @@ mod tests { } #[track_caller] - fn roundtrip_source_data(desc: RelationDesc, datas: Vec, config: &EncodingConfig) { + fn roundtrip_source_data( + desc: &RelationDesc, + datas: Vec, + read_desc: &RelationDesc, + config: &EncodingConfig, + ) { let metrics = ColumnarMetrics::disconnected(); - let mut encoder = >::encoder(&desc).unwrap(); + let mut encoder = >::encoder(desc).unwrap(); for data in &datas { encoder.append(data); } let col = encoder.finish(); + // The top-level StructArray for SourceData should always be non-nullable. + assert!(!col.is_nullable()); + // Reallocate our arrays with lgalloc. let col = realloc_array(&col, &metrics); @@ -2024,23 +2048,68 @@ mod tests { .clone(); // Try generating stats for the data, just to make sure we don't panic. - let _ = >::decoder_any(&desc, &rnd_col) + let stats = >::decoder_any(desc, &rnd_col) .expect("valid decoder") .stats(); // Read back all of our data and assert it roundtrips. let mut rnd_data = SourceData(Ok(Row::default())); - let decoder = >::decoder(&desc, rnd_col).unwrap(); - for (idx, og_data) in datas.into_iter().enumerate() { + let decoder = + >::decoder(desc, rnd_col.clone()).unwrap(); + for (idx, og_data) in datas.iter().enumerate() { decoder.decode(idx, &mut rnd_data); - assert_eq!(og_data, rnd_data); + assert_eq!(og_data, &rnd_data); + } + + // Read back all of our data a second time with a projection applied, and make sure the + // stats are valid. + let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new()); + let stats = RelationPartStats { + name: "test", + metrics: &stats_metrics, + stats: &PartStats { key: stats }, + desc: read_desc, + }; + let mut datum_vec = DatumVec::new(); + let arena = RowArena::default(); + let decoder = >::decoder(read_desc, rnd_col).unwrap(); + + for (idx, og_data) in datas.iter().enumerate() { + decoder.decode(idx, &mut rnd_data); + match (&og_data.0, &rnd_data.0) { + (Ok(og_row), Ok(rnd_row)) => { + // Filter down to just the Datums in the projection schema. + { + let datums = datum_vec.borrow_with(og_row); + let projected_datums = + datums.iter().enumerate().filter_map(|(idx, datum)| { + read_desc + .contains_index(&ColumnIndex::from_raw(idx)) + .then_some(datum) + }); + let og_projected_row = Row::pack(projected_datums); + assert_eq!(&og_projected_row, rnd_row); + } + + // Validate the stats for all of our projected columns. + { + let proj_datums = datum_vec.borrow_with(rnd_row); + for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() { + let spec = stats.col_stats(idx, &arena); + assert!(spec.may_contain(proj_datums[pos])); + } + } + } + (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data), + (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"), + } } // Verify that the RelationDesc itself roundtrips through // {encode,decode}_schema. - let encoded_schema = SourceData::encode_schema(&desc); + let encoded_schema = SourceData::encode_schema(desc); let roundtrip_desc = SourceData::decode_schema(&encoded_schema); - assert_eq!(desc, roundtrip_desc); + assert_eq!(desc, &roundtrip_desc); // Verify that the RelationDesc is backward compatible with itself (this // mostly checks for `unimplemented!` type panics). @@ -2066,13 +2135,19 @@ mod tests { } let num_rows = Union::new_weighted(weights); - let strat = (any::(), num_rows).prop_flat_map(|(desc, num_rows)| { - proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows) - .prop_map(move |datas| (desc.clone(), datas)) - }); + // TODO(parkmycar): There are so many clones going on here, and maybe we can avoid them? + let strat = (any::(), num_rows) + .prop_flat_map(|(desc, num_rows)| { + arb_relation_desc_projection(desc.clone()) + .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone())) + }) + .prop_flat_map(|(desc, read_desc, num_rows)| { + proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows) + .prop_map(move |datas| (desc.clone(), datas, read_desc.clone())) + }); - proptest!(|((config, (desc, source_datas)) in (any::(), strat))| { - roundtrip_source_data(desc, source_datas, &config); + proptest!(|((config, (desc, source_datas, read_desc)) in (any::(), strat))| { + roundtrip_source_data(&desc, source_datas, &read_desc, &config); }); } @@ -2088,7 +2163,7 @@ mod tests { EvalError::DateOutOfRange.into(), )))]; let config = EncodingConfig::default(); - roundtrip_source_data(desc, source_datas, &config); + roundtrip_source_data(&desc, source_datas, &desc, &config); } fn is_sorted(array: &dyn Array) -> bool { @@ -2150,9 +2225,21 @@ mod tests { assert!(migration.is_some()); } + #[mz_ore::test] + fn backward_compatible_project_away_all() { + let old = RelationDesc::from_names_and_types([("a", ScalarType::Bool.nullable(true))]); + let new = RelationDesc::empty(); + + let old_data_type = get_data_type(&old); + let new_data_type = get_data_type(&new); + + let migration = backward_compatible(&old_data_type, &new_data_type); + assert!(migration.is_some()); + } + #[mz_ore::test] #[cfg_attr(miri, ignore)] - fn backward_compatible_migrate1() { + fn backward_compatible_migrate() { let strat = (any::(), any::()).prop_flat_map(|(old, new)| { proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2) .prop_map(move |datas| (old.clone(), new.clone(), datas)) @@ -2180,8 +2267,7 @@ mod tests { typ: ColumnType { nullable, .. }, .. } => *nullable, - // TODO(parkmycar): Re-enable DropColumn. - // PropRelationDescDiff::DropColumn { .. } => true, + PropRelationDescDiff::DropColumn { .. } => true, _ => false, }); @@ -2226,7 +2312,7 @@ mod tests { // Note: This case should be covered by the `all_source_data_roundtrips` test above, but // it's a special case that we explicitly want to exercise. proptest!(|((config, (desc, source_datas)) in (any::(), rows))| { - roundtrip_source_data(desc, source_datas, &config); + roundtrip_source_data(&desc, source_datas, &desc, &config); }); } diff --git a/src/storage-types/src/stats.rs b/src/storage-types/src/stats.rs index 0ff48f8d15244..44c198d88cabe 100644 --- a/src/storage-types/src/stats.rs +++ b/src/storage-types/src/stats.rs @@ -13,7 +13,7 @@ use mz_expr::{ColumnSpecs, Interpreter, MapFilterProject, ResultSpec, Unmaterial use mz_persist_types::stats::{ BytesStats, ColumnStatKinds, JsonStats, PartStats, PartStatsMetrics, }; -use mz_repr::{ColumnType, Datum, RelationDesc, RowArena, ScalarType}; +use mz_repr::{ColumnIndex, ColumnType, Datum, RelationDesc, RowArena, ScalarType}; /// Bundles together a relation desc with the stats for a specific part, and translates between /// Persist's stats representation and the `ResultSpec`s that are used for eg. filter pushdown. @@ -52,9 +52,9 @@ impl RelationPartStats<'_> { return true; } - for (id, _) in self.desc.typ().column_types.iter().enumerate() { - let result_spec = self.col_stats(id, &arena); - ranges.push_column(id, result_spec); + for (pos, (idx, _name, _typ)) in self.desc.iter_all().enumerate() { + let result_spec = self.col_stats(idx, &arena); + ranges.push_column(pos, result_spec); } let result = ranges.mfp_filter(mfp).range; result.may_contain(Datum::True) || result.may_fail() @@ -100,21 +100,21 @@ impl RelationPartStats<'_> { } } - pub fn col_stats<'a>(&'a self, id: usize, arena: &'a RowArena) -> ResultSpec<'a> { - let value_range = match self.col_values(id, arena) { + pub fn col_stats<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> ResultSpec<'a> { + let value_range = match self.col_values(idx, arena) { Some(spec) => spec, None => ResultSpec::anything(), }; - let json_range = self.col_json(id, arena).unwrap_or(ResultSpec::anything()); + let json_range = self.col_json(idx, arena).unwrap_or(ResultSpec::anything()); // If this is not a JSON column or we don't have JSON stats, json_range is // [ResultSpec::anything] and this is a noop. value_range.intersect(json_range) } - fn col_json<'a>(&'a self, idx: usize, arena: &'a RowArena) -> Option> { - let name = self.desc.get_name(idx); - let typ = &self.desc.typ().column_types[idx]; + fn col_json<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option> { + let name = self.desc.get_name_idx(idx); + let typ = &self.desc.get_type(idx); let ok_stats = self.stats.key.col("ok")?; let ok_stats = ok_stats @@ -185,9 +185,9 @@ impl RelationPartStats<'_> { num_oks.map(|num_oks| num_results - num_oks) } - fn col_values<'a>(&'a self, idx: usize, arena: &'a RowArena) -> Option> { - let name = self.desc.get_name(idx); - let typ = &self.desc.typ().column_types[idx]; + fn col_values<'a>(&'a self, idx: &ColumnIndex, arena: &'a RowArena) -> Option> { + let name = self.desc.get_name_idx(idx); + let typ = self.desc.get_type(idx); let ok_stats = self.stats.key.cols.get("ok")?; let ColumnStatKinds::Struct(ok_stats) = &ok_stats.values else { @@ -259,7 +259,7 @@ mod tests { // Validate that the stats would include all of the provided datums. for datum in datums { - let spec = stats.col_stats(0, &arena); + let spec = stats.col_stats(&ColumnIndex::from_raw(0), &arena); assert!(spec.may_contain(*datum)); } diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index 8bdd935014454..f5351b6fdfd55 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -63,6 +63,7 @@ pub(crate) fn render_sink<'g, G: Scope>( &storage_state.txns_ctx, storage_state.storage_configuration.config_set(), sink.from_storage_metadata.clone(), + None, Some(sink.as_of.clone()), snapshot_mode, timely::progress::Antichain::new(), diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 74fe3f45a6b93..279b9160cd5cf 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -281,6 +281,7 @@ where export_id, persist_clients, storage_metadata, + None, Some(as_of), SnapshotMode::Include, Antichain::new(),