Skip to content

Commit 1a59edc

Browse files
committed
Tests and fixes for custom write checkpoints.
1 parent d29984a commit 1a59edc

File tree

5 files changed

+153
-12
lines changed

5 files changed

+153
-12
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

-7
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,6 @@ export class MongoSyncBucketStorage
8787
);
8888
}
8989

90-
createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise<bigint> {
91-
return this.writeCheckpointAPI.createCustomWriteCheckpoint({
92-
...checkpoint,
93-
sync_rules_id: this.group_id
94-
});
95-
}
96-
9790
createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise<bigint> {
9891
return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint);
9992
}

modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,11 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
278278
lsn: null
279279
};
280280
lastId = doc.checkpoint;
281+
} else {
282+
yield {
283+
id: null,
284+
lsn: null
285+
};
281286
}
282287

283288
for await (let event of stream) {
@@ -324,7 +329,7 @@ export async function batchCreateCustomWriteCheckpoints(
324329
db: PowerSyncMongo,
325330
checkpoints: storage.CustomWriteCheckpointOptions[]
326331
): Promise<void> {
327-
if (!checkpoints.length) {
332+
if (checkpoints.length == 0) {
328333
return;
329334
}
330335

modules/module-mongodb-storage/src/storage/implementation/db.ts

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ export class PowerSyncMongo {
7474
await this.instance.deleteOne({});
7575
await this.locks.deleteMany({});
7676
await this.bucket_state.deleteMany({});
77+
await this.custom_write_checkpoints.deleteMany({});
7778
}
7879

7980
/**

modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
22
import * as framework from '@powersync/lib-services-framework';
3-
import { storage } from '@powersync/service-core';
4-
import { JSONBig } from '@powersync/service-jsonbig';
3+
import { storage, sync } from '@powersync/service-core';
4+
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
55
import { models } from '../../types/types.js';
66

77
export type PostgresCheckpointAPIOptions = {
@@ -129,11 +129,22 @@ export async function batchCreateCustomWriteCheckpoints(
129129
return;
130130
}
131131

132+
// Needs to be encoded using plain JSON.stringify
133+
const mappedCheckpoints = checkpoints.map((cp) => {
134+
return {
135+
user_id: cp.user_id,
136+
// Cannot encode bigint directly using JSON.stringify.
137+
// The ::int8 in the query below will take care of casting back to a number
138+
checkpoint: String(cp.checkpoint),
139+
sync_rules_id: cp.sync_rules_id
140+
};
141+
});
142+
132143
await db.sql`
133144
WITH
134145
json_data AS (
135146
SELECT
136-
jsonb_array_elements(${{ type: 'jsonb', value: JSONBig.stringify(checkpoints) }}) AS
147+
jsonb_array_elements(${{ type: 'jsonb', value: mappedCheckpoints }}) AS
137148
CHECKPOINT
138149
)
139150
INSERT INTO

packages/service-core-tests/src/tests/register-data-storage-tests.ts

+132-1
Original file line numberDiff line numberDiff line change
@@ -1560,7 +1560,7 @@ bucket_definitions:
15601560
});
15611561
// We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage.
15621562
// This is what is effetively triggered with RouteAPI.createReplicationHead().
1563-
// MongoDB storage doesn't need this.
1563+
// MongoDB storage doesn't explicitly need this anymore.
15641564
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
15651565
await batch.keepalive('6/0');
15661566
});
@@ -1577,4 +1577,135 @@ bucket_definitions:
15771577
}
15781578
});
15791579
});
1580+
1581+
test('custom write checkpoints - checkpoint after write', async (context) => {
1582+
await using factory = await generateStorageFactory();
1583+
const r = await factory.configureSyncRules({
1584+
content: `
1585+
bucket_definitions:
1586+
mybucket:
1587+
data: []
1588+
`,
1589+
validate: false
1590+
});
1591+
const bucketStorage = factory.getInstance(r.persisted_sync_rules!);
1592+
await bucketStorage.autoActivate();
1593+
bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM);
1594+
1595+
const abortController = new AbortController();
1596+
context.onTestFinished(() => abortController.abort());
1597+
const iter = bucketStorage
1598+
.watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal })
1599+
[Symbol.asyncIterator]();
1600+
1601+
await bucketStorage.batchCreateCustomWriteCheckpoints([
1602+
{
1603+
checkpoint: 5n,
1604+
user_id: 'user1'
1605+
}
1606+
]);
1607+
1608+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1609+
await batch.keepalive('5/0');
1610+
});
1611+
1612+
const result = await iter.next();
1613+
expect(result).toMatchObject({
1614+
done: false,
1615+
value: {
1616+
base: {
1617+
checkpoint: 0n,
1618+
lsn: '5/0'
1619+
},
1620+
writeCheckpoint: 5n
1621+
}
1622+
});
1623+
});
1624+
1625+
test('custom write checkpoints - write after checkpoint', async (context) => {
1626+
await using factory = await generateStorageFactory();
1627+
const r = await factory.configureSyncRules({
1628+
content: `
1629+
bucket_definitions:
1630+
mybucket:
1631+
data: []
1632+
`,
1633+
validate: false
1634+
});
1635+
const bucketStorage = factory.getInstance(r.persisted_sync_rules!);
1636+
await bucketStorage.autoActivate();
1637+
bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM);
1638+
1639+
const abortController = new AbortController();
1640+
context.onTestFinished(() => abortController.abort());
1641+
const iter = bucketStorage
1642+
.watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal })
1643+
[Symbol.asyncIterator]();
1644+
1645+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1646+
await batch.keepalive('5/0');
1647+
});
1648+
1649+
const result = await iter.next();
1650+
expect(result).toMatchObject({
1651+
done: false,
1652+
value: {
1653+
base: {
1654+
checkpoint: 0n,
1655+
lsn: '5/0'
1656+
},
1657+
writeCheckpoint: null
1658+
}
1659+
});
1660+
1661+
await bucketStorage.batchCreateCustomWriteCheckpoints([
1662+
{
1663+
checkpoint: 6n,
1664+
user_id: 'user1'
1665+
}
1666+
]);
1667+
// We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage.
1668+
// This is what is effetively triggered with RouteAPI.createReplicationHead().
1669+
// MongoDB storage doesn't explicitly need this anymore.
1670+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1671+
await batch.keepalive('6/0');
1672+
});
1673+
1674+
const result2 = await iter.next();
1675+
expect(result2).toMatchObject({
1676+
done: false,
1677+
value: {
1678+
base: {
1679+
checkpoint: 0n,
1680+
lsn: '6/0'
1681+
},
1682+
writeCheckpoint: 6n
1683+
}
1684+
});
1685+
1686+
await bucketStorage.batchCreateCustomWriteCheckpoints([
1687+
{
1688+
checkpoint: 7n,
1689+
user_id: 'user1'
1690+
}
1691+
]);
1692+
// We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage.
1693+
// This is what is effetively triggered with RouteAPI.createReplicationHead().
1694+
// MongoDB storage doesn't explicitly need this anymore.
1695+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1696+
await batch.keepalive('7/0');
1697+
});
1698+
1699+
const result3 = await iter.next();
1700+
expect(result3).toMatchObject({
1701+
done: false,
1702+
value: {
1703+
base: {
1704+
checkpoint: 0n,
1705+
lsn: '7/0'
1706+
},
1707+
writeCheckpoint: 7n
1708+
}
1709+
});
1710+
});
15801711
}

0 commit comments

Comments
 (0)