Skip to content

Commit 0666dd6

Browse files
authored
Merge pull request #37 from powersync-ja/compacting-buckets
Compacting buckets
2 parents 613b52e + 42d540b commit 0666dd6

23 files changed

+1858
-223
lines changed

.changeset/dry-schools-rush.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-core': minor
3+
'@powersync/service-image': minor
4+
---
5+
6+
Implement a compact command

docs/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Technical Documentation
2+
3+
This folder contains technical documentation regarding the implementation of PowerSync.
4+
5+
For documentation on using PowerSync, see [docs.powersync.com](https://docs.powersync.com/).

docs/bucket-properties.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Formal Bucket Properties
2+
3+
This document describes buckets as a set of operations, along with the properties we guarantee for the sync protocol. These are the properties used to ensure that each client ends up with the same bucket state when the same bucket has been downloaded.
4+
5+
For a more broad overview of the protocol, see [sync-protocol.md](./sync-protocol.md). For a high-level description of the compact implementation, see [compacting-operations.md](./compacting-operations.md).
6+
7+
## Buckets
8+
9+
A bucket $B$ is defined as a set of operations:
10+
11+
```math
12+
B = \{ op_1, op_2, \ldots, op_n \}
13+
```
14+
15+
Each operation $op_i$ is a tuple of:
16+
17+
```math
18+
(opid_i, type_i, rowid_i, data_i, checksum_i)
19+
```
20+
21+
$opid_i \in \mathbb{N}$ is a strictly incrementing series of numbers identifying the operation within the bucket. $type_i$ is one of PUT, REMOVE, MOVE or CLEAR. $rowid_i$ is an identifier uniquely identifying a single row of data within the bucket. $checksum_i$ is a checksum over all the other fields in the tuple.
22+
23+
We define the shorthand syntax for a sub-sequence of bucket $B$ as:
24+
25+
```math
26+
B_{[a..b]} = \{ op_i | a <= opid_i <= b \}
27+
```
28+
29+
```math
30+
B_{[..b]} = B_{[opid_1..b]}
31+
```
32+
33+
## Reducing buckets
34+
35+
We define the function "reduce" on a bucket $B$, $r(B)$. This operation is run client-side, after downloading data for a bucket, to get the final state for each unique row. The reduced state:
36+
37+
1. Discards MOVE operations, only keeping track of their checksum.
38+
2. Discards superseded operations, only keeping the last operation for each unique row (but keeps track of the checksum of prior operations).
39+
3. Discards REMOVE operations (keeping their checkums), only after all prior operations for those rows have been superseded.
40+
41+
In other words, it keeps the lastest state of each row, in addition to the checksum of other discarded operations. We define two reduced buckets as equal if they have exactly the same set of PUT operations, and the remaining checksum is equal.
42+
43+
The algorithm to compute $r(B)$ is as follows:
44+
45+
1. Start with the initial state containing a special CLEAR operation $R = \\{ (0, CLEAR, checksum_t = 0) \\}$.
46+
2. Iterate through the operations $op_i$ in $B$ in sequence, ordered by $opid$.
47+
3. If $type_i = PUT$:
48+
1. Add $op_i$ to $R$.
49+
2. Remove any prior operations $op_j$ from $R$ where $rowid_j = rowid_i$ and $opid_j < opid_i$ (always true since we're iterating in sequence).
50+
3. Add $checksum_j$ to $checksum_t$ for any $op_j$ we removed.
51+
4. If $type_i = REMOVE$:
52+
1. Remove any prior operations $op_j$ from $R$ where $rowid_j = rowid_i$ and $opid_j < opid_i$ (always true since we're iterating in sequence).
53+
2. Add $checksum_i$ to $checksum_t$ in the first operation, as well as adding $checksum_j$ for any $op_j$ we removed.
54+
5. If $type_i = MOVE$:
55+
1. Add $checksum_i$ to $checksum_t$ in the first operation.
56+
6. If $type_i = CLEAR$:
57+
1. Remove ALL operations from $R$.
58+
2. Set $R = \\{ (0, CLEAR, checksum_t = checksum_i) \\}$.
59+
7. After iterating through all operations in $B$, return $R$ as the result.
60+
61+
$r(r(B_1) \cup B_2)$ is a common operation takes an already-reduced bucket $r(B_1)$, adds all operations from $B_2$, and reduces it again. This is the action of taking an already-download and reduced on the client bucket, downloading new operations for the bucket, and adding it to the reduced result.
62+
63+
The function $r(B)$ must always satisfy this property:
64+
65+
```math
66+
r(B_{[..opid_n]}) = r(r(B_{[..c_i]}) \cup B_{[c_i+1..opid_n]}) \quad \textrm{for all} \quad c_i \in B
67+
```
68+
69+
That is, a client must have been able to sync to any checkpoint $c_i$, persist the reduced set, then continue syncing the remainder of the data, and end up with the same result as syncing it all in one go. The iterative nature of the algorithm makes the above always true.
70+
71+
## Compacting
72+
73+
We define a server-side compact operation, that reduces the amount of data that needs to be downloaded to clients, while still getting the same results on each client.
74+
75+
When we compact the bucket $B$ to $B' = compact(B)$, the following conditions must hold:
76+
77+
```math
78+
r(B) = r(B')
79+
```
80+
81+
```math
82+
r(B) = r(r(B_{[..c_i]}) \cup B'_{[c_i+1..opid_n]}) \quad \textrm{for all} \quad c_i \in B
83+
```
84+
85+
Note that this is the same as the standard equation for buckets above, but with $B$ replaced by the compacted bucket $B'$ in one place, representing newer data downloaded by clients.
86+
87+
The compact operation is defined in [compacting-operations.md](./compacting-operations.md), a formal description is pending.
88+
89+
The above easily holds when compacting operations into MOVE operations. Since move operations are not present in $r(B)$ apart from their checksum, and the checksum is not changed with $B'$, this has no effect on the outcome.
90+
91+
A proof of compacting into CLEAR operations is pending.
92+
93+
### Consistency during sync
94+
95+
Suppose a client starts downloading at checkpoint $c_1$, operations $B_{[..c_1]}$. Right after, operations are added - checkpoint $c_2$, operations $B_{[c_1+1..c_2]}$. Then, the bucket is compacted to $B'\_{[..c_2]}$. While we have a guaranteed $r(B_{[..c_2]}) = r(B'\_{[..c_2]})$, there is no guarantee that $r(B_{[..c_1]}) = r(B'\_{[..c_1]})$. In fact it's very possible that MOVE operations can cause $r(B_{[..c_1]}) \neq r(B'\_{[..c_1]})$. Therefore, we invalidate the checkpoint $c_1$ - the client will only have consistent data once it has fully synced up to $c_2$.

docs/compacting-operations.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Compacting Operations
2+
3+
This document describes the internal process of compacting operations. This describes what happens behind the scenes when running the `compact` CLI command.
4+
5+
By default, each change in the source database becomes a new operation in the ever-growing bucket history. To avoid this history from growing indefinitely, we "compact" the buckets.
6+
7+
## Previous Workaround
8+
9+
A workaround is to deploy a sync rules change, which re-creates all buckets from scratch, containing only the latest version of each row. This reduces the size of buckets for new clients performing a sync from scratch, but requires existing clients to completely re-sync the buckets.
10+
11+
# Compacting Processes
12+
13+
The compacting process can be split into three distinct processes.
14+
15+
1. Convert operations into MOVE operations.
16+
2. Convert operations into CLEAR operations.
17+
3. Defragment buckets.
18+
19+
## 1. MOVE operations
20+
21+
Any operation on a row may be converted into a MOVE operation if there is another PUT or REMOVE operation later in the bucket for the same row.
22+
23+
Two rows are considered the same if the combination of `(object_type, object_id, subkey)` is the same.
24+
25+
A MOVE operation may contain internal metadata of `{target_op: op_id}`. This indicates that the operation was "moved" to the target, and no checkpoint before that op_id will be valid. A previous protocol revision included this in the operation data, and let clients invalidate the checkpoint. Now, this is used purely server-side, and the server omits the `CheckpointComplete` message if the current checkpoint has been invalidated by such an operation. The same applies to CLEAR operations below.
26+
27+
When converting an operation to a MOVE operation, the bucket, op_id and checksum remain the same. The data, object_type, object_id and subkey fields must be cleared, reducing the size of the operation.
28+
29+
By itself, converting operations into MOVE operations does not reduce the number of operations synced, but may reduce the total size of the operations synced. It has no effect on clients that are already up-to-date.
30+
31+
## 2. CLEAR operations
32+
33+
A CLEAR operation in a bucket indicates that all preceeding operations in the bucket must be deleted. It is typically the first operation in a bucket, but the client may receive it at any later point.
34+
35+
The the client has active PUT options before the CLEAR operation, those are effectively converted into REMOVE operations. This will remove the data unless there is another PUT operation for the relevant rows later in the bucket.
36+
37+
The compact process involves:
38+
39+
1. Find the largest sequence of REMOVE, MOVE and/or CLEAR operations of at the start of the bucket.
40+
2. Replace all of those with the single CLEAR operation.
41+
42+
The op_id of the CLEAR operation is the latest op_id of the operations being replaced, and the checksum is the combination of those operations' checksums.
43+
44+
Compacting to CLEAR operations can reduce the number of operations in a bucket. However,
45+
it is not effective if there is a PUT operation near the start of the bucket. This compacting step has no effect on clients that are already up-to-date.
46+
47+
The MOVE compact step above should typically be run before the CLEAR compact step, to ensure maximum effectiveness.
48+
49+
## 3. Defragmentation
50+
51+
Even after doing the MOVE and CLEAR compact processes, there is still a possibility of a bucket being fragmented with many MOVE and REMOVE operations. In the worst case, a bucket may start with a single PUT operation, followed by thousands of MOVE and REMOVE operations. Only a single row (the PUT operation) still exists, but new clients must sync all the MOVE and CLEAR operations.
52+
53+
To handle these cases, we can "defragment" the data.
54+
55+
Defragmentation does not involve any new operations. Instead, it just moves PUT operations for active rows from the start of the bucket to the end of the bucket, to allow the above MOVE and COMPACT processes to efficiently compact the bucket.
56+
57+
The disadvantage here is that these rows will be re-synced by existing clients.
58+
59+
# Implementation
60+
61+
## MOVE + CLEAR
62+
63+
This is a process that compacts all buckets, by iterating through all operations. This process can be run periodically, for example once a day, or after bulk data modifications.
64+
65+
The process iterates through all operations in reverse order. This effectively processes one bucket at a time, in reverse order of operations.
66+
67+
We track each row we've seen in a bucket, along with the last PUT/REMOVE operation we've seen for the row. Whenever we see the same row again, we replace that operation with a MOVE operation, using the PUT/REMOVE op_id as the target.
68+
69+
To avoid indefinite memory growth for this process, we place a limit on the memory usage for the set of rows we're tracking. Once we reach this limit, we avoid adding tracking any additional rows for the bucket. We should be able to effectively compact buckets in the order of 4M unique rows using 1GB of memory, and only lose some compacting gains for larger buckets.
70+
71+
The second part is compacting to CLEAR operations. For each bucket, we keep track of the last PUT operation we've seen (last meaning the smallest op_id since we're iterating in reverse). We then replace all the operations before that with a single CLEAR operation.
72+
73+
## Defragmentation
74+
75+
For an initial workaround, defragmenting can be performed outside powersync by touching all rows in a bucket:
76+
77+
```sql
78+
update mytable set id = id
79+
-- Repeat the above for other tables in the same bucket if relevant
80+
```
81+
82+
After this, the normal MOVE + CLEAR compacting will compact the bucket to only have a single operation per active row.
83+
84+
This would cause existing clients to re-sync every row, while reducing the number of operations for new clients.
85+
86+
If an updated_at column or similar is present, we can use this to defragment more incrementally:
87+
88+
```sql
89+
update mytable set id = id where updated_at < now() - interval '1 week'
90+
```
91+
92+
This version avoids unnecessary defragmentation of rows modified recently.
93+
94+
In the future, we can implement defragmentation inside PowerSync, using heuristics around the spread of operations within a bucket.
95+
96+
# Future additions
97+
98+
Improvements may be implemented in the future:
99+
100+
1. Keeping track of buckets that need compacting would allow us to compact those as soon as needed, without the overhead of compacting buckets where it won't have an effect.
101+
2. Together with the above, we can implement a lightweight compacting process inside the replication worker, to compact operations as soon as modifications come in. This can help to quickly compact in cases where multiple modifications are made to the same rows in a short time span.
102+
3. Implement automatic defragmentation inside PowerSync as described above.

docs/sync-protocol.md

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
# Sync Protocol
2+
3+
This document is a work in progress - more details to be added over time.
4+
5+
# Sync stream
6+
7+
A sync stream contains 5 possible messages:
8+
9+
## StreamingSyncCheckpoint
10+
11+
Format:
12+
13+
```ts
14+
{
15+
checkpoint: {
16+
last_op_id: OpId;
17+
write_checkpoint?: OpId;
18+
buckets: BucketChecksum[];
19+
}
20+
}
21+
```
22+
23+
This indicates a new checkpoint is available, along with checksums for each bucket in the checkpoint.
24+
25+
This is typically (but not necessarily) be the first message in the response, and is often followed by StreamingSyncData and StreamingSyncCheckpointComplete (both optional).
26+
27+
## StreamingSyncCheckpointDiff
28+
29+
This has the same conceptual meaning as a StreamingSyncCheckpoint. It is an optimization to only send details for buckets that changed, instead of sending the entire checkpoint over.
30+
31+
Format:
32+
33+
```ts
34+
{
35+
checkpoint_diff: {
36+
last_op_id: OpId;
37+
write_checkpoint?: OpId;
38+
updated_buckets: BucketChecksum[];
39+
removed_buckets: string[];
40+
}
41+
}
42+
```
43+
44+
Typically a sync stream would contain a single StreamingSyncCheckpoint, and then after that only StreamingSyncCheckpointDiff messages. However, the server may send any number of StreamingSyncCheckpoint messags after that, each time "resetting" the checkpoint state.
45+
46+
## StreamingSyncData
47+
48+
This message contains data for a single bucket.
49+
50+
Format:
51+
52+
```ts
53+
{
54+
data: SyncBucketData;
55+
}
56+
```
57+
58+
While this data is typically associated with the last checkpoint sent, it may be sent at any other time, and may contain data outside the last checkpoint boundaries.
59+
60+
## StreamingSyncCheckpointComplete
61+
62+
This message indicates that all data has been sent for the last checkpoint.
63+
64+
Format:
65+
66+
```ts
67+
{
68+
checkpoint_complete: {
69+
last_op_id: OpId;
70+
}
71+
}
72+
```
73+
74+
`last_op_id` here _must_ match the `last_op_id` of the last `StreamingSyncCheckpoint` or `StreamingSyncCheckpointDiff` message sent.
75+
76+
It is not required to have one `StreamingSyncCheckpointComplete` message for every `StreamingSyncCheckpoint` or `StreamingSyncCheckpointDiff`. In some cases, a checkpoint may be invalidated while data were being sent, in which case no matching `StreamingSyncCheckpointComplete` will be sent. Instead, another `StreamingSyncCheckpoint` or `StreamingSyncCheckpointDiff` will be sent with a new checkpoint.
77+
78+
## StreamingSyncKeepalive
79+
80+
Serves as both a general keepalive message to keep the stream open, and an indication of how long before the current token expires.
81+
82+
Format:
83+
84+
```ts
85+
{
86+
token_expires_in: number;
87+
}
88+
```
89+
90+
# Bucket operations
91+
92+
## PUT
93+
94+
Includes contents for an entire row.
95+
96+
The row is uniquely identified using the combination of (object_type, object_id, subkey).
97+
98+
Another PUT or REMOVE operation with the same `(object_type, object_id, subkey)` _replaces_
99+
this operation.
100+
101+
Another PUT or REMOVE operation with the same `(object_type, object_id, subkey)` but a different `subkey` _does not_ directly replace this one, and may exist concurrently.
102+
103+
Format:
104+
105+
```ts
106+
{
107+
op_id: OpId;
108+
op: 'PUT';
109+
object_type: string;
110+
object_id: string;
111+
data: string;
112+
checksum: number;
113+
subkey?: string;
114+
}
115+
```
116+
117+
## REMOVE
118+
119+
Indicates that a row is removed from a bucket.
120+
121+
The row is uniquely identified using the combination of (object_type, object_id, subkey).
122+
123+
Another PUT or REMOVE operation with the same `(object_type, object_id, subkey)` _replaces_
124+
this operation.
125+
126+
Format:
127+
128+
```ts
129+
{
130+
op_id: OpId;
131+
op: 'REMOVE';
132+
object_type: string;
133+
object_id: string;
134+
checksum: number;
135+
subkey?: string;
136+
}
137+
```
138+
139+
## MOVE
140+
141+
Indicates a "tombstone" operation - an operation that has been replaced by another one.
142+
The checksum of the operation is unchanged.
143+
144+
Format:
145+
146+
```ts
147+
{
148+
op_id: OpId;
149+
op: 'MOVE';
150+
checksum: number;
151+
}
152+
```
153+
154+
A previous iteration of the protocol included an optional `data: '{"target": OpId}'` field. This was to indicate a checkpoint is not valid unless its op_id is greater than or equal to the target. Now, the server is responsible for invalidating checkpoints in those cases, by not sending a `CheckpointComplete` in that case.
155+
156+
## CLEAR
157+
158+
Indicates a "reset point" of a bucket. All operations prior to this bucket must be removed.
159+
160+
If the client has any active PUT operations prior to a CLEAR operation, those should effectively be converted into REMOVE operations.
161+
162+
The checksum of this operation is equal to the combined checksum of all prior operations it replaced.
163+
164+
Format:
165+
166+
```ts
167+
{
168+
op_id: OpId;
169+
op: 'CLEAR';
170+
checksum: number;
171+
}
172+
```

0 commit comments

Comments
 (0)