Skip to content

Commit 49cfb51

Browse files
committed
disconnect client by dropping module
1 parent 7e30080 commit 49cfb51

File tree

7 files changed

+48
-48
lines changed

7 files changed

+48
-48
lines changed

crates/client-api/src/routes/database.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,9 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
664664
)
665665
.into());
666666
}
667-
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {}
667+
UpdateDatabaseResult::NoUpdateNeeded
668+
| UpdateDatabaseResult::UpdatePerformed
669+
| UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => {}
668670
}
669671
}
670672

crates/core/src/db/update.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use super::relational_db::RelationalDB;
22
use crate::database_logger::SystemLogger;
33
use crate::sql::parser::RowLevelExpr;
4-
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
54
use spacetimedb_data_structures::map::HashMap;
65
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
76
use spacetimedb_lib::db::auth::StTableType;
@@ -24,6 +23,13 @@ impl UpdateLogger for SystemLogger {
2423
}
2524
}
2625

26+
/// The result of a database update.
27+
/// Indicates whether clients should be disconnected when the update is complete.
28+
pub enum UpdateResult {
29+
Success,
30+
RequiresClientDisconnect,
31+
}
32+
2733
/// Update the database according to the migration plan.
2834
///
2935
/// The update is performed within the transactional context `tx`.
@@ -36,12 +42,11 @@ impl UpdateLogger for SystemLogger {
3642
// drop_* become transactional.
3743
pub fn update_database(
3844
stdb: &RelationalDB,
39-
subscriptions: &ModuleSubscriptions,
4045
tx: &mut MutTxId,
4146
auth_ctx: AuthCtx,
4247
plan: MigratePlan,
4348
logger: &dyn UpdateLogger,
44-
) -> anyhow::Result<()> {
49+
) -> anyhow::Result<UpdateResult> {
4550
let existing_tables = stdb.get_all_tables_mut(tx)?;
4651

4752
// TODO: consider using `ErrorStream` here.
@@ -59,9 +64,7 @@ pub fn update_database(
5964

6065
match plan {
6166
MigratePlan::Manual(plan) => manual_migrate_database(stdb, tx, plan, logger, existing_tables),
62-
MigratePlan::Auto(plan) => {
63-
auto_migrate_database(stdb, subscriptions, tx, auth_ctx, plan, logger, existing_tables)
64-
}
67+
MigratePlan::Auto(plan) => auto_migrate_database(stdb, tx, auth_ctx, plan, logger, existing_tables),
6568
}
6669
}
6770

@@ -72,7 +75,7 @@ fn manual_migrate_database(
7275
_plan: ManualMigratePlan,
7376
_logger: &dyn UpdateLogger,
7477
_existing_tables: Vec<Arc<TableSchema>>,
75-
) -> anyhow::Result<()> {
78+
) -> anyhow::Result<UpdateResult> {
7679
unimplemented!("Manual database migrations are not yet implemented")
7780
}
7881

@@ -87,13 +90,12 @@ macro_rules! log {
8790
/// Automatically migrate a database.
8891
fn auto_migrate_database(
8992
stdb: &RelationalDB,
90-
subscriptions: &ModuleSubscriptions,
9193
tx: &mut MutTxId,
9294
auth_ctx: AuthCtx,
9395
plan: AutoMigratePlan,
9496
logger: &dyn UpdateLogger,
9597
existing_tables: Vec<Arc<TableSchema>>,
96-
) -> anyhow::Result<()> {
98+
) -> anyhow::Result<UpdateResult> {
9799
// We have already checked in `migrate_database` that `existing_tables` are compatible with the `old` definition in `plan`.
98100
// So we can look up tables in there using unwrap.
99101

@@ -132,6 +134,7 @@ fn auto_migrate_database(
132134
}
133135

134136
log::info!("Running database update steps: {}", stdb.database_identity());
137+
let mut res = UpdateResult::Success;
135138

136139
for step in plan.steps {
137140
match step {
@@ -270,16 +273,16 @@ fn auto_migrate_database(
270273
stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?;
271274
}
272275
spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => {
273-
// Disconnect all clients from subscriptions.
274-
// Any dangling clients will be handled during the launch of module hosts,
275-
// which invokes `ModuleHost::call_identity_disconnected`.
276-
subscriptions.remove_all_subscribers();
276+
log!(logger, "Disconnecting all users");
277+
// It does disconnect clients right away,
278+
// but send response indicated that caller should drop clients
279+
res = UpdateResult::RequiresClientDisconnect;
277280
}
278281
}
279282
}
280283

281284
log::info!("Database update complete");
282-
Ok(())
285+
Ok(res)
283286
}
284287

285288
#[cfg(test)]
@@ -359,14 +362,7 @@ mod test {
359362
// Try to update the db.
360363
let mut tx = begin_mut_tx(&stdb);
361364
let plan = ponder_migrate(&old, &new)?;
362-
update_database(
363-
&stdb,
364-
&ModuleSubscriptions::for_test_new_runtime(Arc::new(stdb.db.clone())).0,
365-
&mut tx,
366-
auth_ctx,
367-
plan,
368-
&TestLogger,
369-
)?;
365+
update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;
370366

371367
// Expect the schema change.
372368
let idx_b_id = stdb

crates/core/src/host/host_controller.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::worker_metrics::WORKER_METRICS;
1919
use anyhow::{anyhow, Context};
2020
use async_trait::async_trait;
2121
use durability::{Durability, EmptyHistory};
22-
use log::{info, trace, warn};
22+
use log::{debug, info, trace, warn};
2323
use parking_lot::Mutex;
2424
use spacetimedb_data_structures::error_stream::ErrorStream;
2525
use spacetimedb_data_structures::map::IntMap;
@@ -414,7 +414,14 @@ impl HostController {
414414
)
415415
.await?;
416416

417-
*guard = Some(host);
417+
// If hotswap is disabled, we drop the host after the update.
418+
// which will drop all connected clients.
419+
if !update_result.hotswap_disabled() {
420+
*guard = Some(host);
421+
} else {
422+
debug!("dropping host after update with hotswap disabled");
423+
}
424+
418425
Ok::<_, anyhow::Error>(update_result)
419426
})
420427
.await??;
@@ -977,7 +984,7 @@ impl Host {
977984
trace!("update result: {update_result:?}");
978985
// Only replace the module + scheduler if the update succeeded.
979986
// Otherwise, we want the database to continue running with the old state.
980-
if update_result.was_successful() {
987+
if update_result.was_successful() && !update_result.hotswap_disabled() {
981988
self.scheduler = scheduler;
982989
scheduler_starter.start(&module)?;
983990
let old_module = self.module.send_replace(module);

crates/core/src/host/module_host.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ pub struct WeakModuleHost {
506506
pub enum UpdateDatabaseResult {
507507
NoUpdateNeeded,
508508
UpdatePerformed,
509+
UpdatePerformedWithClientDisconnect,
509510
AutoMigrateError(ErrorStream<AutoMigrateError>),
510511
ErrorExecutingMigration(anyhow::Error),
511512
}
@@ -514,9 +515,16 @@ impl UpdateDatabaseResult {
514515
pub fn was_successful(&self) -> bool {
515516
matches!(
516517
self,
517-
UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded
518+
UpdateDatabaseResult::UpdatePerformed
519+
| UpdateDatabaseResult::NoUpdateNeeded
520+
| UpdateDatabaseResult::UpdatePerformedWithClientDisconnect
518521
)
519522
}
523+
524+
/// Check if hotswap was disabled due to the update.
525+
pub fn hotswap_disabled(&self) -> bool {
526+
matches!(self, UpdateDatabaseResult::UpdatePerformedWithClientDisconnect)
527+
}
520528
}
521529

522530
#[derive(thiserror::Error, Debug)]

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -306,14 +306,7 @@ impl InstanceCommon {
306306
system_logger.info(&format!("Updated program to {program_hash}"));
307307

308308
let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
309-
let res = crate::db::update::update_database(
310-
stdb,
311-
&replica_ctx.subscriptions,
312-
&mut tx,
313-
auth_ctx,
314-
plan,
315-
system_logger,
316-
);
309+
let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, system_logger);
317310

318311
match res {
319312
Err(e) => {
@@ -323,13 +316,18 @@ impl InstanceCommon {
323316
stdb.report_mut_tx_metrics(reducer, tx_metrics, None);
324317
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
325318
}
326-
Ok(()) => {
319+
Ok(res) => {
327320
if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
328321
stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
329322
}
330323
system_logger.info("Database updated");
331324
log::info!("Database updated, {}", stdb.database_identity());
332-
Ok(UpdateDatabaseResult::UpdatePerformed)
325+
match res {
326+
crate::db::update::UpdateResult::Success => Ok(UpdateDatabaseResult::UpdatePerformed),
327+
crate::db::update::UpdateResult::RequiresClientDisconnect => {
328+
Ok(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect)
329+
}
330+
}
333331
}
334332
}
335333
}

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -853,11 +853,6 @@ impl ModuleSubscriptions {
853853
subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id));
854854
}
855855

856-
pub fn remove_all_subscribers(&self) {
857-
let mut subscriptions = self.subscriptions.write();
858-
subscriptions.remove_all_clients();
859-
}
860-
861856
/// Commit a transaction and broadcast its ModuleEvent to all interested subscribers.
862857
///
863858
/// The returned [`ExecutionMetrics`] are reported in this method via `report_tx_metrics`.

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -779,12 +779,6 @@ impl SubscriptionManager {
779779
}
780780
}
781781

782-
pub fn remove_all_clients(&mut self) {
783-
for id in self.clients.keys().copied().collect::<Vec<_>>() {
784-
self.remove_all_subscriptions(&id);
785-
}
786-
}
787-
788782
/// Remove a single subscription for a client.
789783
/// This will return an error if the client does not have a subscription with the given query id.
790784
pub fn remove_subscription(&mut self, client_id: ClientId, query_id: ClientQueryId) -> Result<Vec<Query>, DBError> {

0 commit comments

Comments
 (0)