Skip to content

Commit ec9f02a

Browse files
committed
update: disconnect clients
1 parent 00ff271 commit ec9f02a

File tree

4 files changed

+39
-4
lines changed

4 files changed

+39
-4
lines changed

crates/core/src/db/update.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::relational_db::RelationalDB;
22
use crate::database_logger::SystemLogger;
33
use crate::sql::parser::RowLevelExpr;
4+
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
45
use spacetimedb_data_structures::map::HashMap;
56
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
67
use spacetimedb_lib::db::auth::StTableType;
@@ -35,6 +36,7 @@ impl UpdateLogger for SystemLogger {
3536
// drop_* become transactional.
3637
pub fn update_database(
3738
stdb: &RelationalDB,
39+
subscriptions: &ModuleSubscriptions,
3840
tx: &mut MutTxId,
3941
auth_ctx: AuthCtx,
4042
plan: MigratePlan,
@@ -57,7 +59,9 @@ pub fn update_database(
5759

5860
match plan {
5961
MigratePlan::Manual(plan) => manual_migrate_database(stdb, tx, plan, logger, existing_tables),
60-
MigratePlan::Auto(plan) => auto_migrate_database(stdb, tx, auth_ctx, plan, logger, existing_tables),
62+
MigratePlan::Auto(plan) => {
63+
auto_migrate_database(stdb, subscriptions, tx, auth_ctx, plan, logger, existing_tables)
64+
}
6165
}
6266
}
6367

@@ -83,6 +87,7 @@ macro_rules! log {
8387
/// Automatically migrate a database.
8488
fn auto_migrate_database(
8589
stdb: &RelationalDB,
90+
subscriptions: &ModuleSubscriptions,
8691
tx: &mut MutTxId,
8792
auth_ctx: AuthCtx,
8893
plan: AutoMigratePlan,
@@ -264,7 +269,12 @@ fn auto_migrate_database(
264269
.collect();
265270
stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?;
266271
}
267-
_ => anyhow::bail!("migration step not implemented: {step:?}"),
272+
spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => {
273+
// Disconnect all clients from subscriptions.
274+
// Any dangling clients will be handled during the launch of module hosts,
275+
// which invokes `ModuleHost::call_identity_disconnected`.
276+
subscriptions.remove_all_subscribers();
277+
}
268278
}
269279
}
270280

@@ -349,7 +359,14 @@ mod test {
349359
// Try to update the db.
350360
let mut tx = begin_mut_tx(&stdb);
351361
let plan = ponder_migrate(&old, &new)?;
352-
update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;
362+
update_database(
363+
&stdb,
364+
&ModuleSubscriptions::for_test_new_runtime(Arc::new(stdb.db.clone())).0,
365+
&mut tx,
366+
auth_ctx,
367+
plan,
368+
&TestLogger,
369+
)?;
353370

354371
// Expect the schema change.
355372
let idx_b_id = stdb

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,14 @@ impl InstanceCommon {
306306
system_logger.info(&format!("Updated program to {program_hash}"));
307307

308308
let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
309-
let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, system_logger);
309+
let res = crate::db::update::update_database(
310+
stdb,
311+
&replica_ctx.subscriptions,
312+
&mut tx,
313+
auth_ctx,
314+
plan,
315+
system_logger,
316+
);
310317

311318
match res {
312319
Err(e) => {

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,11 @@ impl ModuleSubscriptions {
853853
subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id));
854854
}
855855

856+
pub fn remove_all_subscribers(&self) {
857+
let mut subscriptions = self.subscriptions.write();
858+
subscriptions.remove_all_clients();
859+
}
860+
856861
/// Commit a transaction and broadcast its ModuleEvent to all interested subscribers.
857862
///
858863
/// The returned [`ExecutionMetrics`] are reported in this method via `report_tx_metrics`.

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,12 @@ impl SubscriptionManager {
779779
}
780780
}
781781

782+
pub fn remove_all_clients(&mut self) {
783+
for id in self.clients.keys().copied().collect::<Vec<_>>() {
784+
self.remove_all_subscriptions(&id);
785+
}
786+
}
787+
782788
/// Remove a single subscription for a client.
783789
/// This will return an error if the client does not have a subscription with the given query id.
784790
pub fn remove_subscription(&mut self, client_id: ClientId, query_id: ClientQueryId) -> Result<Vec<Query>, DBError> {

0 commit comments

Comments
 (0)