Skip to content

Commit 7aa52e0

Browse files
author
DominicGBauer
committed
chore: add bucket classes and break out into compaction manager
1 parent 1122798 commit 7aa52e0

File tree

3 files changed

+105
-0
lines changed

3 files changed

+105
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import Foundation
2+
3+
struct Checkpoint: Codable, Equatable {
4+
let lastOpId: String
5+
let checksums: [BucketChecksum]
6+
let writeCheckpoint: String?
7+
8+
enum CodingKeys: String, CodingKey {
9+
case lastOpId = "last_op_id"
10+
case checksums = "buckets"
11+
case writeCheckpoint = "write_checkpoint"
12+
}
13+
14+
func clone() -> Checkpoint {
15+
return Checkpoint(lastOpId: self.lastOpId, checksums: self.checksums, writeCheckpoint: self.writeCheckpoint)
16+
}
17+
}
18+
19+
extension Checkpoint: CustomStringConvertible {
20+
var description: String {
21+
return "Checkpoint<lastOpId:\(lastOpId), checksums:\(checksums), writeCheckpoint:\(writeCheckpoint ?? "nil")>"
22+
}
23+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import Foundation
2+
3+
struct ChecksumCache: Codable, Equatable {
4+
let lastOpId: String
5+
let checksums: [String: BucketChecksum]
6+
7+
enum CodingKeys: String, CodingKey {
8+
case lastOpId = "last_op_id"
9+
case checksums
10+
}
11+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import Foundation
2+
import OSLog
3+
4+
actor CompactionManager {
5+
private let db: PowerSyncDatabaseProtocol
6+
private let logger: Logger
7+
private var compactCounter: Int
8+
private let pendingBucketDeletes: SharedPendingDeletesActor
9+
10+
private enum Constants {
11+
static let compactOperationInterval = 1_000
12+
}
13+
14+
init(db: PowerSyncDatabaseProtocol, logger: Logger, pendingBucketDeletes: SharedPendingDeletesActor) {
15+
self.db = db
16+
self.logger = logger
17+
self.pendingBucketDeletes = pendingBucketDeletes
18+
self.compactCounter = Constants.compactOperationInterval
19+
}
20+
21+
func incrementCounter(_ amount: Int) {
22+
compactCounter += amount
23+
}
24+
25+
func resetCounter() {
26+
compactCounter = Constants.compactOperationInterval
27+
}
28+
29+
func forceCompact() async throws {
30+
resetCounter()
31+
await pendingBucketDeletes.setPendingBucketDeletes(true)
32+
try await autoCompact()
33+
}
34+
35+
func autoCompact() async throws {
36+
// 1. Delete buckets
37+
try await deletePendingBuckets()
38+
39+
// 2. Clear REMOVE operations, only keeping PUT ones
40+
try await clearRemoveOps()
41+
}
42+
43+
private func deletePendingBuckets() async throws {
44+
guard await pendingBucketDeletes.getPendingBucketDeletes() else { return }
45+
46+
// TODO: Fix transactions and change this back
47+
// try await db.writeTransaction { transaction in
48+
_ = try await db.execute(
49+
"INSERT INTO powersync_operations(op, data) VALUES (?, ?)",
50+
["delete_pending_buckets", ""]
51+
)
52+
53+
// Executed once after start-up, and again when there are pending deletes.
54+
await pendingBucketDeletes.setPendingBucketDeletes(false)
55+
// }
56+
}
57+
58+
private func clearRemoveOps() async throws {
59+
guard compactCounter >= Constants.compactOperationInterval else { return }
60+
61+
// TODO: Fix transactions and change this back
62+
// _ = try await db.writeTransaction { transaction in
63+
_ = try await db.execute(
64+
"INSERT INTO powersync_operations(op, data) VALUES (?, ?)",
65+
["clear_remove_ops", ""]
66+
)
67+
// }
68+
69+
compactCounter = 0
70+
}
71+
}

0 commit comments

Comments
 (0)