Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: powersync-ja/powersync-service
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 1d972e77ebd100e8682af7baa7f6915263d4e83a
Choose a base ref
..
head repository: powersync-ja/powersync-service
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: de6c1ac7621e8d28f659614af99dc6f12772ed26
Choose a head ref
Original file line number Diff line number Diff line change
@@ -364,22 +364,6 @@ export class MongoCompactor {
},
{ session }
);

// Note: This does not update anything if there is no existing state
await this.db.bucket_state.updateOne(
{
_id: {
g: this.group_id,
b: bucket
}
},
{
$inc: {
op_count: 1 - numberOfOpsToClear
}
},
{ session }
);
},
{
writeConcern: { w: 'majority' },
Original file line number Diff line number Diff line change
@@ -910,6 +910,7 @@ export class MongoSyncBucketStorage
private async getDataBucketChanges(
options: GetCheckpointChangesOptions
): Promise<Pick<CheckpointChanges, 'updatedDataBuckets' | 'invalidateDataBuckets'>> {
const limit = 1000;
const bucketStateUpdates = await this.db.bucket_state
.find(
{
@@ -921,43 +922,43 @@ export class MongoSyncBucketStorage
projection: {
'_id.b': 1
},
limit: 1001,
batchSize: 1001,
limit: limit + 1,
batchSize: limit + 1,
singleBatch: true
}
)
.toArray();

const buckets = bucketStateUpdates.map((doc) => doc._id.b);
const invalidateDataBuckets = buckets.length > 1000;
const invalidateDataBuckets = buckets.length > limit;

return {
invalidateDataBuckets: invalidateDataBuckets,
updatedDataBuckets: invalidateDataBuckets ? [] : buckets
updatedDataBuckets: invalidateDataBuckets ? new Set<string>() : new Set(buckets)
};
}

private async getParameterBucketChanges(
options: GetCheckpointChangesOptions
): Promise<Pick<CheckpointChanges, 'updatedParameterLookups' | 'invalidateParameterBuckets'>> {
// TODO: limit max query running time
const limit = 1000;
const parameterUpdates = await this.db.bucket_parameters
.find(
{
_id: { $gt: BigInt(options.lastCheckpoint), $lt: BigInt(options.nextCheckpoint) },
_id: { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) },
'key.g': this.group_id
},
{
projection: {
lookup: 1
},
limit: 1001,
batchSize: 1001,
limit: limit + 1,
batchSize: limit + 1,
singleBatch: true
}
)
.toArray();
const invalidateParameterUpdates = parameterUpdates.length > 1000;
const invalidateParameterUpdates = parameterUpdates.length > limit;

return {
invalidateParameterBuckets: invalidateParameterUpdates,
@@ -967,13 +968,22 @@ export class MongoSyncBucketStorage
};
}

// TODO:
// If we processed all connections together for each checkpoint, we could do a single lookup for all connections.
// In practice, specific connections may fall behind. So instead, we just cache the results of each specific lookup.
// TODO (later):
// We can optimize this by implementing it like ChecksumCache: We can use partial cache results to do
// more efficient lookups in some cases.
private checkpointChangesCache = new LRUCache<string, CheckpointChanges, { options: GetCheckpointChangesOptions }>({
// Limit to 50 cache entries, or 10MB, whichever comes first.
// Some rough calculations:
// If we process 10 checkpoints per second, and a connection may be 2 seconds behind, we could have
// up to 20 relevant checkpoints. That gives us 20*20 = 400 potentially-relevant cache entries.
// That is a worst-case scenario, so we don't actually store that many. In real life, the cache keys
// would likely be clustered around a few values, rather than spread over all 400 potential values.
max: 50,
maxSize: 10 * 1024 * 1024,
sizeCalculation: (value: CheckpointChanges) => {
// Estimate of memory usage
const paramSize = [...value.updatedParameterLookups].reduce<number>((a, b) => a + b.length, 0);
const bucketSize = [...value.updatedDataBuckets].reduce<number>((a, b) => a + b.length, 0);
return 100 + paramSize + bucketSize;
@@ -983,31 +993,7 @@ export class MongoSyncBucketStorage
}
});

private _hasDynamicBucketsCached: boolean | undefined = undefined;

private hasDynamicBucketQueries(): boolean {
if (this._hasDynamicBucketsCached != null) {
return this._hasDynamicBucketsCached;
}
const syncRules = this.getParsedSyncRules({
defaultSchema: 'default' // n/a
});
const hasDynamicBuckets = syncRules.hasDynamicBucketQueries();
this._hasDynamicBucketsCached = hasDynamicBuckets;
return hasDynamicBuckets;
}

async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
if (!this.hasDynamicBucketQueries()) {
// Special case when we have no dynamic parameter queries.
// In this case, we can avoid doing any queries.
return {
invalidateDataBuckets: true,
updatedDataBuckets: [],
invalidateParameterBuckets: false,
updatedParameterLookups: new Set<string>()
};
}
const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`;
const result = await this.checkpointChangesCache.fetch(key, { context: { options } });
return result!;
Original file line number Diff line number Diff line change
@@ -330,9 +330,6 @@ export class PersistedBatch {
update: {
$set: {
last_op: state.lastOp
},
$inc: {
op_count: state.incrementCount
}
},
upsert: true
Original file line number Diff line number Diff line change
@@ -75,13 +75,22 @@ export interface SourceTableDocument {
snapshot_done: boolean | undefined;
}

/**
* Record the state of each bucket.
*
* Right now, this is just used to track when buckets are updated, for efficient incremental sync.
* In the future, this could be used to track operation counts, both for diagnostic purposes, and for
* determining when a compact and/or defragment could be beneficial.
*
* Note: There is currently no migration to populate this collection from existing data - it is only
* populated by new updates.
*/
export interface BucketStateDocument {
_id: {
g: number;
b: string;
};
last_op: bigint;
op_count: number;
}

export interface IdSequenceDocument {
Original file line number Diff line number Diff line change
@@ -357,6 +357,74 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = `
]
`;

exports[`sync - mongodb > sync updates to data query only 1`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "by_user["user1"]",
"checksum": 0,
"count": 0,
"priority": 3,
},
],
"last_op_id": "1",
"write_checkpoint": undefined,
},
},
{
"checkpoint_complete": {
"last_op_id": "1",
},
},
]
`;

exports[`sync - mongodb > sync updates to data query only 2`] = `
[
{
"checkpoint_diff": {
"last_op_id": "2",
"removed_buckets": [],
"updated_buckets": [
{
"bucket": "by_user["user1"]",
"checksum": 1418351250,
"count": 1,
"priority": 3,
},
],
"write_checkpoint": undefined,
},
},
{
"data": {
"after": "0",
"bucket": "by_user["user1"]",
"data": [
{
"checksum": 1418351250n,
"data": "{"id":"list1","user_id":"user1","name":"User 1"}",
"object_id": "list1",
"object_type": "lists",
"op": "PUT",
"op_id": "2",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
},
],
"has_more": false,
"next_after": "2",
},
},
{
"checkpoint_complete": {
"last_op_id": "2",
},
},
]
`;

exports[`sync - mongodb > sync updates to global data 1`] = `
[
{
@@ -468,3 +536,106 @@ exports[`sync - mongodb > sync updates to global data 3`] = `
},
]
`;

exports[`sync - mongodb > sync updates to parameter query + data 1`] = `
[
{
"checkpoint": {
"buckets": [],
"last_op_id": "0",
"write_checkpoint": undefined,
},
},
{
"checkpoint_complete": {
"last_op_id": "0",
},
},
]
`;

exports[`sync - mongodb > sync updates to parameter query + data 2`] = `
[
{
"checkpoint_diff": {
"last_op_id": "2",
"removed_buckets": [],
"updated_buckets": [
{
"bucket": "by_user["user1"]",
"checksum": 1418351250,
"count": 1,
"priority": 3,
},
],
"write_checkpoint": undefined,
},
},
{
"data": {
"after": "0",
"bucket": "by_user["user1"]",
"data": [
{
"checksum": 1418351250n,
"data": "{"id":"list1","user_id":"user1","name":"User 1"}",
"object_id": "list1",
"object_type": "lists",
"op": "PUT",
"op_id": "1",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
},
],
"has_more": false,
"next_after": "1",
},
},
{
"checkpoint_complete": {
"last_op_id": "2",
},
},
]
`;

exports[`sync - mongodb > sync updates to parameter query only 1`] = `
[
{
"checkpoint": {
"buckets": [],
"last_op_id": "0",
"write_checkpoint": undefined,
},
},
{
"checkpoint_complete": {
"last_op_id": "0",
},
},
]
`;

exports[`sync - mongodb > sync updates to parameter query only 2`] = `
[
{
"checkpoint_diff": {
"last_op_id": "1",
"removed_buckets": [],
"updated_buckets": [
{
"bucket": "by_user["user1"]",
"checksum": 0,
"count": 0,
"priority": 3,
},
],
"write_checkpoint": undefined,
},
},
{
"checkpoint_complete": {
"last_op_id": "1",
},
},
]
`;
Loading