Skip to content

Commit

Permalink
[service-utils] Expose more kafka configs, set max in-flight requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
arcoraven authored Feb 6, 2025
1 parent 4e703fd commit dfd64d2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/quiet-goats-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@thirdweb-dev/service-utils": patch
---

[service-utils] Expose maxInFlightRequests in Kafka producer
25 changes: 17 additions & 8 deletions packages/service-utils/src/node/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ import { compress, decompress } from "lz4js";
import KafkaJS from "kafkajs";
const { CompressionCodecs } = KafkaJS;

/**
* Reference: https://kafka.js.org/docs/producing#producing-messages
*/
export interface KafkaProducerSendOptions {
// Per-message settings.
acks?: number;
timeout?: number;

// Per-producer settings.
retries?: number;
allowAutoTopicCreation?: boolean;
maxInFlightRequests?: number;
}

/**
* Creates a KafkaProducer which opens a persistent TCP connection.
* This class is thread-safe so your service should re-use one instance.
Expand Down Expand Up @@ -98,18 +112,13 @@ export class KafkaProducer {
async send(
topic: string,
messages: Record<string, unknown>[],
/**
* Reference: https://kafka.js.org/docs/producing#producing-messages
*/
options?: {
acks?: number;
timeout?: number;
allowAutoTopicCreation?: boolean;
},
options?: KafkaProducerSendOptions,
): Promise<void> {
if (!this.producer) {
this.producer = this.kafka.producer({
allowAutoTopicCreation: options?.allowAutoTopicCreation ?? false,
maxInFlightRequests: options?.maxInFlightRequests ?? 2000,
retry: { retries: options?.retries ?? 5 },
});
await this.producer.connect();
}
Expand Down
8 changes: 2 additions & 6 deletions packages/service-utils/src/node/usageV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type UsageV2Source,
getTopicName,
} from "../core/usageV2.js";
import { KafkaProducer } from "./kafka.js";
import { KafkaProducer, type KafkaProducerSendOptions } from "./kafka.js";

/**
* Creates a UsageV2Producer which opens a persistent TCP connection.
Expand Down Expand Up @@ -63,11 +63,7 @@ export class UsageV2Producer {
/**
* Reference: https://kafka.js.org/docs/producing#producing-messages
*/
options?: {
acks?: number;
timeout?: number;
allowAutoTopicCreation?: boolean;
},
options?: KafkaProducerSendOptions,
): Promise<void> {
const parsedEvents = events.map((event) => ({
...event,
Expand Down

0 comments on commit dfd64d2

Please sign in to comment.