Skip to content

Commit c838ed5

Browse files
committed
Improve snapshot compression metrics
Report more metrics about snapshot compression, namely: - time to compress a single snapshot (histogram) - for each compression pass: - number of snapshots found to be already compressed (gauge) - number of snapshots compressed (gauge) - cumulative number of objects compressed (gauge) - cumulative number of objects hardlinked (gauge) Those metrics are collected from the `spacetimedb-snapshot` crate without imposing a prometheus dependency on it, i.e. they can be observed by the caller as ordinary Rust types. This is exploited to avoid scanning the entire snapshot repository on each pass -- only the range `(last_compressed + 1)..newest_snapshot` is visited (note that the `compress_snapshots` method now short-circuits on errors). Lastly, the snapshot worker can now be configured to disable compression. This greatly simplifies implementation of alternative post-processing strategies, e.g. involving archival, for which a more coarse-grained compression strategy may be more appropriate. Subcribers are notified of a new snapshot _after_ compression, such that any filesystem locks should be released.
1 parent c60bfe6 commit c838ed5

File tree

8 files changed

+426
-145
lines changed

8 files changed

+426
-145
lines changed

crates/core/src/db/persistence.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{messages::control_db::Database, util::asyncify};
99

1010
use super::{
1111
relational_db::{self, Txdata},
12-
snapshot::{SnapshotDatabaseState, SnapshotWorker},
12+
snapshot::{self, SnapshotDatabaseState, SnapshotWorker},
1313
};
1414

1515
/// [spacetimedb_durability::Durability] impls with a [`Txdata`] transaction
@@ -137,7 +137,7 @@ impl PersistenceProvider for LocalPersistenceProvider {
137137
let snapshot_worker =
138138
asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id))
139139
.await
140-
.map(SnapshotWorker::new)?;
140+
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?;
141141

142142
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
143143
snapshot_worker.subscribe(),

crates/core/src/db/relational_db.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,6 +1664,7 @@ fn default_row_count_fn(db: Identity) -> RowCountFn {
16641664

16651665
#[cfg(any(test, feature = "test"))]
16661666
pub mod tests_utils {
1667+
use crate::db::snapshot;
16671668
use crate::db::snapshot::SnapshotWorker;
16681669

16691670
use super::*;
@@ -1813,7 +1814,10 @@ pub mod tests_utils {
18131814
let (local, disk_size_fn) = rt.block_on(local_durability(root.commit_log()))?;
18141815
let history = local.clone();
18151816
let snapshots = want_snapshot_repo
1816-
.then(|| open_snapshot_repo(root.snapshots(), db_identity, replica_id).map(SnapshotWorker::new))
1817+
.then(|| {
1818+
open_snapshot_repo(root.snapshots(), db_identity, replica_id)
1819+
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
1820+
})
18171821
.transpose()?;
18181822

18191823
let persistence = Persistence {
@@ -1951,7 +1955,10 @@ pub mod tests_utils {
19511955
durability: local.clone(),
19521956
disk_size: disk_size_fn,
19531957
snapshots: want_snapshot_repo
1954-
.then(|| open_snapshot_repo(root.snapshots(), Identity::ZERO, 0).map(SnapshotWorker::new))
1958+
.then(|| {
1959+
open_snapshot_repo(root.snapshots(), Identity::ZERO, 0)
1960+
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
1961+
})
19551962
.transpose()?,
19561963
};
19571964
let db = Self::open_db(root, history, Some(persistence), None, 0)?;
@@ -2120,7 +2127,7 @@ mod tests {
21202127
system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID,
21212128
ST_SEQUENCE_ID, ST_TABLE_ID,
21222129
};
2123-
use spacetimedb_fs_utils::compression::{CompressCount, CompressType};
2130+
use spacetimedb_fs_utils::compression::CompressType;
21242131
use spacetimedb_lib::db::raw_def::v9::{btree, RawTableDefBuilder};
21252132
use spacetimedb_lib::error::ResultTest;
21262133
use spacetimedb_lib::Identity;
@@ -2129,6 +2136,7 @@ mod tests {
21292136
use spacetimedb_sats::buffer::BufReader;
21302137
use spacetimedb_sats::product;
21312138
use spacetimedb_schema::schema::RowLevelSecuritySchema;
2139+
use spacetimedb_snapshot::CompressionStats;
21322140
#[cfg(unix)]
21332141
use spacetimedb_snapshot::Snapshot;
21342142
use spacetimedb_table::read_column::ReadColumn;
@@ -3142,7 +3150,9 @@ mod tests {
31423150
assert_eq!(&offsets, &[1, 2, 3]);
31433151
// Simulate we take except the last snapshot
31443152
let last_compress = 2;
3145-
assert_eq!(repo.compress_older_snapshots(3)?, CompressCount { none: 0, zstd: 2 });
3153+
let mut stats = CompressionStats::default();
3154+
repo.compress_snapshots(&mut stats, ..3)?;
3155+
assert_eq!(stats.compressed(), 2);
31463156
let size_compress_on = repo.size_on_disk()?;
31473157
assert!(size_compress_on.total_size < size_compress_off.total_size);
31483158
// Verify we hard-linked the second snapshot
@@ -3154,7 +3164,7 @@ mod tests {
31543164
let mut hard_linked_off = 0;
31553165

31563166
let (snapshot, compress) = Snapshot::read_from_file(&snapshot_dir.snapshot_file(last_compress))?;
3157-
assert_eq!(compress, CompressType::Zstd);
3167+
assert!(compress.is_compressed());
31583168
let repo = SnapshotRepository::object_repo(&snapshot_dir)?;
31593169
for (_, path) in snapshot.files(&repo) {
31603170
match path.metadata()?.nlink() {
@@ -3204,7 +3214,7 @@ mod tests {
32043214
let out = TempDir::with_prefix("snapshot_test")?;
32053215
let dir = SnapshotsPath::from_path_unchecked(out.path());
32063216

3207-
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, CompressType::Zstd, false);
3217+
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, CompressType::zstd(), false);
32083218

32093219
stdb.take_snapshot(&repo)?;
32103220
let size = repo.size_on_disk()?;

0 commit comments

Comments
 (0)