-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize write checkpoints lookups #230
Conversation
🦋 Changeset detectedLatest commit: 92a856f The changes in this PR will be included in the next version bump. This PR includes changesets to release 9 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
1d972e7
to
de6c1ac
Compare
batchCreateCustomWriteCheckpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR optimizes write checkpoint lookups by switching from polling to change stream based updates and adjusts related APIs and tests.
- Introduces a new Demultiplexer for managing checkpoint events.
- Updates naming from watchWriteCheckpoint to watchCheckpointChanges and refactors stream methods accordingly.
- Adjusts tests and changeset files to reflect the new behavior.
Reviewed Changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
packages/service-core/src/streams/Demultiplexer.ts | Adds a Demultiplexer class to handle multiplexed streams for checkpoint updates. |
.changeset/short-experts-fetch.md | Updates changeset description with a typo in the message. |
packages/service-core-tests/src/tests/register-data-storage-tests.ts | Adds new tests for managed and custom write checkpoints. |
modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts | Introduces watchUserWriteCheckpoint with change stream based logic. |
modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts | Refactors checkpoint streaming logic and updates error handling. |
modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts | Removes the createCustomWriteCheckpoint method and updates batch creation. |
packages/service-core/src/storage/WriteCheckpointAPI.ts | Updates interface definitions to include watchUserWriteCheckpoint and new WriteCheckpointResult. |
modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts | Refactors checkpoint stream handling methods (renamed and method calls updated). |
packages/service-core/src/sync/sync.ts | Updates import paths and renames watchWriteCheckpoint calls. |
packages/service-core/src/streams/LastValueSink.ts | Renames methods (next -> write, complete -> end) to standardize the API. |
Packages and tests across the codebase | Adjust import paths, method names, and minor error handling adjustments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Thanks for fixing the custom write checkpoint issue in the Postgres storage.
Builds on #200.
This now uses a change stream to get write checkpoint updates, instead of polling. This is the last big remaining optimization required to efficiently scale incremental updates on a large number of concurrent connections.
Write checkpoints are different from bucket and parameter data, since they do not follow any op_id sequence, so we use a different approach from those. Write checkpoint write load is also expected to be much lower than bucket and parameter data, so the additional changestream should not have much overhead.
This shares a single change stream between all connections, similar to sync rules. This cannot use the same change stream as sync rules, since we need
fullDocument: 'updateLookup'
here, and don't want that overhead for sync rules updates. Note that change streams can be multiplexed over the same connection, so this should not increase our overall connection count.This optimization is currently only implemented for MongoDB storage, not for Postgres storage.
This also fixes an issue with custom write checkpoints with Postgres storage (custom write checkpoints failed to persist).