diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index 7f0b835c2ab91..0914607cae824 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -1885,6 +1885,8 @@ fn sort_updates_inner(updates: Vec) -> Vec { // Partition items into groups s.t. each item in one group has a predefined order with all // items in other groups. For example, all sinks are ordered greater than all tables. let mut types = Vec::new(); + // N.B. Functions can depend on system tables, but not user tables. + let mut funcs = Vec::new(); let mut secrets = Vec::new(); let mut connections = Vec::new(); let mut sources = Vec::new(); @@ -1896,14 +1898,14 @@ fn sort_updates_inner(updates: Vec) -> Vec { for update in item_updates { match update.0.item_type() { CatalogItemType::Type => types.push(update), + CatalogItemType::Func => funcs.push(update), CatalogItemType::Secret => secrets.push(update), CatalogItemType::Connection => connections.push(update), CatalogItemType::Source => sources.push(update), CatalogItemType::Table => tables.push(update), CatalogItemType::View | CatalogItemType::MaterializedView - | CatalogItemType::Index - | CatalogItemType::Func => derived_items.push(update), + | CatalogItemType::Index => derived_items.push(update), CatalogItemType::Sink => sinks.push(update), CatalogItemType::ContinualTask => continual_tasks.push(update), } @@ -1912,6 +1914,7 @@ fn sort_updates_inner(updates: Vec) -> Vec { // Within each group, sort by ID. for group in [ &mut types, + &mut funcs, &mut secrets, &mut connections, &mut sources, @@ -1925,6 +1928,7 @@ fn sort_updates_inner(updates: Vec) -> Vec { iter::empty() .chain(types) + .chain(funcs) .chain(secrets) .chain(connections) .chain(sources) @@ -1945,6 +1949,8 @@ fn sort_updates_inner(updates: Vec) -> Vec { // Partition items into groups s.t. each item in one group has a predefined order with all // items in other groups. For example, all sinks are ordered greater than all tables. let mut types = Vec::new(); + // N.B. Functions can depend on system tables, but not user tables. + let mut funcs = Vec::new(); let mut secrets = Vec::new(); let mut connections = Vec::new(); let mut sources = Vec::new(); @@ -1956,14 +1962,14 @@ fn sort_updates_inner(updates: Vec) -> Vec { for update in temp_item_updates { match update.0.item.typ() { CatalogItemType::Type => types.push(update), + CatalogItemType::Func => funcs.push(update), CatalogItemType::Secret => secrets.push(update), CatalogItemType::Connection => connections.push(update), CatalogItemType::Source => sources.push(update), CatalogItemType::Table => tables.push(update), CatalogItemType::View | CatalogItemType::MaterializedView - | CatalogItemType::Index - | CatalogItemType::Func => derived_items.push(update), + | CatalogItemType::Index => derived_items.push(update), CatalogItemType::Sink => sinks.push(update), CatalogItemType::ContinualTask => continual_tasks.push(update), } @@ -1972,6 +1978,7 @@ fn sort_updates_inner(updates: Vec) -> Vec { // Within each group, sort by ID. for group in [ &mut types, + &mut funcs, &mut secrets, &mut connections, &mut sources, @@ -1985,6 +1992,7 @@ fn sort_updates_inner(updates: Vec) -> Vec { iter::empty() .chain(types) + .chain(funcs) .chain(secrets) .chain(connections) .chain(sources) diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index fd9a014d879b3..bfffb75ed53da 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -518,6 +518,7 @@ impl Catalog { // Include any post-item-updates generated by migrations, and then consolidate // them to ensure diffs are all positive. post_item_updates.extend(migrate_result.post_item_updates); + // Push everything to the same timestamp so it consolidates cleanly. if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() { for (_, ts, _) in &mut post_item_updates { *ts = max_ts;