Skip to content

Commit

Permalink
feat(shuttle): Allow changing subscriber batch size or flush interval (
Browse files Browse the repository at this point in the history
…farcasterxyz#2147)

## Why is this change needed?

The way we were writing events to the stream was resulting in a lot more
separate requests to Redis since we weren't actually batching them
together.

Change this so we leverage the multi-argument version of `XADD` so that
throughput can be increased at higher volumes. Accompanying this are the
introduction of a few more configuration options that allow us to tweak
the throughput of the `HubSubscriber`.

While here, do a minor version bump since we're slightly changing how
events are written to the stream.

## Merge Checklist

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [x] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR updates the `@farcaster/shuttle` package to version 0.5.0,
introducing customization for event batch size and time between flushes.

### Detailed summary
- Updated package version to 0.5.0
- Added customization for event batch size and time between flushes
- Modified event handling logic in `EventStreamHubSubscriber`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
sds authored Jul 10, 2024
1 parent 17a81b7 commit 1320371
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
6 changes: 6 additions & 0 deletions packages/shuttle/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @farcaster/hub-shuttle

## 0.5.0

### Minor Changes

- Support customization of event batch size and time between flushes

## 0.4.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/shuttle/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@farcaster/shuttle",
"version": "0.4.4",
"version": "0.5.0",
"main": "./dist/index.js",
"module": "./dist/index.mjs",
"types": "./dist/index.d.ts",
Expand Down
42 changes: 29 additions & 13 deletions packages/shuttle/src/shuttle/hubSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,12 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
private redis: RedisClient;
public readonly streamKey: string;
public readonly redisKey: string;
private eventsToAdd: HubEvent[];
private eventBatchSize = 100;
private eventsToAdd: [HubEvent, Buffer][];
public eventBatchSize = 100;
private eventBatchLastFlushedAt = 0;
public maxTimeBetweenBatchFlushes = 200; // Millis
public maxBatchBytesBeforeForceFlush = 2 ** 20; // 2 MiB
private eventBatchBytes = 0;

constructor(
label: string,
Expand Down Expand Up @@ -208,17 +212,29 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
}

public override async processHubEvent(event: HubEvent): Promise<boolean> {
this.eventsToAdd.push(event);
if (this.eventsToAdd.length >= this.eventBatchSize) {
let lastEventId: number | undefined;
for (const evt of this.eventsToAdd) {
await this.eventStream.add(this.streamKey, Buffer.from(HubEvent.encode(evt).finish()));
lastEventId = evt.id;
}
if (lastEventId) {
await this.redis.setLastProcessedEvent(this.redisKey, lastEventId);
}
this.eventsToAdd = [];
const eventBytes = Buffer.from(HubEvent.encode(event).finish());
this.eventBatchBytes += eventBytes.length;
this.eventsToAdd.push([event, eventBytes]);
if (
this.eventsToAdd.length >= this.eventBatchSize ||
this.eventBatchBytes >= this.maxBatchBytesBeforeForceFlush ||
Date.now() - this.eventBatchLastFlushedAt > this.maxTimeBetweenBatchFlushes
) {
// Empties the current batch
const eventBatch = this.eventsToAdd.splice(0, this.eventsToAdd.length);

// Copies the removed events to the stream
await this.eventStream.add(
this.streamKey,
eventBatch.map(([_event, eventBytes]) => eventBytes),
);

this.eventBatchLastFlushedAt = Date.now();

// biome-ignore lint/style/noNonNullAssertion: batch always has at least one event
const [evt, eventBytes] = eventBatch[eventBatch.length - 1]!;
const lastEventId = evt.id;
await this.redis.setLastProcessedEvent(this.redisKey, lastEventId);
}

return true;
Expand Down

0 comments on commit 1320371

Please sign in to comment.