Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapter/src/continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub fn ct_item_from_plan(
compaction_window: _,
refresh_schedule: _,
as_of,
replacing: _,
},
} = plan;

Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2810,7 +2810,7 @@ impl Coordinator {
CatalogItem::ContinualTask(ct) => {
let collection_desc = CollectionDescription {
desc: ct.desc.clone(),
data_source: DataSource::Other,
data_source: DataSource::Other { primary: None },
since: ct.initial_as_of.clone(),
status_collection_id: None,
timeline: None,
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,7 @@ impl Coordinator {
query: cmvs.query,
with_options: cmvs.with_options,
as_of: None,
replacing: cmvs.replacing,
});

// (Purifying CreateMaterializedView doesn't happen async, so no need to send
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl Coordinator {
global_id,
CollectionDescription {
desc,
data_source: DataSource::Other,
data_source: DataSource::Other { primary: None },
since: Some(as_of),
status_collection_id: None,
timeline: None,
Expand Down
46 changes: 26 additions & 20 deletions src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::error::AdapterError;
use crate::explain::explain_dataflow;
use crate::explain::explain_plan;
use crate::explain::optimizer_trace::OptimizerTrace;
use crate::optimize::OptimizerCatalog;
use crate::optimize::dataflows::dataflow_import_id_bundle;
use crate::optimize::{self, Optimize};
use crate::session::Session;
Expand Down Expand Up @@ -565,6 +566,7 @@ impl Coordinator {
non_null_assertions,
compaction_window,
refresh_schedule,
replacing,
..
},
drop_ids,
Expand Down Expand Up @@ -663,6 +665,9 @@ impl Coordinator {
.take(global_lir_plan.df_meta().optimizer_notices.len())
.collect::<Vec<_>>();

let primary =
replacing.map(|id| self.catalog().get_entry_by_item_id(&id).latest_global_id());

let transact_result = self
.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
Box::pin(async move {
Expand All @@ -688,25 +693,23 @@ impl Coordinator {
let storage_metadata = coord.catalog.state().storage_metadata();

// Announce the creation of the materialized view source.
coord
.controller
.storage
.create_collections(
storage_metadata,
None,
vec![(
global_id,
CollectionDescription {
desc: output_desc,
data_source: DataSource::Other,
since: Some(storage_as_of),
status_collection_id: None,
timeline: None,
},
)],
)
.await
.unwrap_or_terminate("cannot fail to append");
let desc = CollectionDescription {
desc: output_desc,
data_source: DataSource::Other { primary },
since: Some(storage_as_of),
status_collection_id: None,
timeline: None,
};
if primary.is_some() {
coord.controller.storage.create_alias(global_id, desc).await;
} else {
coord
.controller
.storage
.create_collections(storage_metadata, None, vec![(global_id, desc)])
.await
.unwrap_or_terminate("cannot fail to append");
}

coord
.initialize_storage_read_policies(
Expand All @@ -722,7 +725,10 @@ impl Coordinator {
notice_builtin_updates_fut,
)
.await;
coord.allow_writes(cluster_id, global_id);

if replacing.is_none() {
coord.allow_writes(cluster_id, global_id);
}
})
})
.await;
Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ Rename
Reoptimize
Repeatable
Replace
Replacing
Replan
Replica
Replicas
Expand Down
1 change: 1 addition & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,7 @@ pub struct CreateMaterializedViewStatement<T: AstInfo> {
pub query: Query<T>,
pub as_of: Option<u64>,
pub with_options: Vec<MaterializedViewOption<T>>,
pub replacing: Option<T::ItemName>,
}

impl<T: AstInfo> AstDisplay for CreateMaterializedViewStatement<T> {
Expand Down
9 changes: 9 additions & 0 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3861,6 +3861,14 @@ impl<'a> Parser<'a> {
}

let name = self.parse_item_name()?;

let replacing = if self.parse_keyword(REPLACING) {
let name = self.parse_raw_name()?;
Some(name)
} else {
None
};

let columns = self.parse_parenthesized_column_list(Optional)?;
let in_cluster = self.parse_optional_in_cluster()?;

Expand All @@ -3886,6 +3894,7 @@ impl<'a> Parser<'a> {
query,
as_of,
with_options,
replacing,
},
))
}
Expand Down
4 changes: 4 additions & 0 deletions src/sql/src/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,14 @@ pub fn create_statement(
query,
with_options: _,
as_of: _,
replacing,
}) => {
*name = allocate_name(name)?;
{
let mut normalizer = QueryNormalizer::new();
if let Some(name) = replacing {
normalizer.visit_item_name_mut(name);
}
normalizer.visit_query_mut(query);
if let Some(err) = normalizer.err {
return Err(err);
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,7 @@ pub struct MaterializedView {
pub compaction_window: Option<CompactionWindow>,
pub refresh_schedule: Option<RefreshSchedule>,
pub as_of: Option<Timestamp>,
pub replacing: Option<CatalogItemId>,
}

#[derive(Clone, Debug)]
Expand Down
10 changes: 10 additions & 0 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2720,6 +2720,14 @@ pub fn plan_create_materialized_view(
scx: &StatementContext,
mut stmt: CreateMaterializedViewStatement<Aug>,
) -> Result<Plan, PlanError> {
let replacing = if let Some(name) = &stmt.replacing {
let item = scx.get_item_by_resolved_name(name)?;
assert_eq!(item.item_type(), CatalogItemType::MaterializedView);
Some(item.id())
} else {
None
};

let cluster_id =
crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
stmt.in_cluster = Some(ResolvedClusterName {
Expand Down Expand Up @@ -2994,6 +3002,7 @@ pub fn plan_create_materialized_view(
compaction_window,
refresh_schedule,
as_of,
replacing,
},
replace,
drop_ids,
Expand Down Expand Up @@ -3243,6 +3252,7 @@ pub fn plan_create_continual_task(
compaction_window: None,
refresh_schedule: None,
as_of,
replacing: None,
},
}))
}
Expand Down
8 changes: 5 additions & 3 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub enum DataSource<T> {
},
/// This source's data does not need to be managed by the storage
/// controller, e.g. it's a materialized view or the catalog collection.
Other,
Other { primary: Option<GlobalId> },
/// This collection is the output collection of a sink.
Sink { desc: ExportDescription<T> },
}
Expand All @@ -158,7 +158,7 @@ impl<T> CollectionDescription<T> {
pub fn for_other(desc: RelationDesc, since: Option<Antichain<T>>) -> Self {
Self {
desc,
data_source: DataSource::Other,
data_source: DataSource::Other { primary: None },
since,
status_collection_id: None,
timeline: None,
Expand Down Expand Up @@ -514,6 +514,8 @@ pub trait StorageController: Debug {
register_ts: Self::Timestamp,
) -> Result<(), StorageError<Self::Timestamp>>;

async fn create_alias(&mut self, id: GlobalId, desc: CollectionDescription<Self::Timestamp>);

/// Acquire an immutable reference to the export state, should it exist.
fn export(
&self,
Expand Down Expand Up @@ -737,7 +739,7 @@ impl<T> DataSource<T> {
pub fn in_txns(&self) -> bool {
match self {
DataSource::Table { .. } => true,
DataSource::Other
DataSource::Other { .. }
| DataSource::Ingestion(_)
| DataSource::IngestionExport { .. }
| DataSource::Introspection(_)
Expand Down
71 changes: 66 additions & 5 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ pub trait StorageCollections: Debug {
expected_version: RelationVersion,
) -> Result<(), StorageError<Self::Timestamp>>;

async fn create_alias(&self, id: GlobalId, desc: CollectionDescription<Self::Timestamp>);

/// Drops the read capability for the sources and allows their resources to
/// be reclaimed.
///
Expand Down Expand Up @@ -916,9 +918,12 @@ where
| DataSource::Webhook
| DataSource::Table { primary: None }
| DataSource::Progress
| DataSource::Other => Vec::new(),
| DataSource::Other { primary: None } => Vec::new(),
DataSource::Table {
primary: Some(primary),
}
| DataSource::Other {
primary: Some(primary),
} => vec![*primary],
DataSource::IngestionExport {
ingestion_id,
Expand Down Expand Up @@ -1346,7 +1351,7 @@ where
let collection = collections.get(&key).expect("must still exist");
let should_emit_persist_compaction = !matches!(
collection.description.data_source,
DataSource::Table { primary: Some(_) }
DataSource::Table { primary: Some(_) } | DataSource::Other { primary: Some(_) }
);

if frontier.is_empty() {
Expand Down Expand Up @@ -1904,7 +1909,7 @@ where
| DataSource::Webhook
| DataSource::Ingestion(_)
| DataSource::Progress
| DataSource::Other => {}
| DataSource::Other { .. } => {}
DataSource::Sink { .. } => {}
DataSource::Table { .. } => {
let register_ts = register_ts.expect(
Expand Down Expand Up @@ -2108,7 +2113,7 @@ where
}
self_collections.insert(id, collection_state);
}
DataSource::Progress | DataSource::Other => {
DataSource::Progress | DataSource::Other { .. } => {
self_collections.insert(id, collection_state);
}
DataSource::Ingestion(_) => {
Expand Down Expand Up @@ -2413,6 +2418,62 @@ where
Ok(())
}

async fn create_alias(&self, id: GlobalId, desc: CollectionDescription<Self::Timestamp>) {
let primary_id = match &desc.data_source {
DataSource::Other { primary: Some(id) } => *id,
_ => panic!("invalid data source"),
};

let data_shard = {
let mut collections = self.collections.lock().unwrap();
let primary = collections.get(&primary_id).unwrap();
let data_shard = primary.collection_metadata.data_shard;

let implied_capability = primary.read_capabilities.frontier().to_owned();
let write_frontier = primary.write_frontier.clone();

let mut changes = ChangeBatch::new();
changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));

let collection_meta = CollectionMetadata {
persist_location: self.persist_location.clone(),
relation_desc: desc.desc.clone(),
data_shard,
txns_shard: None,
};

let collection = CollectionState::new(
desc.clone(),
implied_capability,
write_frontier,
Vec::new(),
collection_meta,
);
collections.insert(id, collection);

let mut updates = BTreeMap::from([(id, changes)]);
StorageCollectionsImpl::update_read_capabilities_inner(
&self.cmd_tx,
&mut *collections,
&mut updates,
);

data_shard
};

let persist_client = self
.persist
.open(self.persist_location.clone())
.await
.unwrap();

let (write_handle, since_handle) = self
.open_data_handles(&id, data_shard, None, desc.desc, &persist_client)
.await;

self.register_handles(id, true, since_handle, write_handle);
}

fn drop_collections_unvalidated(
&self,
storage_metadata: &StorageMetadata,
Expand Down Expand Up @@ -2619,7 +2680,7 @@ where
result = Some(TimeDependence::default())
}
// Materialized views, continual tasks, etc, aren't managed by storage.
Other => {}
Other { .. } => {}
Sink { .. } => {}
};
}
Expand Down
Loading