|
| 1 | +use std::{io, sync::Arc}; |
| 2 | + |
| 3 | +use async_trait::async_trait; |
| 4 | +use spacetimedb_durability::{DurabilityExited, TxOffset}; |
| 5 | +use spacetimedb_paths::server::ServerDataDir; |
| 6 | +use spacetimedb_snapshot::SnapshotRepository; |
| 7 | + |
| 8 | +use crate::{messages::control_db::Database, util::asyncify}; |
| 9 | + |
| 10 | +use super::{ |
| 11 | + relational_db::{self, Txdata}, |
| 12 | + snapshot::{SnapshotDatabaseState, SnapshotWorker}, |
| 13 | +}; |
| 14 | + |
| 15 | +/// [spacetimedb_durability::Durability] impls with a [`Txdata`] transaction |
| 16 | +/// payload, suitable for use in the [`relational_db::RelationalDB`]. |
| 17 | +pub type Durability = dyn spacetimedb_durability::Durability<TxData = Txdata>; |
| 18 | + |
| 19 | +/// A function to determine the size on disk of the durable state of the |
| 20 | +/// local database instance. This is used for metrics and energy accounting |
| 21 | +/// purposes. |
| 22 | +/// |
| 23 | +/// It is not part of the [`Durability`] trait because it must report disk |
| 24 | +/// usage of the local instance only, even if exclusively remote durability is |
| 25 | +/// configured or the database is in follower state. |
| 26 | +pub type DiskSizeFn = Arc<dyn Fn() -> io::Result<u64> + Send + Sync>; |
| 27 | + |
| 28 | +/// Persistence services for a database. |
| 29 | +pub struct Persistence { |
| 30 | + /// The [Durability] to use, for persisting transactions. |
| 31 | + pub durability: Arc<Durability>, |
| 32 | + /// The [DiskSizeFn]. |
| 33 | + /// |
| 34 | + /// Currently the expectation is that the reported size is the commitlog |
| 35 | + /// size only. |
| 36 | + pub disk_size: DiskSizeFn, |
| 37 | + /// An optional [SnapshotWorker]. |
| 38 | + /// |
| 39 | + /// The current expectation is that snapshots are only enabled for |
| 40 | + /// persistent (as opposed to in-memory) databases. This is enforced by |
| 41 | + /// this type. |
| 42 | + pub snapshots: Option<SnapshotWorker>, |
| 43 | +} |
| 44 | + |
| 45 | +impl Persistence { |
| 46 | + /// Convenience constructor of a [Persistence] that handles boxing. |
| 47 | + pub fn new( |
| 48 | + durability: impl spacetimedb_durability::Durability<TxData = Txdata> + 'static, |
| 49 | + disk_size: impl Fn() -> io::Result<u64> + Send + Sync + 'static, |
| 50 | + snapshots: Option<SnapshotWorker>, |
| 51 | + ) -> Self { |
| 52 | + Self { |
| 53 | + durability: Arc::new(durability), |
| 54 | + disk_size: Arc::new(disk_size), |
| 55 | + snapshots, |
| 56 | + } |
| 57 | + } |
| 58 | + |
| 59 | + /// If snapshots are enabled, get the [SnapshotRepository] they are stored in. |
| 60 | + pub fn snapshot_repo(&self) -> Option<&SnapshotRepository> { |
| 61 | + self.snapshots.as_ref().map(|worker| worker.repo()) |
| 62 | + } |
| 63 | + |
| 64 | + /// Get the [TxOffset] reported as durable by the [Durability] impl. |
| 65 | + /// |
| 66 | + /// Returns `Ok(None)` if no offset is durable yet, and `Err(DurabilityExited)` |
| 67 | + /// if the [Durability] has shut down already. |
| 68 | + pub fn durable_tx_offset(&self) -> Result<Option<TxOffset>, DurabilityExited> { |
| 69 | + self.durability.durable_tx_offset().get() |
| 70 | + } |
| 71 | + |
| 72 | + /// Initialize the [SnapshotWorker], no-op if snapshots are not enabled. |
| 73 | + pub(super) fn set_snapshot_state(&self, state: SnapshotDatabaseState) { |
| 74 | + if let Some(worker) = &self.snapshots { |
| 75 | + worker.start(state) |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + /// Convenience to deconstruct an [Option<Self>] into parts. |
| 80 | + /// |
| 81 | + /// Returns `(Some(durability), Some(disk_size), Option<SnapshotWorker>)` |
| 82 | + /// if `this` is `Some`, and `(None, None, None)` if `this` is `None`. |
| 83 | + pub(super) fn unzip(this: Option<Self>) -> (Option<Arc<Durability>>, Option<DiskSizeFn>, Option<SnapshotWorker>) { |
| 84 | + this.map( |
| 85 | + |Self { |
| 86 | + durability, |
| 87 | + disk_size, |
| 88 | + snapshots, |
| 89 | + }| (Some(durability), Some(disk_size), snapshots), |
| 90 | + ) |
| 91 | + .unwrap_or_default() |
| 92 | + } |
| 93 | +} |
| 94 | + |
| 95 | +/// A persistence provider is a "factory" of sorts that can produce [Persistence] |
| 96 | +/// services for a given replica. |
| 97 | +/// |
| 98 | +/// The [crate::host::HostController] uses this to obtain [Persistence]s from |
| 99 | +/// an external source, and construct [relational_db::RelationalDB]s with it. |
| 100 | +/// |
| 101 | +/// This is an `async_trait` to allow it to be used as a trait object. |
| 102 | +#[async_trait] |
| 103 | +pub trait PersistenceProvider: Send + Sync { |
| 104 | + async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result<Persistence>; |
| 105 | +} |
| 106 | + |
| 107 | +/// The standard [PersistenceProvider] for non-replicated databases. |
| 108 | +/// |
| 109 | +/// [Persistence] services are provided for the local [ServerDataDir]. |
| 110 | +/// |
| 111 | +/// Note that its [PersistenceProvider::persistence] impl will spawn a |
| 112 | +/// background task that [compresses] older commitlog segments whenever a |
| 113 | +/// snapshot is taken. |
| 114 | +/// |
| 115 | +/// [compresses]: relational_db::snapshot_watching_commitlog_compressor |
| 116 | +pub struct LocalPersistenceProvider { |
| 117 | + data_dir: Arc<ServerDataDir>, |
| 118 | +} |
| 119 | + |
| 120 | +impl LocalPersistenceProvider { |
| 121 | + pub fn new(data_dir: impl Into<Arc<ServerDataDir>>) -> Self { |
| 122 | + Self { |
| 123 | + data_dir: data_dir.into(), |
| 124 | + } |
| 125 | + } |
| 126 | +} |
| 127 | + |
| 128 | +#[async_trait] |
| 129 | +impl PersistenceProvider for LocalPersistenceProvider { |
| 130 | + async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result<Persistence> { |
| 131 | + let replica_dir = self.data_dir.replica(replica_id); |
| 132 | + let commitlog_dir = replica_dir.commit_log(); |
| 133 | + let snapshot_dir = replica_dir.snapshots(); |
| 134 | + |
| 135 | + let (durability, disk_size) = relational_db::local_durability(commitlog_dir).await?; |
| 136 | + let database_identity = database.database_identity; |
| 137 | + let snapshot_worker = |
| 138 | + asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id)) |
| 139 | + .await |
| 140 | + .map(SnapshotWorker::new)?; |
| 141 | + |
| 142 | + tokio::spawn(relational_db::snapshot_watching_commitlog_compressor( |
| 143 | + snapshot_worker.subscribe(), |
| 144 | + None, |
| 145 | + None, |
| 146 | + durability.clone(), |
| 147 | + )); |
| 148 | + |
| 149 | + Ok(Persistence { |
| 150 | + durability, |
| 151 | + disk_size, |
| 152 | + snapshots: Some(snapshot_worker), |
| 153 | + }) |
| 154 | + } |
| 155 | +} |
0 commit comments