From 1320371c70008d347d7d50fe01c2ffcb1dc9b999 Mon Sep 17 00:00:00 2001 From: Shane da Silva Date: Wed, 10 Jul 2024 00:34:38 -0700 Subject: [PATCH] feat(shuttle): Allow changing subscriber batch size or flush interval (#2147) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why is this change needed? The way we were writing events to the stream was resulting in a lot more separate requests to Redis since we weren't actually batching them together. Change this so we leverage the multi-argument version of `XADD` so that throughput can be increased at higher volumes. Accompanying this are the introduction of a few more configuration options that allow us to tweak the throughput of the `HubSubscriber`. While here, do a minor version bump since we're slightly changing how events are written to the stream. ## Merge Checklist - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [x] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. --- ## PR-Codex overview This PR updates the `@farcaster/shuttle` package to version 0.5.0, introducing customization for event batch size and time between flushes. ### Detailed summary - Updated package version to 0.5.0 - Added customization for event batch size and time between flushes - Modified event handling logic in `EventStreamHubSubscriber` > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` --- packages/shuttle/CHANGELOG.md | 6 +++ packages/shuttle/package.json | 2 +- packages/shuttle/src/shuttle/hubSubscriber.ts | 42 +++++++++++++------ 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/packages/shuttle/CHANGELOG.md b/packages/shuttle/CHANGELOG.md index 6561bbf29a..bf91d5f199 100644 --- a/packages/shuttle/CHANGELOG.md +++ b/packages/shuttle/CHANGELOG.md @@ -1,5 +1,11 @@ # @farcaster/hub-shuttle +## 0.5.0 + +### Minor Changes + +- Support customization of event batch size and time between flushes + ## 0.4.4 ### Patch Changes diff --git a/packages/shuttle/package.json b/packages/shuttle/package.json index 7273e20061..2e5e04158f 100644 --- a/packages/shuttle/package.json +++ b/packages/shuttle/package.json @@ -1,6 +1,6 @@ { "name": "@farcaster/shuttle", - "version": "0.4.4", + "version": "0.5.0", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/shuttle/src/shuttle/hubSubscriber.ts b/packages/shuttle/src/shuttle/hubSubscriber.ts index 7cb0d87d20..b596d3fdcf 100644 --- a/packages/shuttle/src/shuttle/hubSubscriber.ts +++ b/packages/shuttle/src/shuttle/hubSubscriber.ts @@ -175,8 +175,12 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { private redis: RedisClient; public readonly streamKey: string; public readonly redisKey: string; - private eventsToAdd: HubEvent[]; - private eventBatchSize = 100; + private eventsToAdd: [HubEvent, Buffer][]; + public eventBatchSize = 100; + private eventBatchLastFlushedAt = 0; + public maxTimeBetweenBatchFlushes = 200; // Millis + public maxBatchBytesBeforeForceFlush = 2 ** 20; // 2 MiB + private eventBatchBytes = 0; constructor( label: string, @@ -208,17 +212,29 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { } public override async processHubEvent(event: HubEvent): Promise { - this.eventsToAdd.push(event); - if (this.eventsToAdd.length >= this.eventBatchSize) { - let lastEventId: number | undefined; - for (const evt of this.eventsToAdd) { - await this.eventStream.add(this.streamKey, Buffer.from(HubEvent.encode(evt).finish())); - lastEventId = evt.id; - } - if (lastEventId) { - await this.redis.setLastProcessedEvent(this.redisKey, lastEventId); - } - this.eventsToAdd = []; + const eventBytes = Buffer.from(HubEvent.encode(event).finish()); + this.eventBatchBytes += eventBytes.length; + this.eventsToAdd.push([event, eventBytes]); + if ( + this.eventsToAdd.length >= this.eventBatchSize || + this.eventBatchBytes >= this.maxBatchBytesBeforeForceFlush || + Date.now() - this.eventBatchLastFlushedAt > this.maxTimeBetweenBatchFlushes + ) { + // Empties the current batch + const eventBatch = this.eventsToAdd.splice(0, this.eventsToAdd.length); + + // Copies the removed events to the stream + await this.eventStream.add( + this.streamKey, + eventBatch.map(([_event, eventBytes]) => eventBytes), + ); + + this.eventBatchLastFlushedAt = Date.now(); + + // biome-ignore lint/style/noNonNullAssertion: batch always has at least one event + const [evt, eventBytes] = eventBatch[eventBatch.length - 1]!; + const lastEventId = evt.id; + await this.redis.setLastProcessedEvent(this.redisKey, lastEventId); } return true;