Skip to content

Commit ce6bc8d

Browse files
committed
Add direct tests for the replica identity full issue.
1 parent 8ebaa08 commit ce6bc8d

File tree

1 file changed

+177
-0
lines changed

1 file changed

+177
-0
lines changed

packages/service-core/test/src/data_storage.test.ts

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,183 @@ bucket_definitions:
897897
]);
898898
});
899899

900+
test('changed data with replica identity full', async () => {
901+
const sync_rules = SqlSyncRules.fromYaml(`
902+
bucket_definitions:
903+
global:
904+
data:
905+
- SELECT id, description FROM "test"
906+
`);
907+
const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' });
908+
909+
const sourceTable = makeTestTable('test', ['id', 'description']);
910+
911+
// Pre-setup
912+
const result1 = await storage.startBatch({}, async (batch) => {
913+
await batch.save({
914+
sourceTable,
915+
tag: 'insert',
916+
after: {
917+
id: 'test1',
918+
description: 'test1a'
919+
}
920+
});
921+
});
922+
923+
const checkpoint1 = result1?.flushed_op ?? '0';
924+
925+
const result2 = await storage.startBatch({}, async (batch) => {
926+
// Unchanged, but has a before id
927+
await batch.save({
928+
sourceTable,
929+
tag: 'update',
930+
before: {
931+
id: 'test1',
932+
description: 'test1a'
933+
},
934+
after: {
935+
id: 'test1',
936+
description: 'test1b'
937+
}
938+
});
939+
});
940+
941+
const result3 = await storage.startBatch({}, async (batch) => {
942+
// Delete
943+
await batch.save({
944+
sourceTable,
945+
tag: 'delete',
946+
before: {
947+
id: 'test1',
948+
description: 'test1b'
949+
},
950+
after: undefined
951+
});
952+
});
953+
954+
const checkpoint3 = result3!.flushed_op;
955+
956+
const batch = await fromAsync(storage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])));
957+
const data = batch[0].data.map((d) => {
958+
return {
959+
op: d.op,
960+
object_id: d.object_id,
961+
data: d.data,
962+
subkey: d.subkey
963+
};
964+
});
965+
966+
// Operations must be in this order
967+
expect(data).toEqual([
968+
// 2
969+
// The REMOVE is expected because the subkey changes
970+
{
971+
op: 'REMOVE',
972+
object_id: 'test1',
973+
data: null,
974+
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
975+
},
976+
{
977+
op: 'PUT',
978+
object_id: 'test1',
979+
data: JSON.stringify({ id: 'test1', description: 'test1b' }),
980+
subkey: '6544e3899293153fa7b38331/500e9b68-a2fd-51ff-9c00-313e2fb9f562'
981+
},
982+
// 3
983+
{
984+
op: 'REMOVE',
985+
object_id: 'test1',
986+
data: null,
987+
subkey: '6544e3899293153fa7b38331/500e9b68-a2fd-51ff-9c00-313e2fb9f562'
988+
}
989+
]);
990+
});
991+
992+
test('unchanged data with replica identity full', async () => {
993+
const sync_rules = SqlSyncRules.fromYaml(`
994+
bucket_definitions:
995+
global:
996+
data:
997+
- SELECT id, description FROM "test"
998+
`);
999+
const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' });
1000+
1001+
const sourceTable = makeTestTable('test', ['id', 'description']);
1002+
1003+
// Pre-setup
1004+
const result1 = await storage.startBatch({}, async (batch) => {
1005+
await batch.save({
1006+
sourceTable,
1007+
tag: 'insert',
1008+
after: {
1009+
id: 'test1',
1010+
description: 'test1a'
1011+
}
1012+
});
1013+
});
1014+
1015+
const checkpoint1 = result1?.flushed_op ?? '0';
1016+
1017+
const result2 = await storage.startBatch({}, async (batch) => {
1018+
// Unchanged, but has a before id
1019+
await batch.save({
1020+
sourceTable,
1021+
tag: 'update',
1022+
before: {
1023+
id: 'test1',
1024+
description: 'test1a'
1025+
},
1026+
after: {
1027+
id: 'test1',
1028+
description: 'test1a'
1029+
}
1030+
});
1031+
});
1032+
1033+
const result3 = await storage.startBatch({}, async (batch) => {
1034+
// Delete
1035+
await batch.save({
1036+
sourceTable,
1037+
tag: 'delete',
1038+
before: {
1039+
id: 'test1',
1040+
description: 'test1a'
1041+
},
1042+
after: undefined
1043+
});
1044+
});
1045+
1046+
const checkpoint3 = result3!.flushed_op;
1047+
1048+
const batch = await fromAsync(storage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])));
1049+
const data = batch[0].data.map((d) => {
1050+
return {
1051+
op: d.op,
1052+
object_id: d.object_id,
1053+
data: d.data,
1054+
subkey: d.subkey
1055+
};
1056+
});
1057+
1058+
// Operations must be in this order
1059+
expect(data).toEqual([
1060+
// 2
1061+
{
1062+
op: 'PUT',
1063+
object_id: 'test1',
1064+
data: JSON.stringify({ id: 'test1', description: 'test1a' }),
1065+
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
1066+
},
1067+
// 3
1068+
{
1069+
op: 'REMOVE',
1070+
object_id: 'test1',
1071+
data: null,
1072+
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
1073+
}
1074+
]);
1075+
});
1076+
9001077
test('large batch', async () => {
9011078
// Test syncing a batch of data that is small in count,
9021079
// but large enough in size to be split over multiple returned batches.

0 commit comments

Comments
 (0)