Skip to content

Commit 7024aac

Browse files
committed
add graph
1 parent 45b09a9 commit 7024aac

File tree

3 files changed

+34
-1
lines changed

3 files changed

+34
-1
lines changed

dashboard/lib/api/streaming.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ export function relationType(x: Relation) {
6767
return "SINK"
6868
} else if ((x as Source).info !== undefined) {
6969
return "SOURCE"
70-
} else {
70+
} else if ((x as Subscription).dependentTableId !== undefined) {
71+
return "SUBSCRIPTION"
72+
}
73+
else {
7174
return "UNKNOWN"
7275
}
7376
}

src/meta/src/controller/catalog.rs

+23
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,29 @@ impl CatalogController {
581581
}
582582
}));
583583

584+
let subscription_dependencies: Vec<(SubscriptionId, TableId)> = Sink::find()
585+
.select_only()
586+
.columns([
587+
subscription::Column::SubscriptionId,
588+
subscription::Column::DependentTableId,
589+
])
590+
.join(JoinType::InnerJoin, sink::Relation::Object.def())
591+
.filter(
592+
subscription::Column::SubscriptionState
593+
.eq(Into::<i32>::into(SubscriptionState::Created))
594+
.and(subscription::Column::DependentTableId.is_not_null()),
595+
)
596+
.into_tuple()
597+
.all(&inner.db)
598+
.await?;
599+
600+
obj_dependencies.extend(subscription_dependencies.into_iter().map(
601+
|(sink_id, table_id)| PbObjectDependencies {
602+
object_id: table_id as _,
603+
referenced_object_id: sink_id as _,
604+
},
605+
));
606+
584607
Ok(obj_dependencies)
585608
}
586609

src/meta/src/manager/catalog/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -3698,6 +3698,13 @@ impl CatalogManager {
36983698
}
36993699
}
37003700

3701+
for subscription in core.subscriptions.values() {
3702+
dependencies.push(PbObjectDependencies {
3703+
object_id: subscription.id,
3704+
referenced_object_id: subscription.dependent_table_id,
3705+
});
3706+
}
3707+
37013708
dependencies
37023709
}
37033710

0 commit comments

Comments
 (0)