Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/dry-pots-leave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres': patch
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-module-mysql': patch
'@powersync/service-sync-rules': patch
---

Correctly handle custom types in primary keys.
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ export class MongoBucketBatch
const record = operation.record;
const beforeId = operation.beforeId;
const afterId = operation.afterId;
let sourceAfter = record.after;
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
let after = record.after;
const sourceTable = record.sourceTable;

let existing_buckets: CurrentBucket[] = [];
Expand Down
11 changes: 8 additions & 3 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ export class ChangeStream {
// Pre-fetch next batch, so that we can read and write concurrently
nextChunkPromise = query.nextChunk();
for (let document of docBatch) {
const record = constructAfterRecord(document);
const record = this.constructAfterRecord(document);

// This auto-flushes when the batch reaches its size limit
await batch.save({
Expand Down Expand Up @@ -619,6 +619,11 @@ export class ChangeStream {
return result.table;
}

private constructAfterRecord(document: mongo.Document): SqliteRow {
const inputRow = constructAfterRecord(document);
return this.sync_rules.applyRowContext<never>(inputRow);
}

async writeChange(
batch: storage.BucketStorageBatch,
table: storage.SourceTable,
Expand All @@ -631,7 +636,7 @@ export class ChangeStream {

this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
if (change.operationType == 'insert') {
const baseRecord = constructAfterRecord(change.fullDocument);
const baseRecord = this.constructAfterRecord(change.fullDocument);
return await batch.save({
tag: SaveOperationTag.INSERT,
sourceTable: table,
Expand All @@ -650,7 +655,7 @@ export class ChangeStream {
beforeReplicaId: change.documentKey._id
});
}
const after = constructAfterRecord(change.fullDocument!);
const after = this.constructAfterRecord(change.fullDocument!);
return await batch.save({
tag: SaveOperationTag.UPDATE,
sourceTable: table,
Expand Down
5 changes: 1 addition & 4 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { storage } from '@powersync/service-core';
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
import { JsonContainer } from '@powersync/service-jsonbig';
import {
CompatibilityContext,
CustomArray,
CustomObject,
CustomSqliteValue,
DatabaseInputValue,
SqliteInputRow,
SqliteInputValue,
SqliteRow,
SqliteValue,
DateTimeValue
} from '@powersync/service-sync-rules';

Expand Down
15 changes: 10 additions & 5 deletions modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ export class BinLogStream {
throw new ReplicationAssertionError(`No 'fields' event emitted`);
}

const record = common.toSQLiteRow(row, columns!);
const record = this.toSQLiteRow(row, columns!);
await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
Expand Down Expand Up @@ -596,14 +596,19 @@ export class BinLogStream {
return null;
}

private toSQLiteRow(row: Record<string, any>, columns: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
const inputRecord = common.toSQLiteRow(row, columns);
return this.syncRules.applyRowContext<never>(inputRecord);
}

private async writeChange(
batch: storage.BucketStorageBatch,
payload: WriteChangePayload
): Promise<storage.FlushedResult | null> {
switch (payload.type) {
case storage.SaveOperationTag.INSERT:
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const record = common.toSQLiteRow(payload.row, payload.columns);
const record = this.toSQLiteRow(payload.row, payload.columns);
return await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: payload.sourceTable,
Expand All @@ -617,9 +622,9 @@ export class BinLogStream {
// The previous row may be null if the replica id columns are unchanged.
// It's fine to treat that the same as an insert.
const beforeUpdated = payload.previous_row
? common.toSQLiteRow(payload.previous_row, payload.columns)
? this.toSQLiteRow(payload.previous_row, payload.columns)
: undefined;
const after = common.toSQLiteRow(payload.row, payload.columns);
const after = this.toSQLiteRow(payload.row, payload.columns);

return await batch.save({
tag: storage.SaveOperationTag.UPDATE,
Expand All @@ -634,7 +639,7 @@ export class BinLogStream {

case storage.SaveOperationTag.DELETE:
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const beforeDeleted = common.toSQLiteRow(payload.row, payload.columns);
const beforeDeleted = this.toSQLiteRow(payload.row, payload.columns);

return await batch.save({
tag: storage.SaveOperationTag.DELETE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,7 @@ export class PostgresBucketBatch
// We store bytea colums for source keys
const beforeId = operation.beforeId;
const afterId = operation.afterId;
let sourceAfter = record.after;
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
let after = record.after;
const sourceTable = record.sourceTable;

let existingBuckets: CurrentBucket[] = [];
Expand Down
28 changes: 23 additions & 5 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import {
CompatibilityContext,
DatabaseInputRow,
SqliteInputRow,
SqliteInputValue,
SqliteRow,
SqlSyncRules,
TablePattern,
ToastableSqliteRow,
toSyncRulesRow
} from '@powersync/service-sync-rules';

Expand Down Expand Up @@ -635,7 +638,8 @@ WHERE oid = $1::regclass`,
hasRemainingData = true;
}

for (const record of WalStream.getQueryData(rows)) {
for (const inputRecord of WalStream.getQueryData(rows)) {
const record = this.syncRulesRecord(inputRecord);
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
Expand Down Expand Up @@ -787,6 +791,20 @@ WHERE oid = $1::regclass`,
return table;
}

private syncRulesRecord(row: SqliteInputRow): SqliteRow;
private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined;

private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined {
if (row == null) {
return undefined;
}
return this.sync_rules.applyRowContext<never>(row);
}

private toastableSyncRulesRecord(row: ToastableSqliteRow<SqliteInputValue>): ToastableSqliteRow {
return this.sync_rules.applyRowContext(row);
}

async writeChange(
batch: storage.BucketStorageBatch,
msg: pgwire.PgoutputMessage
Expand All @@ -803,7 +821,7 @@ WHERE oid = $1::regclass`,

if (msg.tag == 'insert') {
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const baseRecord = this.connections.types.constructAfterRecord(msg);
const baseRecord = this.syncRulesRecord(this.connections.types.constructAfterRecord(msg));
return await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
Expand All @@ -816,8 +834,8 @@ WHERE oid = $1::regclass`,
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
// "before" may be null if the replica id columns are unchanged
// It's fine to treat that the same as an insert.
const before = this.connections.types.constructBeforeRecord(msg);
const after = this.connections.types.constructAfterRecord(msg);
const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg));
const after = this.toastableSyncRulesRecord(this.connections.types.constructAfterRecord(msg));
return await batch.save({
tag: storage.SaveOperationTag.UPDATE,
sourceTable: table,
Expand All @@ -828,7 +846,7 @@ WHERE oid = $1::regclass`,
});
} else if (msg.tag == 'delete') {
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const before = this.connections.types.constructBeforeRecord(msg)!;
const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg)!);

return await batch.save({
tag: storage.SaveOperationTag.DELETE,
Expand Down
101 changes: 101 additions & 0 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,42 @@ bucket_definitions:
}
});

test('old date format', async () => {
await using context = await WalStreamTestContext.open(factory);
await context.updateSyncRules(BASIC_SYNC_RULES);

const { pool } = context;
await pool.query(`DROP TABLE IF EXISTS test_data`);
await pool.query(`CREATE TABLE test_data(id text primary key, description timestamptz);`);

await context.initializeReplication();
await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', '2025-09-10 15:17:14+02')`);

let data = await context.getBucketData('global[]');
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '2025-09-10 13:17:14Z' })]);
});

test('new date format', async () => {
await using context = await WalStreamTestContext.open(factory);
await context.updateSyncRules(`
streams:
stream:
query: SELECT id, * FROM "test_data"

config:
edition: 2
`);
const { pool } = context;
await pool.query(`DROP TABLE IF EXISTS test_data`);
await pool.query(`CREATE TABLE test_data(id text primary key, description timestamptz);`);

await context.initializeReplication();
await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', '2025-09-10 15:17:14+02')`);

const data = await context.getBucketData('1#stream|0[]');
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '2025-09-10T13:17:14.000000Z' })]);
});

test('custom types', async () => {
await using context = await WalStreamTestContext.open(factory);

Expand All @@ -348,4 +384,69 @@ config:
const data = await context.getBucketData('1#stream|0[]');
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '{"foo":1,"bar":2}' })]);
});

test('custom types in primary key', async () => {
await using context = await WalStreamTestContext.open(factory);

await context.updateSyncRules(`
streams:
stream:
query: SELECT id, * FROM "test_data"

config:
edition: 2
`);

const { pool } = context;
await pool.query(`DROP TABLE IF EXISTS test_data`);
await pool.query(`CREATE DOMAIN test_id AS TEXT;`);
await pool.query(`CREATE TABLE test_data(id test_id primary key);`);

await context.initializeReplication();
await pool.query(`INSERT INTO test_data(id) VALUES ('t1')`);

const data = await context.getBucketData('1#stream|0[]');
expect(data).toMatchObject([putOp('test_data', { id: 't1' })]);
});

test('replica identity handling', async () => {
// This specifically test a case of timestamps being used as part of the replica identity.
// There was a regression in versions 1.15.0-1.15.5, which this tests for.
await using context = await WalStreamTestContext.open(factory);
const { pool } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

await pool.query(`DROP TABLE IF EXISTS test_data`);
await pool.query(`CREATE TABLE test_data(id uuid primary key, description text, ts timestamptz)`);
await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`);

const test_id = `a9798b07-84de-4297-9a8e-aafb4dd0282f`;

await pool.query(
`INSERT INTO test_data(id, description, ts) VALUES('${test_id}', 'test1', '2025-01-01T00:00:00Z') returning id as test_id`
);

await context.replicateSnapshot();
await context.startStreaming();

await pool.query(`UPDATE test_data SET description = 'test2' WHERE id = '${test_id}'`);

const data = await context.getBucketData('global[]');
// For replica identity full, each change changes the id, making it a REMOVE+PUT
expect(data).toMatchObject([
// Initial insert
putOp('test_data', { id: test_id, description: 'test1' }),
// Update
removeOp('test_data', test_id),
putOp('test_data', { id: test_id, description: 'test2' })
]);

// subkey contains `${table id}/${replica identity}`.
// table id changes from run to run, but replica identity should always stay constant.
// This should not change if we make changes to the implementation
// (unless specifically opting in to new behavior)
expect(data[0].subkey).toContain('/c7b3f1a3-ec4d-5d44-b295-c7f2a32bb056');
expect(data[1].subkey).toContain('/c7b3f1a3-ec4d-5d44-b295-c7f2a32bb056');
expect(data[2].subkey).toContain('/984d457a-69f0-559a-a2f9-a511c28b968d');
});
}
Loading