Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist/storage: communicate projection pushdown via shard_source #30764

Merged
merged 10 commits into from
Jan 7, 2025
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def get_default_system_parameters(
"compute_dataflow_max_inflight_bytes": "134217728", # 128 MiB
"compute_hydration_concurrency": "2",
"compute_replica_expiration_offset": "3d",
"compute_apply_column_demands": "true",
"disk_cluster_replicas_default": "true",
"enable_0dt_deployment": "true" if zero_downtime else "false",
"enable_0dt_deployment_panic_after_timeout": "true",
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ def __init__(
self.flags_with_values["persist_encoding_enable_dictionary"] = (
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["compute_apply_column_demands"] = BOOLEAN_FLAG_VALUES

def run(self, exe: Executor) -> bool:
flag_name = self.rng.choice(list(self.flags_with_values.keys()))
Expand Down
10 changes: 10 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ pub const COMPUTE_REPLICA_EXPIRATION_OFFSET: Config<Duration> = Config::new(
"The expiration time offset for replicas. Zero disables expiration.",
);

/// When enabled, applies the column demands from a MapFilterProject onto the RelationDesc used to
/// read out of Persist. This allows Persist to prune unneeded columns as a performance
/// optimization.
pub const COMPUTE_APPLY_COLUMN_DEMANDS: Config<bool> = Config::new(
"compute_apply_column_demands",
false,
"When enabled, passes applys column demands to the RelationDesc used to read out of Persist.",
);

/// Adds the full set of all compute `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -169,4 +178,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COPY_TO_S3_MULTIPART_PART_SIZE_BYTES)
.add(&ENABLE_COMPUTE_REPLICA_EXPIRATION)
.add(&COMPUTE_REPLICA_EXPIRATION_OFFSET)
.add(&COMPUTE_APPLY_COLUMN_DEMANDS)
}
22 changes: 21 additions & 1 deletion src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ use differential_dataflow::{AsCollection, Collection, Data};
use futures::channel::oneshot;
use futures::FutureExt;
use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
use mz_compute_types::dyncfgs::COMPUTE_APPLY_COLUMN_DEMANDS;
use mz_compute_types::plan::render_plan::{
self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
};
Expand Down Expand Up @@ -204,6 +205,7 @@ pub fn build_compute_dataflow<A: Allocate>(
.collect::<Vec<_>>();

let worker_logging = timely_worker.log_register().get("timely");
let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);

let name = format!("Dataflow: {}", &dataflow.debug_name);
let input_name = format!("InputRegion: {}", &dataflow.debug_name);
Expand All @@ -224,7 +226,24 @@ pub fn build_compute_dataflow<A: Allocate>(
// Import declared sources into the rendering context.
for (source_id, (source, _monotonic)) in dataflow.source_imports.iter() {
region.region_named(&format!("Source({:?})", source_id), |inner| {
let mut mfp = source.arguments.operators.clone().map(|ops| {
let mut read_schema = None;
let mut mfp = source.arguments.operators.clone().map(|mut ops| {
// If enabled, we read from Persist with a `RelationDesc` that
// omits uneeded columns.
if apply_demands {
let demands = ops.demand();
let new_desc =
source.storage_metadata.relation_desc.apply_demand(&demands);
let new_arity = demands.len();
let remap: BTreeMap<_, _> = demands
.into_iter()
.enumerate()
.map(|(new, old)| (old, new))
.collect();
ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
read_schema = Some(new_desc);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, some things I think might make this a little cleaner:

  • The demand + permute could be a new helper on the MFP itself... pre_project or something? May help clarify intent.
  • This logic could live inside the persist_source itself... though I guess you'd need to move the MfpPlan::create_from in there too. My feeling is that (rollout aside) you'd never not want to apply this projection, so better to not make it the caller's responisibility.

Neither suggestion blocking!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with adding a demand + permute helper on the MFP itself, but couldn't get something that I really liked. Mainly because applying the demands on a RelationDesc returns a new RelationDesc instead of modifying in-place, which I think requires the helper to be able to return the new RelationDesc. It could also be a case of morning brain, if you have something specific you're thinking of here let me know, happy to circle back on it!


mz_expr::MfpPlan::create_from(ops)
.expect("Linear operators should always be valid")
});
Expand All @@ -247,6 +266,7 @@ pub fn build_compute_dataflow<A: Allocate>(
&compute_state.txns_ctx,
&compute_state.worker_config,
source.storage_metadata.clone(),
read_schema,
dataflow.as_of.clone(),
snapshot_mode,
until.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ where
&compute_state.txns_ctx,
&compute_state.worker_config,
target.clone(),
None,
source_as_of,
SnapshotMode::Include,
Antichain::new(), // we want all updates
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/sink/materialized_view_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ where
&compute_state.txns_ctx,
&compute_state.worker_config,
target,
None,
as_of,
SnapshotMode::Include,
until,
Expand Down
30 changes: 26 additions & 4 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredEx
use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::{Blob, SeqNo};
use mz_persist::metrics::ColumnarMetrics;
use mz_persist_types::arrow::ArrayOrd;
use mz_persist_types::columnar::{ColumnDecoder, Schema2};
use mz_persist_types::stats::PartStats;
use mz_persist_types::{Codec, Codec64};
Expand Down Expand Up @@ -838,10 +839,23 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
}
}
match &migration {
PartMigration::SameSchema { both } => (
decode::<K>("key", &*both.key, &structured.key),
decode::<V>("val", &*both.val, &structured.val),
),
PartMigration::SameSchema { both } => {
let key_size_before = ArrayOrd::new(&structured.key).goodbytes();

let key = decode::<K>("key", &*both.key, &structured.key);
let val = decode::<V>("val", &*both.val, &structured.val);

if let Some(key_decoder) = key.as_ref() {
let key_size_after = key_decoder.goodbytes();
let key_diff = key_size_before.saturating_sub(key_size_after);
metrics
.pushdown
.parts_projection_trimmed_bytes
.inc_by(u64::cast_from(key_diff));
}

(key, val)
}
PartMigration::Codec { .. } => (None, None),
PartMigration::Either {
_write,
Expand All @@ -857,6 +871,14 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
.migration_migrate_seconds
.inc_by(start.elapsed().as_secs_f64());

let key_before_size = ArrayOrd::new(&structured.key).goodbytes();
let key_after_size = ArrayOrd::new(&key).goodbytes();
let key_diff = key_before_size.saturating_sub(key_after_size);
metrics
.pushdown
.parts_projection_trimmed_bytes
.inc_by(u64::cast_from(key_diff));

(
decode::<K>("key", &*read.key, &key),
decode::<V>("val", &*read.val, &val),
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2377,6 +2377,7 @@ pub struct PushdownMetrics {
pub(crate) parts_faked_bytes: IntCounter,
pub(crate) parts_stats_trimmed_count: IntCounter,
pub(crate) parts_stats_trimmed_bytes: IntCounter,
pub(crate) parts_projection_trimmed_bytes: IntCounter,
pub part_stats: PartStatsMetrics,
}

Expand Down Expand Up @@ -2431,6 +2432,10 @@ impl PushdownMetrics {
name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
help: "total bytes trimmed from part stats",
)),
parts_projection_trimmed_bytes: registry.register(metric!(
name: "mz_persist_pushdown_parts_projection_trimmed_bytes",
help: "total bytes trimmed from columnar data because of projection pushdown",
)),
part_stats: PartStatsMetrics::new(registry),
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/persist-client/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ mod tests {
use mz_dyncfg::ConfigUpdates;
use mz_ore::error::ErrorExt;
use mz_ore::{assert_contains, assert_err, assert_ok};
use mz_persist_types::arrow::ArrayOrd;
use mz_persist_types::codec_impls::UnitSchema;
use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema2};
use mz_persist_types::stats::{NoneStats, StructStats};
Expand Down Expand Up @@ -614,6 +615,12 @@ mod tests {
fn is_null(&self, _: usize) -> bool {
false
}
fn goodbytes(&self) -> usize {
self.0
.iter()
.map(|val| ArrayOrd::String(val.clone()).goodbytes())
.sum()
}
fn stats(&self) -> StructStats {
StructStats {
len: self.0[0].len(),
Expand Down
12 changes: 12 additions & 0 deletions src/persist-types/src/codec_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use arrow::array::{
use bytes::{BufMut, Bytes};
use timely::order::Product;

use crate::arrow::ArrayOrd;
use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema2};
use crate::stats::{ColumnStatKinds, ColumnarStats, NoneStats, StructStats};
use crate::{Codec, Codec64, Opaque, ShardId};
Expand Down Expand Up @@ -94,6 +95,10 @@ impl ColumnDecoder<()> for UnitColumnar {
}
}

fn goodbytes(&self) -> usize {
0
}

fn stats(&self) -> StructStats {
StructStats {
len: self.len,
Expand Down Expand Up @@ -282,6 +287,9 @@ impl<T: SimpleColumnarData> ColumnDecoder<T> for SimpleColumnarDecoder<T> {
fn is_null(&self, idx: usize) -> bool {
self.0.is_null(idx)
}
fn goodbytes(&self) -> usize {
ArrayOrd::new(&self.0).goodbytes()
}

fn stats(&self) -> StructStats {
ColumnarStats::one_column_struct(self.0.len(), ColumnStatKinds::None)
Expand Down Expand Up @@ -602,6 +610,10 @@ impl<T> ColumnDecoder<T> for TodoColumnarDecoder<T> {
panic!("TODO")
}

fn goodbytes(&self) -> usize {
panic!("TODO")
}

fn stats(&self) -> StructStats {
panic!("TODO")
}
Expand Down
3 changes: 3 additions & 0 deletions src/persist-types/src/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub trait ColumnDecoder<T> {
/// Returns if the value at `idx` is null.
fn is_null(&self, idx: usize) -> bool;

/// Returns the number of bytes used by this decoder.
fn goodbytes(&self) -> usize;

/// Returns statistics for the column. This structure is defined by Persist,
/// but the contents are determined by the client; Persist will preserve
/// them in the part metadata and make them available to readers.
Expand Down
48 changes: 45 additions & 3 deletions src/persist-types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::{new_null_array, Array, AsArray, ListArray, StructArray};
use arrow::array::{new_null_array, Array, AsArray, ListArray, NullArray, StructArray};
use arrow::datatypes::{DataType, Field, FieldRef, Fields, SchemaBuilder};
use itertools::Itertools;
use mz_ore::cast::CastFrom;
Expand Down Expand Up @@ -108,9 +108,17 @@ pub(crate) enum StructArrayMigration {
name: String,
typ: DataType,
},
/// Drop the field of the provided name.
DropField {
name: String,
},
/// Replace the field with a NullArray.
///
/// Special case for projecting away all of the columns in a StructArray.
MakeNull {
name: String,
},
/// Make the field of the provided name nullable.
AlterFieldNullable {
name: String,
},
Expand Down Expand Up @@ -159,6 +167,8 @@ impl ArrayMigration {
for migration in migrations {
migration.migrate(len, &mut fields, &mut arrays);
}
assert_eq!(fields.len(), arrays.len(), "invalid migration");

Arc::new(StructArray::new(fields, arrays, nulls))
}
List(field, entry_migration) => {
Expand All @@ -183,7 +193,7 @@ impl StructArrayMigration {
use StructArrayMigration::*;
match self {
AddFieldNullableAtEnd { .. } => false,
DropField { .. } => true,
DropField { .. } | MakeNull { .. } => true,
AlterFieldNullable { .. } => false,
Recurse { migration, .. } => migration.contains_drop(),
}
Expand All @@ -207,6 +217,20 @@ impl StructArrayMigration {
f.remove(idx);
*fields = f.finish().fields;
}
MakeNull { name } => {
let (idx, _) = fields
.find(name)
.unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields));
let array_len = arrays
.get(idx)
.expect("checked above that this exists")
.len();
arrays[idx] = Arc::new(NullArray::new(array_len));
let mut f = SchemaBuilder::from(&*fields);
let field = f.field_mut(idx);
*field = Arc::new(Field::new(name.clone(), DataType::Null, true));
*fields = f.finish().fields;
}
AlterFieldNullable { name } => {
let (idx, _) = fields
.find(name)
Expand Down Expand Up @@ -334,6 +358,22 @@ fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigrati
if o.is_nullable() && !n.is_nullable() {
return None;
}

// Special case, dropping all of the fields in a StructArray.
//
// Note: In the SourceDataColumnarEncoder we model empty Rows as a
// NullArray and use the validity bitmask on the 'err' column to
// determine whether or not a field is actually null.
if matches!(o.data_type(), DataType::Struct(_))
&& o.is_nullable()
&& n.data_type().is_null()
{
field_migrations.push(MakeNull {
name: n.name().clone(),
});
continue;
}

// However, allowed to make a non-nullable field nullable.
let make_nullable = !o.is_nullable() && n.is_nullable();

Expand All @@ -352,7 +392,9 @@ fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigrati
NoOp => true,
List(_field, child) => recursively_all_nullable(child),
Struct(children) => children.iter().all(|child| match child {
AddFieldNullableAtEnd { .. } | DropField { .. } => false,
AddFieldNullableAtEnd { .. } | DropField { .. } | MakeNull { .. } => {
false
}
AlterFieldNullable { .. } => true,
Recurse { migration, .. } => recursively_all_nullable(migration),
}),
Expand Down
8 changes: 4 additions & 4 deletions src/repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ pub use crate::datum_vec::{DatumVec, DatumVecBorrow};
pub use crate::diff::Diff;
pub use crate::global_id::GlobalId;
pub use crate::relation::{
arb_relation_desc_diff, arb_row_for_relation, ColumnName, ColumnType, NotNullViolation,
PropRelationDescDiff, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
RelationDesc, RelationDescBuilder, RelationType, RelationVersion, RelationVersionSelector,
VersionedRelationDesc,
arb_relation_desc_diff, arb_relation_desc_projection, arb_row_for_relation, ColumnIndex,
ColumnName, ColumnType, NotNullViolation, PropRelationDescDiff, ProtoColumnName,
ProtoColumnType, ProtoRelationDesc, ProtoRelationType, RelationDesc, RelationDescBuilder,
RelationType, RelationVersion, RelationVersionSelector, VersionedRelationDesc,
};
pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder};
pub use crate::row::iter::{IntoRowIterator, RowIterator};
Expand Down
Loading
Loading