Skip to content

Commit

Permalink
Update item sorting to only sort within item groups
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 committed Dec 17, 2024
1 parent 846b8da commit 396368e
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 165 deletions.
209 changes: 104 additions & 105 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use mz_compute_client::controller::ComputeReplicaConfig;
use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
use mz_controller_types::ClusterId;
use mz_expr::MirScalarExpr;
use mz_ore::collections::{CollectionExt, HashMap};
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{instrument, soft_assert_no_log};
use mz_pgrepr::oid::INVALID_OID;
Expand Down Expand Up @@ -101,7 +100,7 @@ impl CatalogState {
local_expression_cache: &mut LocalExpressionCache,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
let mut builtin_table_updates = Vec::with_capacity(updates.len());
let updates = sort_updates(updates, self);
let updates = sort_updates(updates);

let mut groups: Vec<Vec<_>> = Vec::new();
for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
Expand Down Expand Up @@ -143,7 +142,7 @@ impl CatalogState {
updates: Vec<StateUpdate>,
) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
let mut builtin_table_updates = Vec::with_capacity(updates.len());
let updates = sort_updates(updates, self);
let updates = sort_updates(updates);

for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
let mut retractions = InProgressRetractions::default();
Expand Down Expand Up @@ -1733,20 +1732,20 @@ impl CatalogState {
}

/// Sort [`StateUpdate`]s in timestamp then dependency order
fn sort_updates(mut updates: Vec<StateUpdate>, state: &CatalogState) -> Vec<StateUpdate> {
fn sort_updates(mut updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
let mut sorted_updates = Vec::with_capacity(updates.len());

updates.sort_by_key(|update| update.ts);
for (_, updates) in &updates.into_iter().group_by(|update| update.ts) {
let sorted_ts_updates = sort_updates_inner(updates.collect(), state);
let sorted_ts_updates = sort_updates_inner(updates.collect());
sorted_updates.extend(sorted_ts_updates);
}

sorted_updates
}

/// Sort [`StateUpdate`]s in dependency order for a single timestamp.
fn sort_updates_inner(updates: Vec<StateUpdate>, state: &CatalogState) -> Vec<StateUpdate> {
fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
fn push_update<T>(
update: T,
diff: StateDiff,
Expand Down Expand Up @@ -1882,117 +1881,117 @@ fn sort_updates_inner(updates: Vec<StateUpdate>, state: &CatalogState) -> Vec<St
/// this set of updates and then performing a topological sort.
fn sort_item_updates(
item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
state: &CatalogState,
) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
let mut item_id_lookup: HashMap<_, HashMap<_, _>> = HashMap::new();
for (item, _, _) in &item_updates {
item_id_lookup
.entry(item.schema_id)
.or_insert_with(HashMap::new)
.insert(item.name.clone(), item.global_id);
}

let mut items_with_dependencies = item_updates
.into_iter()
.map(|(item, ts, diff)| {
let parsed = mz_sql::parse::parse(&item.create_sql)
.expect("failed to parse persisted SQL")
.into_element()
.ast;
let (mut id_deps, name_deps) = mz_sql::names::raw_item_dependency_ids(&parsed);

// Convert named deps into ID deps. Ideally this is empty and all dependencies are
// specified by ID. However, this is not the case and require some changes and a
// migration to fix.
for name in name_deps {
let (db, schema, item) = match name.0.len() {
3 => (
Some(name.0[0].as_str()),
name.0[1].as_str(),
name.0[2].as_str(),
),
2 => (None, name.0[0].as_str(), name.0[1].as_str()),
// This must be a CTE.
_ => continue,
};
let schema = state
.resolve_schema(None, db, schema, &SYSTEM_CONN_ID)
.expect("schema must be loaded before an item");
// If `name` is not also being applied in this batch then the relative order of
// `item` and `name` doesn't matter, so we can ignore it.
let schema_id = match schema.id {
SchemaSpecifier::Id(id) => id,
SchemaSpecifier::Temporary => {
panic!("temporary item {name:?} persisted as dependency of {item:?}")
}
};
if let Some(ids) = item_id_lookup.get(&schema_id) {
if let Some(id) = ids.get(item) {
id_deps.insert(*id);
}
}
}

(item.global_id, (id_deps, (item, ts, diff)))
})
.collect::<BTreeMap<_, _>>();
let mut visited = BTreeSet::new();
let mut sorted = Vec::new();
fn dfs(
id: GlobalId,
visited: &mut BTreeSet<GlobalId>,
sorted: &mut Vec<GlobalId>,
items_with_dependencies: &BTreeMap<
GlobalId,
(
BTreeSet<GlobalId>,
(mz_catalog::durable::Item, Timestamp, StateDiff),
),
>,
) {
visited.insert(id);
let deps = items_with_dependencies
.get(&id)
.map(|(deps, _)| deps)
.expect("item should be in the map");
for dep in deps {
// We only want to visit dependencies that are in the current set of updates.
if !visited.contains(dep) && items_with_dependencies.contains_key(dep) {
dfs(*dep, visited, sorted, items_with_dependencies);
}
// Partition items into groups s.t. each item in one group has a predefined order with all
// items in other groups. For example, all sinks are ordered greater than all tables.
let mut types = Vec::new();
let mut secrets = Vec::new();
let mut connections = Vec::new();
let mut sources = Vec::new();
let mut tables = Vec::new();
let mut derived_items = Vec::new();
let mut sinks = Vec::new();
let mut continual_tasks = Vec::new();

for update in item_updates {
match update.0.item_type() {
CatalogItemType::Type => types.push(update),
CatalogItemType::Secret => secrets.push(update),
CatalogItemType::Connection => connections.push(update),
CatalogItemType::Source => sources.push(update),
CatalogItemType::Table => tables.push(update),
CatalogItemType::View
| CatalogItemType::MaterializedView
| CatalogItemType::Index
| CatalogItemType::Func => derived_items.push(update),
CatalogItemType::Sink => sinks.push(update),
CatalogItemType::ContinualTask => continual_tasks.push(update),
}
sorted.push(id);
}
for id in items_with_dependencies.keys() {
if !visited.contains(id) {
dfs(*id, &mut visited, &mut sorted, &items_with_dependencies);
}

// Within each group, sort by ID.
for group in [
&mut types,
&mut secrets,
&mut connections,
&mut sources,
&mut tables,
&mut derived_items,
&mut sinks,
&mut continual_tasks,
] {
group.sort_by_key(|(item, _, _)| item.id);
}
// return the values from items_with_dependencies in the order of sorted
sorted
.into_iter()
.filter_map(|id| items_with_dependencies.remove(&id))
.map(|item| item.1)

iter::empty()
.chain(types)
.chain(secrets)
.chain(connections)
.chain(sources)
.chain(tables)
.chain(derived_items)
.chain(sinks)
.chain(continual_tasks)
.collect()
}

let item_retractions = sort_item_updates(item_retractions, state);
let item_additions = sort_item_updates(item_additions, state);
let item_retractions = sort_item_updates(item_retractions);
let item_additions = sort_item_updates(item_additions);

/// Sort temporary item updates by GlobalId.
fn sort_temp_item_updates(
temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
temp_item_updates
.into_iter()
// HACK: due to `ALTER SINK`, sinks can appear before the objects they
// depend upon. Fortunately, because sinks can never have dependencies
// and can never depend upon one another, to fix the topological sort,
// we can just always move sinks to the end.
.sorted_by_key(|(item, _ts, _diff)| match item.item.typ() {
CatalogItemType::Sink => CatalogItemId::User(u64::MAX),
_ => item.id,
})
// Partition items into groups s.t. each item in one group has a predefined order with all
// items in other groups. For example, all sinks are ordered greater than all tables.
let mut types = Vec::new();
let mut secrets = Vec::new();
let mut connections = Vec::new();
let mut sources = Vec::new();
let mut tables = Vec::new();
let mut derived_items = Vec::new();
let mut sinks = Vec::new();
let mut continual_tasks = Vec::new();

for update in temp_item_updates {
match update.0.item.typ() {
CatalogItemType::Type => types.push(update),
CatalogItemType::Secret => secrets.push(update),
CatalogItemType::Connection => connections.push(update),
CatalogItemType::Source => sources.push(update),
CatalogItemType::Table => tables.push(update),
CatalogItemType::View
| CatalogItemType::MaterializedView
| CatalogItemType::Index
| CatalogItemType::Func => derived_items.push(update),
CatalogItemType::Sink => sinks.push(update),
CatalogItemType::ContinualTask => continual_tasks.push(update),
}
}

// Within each group, sort by ID.
for group in [
&mut types,
&mut secrets,
&mut connections,
&mut sources,
&mut tables,
&mut derived_items,
&mut sinks,
&mut continual_tasks,
] {
group.sort_by_key(|(item, _, _)| item.id);
}

iter::empty()
.chain(types)
.chain(secrets)
.chain(connections)
.chain(sources)
.chain(tables)
.chain(derived_items)
.chain(sinks)
.chain(continual_tasks)
.collect()
}
let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
Expand Down
60 changes: 35 additions & 25 deletions src/catalog/src/durable/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,12 @@ pub struct Item {
pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
}

impl Item {
pub fn item_type(&self) -> CatalogItemType {
item_type(&self.create_sql)
}
}

impl DurableType for Item {
type Key = ItemKey;
type Value = ItemValue;
Expand Down Expand Up @@ -1289,32 +1295,36 @@ pub struct ItemValue {
}

impl ItemValue {
pub(crate) fn item_type(&self) -> CatalogItemType {
// NOTE(benesch): the implementation of this method is hideous, but is
// there a better alternative? Storing the object type alongside the
// `create_sql` would introduce the possibility of skew.
let mut tokens = self.create_sql.split_whitespace();
assert_eq!(tokens.next(), Some("CREATE"));
match tokens.next() {
Some("TABLE") => CatalogItemType::Table,
Some("SOURCE") | Some("SUBSOURCE") => CatalogItemType::Source,
Some("SINK") => CatalogItemType::Sink,
Some("VIEW") => CatalogItemType::View,
Some("MATERIALIZED") => {
assert_eq!(tokens.next(), Some("VIEW"));
CatalogItemType::MaterializedView
}
Some("CONTINUAL") => {
assert_eq!(tokens.next(), Some("TASK"));
CatalogItemType::ContinualTask
}
Some("INDEX") => CatalogItemType::Index,
Some("TYPE") => CatalogItemType::Type,
Some("FUNCTION") => CatalogItemType::Func,
Some("SECRET") => CatalogItemType::Secret,
Some("CONNECTION") => CatalogItemType::Connection,
_ => panic!("unexpected create sql: {}", self.create_sql),
pub fn item_type(&self) -> CatalogItemType {
item_type(&self.create_sql)
}
}

fn item_type(create_sql: &str) -> CatalogItemType {
// NOTE(benesch): the implementation of this method is hideous, but is
// there a better alternative? Storing the object type alongside the
// `create_sql` would introduce the possibility of skew.
let mut tokens = create_sql.split_whitespace();
assert_eq!(tokens.next(), Some("CREATE"));
match tokens.next() {
Some("TABLE") => CatalogItemType::Table,
Some("SOURCE") | Some("SUBSOURCE") => CatalogItemType::Source,
Some("SINK") => CatalogItemType::Sink,
Some("VIEW") => CatalogItemType::View,
Some("MATERIALIZED") => {
assert_eq!(tokens.next(), Some("VIEW"));
CatalogItemType::MaterializedView
}
Some("CONTINUAL") => {
assert_eq!(tokens.next(), Some("TASK"));
CatalogItemType::ContinualTask
}
Some("INDEX") => CatalogItemType::Index,
Some("TYPE") => CatalogItemType::Type,
Some("FUNCTION") => CatalogItemType::Func,
Some("SECRET") => CatalogItemType::Secret,
Some("CONNECTION") => CatalogItemType::Connection,
_ => panic!("unexpected create sql: {}", create_sql),
}
}

Expand Down
35 changes: 0 additions & 35 deletions src/sql/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2352,41 +2352,6 @@ where
ResolvedIds::new(visitor.ids)
}

#[derive(Debug)]
struct RawItemDependencyIds<'a> {
ids: BTreeSet<GlobalId>,
names: BTreeSet<&'a UnresolvedItemName>,
}

impl<'ast> Visit<'ast, Raw> for RawItemDependencyIds<'ast> {
fn visit_item_name(&mut self, item_name: &'ast RawItemName) {
match item_name {
RawItemName::Name(name) => {
self.names.insert(name);
}
RawItemName::Id(id, _, _) => {
let parsed_id = id.parse::<GlobalId>().unwrap();
self.ids.insert(parsed_id);
}
}
}
}

/// Collect any dependencies of the provided raw AST node.
pub fn raw_item_dependency_ids<'ast, N>(
node: &'ast N,
) -> (BTreeSet<GlobalId>, BTreeSet<&'ast UnresolvedItemName>)
where
N: VisitNode<'ast, Raw>,
{
let mut deps = RawItemDependencyIds {
ids: BTreeSet::new(),
names: BTreeSet::new(),
};
node.visit(&mut deps);
(deps.ids, deps.names)
}

#[derive(Debug)]
pub struct ItemDependencyModifier<'a> {
pub modified: bool,
Expand Down

0 comments on commit 396368e

Please sign in to comment.