diff --git a/packages/shuttle/CHANGELOG.md b/packages/shuttle/CHANGELOG.md index 6561bbf29a..bf91d5f199 100644 --- a/packages/shuttle/CHANGELOG.md +++ b/packages/shuttle/CHANGELOG.md @@ -1,5 +1,11 @@ # @farcaster/hub-shuttle +## 0.5.0 + +### Minor Changes + +- Support customization of event batch size and time between flushes + ## 0.4.4 ### Patch Changes diff --git a/packages/shuttle/package.json b/packages/shuttle/package.json index 7273e20061..2e5e04158f 100644 --- a/packages/shuttle/package.json +++ b/packages/shuttle/package.json @@ -1,6 +1,6 @@ { "name": "@farcaster/shuttle", - "version": "0.4.4", + "version": "0.5.0", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/shuttle/src/shuttle/hubSubscriber.ts b/packages/shuttle/src/shuttle/hubSubscriber.ts index 7cb0d87d20..b596d3fdcf 100644 --- a/packages/shuttle/src/shuttle/hubSubscriber.ts +++ b/packages/shuttle/src/shuttle/hubSubscriber.ts @@ -175,8 +175,12 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { private redis: RedisClient; public readonly streamKey: string; public readonly redisKey: string; - private eventsToAdd: HubEvent[]; - private eventBatchSize = 100; + private eventsToAdd: [HubEvent, Buffer][]; + public eventBatchSize = 100; + private eventBatchLastFlushedAt = 0; + public maxTimeBetweenBatchFlushes = 200; // Millis + public maxBatchBytesBeforeForceFlush = 2 ** 20; // 2 MiB + private eventBatchBytes = 0; constructor( label: string, @@ -208,17 +212,29 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { } public override async processHubEvent(event: HubEvent): Promise { - this.eventsToAdd.push(event); - if (this.eventsToAdd.length >= this.eventBatchSize) { - let lastEventId: number | undefined; - for (const evt of this.eventsToAdd) { - await this.eventStream.add(this.streamKey, Buffer.from(HubEvent.encode(evt).finish())); - lastEventId = evt.id; - } - if (lastEventId) { - await this.redis.setLastProcessedEvent(this.redisKey, lastEventId); - } - this.eventsToAdd = []; + const eventBytes = Buffer.from(HubEvent.encode(event).finish()); + this.eventBatchBytes += eventBytes.length; + this.eventsToAdd.push([event, eventBytes]); + if ( + this.eventsToAdd.length >= this.eventBatchSize || + this.eventBatchBytes >= this.maxBatchBytesBeforeForceFlush || + Date.now() - this.eventBatchLastFlushedAt > this.maxTimeBetweenBatchFlushes + ) { + // Empties the current batch + const eventBatch = this.eventsToAdd.splice(0, this.eventsToAdd.length); + + // Copies the removed events to the stream + await this.eventStream.add( + this.streamKey, + eventBatch.map(([_event, eventBytes]) => eventBytes), + ); + + this.eventBatchLastFlushedAt = Date.now(); + + // biome-ignore lint/style/noNonNullAssertion: batch always has at least one event + const [evt, eventBytes] = eventBatch[eventBatch.length - 1]!; + const lastEventId = evt.id; + await this.redis.setLastProcessedEvent(this.redisKey, lastEventId); } return true;