Skip to content

Commit 98da657

Browse files
committed
Split out "snapshot done" check.
1 parent 7e3b0d1 commit 98da657

File tree

7 files changed

+54
-22
lines changed

7 files changed

+54
-22
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
isCompleteRow,
2020
SaveOperationTag,
2121
storage,
22+
SyncRuleState,
2223
utils
2324
} from '@powersync/service-core';
2425
import * as timers from 'node:timers/promises';
@@ -713,7 +714,6 @@ export class MongoBucketBatch
713714
last_checkpoint_lsn: lsn,
714715
last_checkpoint_ts: now,
715716
last_keepalive_ts: now,
716-
snapshot_done: true,
717717
last_fatal_error: null,
718718
keepalive_op: null
719719
};
@@ -744,8 +744,7 @@ export class MongoBucketBatch
744744
_id: this.group_id
745745
},
746746
{
747-
$set: update,
748-
$unset: { snapshot_lsn: 1 }
747+
$set: update
749748
},
750749
{ session: this.session }
751750
);
@@ -773,7 +772,7 @@ export class MongoBucketBatch
773772
let activated = false;
774773
await session.withTransaction(async () => {
775774
const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session });
776-
if (doc && doc.state == 'PROCESSING') {
775+
if (doc && doc.state == SyncRuleState.PROCESSING && doc.snapshot_done && doc.last_checkpoint != null) {
777776
await this.db.sync_rules.updateOne(
778777
{
779778
_id: this.group_id
@@ -799,13 +798,15 @@ export class MongoBucketBatch
799798
{ session }
800799
);
801800
activated = true;
801+
} else if (doc?.state != SyncRuleState.PROCESSING) {
802+
this.needsActivation = false;
802803
}
803804
});
804805
if (activated) {
805806
this.logger.info(`Activated new sync rules at ${lsn}`);
806807
await this.db.notifyCheckpoint();
808+
this.needsActivation = false;
807809
}
808-
this.needsActivation = false;
809810
}
810811

811812
async keepalive(lsn: string): Promise<boolean> {
@@ -857,11 +858,9 @@ export class MongoBucketBatch
857858
{
858859
$set: {
859860
last_checkpoint_lsn: lsn,
860-
snapshot_done: true,
861861
last_fatal_error: null,
862862
last_keepalive_ts: new Date()
863-
},
864-
$unset: { snapshot_lsn: 1 }
863+
}
865864
},
866865
{ session: this.session }
867866
);
@@ -1044,7 +1043,31 @@ export class MongoBucketBatch
10441043
return copy;
10451044
}
10461045

1047-
async markSnapshotDone(tables: storage.SourceTable[], no_checkpoint_before_lsn: string) {
1046+
async markAllSnapshotDone(no_checkpoint_before_lsn: string) {
1047+
if (this.no_checkpoint_before_lsn == null || no_checkpoint_before_lsn > this.no_checkpoint_before_lsn) {
1048+
this.no_checkpoint_before_lsn = no_checkpoint_before_lsn;
1049+
}
1050+
await this.db.sync_rules.updateOne(
1051+
{
1052+
_id: this.group_id
1053+
},
1054+
{
1055+
$set: {
1056+
snapshot_done: true,
1057+
last_keepalive_ts: new Date()
1058+
},
1059+
$max: {
1060+
no_checkpoint_before: no_checkpoint_before_lsn
1061+
},
1062+
$unset: {
1063+
snapshot_lsn: 1
1064+
}
1065+
},
1066+
{ session: this.session }
1067+
);
1068+
}
1069+
1070+
async markTableSnapshotDone(tables: storage.SourceTable[], no_checkpoint_before_lsn?: string) {
10481071
const session = this.session;
10491072
const ids = tables.map((table) => table.id);
10501073

@@ -1062,17 +1085,20 @@ export class MongoBucketBatch
10621085
{ session }
10631086
);
10641087

1065-
if (no_checkpoint_before_lsn > this.no_checkpoint_before_lsn) {
1066-
this.no_checkpoint_before_lsn = no_checkpoint_before_lsn;
1067-
1088+
if (no_checkpoint_before_lsn != null) {
1089+
if (this.no_checkpoint_before_lsn == null || no_checkpoint_before_lsn > this.no_checkpoint_before_lsn) {
1090+
this.no_checkpoint_before_lsn = no_checkpoint_before_lsn;
1091+
}
10681092
await this.db.sync_rules.updateOne(
10691093
{
10701094
_id: this.group_id
10711095
},
10721096
{
10731097
$set: {
1074-
no_checkpoint_before: no_checkpoint_before_lsn,
10751098
last_keepalive_ts: new Date()
1099+
},
1100+
$max: {
1101+
no_checkpoint_before: no_checkpoint_before_lsn
10761102
}
10771103
},
10781104
{ session: this.session }

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ export class ChangeStream {
366366

367367
for (let table of tablesWithStatus) {
368368
await this.snapshotTable(batch, table);
369-
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);
369+
await batch.markTableSnapshotDone([table]);
370370

371371
this.touch();
372372
}
@@ -375,7 +375,7 @@ export class ChangeStream {
375375
// point before the data can be considered consistent.
376376
// We could do this for each individual table, but may as well just do it once for the entire snapshot.
377377
const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID);
378-
await batch.markSnapshotDone([], checkpoint);
378+
await batch.markAllSnapshotDone(checkpoint);
379379

380380
// This will not create a consistent checkpoint yet, but will persist the op.
381381
// Actual checkpoint will be created when streaming replication caught up.
@@ -612,7 +612,7 @@ export class ChangeStream {
612612
await this.snapshotTable(batch, result.table);
613613
const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID);
614614

615-
const [table] = await batch.markSnapshotDone([result.table], no_checkpoint_before_lsn);
615+
const [table] = await batch.markTableSnapshotDone([result.table], no_checkpoint_before_lsn);
616616
return table;
617617
}
618618

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ export class BinLogStream {
170170
} finally {
171171
connection.release();
172172
}
173-
const [table] = await batch.markSnapshotDone([result.table], gtid.comparable);
173+
const [table] = await batch.markTableSnapshotDone([result.table], gtid.comparable);
174174
return table;
175175
}
176176

@@ -275,10 +275,12 @@ export class BinLogStream {
275275
const tables = await this.getQualifiedTableNames(batch, tablePattern);
276276
for (let table of tables) {
277277
await this.snapshotTable(connection as mysql.Connection, batch, table);
278-
await batch.markSnapshotDone([table], headGTID.comparable);
278+
await batch.markTableSnapshotDone([table], headGTID.comparable);
279279
await framework.container.probes.touch();
280280
}
281281
}
282+
const snapshotDoneGtid = await common.readExecutedGtid(promiseConnection);
283+
await batch.markAllSnapshotDone(snapshotDoneGtid.comparable);
282284
await batch.commit(headGTID.comparable);
283285
}
284286
);

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ export class PostgresBucketBatch
430430
`.execute();
431431
}
432432

433-
async markSnapshotDone(
433+
async markTableSnapshotDone(
434434
tables: storage.SourceTable[],
435435
no_checkpoint_before_lsn: string
436436
): Promise<storage.SourceTable[]> {

modules/module-postgres/src/replication/PostgresSnapshotter.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,9 @@ export class PostgresSnapshotter {
355355
// This makes sure we don't skip any changes applied before starting this snapshot,
356356
// in the case of snapshot retries.
357357
// We could alternatively commit at the replication slot LSN.
358+
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
359+
const globalLsnNotBefore = rs.rows[0][0];
360+
await batch.markAllSnapshotDone(globalLsnNotBefore);
358361
await batch.commit(ZERO_LSN);
359362
}
360363
);
@@ -416,7 +419,7 @@ export class PostgresSnapshotter {
416419
tableLsnNotBefore = rs.rows[0][0];
417420
// Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction.
418421
await db.query('COMMIT');
419-
const [resultTable] = await batch.markSnapshotDone([table], tableLsnNotBefore);
422+
const [resultTable] = await batch.markTableSnapshotDone([table], tableLsnNotBefore);
420423
this.relationCache.update(resultTable);
421424
return resultTable;
422425
} catch (e) {

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ export class WalStream {
481481
await this.handleRelation({
482482
batch,
483483
descriptor: getPgOutputRelation(msg),
484-
snapshot: true,
484+
snapshot: false,
485485
referencedTypeIds: referencedColumnTypeIds(msg)
486486
});
487487
} else if (msg.tag == 'begin') {

packages/service-core/src/storage/BucketStorageBatch.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ export interface BucketStorageBatch extends ObserverClient<BucketBatchStorageLis
8383
*/
8484
resumeFromLsn: string | null;
8585

86-
markSnapshotDone(tables: SourceTable[], no_checkpoint_before_lsn: string): Promise<SourceTable[]>;
86+
markTableSnapshotDone(tables: SourceTable[], no_checkpoint_before_lsn?: string): Promise<SourceTable[]>;
87+
markAllSnapshotDone(no_checkpoint_before_lsn: string): Promise<void>;
8788

8889
updateTableProgress(table: SourceTable, progress: Partial<TableSnapshotStatus>): Promise<SourceTable>;
8990

0 commit comments

Comments
 (0)