Skip to content

Commit

Permalink
Fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 committed Jan 2, 2025
1 parent c4d89a9 commit dc960f3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
16 changes: 12 additions & 4 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
// Partition items into groups s.t. each item in one group has a predefined order with all
// items in other groups. For example, all sinks are ordered greater than all tables.
let mut types = Vec::new();
// N.B. Functions can depend on system tables, but not user tables.
let mut funcs = Vec::new();
let mut secrets = Vec::new();
let mut connections = Vec::new();
let mut sources = Vec::new();
Expand All @@ -1896,14 +1898,14 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
for update in item_updates {
match update.0.item_type() {
CatalogItemType::Type => types.push(update),
CatalogItemType::Func => funcs.push(update),
CatalogItemType::Secret => secrets.push(update),
CatalogItemType::Connection => connections.push(update),
CatalogItemType::Source => sources.push(update),
CatalogItemType::Table => tables.push(update),
CatalogItemType::View
| CatalogItemType::MaterializedView
| CatalogItemType::Index
| CatalogItemType::Func => derived_items.push(update),
| CatalogItemType::Index => derived_items.push(update),
CatalogItemType::Sink => sinks.push(update),
CatalogItemType::ContinualTask => continual_tasks.push(update),
}
Expand All @@ -1912,6 +1914,7 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
// Within each group, sort by ID.
for group in [
&mut types,
&mut funcs,
&mut secrets,
&mut connections,
&mut sources,
Expand All @@ -1925,6 +1928,7 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {

iter::empty()
.chain(types)
.chain(funcs)
.chain(secrets)
.chain(connections)
.chain(sources)
Expand All @@ -1945,6 +1949,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
// Partition items into groups s.t. each item in one group has a predefined order with all
// items in other groups. For example, all sinks are ordered greater than all tables.
let mut types = Vec::new();
// N.B. Functions can depend on system tables, but not user tables.
let mut funcs = Vec::new();
let mut secrets = Vec::new();
let mut connections = Vec::new();
let mut sources = Vec::new();
Expand All @@ -1956,14 +1962,14 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
for update in temp_item_updates {
match update.0.item.typ() {
CatalogItemType::Type => types.push(update),
CatalogItemType::Func => funcs.push(update),
CatalogItemType::Secret => secrets.push(update),
CatalogItemType::Connection => connections.push(update),
CatalogItemType::Source => sources.push(update),
CatalogItemType::Table => tables.push(update),
CatalogItemType::View
| CatalogItemType::MaterializedView
| CatalogItemType::Index
| CatalogItemType::Func => derived_items.push(update),
| CatalogItemType::Index => derived_items.push(update),
CatalogItemType::Sink => sinks.push(update),
CatalogItemType::ContinualTask => continual_tasks.push(update),
}
Expand All @@ -1972,6 +1978,7 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
// Within each group, sort by ID.
for group in [
&mut types,
&mut funcs,
&mut secrets,
&mut connections,
&mut sources,
Expand All @@ -1985,6 +1992,7 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {

iter::empty()
.chain(types)
.chain(funcs)
.chain(secrets)
.chain(connections)
.chain(sources)
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ impl Catalog {
// Include any post-item-updates generated by migrations, and then consolidate
// them to ensure diffs are all positive.
post_item_updates.extend(migrate_result.post_item_updates);
// Push everything to the same timestamp so it consolidates cleanly.
if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
for (_, ts, _) in &mut post_item_updates {
*ts = max_ts;
Expand Down

0 comments on commit dc960f3

Please sign in to comment.