From 5d251d63bd49ff84c8dbe3e1d2f33ad2b3b37dd6 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 17 Nov 2025 18:22:33 +0100 Subject: [PATCH] [poc] CREATE MATERIALIZED VIEW ... REPLACING --- src/adapter/src/continual_task.rs | 1 + src/adapter/src/coord.rs | 2 +- src/adapter/src/coord/command_handler.rs | 1 + .../sequencer/inner/create_continual_task.rs | 2 +- .../inner/create_materialized_view.rs | 46 ++++++------ src/sql-lexer/src/keywords.txt | 1 + src/sql-parser/src/ast/defs/statement.rs | 1 + src/sql-parser/src/parser.rs | 9 +++ src/sql/src/normalize.rs | 4 ++ src/sql/src/plan.rs | 1 + src/sql/src/plan/statement/ddl.rs | 10 +++ src/storage-client/src/controller.rs | 8 ++- src/storage-client/src/storage_collections.rs | 71 +++++++++++++++++-- src/storage-controller/src/lib.rs | 45 ++++++++++-- 14 files changed, 167 insertions(+), 35 deletions(-) diff --git a/src/adapter/src/continual_task.rs b/src/adapter/src/continual_task.rs index 193d2fb32948d..9b90a2f4c46cb 100644 --- a/src/adapter/src/continual_task.rs +++ b/src/adapter/src/continual_task.rs @@ -43,6 +43,7 @@ pub fn ct_item_from_plan( compaction_window: _, refresh_schedule: _, as_of, + replacing: _, }, } = plan; diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 200bc1f061d60..f63f6dec35905 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2810,7 +2810,7 @@ impl Coordinator { CatalogItem::ContinualTask(ct) => { let collection_desc = CollectionDescription { desc: ct.desc.clone(), - data_source: DataSource::Other, + data_source: DataSource::Other { primary: None }, since: ct.initial_as_of.clone(), status_collection_id: None, timeline: None, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 549efa3cf444a..c8ddce10160ee 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -1174,6 +1174,7 @@ impl Coordinator { query: cmvs.query, with_options: cmvs.with_options, as_of: None, + replacing: cmvs.replacing, }); // (Purifying CreateMaterializedView doesn't happen async, so no need to send diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs index 1b826cd44cfdb..40e6bf9849559 100644 --- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -150,7 +150,7 @@ impl Coordinator { global_id, CollectionDescription { desc, - data_source: DataSource::Other, + data_source: DataSource::Other { primary: None }, since: Some(as_of), status_collection_id: None, timeline: None, diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 333fecce291d7..27bfe169b54ca 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -46,6 +46,7 @@ use crate::error::AdapterError; use crate::explain::explain_dataflow; use crate::explain::explain_plan; use crate::explain::optimizer_trace::OptimizerTrace; +use crate::optimize::OptimizerCatalog; use crate::optimize::dataflows::dataflow_import_id_bundle; use crate::optimize::{self, Optimize}; use crate::session::Session; @@ -565,6 +566,7 @@ impl Coordinator { non_null_assertions, compaction_window, refresh_schedule, + replacing, .. }, drop_ids, @@ -663,6 +665,9 @@ impl Coordinator { .take(global_lir_plan.df_meta().optimizer_notices.len()) .collect::>(); + let primary = + replacing.map(|id| self.catalog().get_entry_by_item_id(&id).latest_global_id()); + let transact_result = self .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| { Box::pin(async move { @@ -688,25 +693,23 @@ impl Coordinator { let storage_metadata = coord.catalog.state().storage_metadata(); // Announce the creation of the materialized view source. - coord - .controller - .storage - .create_collections( - storage_metadata, - None, - vec![( - global_id, - CollectionDescription { - desc: output_desc, - data_source: DataSource::Other, - since: Some(storage_as_of), - status_collection_id: None, - timeline: None, - }, - )], - ) - .await - .unwrap_or_terminate("cannot fail to append"); + let desc = CollectionDescription { + desc: output_desc, + data_source: DataSource::Other { primary }, + since: Some(storage_as_of), + status_collection_id: None, + timeline: None, + }; + if primary.is_some() { + coord.controller.storage.create_alias(global_id, desc).await; + } else { + coord + .controller + .storage + .create_collections(storage_metadata, None, vec![(global_id, desc)]) + .await + .unwrap_or_terminate("cannot fail to append"); + } coord .initialize_storage_read_policies( @@ -722,7 +725,10 @@ impl Coordinator { notice_builtin_updates_fut, ) .await; - coord.allow_writes(cluster_id, global_id); + + if replacing.is_none() { + coord.allow_writes(cluster_id, global_id); + } }) }) .await; diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index e126ccd581f50..990fcdf3a2f1c 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -390,6 +390,7 @@ Rename Reoptimize Repeatable Replace +Replacing Replan Replica Replicas diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 22cb27da9ea26..c709086dcb7bf 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -1386,6 +1386,7 @@ pub struct CreateMaterializedViewStatement { pub query: Query, pub as_of: Option, pub with_options: Vec>, + pub replacing: Option, } impl AstDisplay for CreateMaterializedViewStatement { diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 32c71f4ac5f61..1f8be00bc15db 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3861,6 +3861,14 @@ impl<'a> Parser<'a> { } let name = self.parse_item_name()?; + + let replacing = if self.parse_keyword(REPLACING) { + let name = self.parse_raw_name()?; + Some(name) + } else { + None + }; + let columns = self.parse_parenthesized_column_list(Optional)?; let in_cluster = self.parse_optional_in_cluster()?; @@ -3886,6 +3894,7 @@ impl<'a> Parser<'a> { query, as_of, with_options, + replacing, }, )) } diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index 250b47077da60..12a54e42f7f60 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -407,10 +407,14 @@ pub fn create_statement( query, with_options: _, as_of: _, + replacing, }) => { *name = allocate_name(name)?; { let mut normalizer = QueryNormalizer::new(); + if let Some(name) = replacing { + normalizer.visit_item_name_mut(name); + } normalizer.visit_query_mut(query); if let Some(err) = normalizer.err { return Err(err); diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 799648108b609..d7a47d3a31c5a 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -1818,6 +1818,7 @@ pub struct MaterializedView { pub compaction_window: Option, pub refresh_schedule: Option, pub as_of: Option, + pub replacing: Option, } #[derive(Clone, Debug)] diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index e327b98bb148d..05ce3c5ab3db0 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -2720,6 +2720,14 @@ pub fn plan_create_materialized_view( scx: &StatementContext, mut stmt: CreateMaterializedViewStatement, ) -> Result { + let replacing = if let Some(name) = &stmt.replacing { + let item = scx.get_item_by_resolved_name(name)?; + assert_eq!(item.item_type(), CatalogItemType::MaterializedView); + Some(item.id()) + } else { + None + }; + let cluster_id = crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?; stmt.in_cluster = Some(ResolvedClusterName { @@ -2994,6 +3002,7 @@ pub fn plan_create_materialized_view( compaction_window, refresh_schedule, as_of, + replacing, }, replace, drop_ids, @@ -3243,6 +3252,7 @@ pub fn plan_create_continual_task( compaction_window: None, refresh_schedule: None, as_of, + replacing: None, }, })) } diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 03b9eaf0eb690..b25938cff1e3c 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -132,7 +132,7 @@ pub enum DataSource { }, /// This source's data does not need to be managed by the storage /// controller, e.g. it's a materialized view or the catalog collection. - Other, + Other { primary: Option }, /// This collection is the output collection of a sink. Sink { desc: ExportDescription }, } @@ -158,7 +158,7 @@ impl CollectionDescription { pub fn for_other(desc: RelationDesc, since: Option>) -> Self { Self { desc, - data_source: DataSource::Other, + data_source: DataSource::Other { primary: None }, since, status_collection_id: None, timeline: None, @@ -514,6 +514,8 @@ pub trait StorageController: Debug { register_ts: Self::Timestamp, ) -> Result<(), StorageError>; + async fn create_alias(&mut self, id: GlobalId, desc: CollectionDescription); + /// Acquire an immutable reference to the export state, should it exist. fn export( &self, @@ -737,7 +739,7 @@ impl DataSource { pub fn in_txns(&self) -> bool { match self { DataSource::Table { .. } => true, - DataSource::Other + DataSource::Other { .. } | DataSource::Ingestion(_) | DataSource::IngestionExport { .. } | DataSource::Introspection(_) diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index aec2c3eccbbb1..695201d800cf7 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -306,6 +306,8 @@ pub trait StorageCollections: Debug { expected_version: RelationVersion, ) -> Result<(), StorageError>; + async fn create_alias(&self, id: GlobalId, desc: CollectionDescription); + /// Drops the read capability for the sources and allows their resources to /// be reclaimed. /// @@ -916,9 +918,12 @@ where | DataSource::Webhook | DataSource::Table { primary: None } | DataSource::Progress - | DataSource::Other => Vec::new(), + | DataSource::Other { primary: None } => Vec::new(), DataSource::Table { primary: Some(primary), + } + | DataSource::Other { + primary: Some(primary), } => vec![*primary], DataSource::IngestionExport { ingestion_id, @@ -1346,7 +1351,7 @@ where let collection = collections.get(&key).expect("must still exist"); let should_emit_persist_compaction = !matches!( collection.description.data_source, - DataSource::Table { primary: Some(_) } + DataSource::Table { primary: Some(_) } | DataSource::Other { primary: Some(_) } ); if frontier.is_empty() { @@ -1904,7 +1909,7 @@ where | DataSource::Webhook | DataSource::Ingestion(_) | DataSource::Progress - | DataSource::Other => {} + | DataSource::Other { .. } => {} DataSource::Sink { .. } => {} DataSource::Table { .. } => { let register_ts = register_ts.expect( @@ -2108,7 +2113,7 @@ where } self_collections.insert(id, collection_state); } - DataSource::Progress | DataSource::Other => { + DataSource::Progress | DataSource::Other { .. } => { self_collections.insert(id, collection_state); } DataSource::Ingestion(_) => { @@ -2413,6 +2418,62 @@ where Ok(()) } + async fn create_alias(&self, id: GlobalId, desc: CollectionDescription) { + let primary_id = match &desc.data_source { + DataSource::Other { primary: Some(id) } => *id, + _ => panic!("invalid data source"), + }; + + let data_shard = { + let mut collections = self.collections.lock().unwrap(); + let primary = collections.get(&primary_id).unwrap(); + let data_shard = primary.collection_metadata.data_shard; + + let implied_capability = primary.read_capabilities.frontier().to_owned(); + let write_frontier = primary.write_frontier.clone(); + + let mut changes = ChangeBatch::new(); + changes.extend(implied_capability.iter().map(|t| (t.clone(), 1))); + + let collection_meta = CollectionMetadata { + persist_location: self.persist_location.clone(), + relation_desc: desc.desc.clone(), + data_shard, + txns_shard: None, + }; + + let collection = CollectionState::new( + desc.clone(), + implied_capability, + write_frontier, + Vec::new(), + collection_meta, + ); + collections.insert(id, collection); + + let mut updates = BTreeMap::from([(id, changes)]); + StorageCollectionsImpl::update_read_capabilities_inner( + &self.cmd_tx, + &mut *collections, + &mut updates, + ); + + data_shard + }; + + let persist_client = self + .persist + .open(self.persist_location.clone()) + .await + .unwrap(); + + let (write_handle, since_handle) = self + .open_data_handles(&id, data_shard, None, desc.desc, &persist_client) + .await; + + self.register_handles(id, true, since_handle, write_handle); + } + fn drop_collections_unvalidated( &self, storage_metadata: &StorageMetadata, @@ -2619,7 +2680,7 @@ where result = Some(TimeDependence::default()) } // Materialized views, continual tasks, etc, aren't managed by storage. - Other => {} + Other { .. } => {} Sink { .. } => {} }; } diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 0776d4cb4ee31..d75f0d628e94e 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -1088,7 +1088,7 @@ where ); table_registers.push((id, write)); } - DataSource::Progress | DataSource::Other => { + DataSource::Progress | DataSource::Other { .. } => { debug!( ?data_source, meta = ?metadata, "not registering {id} with a controller persist worker", @@ -1214,7 +1214,7 @@ where | DataSource::Webhook | DataSource::Table { .. } | DataSource::Progress - | DataSource::Other => {} + | DataSource::Other { .. } => {} DataSource::Sink { .. } => { if !self.read_only { self.run_export(id)?; @@ -1462,6 +1462,38 @@ where Ok(()) } + async fn create_alias(&mut self, id: GlobalId, desc: CollectionDescription) { + self.storage_collections.create_alias(id, desc.clone()).await; + + let primary = match &desc.data_source { + DataSource::Other { primary: Some(id) } => *id, + _ => panic!("invalid data source"), + }; + + let data_shard = self + .collection(primary) + .unwrap() + .collection_metadata + .data_shard; + + let collection_meta = CollectionMetadata { + persist_location: self.persist_location.clone(), + data_shard, + relation_desc: desc.desc, + txns_shard: None, + }; + let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, None); + let collection = CollectionState::new( + desc.data_source, + collection_meta, + CollectionStateExtra::None, + wallclock_lag_metrics, + ); + + self.collections.insert(id, collection); + self.append_shard_mappings([id].into_iter(), Diff::ONE); + } + fn export( &self, id: GlobalId, @@ -1884,7 +1916,7 @@ where ingestions_to_drop.insert(*id); source_statistics_to_drop.push(*id); } - DataSource::Progress | DataSource::Table { .. } | DataSource::Other => { + DataSource::Progress | DataSource::Table { .. } | DataSource::Other { .. } => { collections_to_drop.push(*id); } DataSource::Introspection(_) | DataSource::Sink { .. } => { @@ -3221,9 +3253,12 @@ where | DataSource::Webhook | DataSource::Table { primary: None } | DataSource::Progress - | DataSource::Other => vec![], + | DataSource::Other { primary: None } => vec![], DataSource::Table { primary: Some(primary), + } + | DataSource::Other { + primary: Some(primary), } => vec![*primary], DataSource::IngestionExport { ingestion_id, .. } => { // Ingestion exports depend on their primary source's remap @@ -3832,7 +3867,7 @@ impl CollectionState { // duplicate measurements. Collections written by other components (e.g. compute) have // their wallclock lags recorded by these components. let wallclock_lag_histogram_stash = match &data_source { - DataSource::Other => None, + DataSource::Other { .. } => None, _ => Some(Default::default()), };