Skip to content

Commit df0d4cb

Browse files
committed
Refactor for snapshotting.
1 parent b7a5d5f commit df0d4cb

File tree

8 files changed

+183
-177
lines changed

8 files changed

+183
-177
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 68 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ export interface MongoBucketBatchOptions {
5050
slotName: string;
5151
lastCheckpointLsn: string | null;
5252
keepaliveOp: InternalOpId | null;
53-
noCheckpointBeforeLsn: string;
5453
resumeFromLsn: string | null;
5554
storeCurrentData: boolean;
5655
/**
@@ -93,8 +92,6 @@ export class MongoBucketBatch
9392
*/
9493
private last_checkpoint_lsn: string | null = null;
9594

96-
private no_checkpoint_before_lsn: string;
97-
9895
private persisted_op: InternalOpId | null = null;
9996

10097
/**
@@ -123,7 +120,6 @@ export class MongoBucketBatch
123120
this.db = options.db;
124121
this.group_id = options.groupId;
125122
this.last_checkpoint_lsn = options.lastCheckpointLsn;
126-
this.no_checkpoint_before_lsn = options.noCheckpointBeforeLsn;
127123
this.resumeFromLsn = options.resumeFromLsn;
128124
this.session = this.client.startSession();
129125
this.slot_name = options.slotName;
@@ -672,36 +668,6 @@ export class MongoBucketBatch
672668
// Cannot create a checkpoint yet - return false
673669
return false;
674670
}
675-
if (lsn < this.no_checkpoint_before_lsn) {
676-
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
677-
this.logger.info(
678-
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
679-
);
680-
this.lastWaitingLogThottled = Date.now();
681-
}
682-
683-
// Edge case: During initial replication, we have a no_checkpoint_before_lsn set,
684-
// and don't actually commit the snapshot.
685-
// The first commit can happen from an implicit keepalive message.
686-
// That needs the persisted_op to get an accurate checkpoint, so
687-
// we persist that in keepalive_op.
688-
689-
await this.db.sync_rules.updateOne(
690-
{
691-
_id: this.group_id
692-
},
693-
{
694-
$set: {
695-
keepalive_op: this.persisted_op == null ? null : String(this.persisted_op)
696-
}
697-
},
698-
{ session: this.session }
699-
);
700-
await this.db.notifyCheckpoint();
701-
702-
// Cannot create a checkpoint yet - return false
703-
return false;
704-
}
705671

706672
if (!createEmptyCheckpoints && this.persisted_op == null) {
707673
// Nothing to commit - also return true
@@ -739,19 +705,63 @@ export class MongoBucketBatch
739705
}
740706
);
741707

742-
await this.db.sync_rules.updateOne(
708+
const updateResult = await this.db.sync_rules.updateOne(
743709
{
744-
_id: this.group_id
745-
},
746-
{
747-
$set: update
710+
_id: this.group_id,
711+
snapshot_done: true,
712+
$or: [{ no_checkpoint_before: null }, { no_checkpoint_before: { $lte: lsn } }]
748713
},
714+
[
715+
{
716+
$set: {
717+
last_checkpoint_lsn: { $literal: lsn },
718+
last_checkpoint_ts: { $literal: now },
719+
last_keepalive_ts: { $literal: now },
720+
last_fatal_error: { $literal: null },
721+
keepalive_op: { $literal: null },
722+
last_checkpoint: {
723+
$max: ['$last_checkpoint', { $literal: this.persisted_op }, { $toLong: '$keepalive_op' }]
724+
}
725+
}
726+
}
727+
],
749728
{ session: this.session }
750729
);
751-
await this.autoActivate(lsn);
752-
await this.db.notifyCheckpoint();
753-
this.persisted_op = null;
754-
this.last_checkpoint_lsn = lsn;
730+
if (updateResult.matchedCount == 0) {
731+
// Failed on snapshot_done or no_checkpoint_before.
732+
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
733+
this.logger.info(
734+
`Waiting until before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
735+
);
736+
this.lastWaitingLogThottled = Date.now();
737+
}
738+
739+
if (this.persisted_op != null) {
740+
await this.db.sync_rules.updateOne(
741+
{
742+
_id: this.group_id
743+
},
744+
[
745+
// KLUDGE: the string format is a pain here, not sure why we ever had it as a string
746+
{
747+
$set: {
748+
keepalive_op: {
749+
$toString: {
750+
$max: [{ $toLong: '$keepalive_op' }, { $literal: this.persisted_op }]
751+
}
752+
}
753+
}
754+
}
755+
],
756+
{ session: this.session }
757+
);
758+
}
759+
} else {
760+
await this.autoActivate(lsn);
761+
await this.db.notifyCheckpoint();
762+
this.persisted_op = null;
763+
this.last_checkpoint_lsn = lsn;
764+
}
755765
return true;
756766
}
757767

@@ -810,65 +820,7 @@ export class MongoBucketBatch
810820
}
811821

812822
async keepalive(lsn: string): Promise<boolean> {
813-
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
814-
// No-op
815-
return false;
816-
}
817-
818-
if (lsn < this.no_checkpoint_before_lsn) {
819-
return false;
820-
}
821-
822-
if (this.persisted_op == null) {
823-
// FIXME: Double-check the logic here, and avoid the lookup
824-
const doc = await this.db.sync_rules.findOne({
825-
_id: this.group_id
826-
});
827-
if (doc?.keepalive_op != null) {
828-
this.persisted_op = BigInt(doc!.keepalive_op);
829-
}
830-
}
831-
832-
if (this.persisted_op != null) {
833-
// The commit may have been skipped due to "no_checkpoint_before_lsn".
834-
// Apply it now if relevant
835-
this.logger.info(`Commit due to keepalive at ${lsn} / ${this.persisted_op}`);
836-
return await this.commit(lsn);
837-
}
838-
839-
await this.db.write_checkpoints.updateMany(
840-
{
841-
processed_at_lsn: null,
842-
'lsns.1': { $lte: lsn }
843-
},
844-
{
845-
$set: {
846-
processed_at_lsn: lsn
847-
}
848-
},
849-
{
850-
session: this.session
851-
}
852-
);
853-
854-
await this.db.sync_rules.updateOne(
855-
{
856-
_id: this.group_id
857-
},
858-
{
859-
$set: {
860-
last_checkpoint_lsn: lsn,
861-
last_fatal_error: null,
862-
last_keepalive_ts: new Date()
863-
}
864-
},
865-
{ session: this.session }
866-
);
867-
await this.autoActivate(lsn);
868-
await this.db.notifyCheckpoint();
869-
this.last_checkpoint_lsn = lsn;
870-
871-
return true;
823+
return await this.commit(lsn);
872824
}
873825

874826
async setResumeLsn(lsn: string): Promise<void> {
@@ -1044,9 +996,6 @@ export class MongoBucketBatch
1044996
}
1045997

1046998
async markAllSnapshotDone(no_checkpoint_before_lsn: string) {
1047-
if (this.no_checkpoint_before_lsn == null || no_checkpoint_before_lsn > this.no_checkpoint_before_lsn) {
1048-
this.no_checkpoint_before_lsn = no_checkpoint_before_lsn;
1049-
}
1050999
await this.db.sync_rules.updateOne(
10511000
{
10521001
_id: this.group_id
@@ -1067,6 +1016,20 @@ export class MongoBucketBatch
10671016
);
10681017
}
10691018

1019+
async markTableSnapshotRequired(table: storage.SourceTable): Promise<void> {
1020+
await this.db.sync_rules.updateOne(
1021+
{
1022+
_id: this.group_id
1023+
},
1024+
{
1025+
$set: {
1026+
snapshot_done: false
1027+
}
1028+
},
1029+
{ session: this.session }
1030+
);
1031+
}
1032+
10701033
async markTableSnapshotDone(tables: storage.SourceTable[], no_checkpoint_before_lsn?: string) {
10711034
const session = this.session;
10721035
const ids = tables.map((table) => table.id);
@@ -1086,9 +1049,6 @@ export class MongoBucketBatch
10861049
);
10871050

10881051
if (no_checkpoint_before_lsn != null) {
1089-
if (this.no_checkpoint_before_lsn == null || no_checkpoint_before_lsn > this.no_checkpoint_before_lsn) {
1090-
this.no_checkpoint_before_lsn = no_checkpoint_before_lsn;
1091-
}
10921052
await this.db.sync_rules.updateOne(
10931053
{
10941054
_id: this.group_id

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import { MongoParameterCompactor } from './MongoParameterCompactor.js';
3939
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
4040
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
4141

42-
4342
export interface MongoSyncBucketStorageOptions {
4443
checksumOptions?: MongoChecksumOptions;
4544
}
@@ -176,7 +175,6 @@ export class MongoSyncBucketStorage
176175
slotName: this.slot_name,
177176
lastCheckpointLsn: checkpoint_lsn,
178177
resumeFromLsn: maxLsn(checkpoint_lsn, doc?.snapshot_lsn),
179-
noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN,
180178
keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null,
181179
storeCurrentData: options.storeCurrentData,
182180
skipExistingRows: options.skipExistingRows ?? false,

modules/module-postgres/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,17 @@
3636
"@powersync/service-sync-rules": "workspace:*",
3737
"@powersync/service-types": "workspace:*",
3838
"jose": "^4.15.1",
39+
"p-defer": "^4.0.1",
3940
"pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87",
4041
"semver": "^7.5.4",
4142
"ts-codec": "^1.3.0",
4243
"uri-js": "^4.4.1",
4344
"uuid": "^11.1.0"
4445
},
4546
"devDependencies": {
47+
"@powersync/lib-service-postgres": "workspace:*",
4648
"@powersync/service-core-tests": "workspace:*",
4749
"@powersync/service-module-mongodb-storage": "workspace:*",
48-
"@powersync/lib-service-postgres": "workspace:*",
4950
"@powersync/service-module-postgres-storage": "workspace:*",
5051
"@types/semver": "^7.5.4"
5152
}

0 commit comments

Comments
 (0)