Skip to content

Commit 8f8abce

Browse files
authored
Merge pull request #25 from powersync-ja/fix-full-replication-id
Fix replication issue with REPLICA IDENTITY FULL
2 parents cbf2683 + 0c1ec44 commit 8f8abce

File tree

5 files changed

+244
-8
lines changed

5 files changed

+244
-8
lines changed

.changeset/fluffy-pandas-eat.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-core': patch
3+
'powersync-open-service': patch
4+
---
5+
6+
Fix replication issue with REPLICA IDENTITY FULL (#27).

packages/service-core/src/storage/BucketStorage.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,12 @@ export interface SaveInsert {
367367
export interface SaveUpdate {
368368
tag: 'update';
369369
sourceTable: SourceTable;
370+
371+
/**
372+
* This is only present when the id has changed, and will only contain replica identity columns.
373+
*/
370374
before?: SqliteRow;
375+
371376
/**
372377
* A null value means null column.
373378
*

packages/service-core/src/storage/mongo/MongoBucketBatch.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ export class MongoBucketBatch implements BucketStorageBatch {
187187
}
188188
const currentData = current_data_lookup.get(op.internalBeforeKey) ?? null;
189189
if (currentData != null) {
190+
// If it will be used again later, it will be set again using nextData below
190191
current_data_lookup.delete(op.internalBeforeKey);
191192
}
192193
const nextData = this.saveOperation(persistedBatch!, op, currentData, op_seq);
@@ -242,6 +243,10 @@ export class MongoBucketBatch implements BucketStorageBatch {
242243
// Not an error if we re-apply a transaction
243244
existing_buckets = [];
244245
existing_lookups = [];
246+
// Log to help with debugging if there was a consistency issue
247+
logger.warn(
248+
`Cannot find previous record for update on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
249+
);
245250
} else {
246251
const data = bson.deserialize((result.data as mongo.Binary).buffer, BSON_DESERIALIZE_OPTIONS) as SqliteRow;
247252
existing_buckets = result.buckets;
@@ -254,6 +259,10 @@ export class MongoBucketBatch implements BucketStorageBatch {
254259
// Not an error if we re-apply a transaction
255260
existing_buckets = [];
256261
existing_lookups = [];
262+
// Log to help with debugging if there was a consistency issue
263+
logger.warn(
264+
`Cannot find previous record for delete on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
265+
);
257266
} else {
258267
existing_buckets = result.buckets;
259268
existing_lookups = result.lookups;
@@ -292,7 +301,7 @@ export class MongoBucketBatch implements BucketStorageBatch {
292301
}
293302

294303
// 2. Save bucket data
295-
if (beforeId != null && beforeId != afterId) {
304+
if (beforeId != null && (afterId == null || !beforeId.equals(afterId))) {
296305
// Source ID updated
297306
if (sourceTable.syncData) {
298307
// Delete old record
@@ -422,7 +431,7 @@ export class MongoBucketBatch implements BucketStorageBatch {
422431
};
423432
}
424433

425-
if (beforeId != afterId) {
434+
if (afterId == null || !beforeId.equals(afterId)) {
426435
// Either a delete (afterId == null), or replaced the old replication id
427436
batch.deleteCurrentData(before_key);
428437
}

packages/service-core/test/src/data_storage.test.ts

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,183 @@ bucket_definitions:
897897
]);
898898
});
899899

900+
test('changed data with replica identity full', async () => {
901+
const sync_rules = SqlSyncRules.fromYaml(`
902+
bucket_definitions:
903+
global:
904+
data:
905+
- SELECT id, description FROM "test"
906+
`);
907+
const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' });
908+
909+
const sourceTable = makeTestTable('test', ['id', 'description']);
910+
911+
// Pre-setup
912+
const result1 = await storage.startBatch({}, async (batch) => {
913+
await batch.save({
914+
sourceTable,
915+
tag: 'insert',
916+
after: {
917+
id: 'test1',
918+
description: 'test1a'
919+
}
920+
});
921+
});
922+
923+
const checkpoint1 = result1?.flushed_op ?? '0';
924+
925+
const result2 = await storage.startBatch({}, async (batch) => {
926+
// Unchanged, but has a before id
927+
await batch.save({
928+
sourceTable,
929+
tag: 'update',
930+
before: {
931+
id: 'test1',
932+
description: 'test1a'
933+
},
934+
after: {
935+
id: 'test1',
936+
description: 'test1b'
937+
}
938+
});
939+
});
940+
941+
const result3 = await storage.startBatch({}, async (batch) => {
942+
// Delete
943+
await batch.save({
944+
sourceTable,
945+
tag: 'delete',
946+
before: {
947+
id: 'test1',
948+
description: 'test1b'
949+
},
950+
after: undefined
951+
});
952+
});
953+
954+
const checkpoint3 = result3!.flushed_op;
955+
956+
const batch = await fromAsync(storage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])));
957+
const data = batch[0].data.map((d) => {
958+
return {
959+
op: d.op,
960+
object_id: d.object_id,
961+
data: d.data,
962+
subkey: d.subkey
963+
};
964+
});
965+
966+
// Operations must be in this order
967+
expect(data).toEqual([
968+
// 2
969+
// The REMOVE is expected because the subkey changes
970+
{
971+
op: 'REMOVE',
972+
object_id: 'test1',
973+
data: null,
974+
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
975+
},
976+
{
977+
op: 'PUT',
978+
object_id: 'test1',
979+
data: JSON.stringify({ id: 'test1', description: 'test1b' }),
980+
subkey: '6544e3899293153fa7b38331/500e9b68-a2fd-51ff-9c00-313e2fb9f562'
981+
},
982+
// 3
983+
{
984+
op: 'REMOVE',
985+
object_id: 'test1',
986+
data: null,
987+
subkey: '6544e3899293153fa7b38331/500e9b68-a2fd-51ff-9c00-313e2fb9f562'
988+
}
989+
]);
990+
});
991+
992+
test('unchanged data with replica identity full', async () => {
993+
const sync_rules = SqlSyncRules.fromYaml(`
994+
bucket_definitions:
995+
global:
996+
data:
997+
- SELECT id, description FROM "test"
998+
`);
999+
const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' });
1000+
1001+
const sourceTable = makeTestTable('test', ['id', 'description']);
1002+
1003+
// Pre-setup
1004+
const result1 = await storage.startBatch({}, async (batch) => {
1005+
await batch.save({
1006+
sourceTable,
1007+
tag: 'insert',
1008+
after: {
1009+
id: 'test1',
1010+
description: 'test1a'
1011+
}
1012+
});
1013+
});
1014+
1015+
const checkpoint1 = result1?.flushed_op ?? '0';
1016+
1017+
const result2 = await storage.startBatch({}, async (batch) => {
1018+
// Unchanged, but has a before id
1019+
await batch.save({
1020+
sourceTable,
1021+
tag: 'update',
1022+
before: {
1023+
id: 'test1',
1024+
description: 'test1a'
1025+
},
1026+
after: {
1027+
id: 'test1',
1028+
description: 'test1a'
1029+
}
1030+
});
1031+
});
1032+
1033+
const result3 = await storage.startBatch({}, async (batch) => {
1034+
// Delete
1035+
await batch.save({
1036+
sourceTable,
1037+
tag: 'delete',
1038+
before: {
1039+
id: 'test1',
1040+
description: 'test1a'
1041+
},
1042+
after: undefined
1043+
});
1044+
});
1045+
1046+
const checkpoint3 = result3!.flushed_op;
1047+
1048+
const batch = await fromAsync(storage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])));
1049+
const data = batch[0].data.map((d) => {
1050+
return {
1051+
op: d.op,
1052+
object_id: d.object_id,
1053+
data: d.data,
1054+
subkey: d.subkey
1055+
};
1056+
});
1057+
1058+
// Operations must be in this order
1059+
expect(data).toEqual([
1060+
// 2
1061+
{
1062+
op: 'PUT',
1063+
object_id: 'test1',
1064+
data: JSON.stringify({ id: 'test1', description: 'test1a' }),
1065+
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
1066+
},
1067+
// 3
1068+
{
1069+
op: 'REMOVE',
1070+
object_id: 'test1',
1071+
data: null,
1072+
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
1073+
}
1074+
]);
1075+
});
1076+
9001077
test('large batch', async () => {
9011078
// Test syncing a batch of data that is small in count,
9021079
// but large enough in size to be split over multiple returned batches.

packages/service-core/test/src/slow_tests.test.ts

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ function defineSlowTests(factory: StorageFactory) {
6262
bucket_definitions:
6363
global:
6464
data:
65-
- SELECT id, description FROM "test_data"
65+
- SELECT * FROM "test_data"
6666
`;
6767
const syncRules = await f.updateSyncRules({ content: syncRuleContent });
6868
const storage = f.getInstance(syncRules.parsed());
@@ -76,7 +76,10 @@ bucket_definitions:
7676
walStream = new WalStream(options);
7777

7878
await pool.query(`DROP TABLE IF EXISTS test_data`);
79-
await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`);
79+
await pool.query(
80+
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num decimal)`
81+
);
82+
await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`);
8083

8184
await walStream.initReplication(replicationConnection);
8285
await storage.autoActivate();
@@ -88,14 +91,17 @@ bucket_definitions:
8891

8992
while (!abort && Date.now() - start < TEST_DURATION_MS) {
9093
const bg = async () => {
91-
for (let j = 0; j < 5 && !abort; j++) {
92-
const n = Math.floor(Math.random() * 50);
94+
for (let j = 0; j < 1 && !abort; j++) {
95+
const n = 1;
9396
let statements: pgwire.Statement[] = [];
9497
for (let i = 0; i < n; i++) {
9598
const description = `test${i}`;
9699
statements.push({
97-
statement: `INSERT INTO test_data(description) VALUES($1) returning id as test_id`,
98-
params: [{ type: 'varchar', value: description }]
100+
statement: `INSERT INTO test_data(description, num) VALUES($1, $2) returning id as test_id`,
101+
params: [
102+
{ type: 'varchar', value: description },
103+
{ type: 'float8', value: Math.random() }
104+
]
99105
});
100106
}
101107
const results = await pool.query(...statements);
@@ -104,6 +110,24 @@ bucket_definitions:
104110
});
105111
await new Promise((resolve) => setTimeout(resolve, Math.random() * 30));
106112

113+
if (Math.random() > 0.5) {
114+
const updateStatements: pgwire.Statement[] = ids.map((id) => {
115+
return {
116+
statement: `UPDATE test_data SET num = $2 WHERE id = $1`,
117+
params: [
118+
{ type: 'uuid', value: id },
119+
{ type: 'float8', value: Math.random() }
120+
]
121+
};
122+
});
123+
124+
await pool.query(...updateStatements);
125+
if (Math.random() > 0.5) {
126+
// Special case - an update that doesn't change data
127+
await pool.query(...updateStatements);
128+
}
129+
}
130+
107131
const deleteStatements: pgwire.Statement[] = ids.map((id) => {
108132
return {
109133
statement: `DELETE FROM test_data WHERE id = $1`,
@@ -129,6 +153,21 @@ bucket_definitions:
129153
return bson.deserialize((doc.data as mongo.Binary).buffer) as SqliteRow;
130154
});
131155
expect(transformed).toEqual([]);
156+
157+
// Check that each PUT has a REMOVE
158+
const ops = await f.db.bucket_data.find().sort({ _id: 1 }).toArray();
159+
let active = new Set<string>();
160+
for (let op of ops) {
161+
const key = op.source_key.toHexString();
162+
if (op.op == 'PUT') {
163+
active.add(key);
164+
} else if (op.op == 'REMOVE') {
165+
active.delete(key);
166+
}
167+
}
168+
if (active.size > 0) {
169+
throw new Error(`${active.size} rows not removed`);
170+
}
132171
}
133172

134173
abortController.abort();

0 commit comments

Comments
 (0)