Skip to content

Commit 221289d

Browse files
simolus3rkistner
andauthored
Move input conversion to source stream (#363)
* Move input conversion to source stream * Smal type improvement * Migrate tests * Add test for replica identity handling. --------- Co-authored-by: Ralf Kistner <[email protected]>
1 parent 92bed80 commit 221289d

File tree

17 files changed

+181
-129
lines changed

17 files changed

+181
-129
lines changed

.changeset/dry-pots-leave.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/service-module-mongodb': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-module-mysql': patch
6+
'@powersync/service-sync-rules': patch
7+
---
8+
9+
Correctly handle custom types in primary keys.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,7 @@ export class MongoBucketBatch
319319
const record = operation.record;
320320
const beforeId = operation.beforeId;
321321
const afterId = operation.afterId;
322-
let sourceAfter = record.after;
323-
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
322+
let after = record.after;
324323
const sourceTable = record.sourceTable;
325324

326325
let existing_buckets: CurrentBucket[] = [];

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ export class ChangeStream {
481481
// Pre-fetch next batch, so that we can read and write concurrently
482482
nextChunkPromise = query.nextChunk();
483483
for (let document of docBatch) {
484-
const record = constructAfterRecord(document);
484+
const record = this.constructAfterRecord(document);
485485

486486
// This auto-flushes when the batch reaches its size limit
487487
await batch.save({
@@ -619,6 +619,11 @@ export class ChangeStream {
619619
return result.table;
620620
}
621621

622+
private constructAfterRecord(document: mongo.Document): SqliteRow {
623+
const inputRow = constructAfterRecord(document);
624+
return this.sync_rules.applyRowContext<never>(inputRow);
625+
}
626+
622627
async writeChange(
623628
batch: storage.BucketStorageBatch,
624629
table: storage.SourceTable,
@@ -631,7 +636,7 @@ export class ChangeStream {
631636

632637
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
633638
if (change.operationType == 'insert') {
634-
const baseRecord = constructAfterRecord(change.fullDocument);
639+
const baseRecord = this.constructAfterRecord(change.fullDocument);
635640
return await batch.save({
636641
tag: SaveOperationTag.INSERT,
637642
sourceTable: table,
@@ -650,7 +655,7 @@ export class ChangeStream {
650655
beforeReplicaId: change.documentKey._id
651656
});
652657
}
653-
const after = constructAfterRecord(change.fullDocument!);
658+
const after = this.constructAfterRecord(change.fullDocument!);
654659
return await batch.save({
655660
tag: SaveOperationTag.UPDATE,
656661
sourceTable: table,

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
22
import { storage } from '@powersync/service-core';
3-
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
3+
import { JsonContainer } from '@powersync/service-jsonbig';
44
import {
55
CompatibilityContext,
66
CustomArray,
77
CustomObject,
88
CustomSqliteValue,
9-
DatabaseInputValue,
109
SqliteInputRow,
1110
SqliteInputValue,
12-
SqliteRow,
13-
SqliteValue,
1411
DateTimeValue
1512
} from '@powersync/service-sync-rules';
1613

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ export class BinLogStream {
329329
throw new ReplicationAssertionError(`No 'fields' event emitted`);
330330
}
331331

332-
const record = common.toSQLiteRow(row, columns!);
332+
const record = this.toSQLiteRow(row, columns!);
333333
await batch.save({
334334
tag: storage.SaveOperationTag.INSERT,
335335
sourceTable: table,
@@ -596,14 +596,19 @@ export class BinLogStream {
596596
return null;
597597
}
598598

599+
private toSQLiteRow(row: Record<string, any>, columns: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
600+
const inputRecord = common.toSQLiteRow(row, columns);
601+
return this.syncRules.applyRowContext<never>(inputRecord);
602+
}
603+
599604
private async writeChange(
600605
batch: storage.BucketStorageBatch,
601606
payload: WriteChangePayload
602607
): Promise<storage.FlushedResult | null> {
603608
switch (payload.type) {
604609
case storage.SaveOperationTag.INSERT:
605610
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
606-
const record = common.toSQLiteRow(payload.row, payload.columns);
611+
const record = this.toSQLiteRow(payload.row, payload.columns);
607612
return await batch.save({
608613
tag: storage.SaveOperationTag.INSERT,
609614
sourceTable: payload.sourceTable,
@@ -617,9 +622,9 @@ export class BinLogStream {
617622
// The previous row may be null if the replica id columns are unchanged.
618623
// It's fine to treat that the same as an insert.
619624
const beforeUpdated = payload.previous_row
620-
? common.toSQLiteRow(payload.previous_row, payload.columns)
625+
? this.toSQLiteRow(payload.previous_row, payload.columns)
621626
: undefined;
622-
const after = common.toSQLiteRow(payload.row, payload.columns);
627+
const after = this.toSQLiteRow(payload.row, payload.columns);
623628

624629
return await batch.save({
625630
tag: storage.SaveOperationTag.UPDATE,
@@ -634,7 +639,7 @@ export class BinLogStream {
634639

635640
case storage.SaveOperationTag.DELETE:
636641
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
637-
const beforeDeleted = common.toSQLiteRow(payload.row, payload.columns);
642+
const beforeDeleted = this.toSQLiteRow(payload.row, payload.columns);
638643

639644
return await batch.save({
640645
tag: storage.SaveOperationTag.DELETE,

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,7 @@ export class PostgresBucketBatch
687687
// We store bytea colums for source keys
688688
const beforeId = operation.beforeId;
689689
const afterId = operation.afterId;
690-
let sourceAfter = record.after;
691-
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
690+
let after = record.after;
692691
const sourceTable = record.sourceTable;
693692

694693
let existingBuckets: CurrentBucket[] = [];

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@ import {
2525
CompatibilityContext,
2626
DatabaseInputRow,
2727
SqliteInputRow,
28+
SqliteInputValue,
29+
SqliteRow,
2830
SqlSyncRules,
2931
TablePattern,
32+
ToastableSqliteRow,
3033
toSyncRulesRow
3134
} from '@powersync/service-sync-rules';
3235

@@ -635,7 +638,8 @@ WHERE oid = $1::regclass`,
635638
hasRemainingData = true;
636639
}
637640

638-
for (const record of WalStream.getQueryData(rows)) {
641+
for (const inputRecord of WalStream.getQueryData(rows)) {
642+
const record = this.syncRulesRecord(inputRecord);
639643
// This auto-flushes when the batch reaches its size limit
640644
await batch.save({
641645
tag: storage.SaveOperationTag.INSERT,
@@ -787,6 +791,20 @@ WHERE oid = $1::regclass`,
787791
return table;
788792
}
789793

794+
private syncRulesRecord(row: SqliteInputRow): SqliteRow;
795+
private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined;
796+
797+
private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined {
798+
if (row == null) {
799+
return undefined;
800+
}
801+
return this.sync_rules.applyRowContext<never>(row);
802+
}
803+
804+
private toastableSyncRulesRecord(row: ToastableSqliteRow<SqliteInputValue>): ToastableSqliteRow {
805+
return this.sync_rules.applyRowContext(row);
806+
}
807+
790808
async writeChange(
791809
batch: storage.BucketStorageBatch,
792810
msg: pgwire.PgoutputMessage
@@ -803,7 +821,7 @@ WHERE oid = $1::regclass`,
803821

804822
if (msg.tag == 'insert') {
805823
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
806-
const baseRecord = this.connections.types.constructAfterRecord(msg);
824+
const baseRecord = this.syncRulesRecord(this.connections.types.constructAfterRecord(msg));
807825
return await batch.save({
808826
tag: storage.SaveOperationTag.INSERT,
809827
sourceTable: table,
@@ -816,8 +834,8 @@ WHERE oid = $1::regclass`,
816834
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
817835
// "before" may be null if the replica id columns are unchanged
818836
// It's fine to treat that the same as an insert.
819-
const before = this.connections.types.constructBeforeRecord(msg);
820-
const after = this.connections.types.constructAfterRecord(msg);
837+
const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg));
838+
const after = this.toastableSyncRulesRecord(this.connections.types.constructAfterRecord(msg));
821839
return await batch.save({
822840
tag: storage.SaveOperationTag.UPDATE,
823841
sourceTable: table,
@@ -828,7 +846,7 @@ WHERE oid = $1::regclass`,
828846
});
829847
} else if (msg.tag == 'delete') {
830848
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
831-
const before = this.connections.types.constructBeforeRecord(msg)!;
849+
const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg)!);
832850

833851
return await batch.save({
834852
tag: storage.SaveOperationTag.DELETE,

modules/module-postgres/test/src/wal_stream.test.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,42 @@ bucket_definitions:
325325
}
326326
});
327327

328+
test('old date format', async () => {
329+
await using context = await WalStreamTestContext.open(factory);
330+
await context.updateSyncRules(BASIC_SYNC_RULES);
331+
332+
const { pool } = context;
333+
await pool.query(`DROP TABLE IF EXISTS test_data`);
334+
await pool.query(`CREATE TABLE test_data(id text primary key, description timestamptz);`);
335+
336+
await context.initializeReplication();
337+
await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', '2025-09-10 15:17:14+02')`);
338+
339+
let data = await context.getBucketData('global[]');
340+
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '2025-09-10 13:17:14Z' })]);
341+
});
342+
343+
test('new date format', async () => {
344+
await using context = await WalStreamTestContext.open(factory);
345+
await context.updateSyncRules(`
346+
streams:
347+
stream:
348+
query: SELECT id, * FROM "test_data"
349+
350+
config:
351+
edition: 2
352+
`);
353+
const { pool } = context;
354+
await pool.query(`DROP TABLE IF EXISTS test_data`);
355+
await pool.query(`CREATE TABLE test_data(id text primary key, description timestamptz);`);
356+
357+
await context.initializeReplication();
358+
await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', '2025-09-10 15:17:14+02')`);
359+
360+
const data = await context.getBucketData('1#stream|0[]');
361+
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '2025-09-10T13:17:14.000000Z' })]);
362+
});
363+
328364
test('custom types', async () => {
329365
await using context = await WalStreamTestContext.open(factory);
330366

@@ -348,4 +384,69 @@ config:
348384
const data = await context.getBucketData('1#stream|0[]');
349385
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '{"foo":1,"bar":2}' })]);
350386
});
387+
388+
test('custom types in primary key', async () => {
389+
await using context = await WalStreamTestContext.open(factory);
390+
391+
await context.updateSyncRules(`
392+
streams:
393+
stream:
394+
query: SELECT id, * FROM "test_data"
395+
396+
config:
397+
edition: 2
398+
`);
399+
400+
const { pool } = context;
401+
await pool.query(`DROP TABLE IF EXISTS test_data`);
402+
await pool.query(`CREATE DOMAIN test_id AS TEXT;`);
403+
await pool.query(`CREATE TABLE test_data(id test_id primary key);`);
404+
405+
await context.initializeReplication();
406+
await pool.query(`INSERT INTO test_data(id) VALUES ('t1')`);
407+
408+
const data = await context.getBucketData('1#stream|0[]');
409+
expect(data).toMatchObject([putOp('test_data', { id: 't1' })]);
410+
});
411+
412+
test('replica identity handling', async () => {
413+
// This specifically test a case of timestamps being used as part of the replica identity.
414+
// There was a regression in versions 1.15.0-1.15.5, which this tests for.
415+
await using context = await WalStreamTestContext.open(factory);
416+
const { pool } = context;
417+
await context.updateSyncRules(BASIC_SYNC_RULES);
418+
419+
await pool.query(`DROP TABLE IF EXISTS test_data`);
420+
await pool.query(`CREATE TABLE test_data(id uuid primary key, description text, ts timestamptz)`);
421+
await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`);
422+
423+
const test_id = `a9798b07-84de-4297-9a8e-aafb4dd0282f`;
424+
425+
await pool.query(
426+
`INSERT INTO test_data(id, description, ts) VALUES('${test_id}', 'test1', '2025-01-01T00:00:00Z') returning id as test_id`
427+
);
428+
429+
await context.replicateSnapshot();
430+
await context.startStreaming();
431+
432+
await pool.query(`UPDATE test_data SET description = 'test2' WHERE id = '${test_id}'`);
433+
434+
const data = await context.getBucketData('global[]');
435+
// For replica identity full, each change changes the id, making it a REMOVE+PUT
436+
expect(data).toMatchObject([
437+
// Initial insert
438+
putOp('test_data', { id: test_id, description: 'test1' }),
439+
// Update
440+
removeOp('test_data', test_id),
441+
putOp('test_data', { id: test_id, description: 'test2' })
442+
]);
443+
444+
// subkey contains `${table id}/${replica identity}`.
445+
// table id changes from run to run, but replica identity should always stay constant.
446+
// This should not change if we make changes to the implementation
447+
// (unless specifically opting in to new behavior)
448+
expect(data[0].subkey).toContain('/c7b3f1a3-ec4d-5d44-b295-c7f2a32bb056');
449+
expect(data[1].subkey).toContain('/c7b3f1a3-ec4d-5d44-b295-c7f2a32bb056');
450+
expect(data[2].subkey).toContain('/984d457a-69f0-559a-a2f9-a511c28b968d');
451+
});
351452
}

0 commit comments

Comments
 (0)