From 4f3a8207c3e3f46f29814ae1488dfd014c92bf90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 12:04:26 +0000 Subject: [PATCH 01/13] flush takes batch size into consideration when calling callback --- .../app/v3/dynamicFlushScheduler.server.ts | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index f87c2a143b..7b8aa5cc69 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -3,22 +3,24 @@ import { nanoid } from "nanoid"; export type DynamicFlushSchedulerConfig = { batchSize: number; flushInterval: number; + maxConcurrency?: number; callback: (flushId: string, batch: T[]) => Promise; }; export class DynamicFlushScheduler { - private batchQueue: T[][]; // Adjust the type according to your data structure + // private batchQueue: T[][]; // Adjust the type according to your data structure private currentBatch: T[]; // Adjust the type according to your data structure private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; + private readonly MAX_CONCURRENCY: number; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; constructor(config: DynamicFlushSchedulerConfig) { - this.batchQueue = []; this.currentBatch = []; this.BATCH_SIZE = config.batchSize; this.FLUSH_INTERVAL = config.flushInterval; + this.MAX_CONCURRENCY = config.maxConcurrency || 1; this.callback = config.callback; this.flushTimer = null; this.startFlushTimer(); @@ -28,8 +30,6 @@ export class DynamicFlushScheduler { this.currentBatch.push(...items); if (this.currentBatch.length >= this.BATCH_SIZE) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; this.flushNextBatch(); this.resetFlushTimer(); } @@ -48,21 +48,19 @@ export class DynamicFlushScheduler { private checkAndFlush(): void { if (this.currentBatch.length > 0) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; + this.flushNextBatch(); } - this.flushNextBatch(); } private async flushNextBatch(): Promise { - if (this.batchQueue.length === 0) return; + if (this.currentBatch.length === 0) return; + + const batch = this.currentBatch.splice(0, this.BATCH_SIZE); + console.log("flushNextBatch", { batch }); - const batchToFlush = this.batchQueue.shift(); try { - await this.callback(nanoid(), batchToFlush!); - if (this.batchQueue.length > 0) { - this.flushNextBatch(); - } + await this.callback(nanoid(), batch!); + this.flushNextBatch(); } catch (error) { console.error("Error inserting batch:", error); } From 04c495047bac6825b2063fe76bcba81395c04e92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 12:04:41 +0000 Subject: [PATCH 02/13] add tests to DynamicFlushScheduler --- .../webapp/test/dynamicFlushScheduler.test.ts | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 apps/webapp/test/dynamicFlushScheduler.test.ts diff --git a/apps/webapp/test/dynamicFlushScheduler.test.ts b/apps/webapp/test/dynamicFlushScheduler.test.ts new file mode 100644 index 0000000000..fcf4535cd5 --- /dev/null +++ b/apps/webapp/test/dynamicFlushScheduler.test.ts @@ -0,0 +1,67 @@ +import { describe, it, expect } from "vitest"; +import { DynamicFlushScheduler } from "../app/v3/dynamicFlushScheduler.server"; + +describe("DynamicFlushScheduler", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + it("doesn't call callback when there are no items", () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 3, + flushInterval: 5000, + callback, + }); + dynamicFlushScheduler.addToBatch([]); + + expect(callback).toBeCalledTimes(0); + }); + + it("calls callback once with batchSize items", () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 3, + flushInterval: 5000, + callback, + }); + const items = [1, 2, 3]; + dynamicFlushScheduler.addToBatch(items); + + expect(callback).toBeCalledTimes(1); + expect(callback).toBeCalledWith(expect.any(String), [1, 2, 3]); + }); + + it("calls callback when flush interval is reached", async () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 100, + flushInterval: 3000, + callback, + }); + const items = [1, 2, 3, 4, 5]; + dynamicFlushScheduler.addToBatch(items); + + await vi.advanceTimersByTimeAsync(3000); + + expect(callback).toBeCalledTimes(1); + expect(callback).toBeCalledWith(expect.any(String), [1, 2, 3, 4, 5]); + }); + + it("calls callback multiple times with the correct batch size", async () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 3, + flushInterval: 3000, + callback, + }); + const items = [1, 2, 3, 4, 5, 6]; + dynamicFlushScheduler.addToBatch(items); + + await vi.advanceTimersByTimeAsync(100); // we need to wait for the async callback to complete); + + expect(callback).toHaveBeenCalledTimes(2); + expect(callback).toHaveBeenNthCalledWith(1, expect.any(String), [1, 2, 3]); + expect(callback).toHaveBeenNthCalledWith(2, expect.any(String), [4, 5, 6]); + }); +}); From ec96dd7c90c546aba8d8df89211a3bed54b7986a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 12:04:59 +0000 Subject: [PATCH 03/13] install p-limit --- apps/webapp/package.json | 9 +++++---- pnpm-lock.yaml | 3 +++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 25cd5f3844..6f485ec7c7 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -45,18 +45,19 @@ "@codemirror/view": "^6.5.0", "@conform-to/react": "^0.6.1", "@conform-to/zod": "^0.6.1", - "@depot/sdk-node": "^1.0.0", "@depot/cli": "0.0.1-cli.2.80.0", + "@depot/sdk-node": "^1.0.0", "@electric-sql/react": "^0.3.5", "@headlessui/react": "^1.7.8", "@heroicons/react": "^2.0.12", - "@internal/run-engine": "workspace:*", - "@internal/zod-worker": "workspace:*", "@internal/redis": "workspace:*", "@internal/redis-worker": "workspace:*", + "@internal/run-engine": "workspace:*", + "@internal/zod-worker": "workspace:*", "@internationalized/date": "^3.5.1", "@lezer/highlight": "^1.1.6", "@opentelemetry/api": "1.9.0", + "@opentelemetry/api-logs": "0.52.1", "@opentelemetry/core": "1.25.1", "@opentelemetry/exporter-logs-otlp-http": "0.52.1", "@opentelemetry/exporter-trace-otlp-http": "0.52.1", @@ -69,7 +70,6 @@ "@opentelemetry/sdk-trace-base": "1.25.1", "@opentelemetry/sdk-trace-node": "1.25.1", "@opentelemetry/semantic-conventions": "1.25.1", - "@opentelemetry/api-logs": "0.52.1", "@popperjs/core": "^2.11.8", "@prisma/instrumentation": "^5.11.0", "@radix-ui/react-alert-dialog": "^1.0.4", @@ -145,6 +145,7 @@ "non.geist": "^1.0.2", "ohash": "^1.1.3", "openai": "^4.33.1", + "p-limit": "^6.2.0", "parse-duration": "^1.1.0", "posthog-js": "^1.93.3", "posthog-node": "^3.1.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 52ab355a58..e001bf94ab 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -547,6 +547,9 @@ importers: openai: specifier: ^4.33.1 version: 4.33.1 + p-limit: + specifier: ^6.2.0 + version: 6.2.0 parse-duration: specifier: ^1.1.0 version: 1.1.0 From df7a484c896e1b3a826a743f16e840dc0177a00e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 13:56:43 +0000 Subject: [PATCH 04/13] take concurrency limit into consideration --- .../app/v3/dynamicFlushScheduler.server.ts | 29 +++++++++++++------ .../webapp/test/dynamicFlushScheduler.test.ts | 13 +++++---- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 7b8aa5cc69..df4a917232 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,4 +1,5 @@ import { nanoid } from "nanoid"; +import pLimit from "p-limit"; export type DynamicFlushSchedulerConfig = { batchSize: number; @@ -13,6 +14,7 @@ export class DynamicFlushScheduler { private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; private readonly MAX_CONCURRENCY: number; + private readonly concurrencyLimiter: ReturnType; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; @@ -21,16 +23,17 @@ export class DynamicFlushScheduler { this.BATCH_SIZE = config.batchSize; this.FLUSH_INTERVAL = config.flushInterval; this.MAX_CONCURRENCY = config.maxConcurrency || 1; + this.concurrencyLimiter = pLimit(this.MAX_CONCURRENCY); this.callback = config.callback; this.flushTimer = null; this.startFlushTimer(); } - addToBatch(items: T[]): void { + async addToBatch(items: T[]): Promise { this.currentBatch.push(...items); if (this.currentBatch.length >= this.BATCH_SIZE) { - this.flushNextBatch(); + await this.flushNextBatch(); this.resetFlushTimer(); } } @@ -55,14 +58,22 @@ export class DynamicFlushScheduler { private async flushNextBatch(): Promise { if (this.currentBatch.length === 0) return; - const batch = this.currentBatch.splice(0, this.BATCH_SIZE); - console.log("flushNextBatch", { batch }); + const batches: T[][] = []; - try { - await this.callback(nanoid(), batch!); - this.flushNextBatch(); - } catch (error) { - console.error("Error inserting batch:", error); + while (this.currentBatch.length > 0) { + batches.push(this.currentBatch.splice(0, this.BATCH_SIZE)); } + + const promises = batches.map(async (batch) => + this.concurrencyLimiter(async () => { + try { + await this.callback(nanoid(), batch!); + } catch (error) { + console.error("Error inserting batch:", error); + } + }) + ); + + await Promise.all(promises); } } diff --git a/apps/webapp/test/dynamicFlushScheduler.test.ts b/apps/webapp/test/dynamicFlushScheduler.test.ts index fcf4535cd5..58ee44e5b0 100644 --- a/apps/webapp/test/dynamicFlushScheduler.test.ts +++ b/apps/webapp/test/dynamicFlushScheduler.test.ts @@ -4,6 +4,8 @@ import { DynamicFlushScheduler } from "../app/v3/dynamicFlushScheduler.server"; describe("DynamicFlushScheduler", () => { beforeEach(() => { vi.useFakeTimers(); + vi.clearAllMocks(); + vi.resetAllMocks(); }); it("doesn't call callback when there are no items", () => { @@ -18,7 +20,7 @@ describe("DynamicFlushScheduler", () => { expect(callback).toBeCalledTimes(0); }); - it("calls callback once with batchSize items", () => { + it("calls callback once with batchSize items", async () => { const callback = vi.fn(); const dynamicFlushScheduler = new DynamicFlushScheduler({ batchSize: 3, @@ -26,7 +28,7 @@ describe("DynamicFlushScheduler", () => { callback, }); const items = [1, 2, 3]; - dynamicFlushScheduler.addToBatch(items); + await dynamicFlushScheduler.addToBatch(items); expect(callback).toBeCalledTimes(1); expect(callback).toBeCalledWith(expect.any(String), [1, 2, 3]); @@ -52,13 +54,14 @@ describe("DynamicFlushScheduler", () => { const callback = vi.fn(); const dynamicFlushScheduler = new DynamicFlushScheduler({ batchSize: 3, - flushInterval: 3000, + flushInterval: 10000, callback, }); const items = [1, 2, 3, 4, 5, 6]; - dynamicFlushScheduler.addToBatch(items); + await dynamicFlushScheduler.addToBatch(items); - await vi.advanceTimersByTimeAsync(100); // we need to wait for the async callback to complete); + // comment out if `addToBatch` can't be marked as async + // await vi.advanceTimersToNextTimerAsync(); expect(callback).toHaveBeenCalledTimes(2); expect(callback).toHaveBeenNthCalledWith(1, expect.any(String), [1, 2, 3]); From 05c0803b2807b5f9565352b244ace72903d1162e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 14:23:06 +0000 Subject: [PATCH 05/13] make sure batches are flushed on SIGTERM --- .../app/v3/dynamicFlushScheduler.server.ts | 35 +++++++++++++++---- .../webapp/test/dynamicFlushScheduler.test.ts | 23 ++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index df4a917232..9bb8f0b5de 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,5 +1,6 @@ import { nanoid } from "nanoid"; import pLimit from "p-limit"; +import { logger } from "~/services/logger.server"; export type DynamicFlushSchedulerConfig = { batchSize: number; @@ -17,6 +18,7 @@ export class DynamicFlushScheduler { private readonly concurrencyLimiter: ReturnType; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; + private isShuttingDown = false; constructor(config: DynamicFlushSchedulerConfig) { this.currentBatch = []; @@ -27,6 +29,7 @@ export class DynamicFlushScheduler { this.callback = config.callback; this.flushTimer = null; this.startFlushTimer(); + this.setupShutdownHandlers(); } async addToBatch(items: T[]): Promise { @@ -42,16 +45,31 @@ export class DynamicFlushScheduler { this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL); } - private resetFlushTimer(): void { + private setupShutdownHandlers() { + process.on("SIGTERM", this.shutdown.bind(this)); + } + + private async shutdown(): Promise { + if (this.isShuttingDown) return; + this.isShuttingDown = true; + await this.checkAndFlush(); + this.clearTimer(); + } + + private clearTimer(): void { if (this.flushTimer) { clearInterval(this.flushTimer); } + } + + private resetFlushTimer(): void { + this.clearTimer(); this.startFlushTimer(); } - private checkAndFlush(): void { + private async checkAndFlush(): Promise { if (this.currentBatch.length > 0) { - this.flushNextBatch(); + await this.flushNextBatch(); } } @@ -59,17 +77,20 @@ export class DynamicFlushScheduler { if (this.currentBatch.length === 0) return; const batches: T[][] = []; - while (this.currentBatch.length > 0) { batches.push(this.currentBatch.splice(0, this.BATCH_SIZE)); } - const promises = batches.map(async (batch) => + const promises = batches.map((batch) => this.concurrencyLimiter(async () => { + const batchId = nanoid(); try { - await this.callback(nanoid(), batch!); + await this.callback(batchId, batch!); } catch (error) { - console.error("Error inserting batch:", error); + logger.error("Error inserting batch:", { + batchId, + error, + }); } }) ); diff --git a/apps/webapp/test/dynamicFlushScheduler.test.ts b/apps/webapp/test/dynamicFlushScheduler.test.ts index 58ee44e5b0..ada4eacdec 100644 --- a/apps/webapp/test/dynamicFlushScheduler.test.ts +++ b/apps/webapp/test/dynamicFlushScheduler.test.ts @@ -67,4 +67,27 @@ describe("DynamicFlushScheduler", () => { expect(callback).toHaveBeenNthCalledWith(1, expect.any(String), [1, 2, 3]); expect(callback).toHaveBeenNthCalledWith(2, expect.any(String), [4, 5, 6]); }); + + it("handles SIGTERM signal correctly", async () => { + const callback = vi.fn(); + + const processOnMock = vi.fn(); + process.on = processOnMock; + + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 10, + flushInterval: 5000, + callback, + }); + + const items = [1, 2, 3, 4, 5, 6]; + await dynamicFlushScheduler.addToBatch(items); + + const sigtermHandler = processOnMock.mock.calls.find((call) => call[0] === "SIGTERM")[1]; + + await sigtermHandler(); + + expect(callback).toHaveBeenCalled(); + expect(callback).toHaveBeenCalledWith(expect.any(String), items); + }); }); From ba7dca6f3681e2c77e6998022193062604a41c9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 15:53:13 +0000 Subject: [PATCH 06/13] report metrics to the /metrics endpoint --- .../app/v3/dynamicFlushScheduler.server.ts | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 9bb8f0b5de..417477e43a 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,5 +1,7 @@ import { nanoid } from "nanoid"; import pLimit from "p-limit"; +import { Gauge } from "prom-client"; +import { metricsRegister } from "~/metrics.server"; import { logger } from "~/services/logger.server"; export type DynamicFlushSchedulerConfig = { @@ -10,7 +12,6 @@ export type DynamicFlushSchedulerConfig = { }; export class DynamicFlushScheduler { - // private batchQueue: T[][]; // Adjust the type according to your data structure private currentBatch: T[]; // Adjust the type according to your data structure private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; @@ -30,10 +31,24 @@ export class DynamicFlushScheduler { this.flushTimer = null; this.startFlushTimer(); this.setupShutdownHandlers(); + + const scheduler = this; + new Gauge({ + name: "dynamic_flush_scheduler_batch_size", + help: "Number of items in the current dynamic flush scheduler batch", + collect() { + this.set(scheduler.currentBatch.length); + }, + registers: [metricsRegister], + }); } async addToBatch(items: T[]): Promise { this.currentBatch.push(...items); + logger.debug("Adding items to batch", { + batchSize: this.BATCH_SIZE, + newSize: this.currentBatch.length, + }); if (this.currentBatch.length >= this.BATCH_SIZE) { await this.flushNextBatch(); @@ -52,8 +67,12 @@ export class DynamicFlushScheduler { private async shutdown(): Promise { if (this.isShuttingDown) return; this.isShuttingDown = true; + logger.log("Shutting down dynamic flush scheduler..."); + await this.checkAndFlush(); this.clearTimer(); + + logger.log("All items have been flushed."); } private clearTimer(): void { From a9299743d9ea00b5f736ede10ede826237feee04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 16:10:28 +0000 Subject: [PATCH 07/13] comment out metric sending --- .../app/v3/dynamicFlushScheduler.server.ts | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 417477e43a..f4f11b1805 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -32,15 +32,17 @@ export class DynamicFlushScheduler { this.startFlushTimer(); this.setupShutdownHandlers(); - const scheduler = this; - new Gauge({ - name: "dynamic_flush_scheduler_batch_size", - help: "Number of items in the current dynamic flush scheduler batch", - collect() { - this.set(scheduler.currentBatch.length); - }, - registers: [metricsRegister], - }); + // if (process.env.NODE_ENV !== "test") { + // const scheduler = this; + // new Gauge({ + // name: "dynamic_flush_scheduler_batch_size", + // help: "Number of items in the current dynamic flush scheduler batch", + // collect() { + // this.set(scheduler.currentBatch.length); + // }, + // registers: [metricsRegister], + // }); + // } } async addToBatch(items: T[]): Promise { From 09145b375ddd9795a26991890b4b1157939479ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 16:10:36 +0000 Subject: [PATCH 08/13] enable SIGINT --- apps/webapp/app/v3/dynamicFlushScheduler.server.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index f4f11b1805..ce6086bd27 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -64,6 +64,7 @@ export class DynamicFlushScheduler { private setupShutdownHandlers() { process.on("SIGTERM", this.shutdown.bind(this)); + process.on("SIGINT", this.shutdown.bind(this)); } private async shutdown(): Promise { From 6944cda99b5aea5c04a22bf63c825f049afcaec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 16:40:59 +0000 Subject: [PATCH 09/13] report failedBatchesCount --- .../app/v3/dynamicFlushScheduler.server.ts | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index ce6086bd27..2cbeb38c38 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -19,7 +19,8 @@ export class DynamicFlushScheduler { private readonly concurrencyLimiter: ReturnType; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; - private isShuttingDown = false; + private isShuttingDown; + private failedBatchCount; constructor(config: DynamicFlushSchedulerConfig) { this.currentBatch = []; @@ -27,22 +28,33 @@ export class DynamicFlushScheduler { this.FLUSH_INTERVAL = config.flushInterval; this.MAX_CONCURRENCY = config.maxConcurrency || 1; this.concurrencyLimiter = pLimit(this.MAX_CONCURRENCY); - this.callback = config.callback; this.flushTimer = null; + this.callback = config.callback; + this.isShuttingDown = false; + this.failedBatchCount = 0; this.startFlushTimer(); this.setupShutdownHandlers(); - // if (process.env.NODE_ENV !== "test") { - // const scheduler = this; - // new Gauge({ - // name: "dynamic_flush_scheduler_batch_size", - // help: "Number of items in the current dynamic flush scheduler batch", - // collect() { - // this.set(scheduler.currentBatch.length); - // }, - // registers: [metricsRegister], - // }); - // } + if (process.env.NODE_ENV !== "test") { + const scheduler = this; + new Gauge({ + name: "dynamic_flush_scheduler_batch_size", + help: "Number of items in the current dynamic flush scheduler batch", + collect() { + this.set(scheduler.currentBatch.length); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "dynamic_flush_scheduler_failed_batches", + help: "Number of failed batches", + collect() { + this.set(scheduler.failedBatchCount); + }, + registers: [metricsRegister], + }); + } } async addToBatch(items: T[]): Promise { @@ -113,10 +125,14 @@ export class DynamicFlushScheduler { batchId, error, }); + throw error; } }) ); - await Promise.all(promises); + const results = await Promise.allSettled(promises); + + const failedBatches = results.filter((result) => result.status === "rejected").length; + this.failedBatchCount += failedBatches; } } From 4bb1e1da9dd1670392b90bbe54feeae16e0a66fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 16:49:24 +0000 Subject: [PATCH 10/13] add logging --- .../app/v3/dynamicFlushScheduler.server.ts | 50 +++++++++++++++++-- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 2cbeb38c38..c620c0b2ac 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -32,6 +32,13 @@ export class DynamicFlushScheduler { this.callback = config.callback; this.isShuttingDown = false; this.failedBatchCount = 0; + + logger.info("Initializing DynamicFlushScheduler", { + batchSize: this.BATCH_SIZE, + flushInterval: this.FLUSH_INTERVAL, + maxConcurrency: this.MAX_CONCURRENCY, + }); + this.startFlushTimer(); this.setupShutdownHandlers(); @@ -60,11 +67,15 @@ export class DynamicFlushScheduler { async addToBatch(items: T[]): Promise { this.currentBatch.push(...items); logger.debug("Adding items to batch", { - batchSize: this.BATCH_SIZE, - newSize: this.currentBatch.length, + currentBatchSize: this.currentBatch.length, + itemsAdded: items.length, }); if (this.currentBatch.length >= this.BATCH_SIZE) { + logger.debug("Batch size threshold reached, initiating flush", { + batchSize: this.BATCH_SIZE, + currentSize: this.currentBatch.length, + }); await this.flushNextBatch(); this.resetFlushTimer(); } @@ -72,37 +83,48 @@ export class DynamicFlushScheduler { private startFlushTimer(): void { this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL); + logger.debug("Started flush timer", { interval: this.FLUSH_INTERVAL }); } private setupShutdownHandlers() { process.on("SIGTERM", this.shutdown.bind(this)); process.on("SIGINT", this.shutdown.bind(this)); + logger.debug("Shutdown handlers configured"); } private async shutdown(): Promise { if (this.isShuttingDown) return; this.isShuttingDown = true; - logger.log("Shutting down dynamic flush scheduler..."); + logger.info("Initiating shutdown of dynamic flush scheduler", { + remainingItems: this.currentBatch.length, + }); await this.checkAndFlush(); this.clearTimer(); - logger.log("All items have been flushed."); + logger.info("Dynamic flush scheduler shutdown complete", { + totalFailedBatches: this.failedBatchCount, + }); } private clearTimer(): void { if (this.flushTimer) { clearInterval(this.flushTimer); + logger.debug("Flush timer cleared"); } } private resetFlushTimer(): void { this.clearTimer(); this.startFlushTimer(); + logger.debug("Flush timer reset"); } private async checkAndFlush(): Promise { if (this.currentBatch.length > 0) { + logger.debug("Periodic flush check triggered", { + currentBatchSize: this.currentBatch.length, + }); await this.flushNextBatch(); } } @@ -115,15 +137,26 @@ export class DynamicFlushScheduler { batches.push(this.currentBatch.splice(0, this.BATCH_SIZE)); } + logger.info("Starting batch flush", { + numberOfBatches: batches.length, + totalItems: batches.reduce((sum, batch) => sum + batch.length, 0), + }); + const promises = batches.map((batch) => this.concurrencyLimiter(async () => { const batchId = nanoid(); try { + logger.debug("Processing batch", { + batchId, + batchSize: batch.length, + }); await this.callback(batchId, batch!); } catch (error) { - logger.error("Error inserting batch:", { + logger.error("Error processing batch", { batchId, error, + batchSize: batch.length, + errorMessage: error instanceof Error ? error.message : "Unknown error", }); throw error; } @@ -134,5 +167,12 @@ export class DynamicFlushScheduler { const failedBatches = results.filter((result) => result.status === "rejected").length; this.failedBatchCount += failedBatches; + + logger.info("Batch flush complete", { + totalBatches: batches.length, + successfulBatches: batches.length - failedBatches, + failedBatches, + totalFailedBatches: this.failedBatchCount, + }); } } From 3b012df197eb70b70b1a3f09f066b2248da52e39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 16:50:56 +0000 Subject: [PATCH 11/13] report failedBatchesCount --- apps/webapp/app/v3/dynamicFlushScheduler.server.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index c620c0b2ac..e22d21eceb 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -146,10 +146,6 @@ export class DynamicFlushScheduler { this.concurrencyLimiter(async () => { const batchId = nanoid(); try { - logger.debug("Processing batch", { - batchId, - batchSize: batch.length, - }); await this.callback(batchId, batch!); } catch (error) { logger.error("Error processing batch", { From 6ce9b836aa865a16d16f5823fce9c41a14c018ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 17:48:46 +0000 Subject: [PATCH 12/13] add some todo comments to the scheduler --- apps/webapp/app/v3/dynamicFlushScheduler.server.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index e22d21eceb..96760bfa30 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -42,7 +42,7 @@ export class DynamicFlushScheduler { this.startFlushTimer(); this.setupShutdownHandlers(); - if (process.env.NODE_ENV !== "test") { + if (!process.env.VITEST) { const scheduler = this; new Gauge({ name: "dynamic_flush_scheduler_batch_size", @@ -64,7 +64,12 @@ export class DynamicFlushScheduler { } } + /** + * + * If you want to fire and forget, don't await this method. + */ async addToBatch(items: T[]): Promise { + // TODO: consider using concat. spread is not performant this.currentBatch.push(...items); logger.debug("Adding items to batch", { currentBatchSize: this.currentBatch.length, @@ -142,6 +147,7 @@ export class DynamicFlushScheduler { totalItems: batches.reduce((sum, batch) => sum + batch.length, 0), }); + // TODO: report plimit.activeCount and pLimit.pendingCount and pLimit.concurrency to /metrics const promises = batches.map((batch) => this.concurrencyLimiter(async () => { const batchId = nanoid(); From 4153e38143c5e9bd96aaeef78426e3c86353a1c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Vet=C5=91?= Date: Thu, 13 Mar 2025 17:56:04 +0000 Subject: [PATCH 13/13] remove commented out code --- apps/webapp/test/dynamicFlushScheduler.test.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/webapp/test/dynamicFlushScheduler.test.ts b/apps/webapp/test/dynamicFlushScheduler.test.ts index ada4eacdec..d2a4392fdf 100644 --- a/apps/webapp/test/dynamicFlushScheduler.test.ts +++ b/apps/webapp/test/dynamicFlushScheduler.test.ts @@ -60,9 +60,6 @@ describe("DynamicFlushScheduler", () => { const items = [1, 2, 3, 4, 5, 6]; await dynamicFlushScheduler.addToBatch(items); - // comment out if `addToBatch` can't be marked as async - // await vi.advanceTimersToNextTimerAsync(); - expect(callback).toHaveBeenCalledTimes(2); expect(callback).toHaveBeenNthCalledWith(1, expect.any(String), [1, 2, 3]); expect(callback).toHaveBeenNthCalledWith(2, expect.any(String), [4, 5, 6]);