Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache parameter queries and buckets #200

Merged
merged 30 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
62d40f8
POC of incremental update lookups.
rkistner Feb 12, 2025
5837e14
Cache dynamic bucket lookups.
rkistner Mar 5, 2025
39e256a
Fix sizeCalculation.
rkistner Feb 12, 2025
4e52f86
Optimization: skip checking for bucket_parameters changes if there are
rkistner Feb 12, 2025
f8582c5
Fix tests.
rkistner Feb 13, 2025
c8e63be
Fix import.
rkistner Feb 25, 2025
74e66e3
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 11, 2025
6ebefa9
Improve parameter query filtering.
rkistner Mar 11, 2025
27c7577
Track last_op for each bucket.
rkistner Mar 11, 2025
48846ad
Fix cache size calculation.
rkistner Mar 11, 2025
1f73456
Update bucket counts after compact.
rkistner Mar 11, 2025
357477f
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 11, 2025
8f430db
Fix some tests.
rkistner Mar 12, 2025
dfbdf09
Use an explicit ParameterLookup class for better typing.
rkistner Mar 12, 2025
71309df
Fix sync-rules tests.
rkistner Mar 12, 2025
246c3b6
Fix another test.
rkistner Mar 12, 2025
b61b0ca
Add changeset.
rkistner Mar 12, 2025
95d98ee
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 13, 2025
4dc78b8
Merge remote-tracking branch 'origin/main' into optimize-bucket-looku…
rkistner Mar 17, 2025
bdf9361
Remove op_count from bucket_state.
rkistner Mar 18, 2025
3cdf69d
Fix compact.
rkistner Mar 18, 2025
f28eaef
Cleanup and comments.
rkistner Mar 19, 2025
f1af3a1
Simplify type guard.
rkistner Mar 19, 2025
ec45ae4
Tweaks and tests for hasIntersection.
rkistner Mar 19, 2025
1a76b2c
Use set intersection.
rkistner Mar 19, 2025
66fed0f
Fix handling of checkpoints only containing a write checkpoint update.
rkistner Mar 19, 2025
7a6cff1
Use a Symbol instead of null for INVALIDATE_ALL_BUCKETS.
rkistner Mar 19, 2025
65889dd
Fix typo.
rkistner Mar 19, 2025
8f45ca5
Add tests; fix parameter query lookup issue.
rkistner Mar 19, 2025
80fbe0b
Update snapshots for postgres.
rkistner Mar 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/swift-wolves-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-core': minor
'@powersync/service-sync-rules': minor
---

Cache parameter queries and buckets to reduce incremental sync overhead
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

const INDEX_NAME = 'bucket_updates';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.bucket_state.createIndex(
{
'_id.g': 1,
last_op: 1
},
{ name: INDEX_NAME, unique: true }
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
if (await db.bucket_state.indexExists(INDEX_NAME)) {
await db.bucket_state.dropIndex(INDEX_NAME);
}
} finally {
await db.client.close();
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,12 @@ export class MongoCompactor {
let lastOpId: BucketDataKey | null = null;
let targetOp: bigint | null = null;
let gotAnOp = false;
let numberOfOpsToClear = 0;
for await (let op of query.stream()) {
if (op.op == 'MOVE' || op.op == 'REMOVE' || op.op == 'CLEAR') {
checksum = utils.addChecksums(checksum, op.checksum);
lastOpId = op._id;
numberOfOpsToClear += 1;
if (op.op != 'CLEAR') {
gotAnOp = true;
}
Expand All @@ -337,7 +339,7 @@ export class MongoCompactor {
return;
}

logger.info(`Flushing CLEAR at ${lastOpId?.o}`);
logger.info(`Flushing CLEAR for ${numberOfOpsToClear} ops at ${lastOpId?.o}`);
await this.db.bucket_data.deleteMany(
{
_id: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@ import {
} from '@powersync/lib-services-framework';
import {
BroadcastIterable,
CHECKPOINT_INVALIDATE_ALL,
CheckpointChanges,
GetCheckpointChangesOptions,
InternalOpId,
internalToExternalOpId,
ProtocolOpId,
ReplicationCheckpoint,
SourceTable,
storage,
utils,
WatchWriteCheckpointOptions
WatchWriteCheckpointOptions,
CHECKPOINT_INVALIDATE_ALL,
deserializeParameterLookup
} from '@powersync/service-core';
import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules';
import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules';
import * as bson from 'bson';
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
import { LRUCache } from 'lru-cache';
import * as timers from 'timers/promises';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
import { PowerSyncMongo } from './db.js';
import {
BucketDataDocument,
BucketDataKey,
BucketStateDocument,
SourceKey,
SourceTableDocument,
SyncRuleCheckpointState,
Expand All @@ -39,6 +41,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoCompactor } from './MongoCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
import { JSONBig } from '@powersync/service-jsonbig';

export class MongoSyncBucketStorage
extends BaseObserver<storage.SyncRulesBucketStorageListener>
Expand Down Expand Up @@ -154,7 +157,7 @@ export class MongoSyncBucketStorage

await callback(batch);
await batch.flush();
if (batch.last_flushed_op) {
if (batch.last_flushed_op != null) {
return { flushed_op: batch.last_flushed_op };
} else {
return null;
Expand Down Expand Up @@ -252,7 +255,7 @@ export class MongoSyncBucketStorage
return result!;
}

async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
async getParameterSets(checkpoint: utils.InternalOpId, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
const lookupFilter = lookups.map((lookup) => {
return storage.serializeLookup(lookup);
});
Expand Down Expand Up @@ -585,6 +588,13 @@ export class MongoSyncBucketStorage
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.bucket_state.deleteMany(
{
_id: idPrefixFilter<BucketStateDocument['_id']>({ g: this.group_id }, ['b'])
},
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.source_tables.deleteMany(
{
group_id: this.group_id
Expand Down Expand Up @@ -795,12 +805,7 @@ export class MongoSyncBucketStorage

const updates: CheckpointChanges =
lastCheckpoint == null
? {
invalidateDataBuckets: true,
invalidateParameterBuckets: true,
updatedDataBuckets: [],
updatedParameterBucketDefinitions: []
}
? CHECKPOINT_INVALIDATE_ALL
: await this.getCheckpointChanges({
lastCheckpoint: lastCheckpoint,
nextCheckpoint: checkpoint
Expand Down Expand Up @@ -869,7 +874,105 @@ export class MongoSyncBucketStorage
return pipeline;
}

private async getDataBucketChanges(
options: GetCheckpointChangesOptions
): Promise<Pick<CheckpointChanges, 'updatedDataBuckets' | 'invalidateDataBuckets'>> {
const limit = 1000;
const bucketStateUpdates = await this.db.bucket_state
.find(
{
// We have an index on (_id.g, last_op).
'_id.g': this.group_id,
last_op: { $gt: BigInt(options.lastCheckpoint) }
},
{
projection: {
'_id.b': 1
},
limit: limit + 1,
batchSize: limit + 1,
singleBatch: true
}
)
.toArray();

const buckets = bucketStateUpdates.map((doc) => doc._id.b);
const invalidateDataBuckets = buckets.length > limit;

return {
invalidateDataBuckets: invalidateDataBuckets,
updatedDataBuckets: invalidateDataBuckets ? new Set<string>() : new Set(buckets)
};
}

private async getParameterBucketChanges(
options: GetCheckpointChangesOptions
): Promise<Pick<CheckpointChanges, 'updatedParameterLookups' | 'invalidateParameterBuckets'>> {
const limit = 1000;
const parameterUpdates = await this.db.bucket_parameters
.find(
{
_id: { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) },
'key.g': this.group_id
},
{
projection: {
lookup: 1
},
limit: limit + 1,
batchSize: limit + 1,
singleBatch: true
}
)
.toArray();
const invalidateParameterUpdates = parameterUpdates.length > limit;

return {
invalidateParameterBuckets: invalidateParameterUpdates,
updatedParameterLookups: invalidateParameterUpdates
? new Set<string>()
: new Set<string>(parameterUpdates.map((p) => JSONBig.stringify(deserializeParameterLookup(p.lookup))))
};
}

// If we processed all connections together for each checkpoint, we could do a single lookup for all connections.
// In practice, specific connections may fall behind. So instead, we just cache the results of each specific lookup.
// TODO (later):
// We can optimize this by implementing it like ChecksumCache: We can use partial cache results to do
// more efficient lookups in some cases.
private checkpointChangesCache = new LRUCache<string, CheckpointChanges, { options: GetCheckpointChangesOptions }>({
// Limit to 50 cache entries, or 10MB, whichever comes first.
// Some rough calculations:
// If we process 10 checkpoints per second, and a connection may be 2 seconds behind, we could have
// up to 20 relevant checkpoints. That gives us 20*20 = 400 potentially-relevant cache entries.
// That is a worst-case scenario, so we don't actually store that many. In real life, the cache keys
// would likely be clustered around a few values, rather than spread over all 400 potential values.
max: 50,
maxSize: 10 * 1024 * 1024,
sizeCalculation: (value: CheckpointChanges) => {
// Estimate of memory usage
const paramSize = [...value.updatedParameterLookups].reduce<number>((a, b) => a + b.length, 0);
const bucketSize = [...value.updatedDataBuckets].reduce<number>((a, b) => a + b.length, 0);
return 100 + paramSize + bucketSize;
},
fetchMethod: async (_key, _staleValue, options) => {
return this.getCheckpointChangesInternal(options.context.options);
}
});

async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
return CHECKPOINT_INVALIDATE_ALL;
const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`;
const result = await this.checkpointChangesCache.fetch(key, { context: { options } });
return result!;
}

private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
const dataUpdates = await this.getDataBucketChanges(options);
const parameterUpdates = await this.getParameterBucketChanges(options);

return {
...dataUpdates,
...parameterUpdates
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PowerSyncMongo } from './db.js';
import {
BucketDataDocument,
BucketParameterDocument,
BucketStateDocument,
CurrentBucket,
CurrentDataDocument,
SourceKey
Expand Down Expand Up @@ -48,6 +49,7 @@ export class PersistedBatch {
bucketData: mongo.AnyBulkWriteOperation<BucketDataDocument>[] = [];
bucketParameters: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = [];
currentData: mongo.AnyBulkWriteOperation<CurrentDataDocument>[] = [];
bucketStates: Map<string, BucketStateUpdate> = new Map();

/**
* For debug logging only.
Expand All @@ -66,6 +68,19 @@ export class PersistedBatch {
this.currentSize = writtenSize;
}

private incrementBucket(bucket: string, op_id: InternalOpId) {
let existingState = this.bucketStates.get(bucket);
if (existingState) {
existingState.lastOp = op_id;
existingState.incrementCount += 1;
} else {
this.bucketStates.set(bucket, {
lastOp: op_id,
incrementCount: 1
});
}
}

saveBucketData(options: {
op_seq: MongoIdSequence;
sourceKey: storage.ReplicaId;
Expand Down Expand Up @@ -120,6 +135,7 @@ export class PersistedBatch {
}
}
});
this.incrementBucket(k.bucket, op_id);
}

for (let bd of remaining_buckets.values()) {
Expand Down Expand Up @@ -147,6 +163,7 @@ export class PersistedBatch {
}
});
this.currentSize += 200;
this.incrementBucket(bd.bucket, op_id);
}
}

Expand Down Expand Up @@ -277,6 +294,14 @@ export class PersistedBatch {
});
}

if (this.bucketStates.size > 0) {
await db.bucket_state.bulkWrite(this.getBucketStateUpdates(), {
session,
// Per-bucket operation - order doesn't matter
ordered: false
});
}

const duration = performance.now() - startAt;
logger.info(
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
Expand All @@ -287,7 +312,34 @@ export class PersistedBatch {
this.bucketData = [];
this.bucketParameters = [];
this.currentData = [];
this.bucketStates.clear();
this.currentSize = 0;
this.debugLastOpId = null;
}

private getBucketStateUpdates(): mongo.AnyBulkWriteOperation<BucketStateDocument>[] {
return Array.from(this.bucketStates.entries()).map(([bucket, state]) => {
return {
updateOne: {
filter: {
_id: {
g: this.group_id,
b: bucket
}
},
update: {
$set: {
last_op: state.lastOp
}
},
upsert: true
}
} satisfies mongo.AnyBulkWriteOperation<BucketStateDocument>;
});
}
}

interface BucketStateUpdate {
lastOp: InternalOpId;
incrementCount: number;
}
Loading