Skip to content

Commit 983bb41

Browse files
authored
Redis client factory (error handling and good defaults) (#1761)
* @internal/redis package for creating Redis clients with sensible defaults and error handling * Added @internal/redis to webapp
1 parent 4358e2d commit 983bb41

File tree

18 files changed

+223
-119
lines changed

18 files changed

+223
-119
lines changed

apps/webapp/package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
"@heroicons/react": "^2.0.12",
5353
"@internal/run-engine": "workspace:*",
5454
"@internal/zod-worker": "workspace:*",
55+
"@internal/redis": "workspace:*",
5556
"@internal/redis-worker": "workspace:*",
5657
"@internationalized/date": "^3.5.1",
5758
"@lezer/highlight": "^1.1.6",
@@ -258,4 +259,4 @@
258259
"engines": {
259260
"node": ">=16.0.0"
260261
}
261-
}
262+
}

apps/webapp/tsconfig.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
"@internal/run-engine": ["../../internal-packages/run-engine/src/index"],
3939
"@internal/run-engine/*": ["../../internal-packages/run-engine/src/*"],
4040
"@internal/redis-worker": ["../../internal-packages/redis-worker/src/index"],
41-
"@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"]
41+
"@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"],
42+
"@internal/redis": ["../../internal-packages/redis/src/index"],
43+
"@internal/redis/*": ["../../internal-packages/redis/src/*"]
4244
},
4345
"noEmit": true
4446
}

internal-packages/redis-worker/package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"type": "module",
88
"dependencies": {
99
"@opentelemetry/api": "^1.9.0",
10+
"@internal/redis": "workspace:*",
1011
"@trigger.dev/core": "workspace:*",
1112
"ioredis": "^5.3.2",
1213
"lodash.omit": "^4.5.0",
@@ -23,4 +24,4 @@
2324
"typecheck": "tsc --noEmit",
2425
"test": "vitest --no-file-parallelism"
2526
}
26-
}
27+
}

internal-packages/redis-worker/src/queue.ts

+21-26
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { createRedisClient } from "@internal/redis";
12
import { Logger } from "@trigger.dev/core/logger";
23
import Redis, { type Callback, type RedisOptions, type Result } from "ioredis";
34
import { nanoid } from "nanoid";
@@ -50,35 +51,29 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
5051
logger?: Logger;
5152
}) {
5253
this.name = name;
53-
this.redis = new Redis({
54-
...redisOptions,
55-
keyPrefix: `${redisOptions.keyPrefix ?? ""}{queue:${name}:}`,
56-
retryStrategy(times) {
57-
const delay = Math.min(times * 50, 1000);
58-
return delay;
54+
this.logger = logger ?? new Logger("SimpleQueue", "debug");
55+
56+
this.redis = createRedisClient(
57+
{
58+
...redisOptions,
59+
keyPrefix: `${redisOptions.keyPrefix ?? ""}{queue:${name}:}`,
60+
retryStrategy(times) {
61+
const delay = Math.min(times * 50, 1000);
62+
return delay;
63+
},
64+
maxRetriesPerRequest: 20,
5965
},
60-
maxRetriesPerRequest: 20,
61-
});
66+
{
67+
onError: (error) => {
68+
this.logger.error(`RedisWorker queue redis client error:`, {
69+
error,
70+
keyPrefix: redisOptions.keyPrefix,
71+
});
72+
},
73+
}
74+
);
6275
this.#registerCommands();
6376
this.schema = schema;
64-
65-
this.logger = logger ?? new Logger("SimpleQueue", "debug");
66-
67-
this.redis.on("error", (error) => {
68-
this.logger.error(`Redis Error for queue ${this.name}:`, { queue: this.name, error });
69-
});
70-
71-
this.redis.on("connect", () => {
72-
this.logger.log(`Redis connected for queue ${this.name}`);
73-
});
74-
75-
this.redis.on("reconnecting", () => {
76-
this.logger.warn(`Redis reconnecting for queue ${this.name}`);
77-
});
78-
79-
this.redis.on("close", () => {
80-
this.logger.warn(`Redis connection closed for queue ${this.name}`);
81-
});
8277
}
8378

8479
async enqueue({

internal-packages/redis-worker/src/worker.test.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { expect } from "vitest";
55
import { z } from "zod";
66
import { Worker } from "./worker.js";
77
import Redis from "ioredis";
8+
import { createRedisClient } from "@internal/redis";
89

910
describe("Worker", () => {
1011
redisTest("Process items that don't throw", { timeout: 30_000 }, async ({ redisContainer }) => {
@@ -241,7 +242,7 @@ describe("Worker", () => {
241242
expect(dlqSize).toBe(1);
242243

243244
// Create a Redis client to publish the redrive message
244-
const redisClient = new Redis({
245+
const redisClient = createRedisClient({
245246
host: redisContainer.getHost(),
246247
port: redisContainer.getPort(),
247248
password: redisContainer.getPassword(),

internal-packages/redis-worker/src/worker.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import Redis from "ioredis";
99
import { nanoid } from "nanoid";
1010
import { startSpan } from "./telemetry.js";
1111
import pLimit from "p-limit";
12+
import { createRedisClient } from "@internal/redis";
1213

1314
export type WorkerCatalog = {
1415
[key: string]: {
@@ -108,7 +109,14 @@ class Worker<TCatalog extends WorkerCatalog> {
108109

109110
this.setupShutdownHandlers();
110111

111-
this.subscriber = new Redis(this.options.redisOptions);
112+
this.subscriber = createRedisClient(this.options.redisOptions, {
113+
onError: (error) => {
114+
this.logger.error(`RedisWorker subscriber redis client error:`, {
115+
error,
116+
keyPrefix: this.options.redisOptions.keyPrefix,
117+
});
118+
},
119+
});
112120
this.setupSubscriber();
113121

114122
return this;

internal-packages/redis-worker/tsconfig.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"@internal/testcontainers": ["../../internal-packages/testcontainers/src/index"],
1919
"@internal/testcontainers/*": ["../../internal-packages/testcontainers/src/*"],
2020
"@trigger.dev/core": ["../../packages/core/src/index"],
21-
"@trigger.dev/core/*": ["../../packages/core/src/*"]
21+
"@trigger.dev/core/*": ["../../packages/core/src/*"],
22+
"@internal/redis": ["../../internal-packages/redis/src/index"],
23+
"@internal/redis/*": ["../../internal-packages/redis/src/*"]
2224
}
2325
},
2426
"exclude": ["node_modules"]

internal-packages/redis/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Redis
2+
3+
This is a simple package that is used to return a valid Redis client and provides an error callback. It will log and swallow errors if they're not handled.

internal-packages/redis/package.json

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "@internal/redis",
3+
"private": true,
4+
"version": "0.0.1",
5+
"main": "./src/index.ts",
6+
"types": "./src/index.ts",
7+
"type": "module",
8+
"dependencies": {
9+
"ioredis": "^5.3.2",
10+
"@trigger.dev/core": "workspace:*"
11+
},
12+
"devDependencies": {
13+
"vitest": "^1.4.0"
14+
},
15+
"scripts": {
16+
"typecheck": "tsc --noEmit"
17+
}
18+
}

internal-packages/redis/src/index.ts

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { Redis, RedisOptions } from "ioredis";
2+
import { Logger } from "@trigger.dev/core/logger";
3+
4+
const defaultOptions: Partial<RedisOptions> = {
5+
retryStrategy: (times: number) => {
6+
const delay = Math.min(times * 50, 1000);
7+
return delay;
8+
},
9+
maxRetriesPerRequest: 20,
10+
};
11+
12+
const logger = new Logger("Redis", "debug");
13+
14+
export function createRedisClient(
15+
options: RedisOptions,
16+
handlers?: { onError?: (err: Error) => void }
17+
): Redis {
18+
const client = new Redis({
19+
...defaultOptions,
20+
...options,
21+
});
22+
23+
// Skip error handling setup if running in Vitest
24+
if (process.env.VITEST) {
25+
client.on("error", (error) => {
26+
// swallow errors
27+
});
28+
return client;
29+
}
30+
31+
client.on("error", (error) => {
32+
if (handlers?.onError) {
33+
handlers.onError(error);
34+
} else {
35+
logger.error(`Redis client error:`, { error, keyPrefix: options.keyPrefix });
36+
}
37+
});
38+
39+
return client;
40+
}

internal-packages/redis/tsconfig.json

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"compilerOptions": {
3+
"target": "ES2019",
4+
"lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"],
5+
"module": "CommonJS",
6+
"moduleResolution": "Node",
7+
"moduleDetection": "force",
8+
"verbatimModuleSyntax": false,
9+
"types": ["vitest/globals"],
10+
"esModuleInterop": true,
11+
"forceConsistentCasingInFileNames": true,
12+
"isolatedModules": true,
13+
"preserveWatchOutput": true,
14+
"skipLibCheck": true,
15+
"noEmit": true,
16+
"strict": true,
17+
"paths": {
18+
"@trigger.dev/core": ["../../packages/core/src/index"],
19+
"@trigger.dev/core/*": ["../../packages/core/src/*"]
20+
}
21+
},
22+
"exclude": ["node_modules"]
23+
}

internal-packages/run-engine/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"main": "./src/index.ts",
66
"types": "./src/index.ts",
77
"dependencies": {
8+
"@internal/redis": "workspace:*",
89
"@internal/redis-worker": "workspace:*",
910
"@opentelemetry/api": "^1.9.0",
1011
"@opentelemetry/semantic-conventions": "^1.27.0",

internal-packages/run-engine/src/engine/index.ts

+15-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { createRedisClient } from "@internal/redis";
12
import { Worker } from "@internal/redis-worker";
23
import { Attributes, Span, SpanKind, trace, Tracer } from "@opentelemetry/api";
34
import { assertExhaustive } from "@trigger.dev/core";
@@ -135,10 +136,20 @@ export class RunEngine {
135136

136137
constructor(private readonly options: RunEngineOptions) {
137138
this.prisma = options.prisma;
138-
this.runLockRedis = new Redis({
139-
...options.runLock.redis,
140-
keyPrefix: `${options.runLock.redis.keyPrefix}runlock:`,
141-
});
139+
this.runLockRedis = createRedisClient(
140+
{
141+
...options.runLock.redis,
142+
keyPrefix: `${options.runLock.redis.keyPrefix}runlock:`,
143+
},
144+
{
145+
onError: (error) => {
146+
this.logger.error(`RunLock redis client error:`, {
147+
error,
148+
keyPrefix: options.runLock.redis.keyPrefix,
149+
});
150+
},
151+
}
152+
);
142153
this.runLock = new RunLocker({ redis: this.runLockRedis });
143154

144155
this.runQueue = new RunQueue({

internal-packages/run-engine/src/engine/locking.test.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
import { createRedisClient } from "@internal/redis";
12
import { redisTest } from "@internal/testcontainers";
23
import { expect } from "vitest";
34
import { RunLocker } from "./locking.js";
4-
import Redis from "ioredis";
55

66
describe("RunLocker", () => {
77
redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => {
8-
const redis = new Redis(redisOptions);
8+
const redis = createRedisClient(redisOptions);
99
try {
1010
const runLock = new RunLocker({ redis });
1111

@@ -23,7 +23,7 @@ describe("RunLocker", () => {
2323
});
2424

2525
redisTest("Test double locking works", { timeout: 15_000 }, async ({ redisOptions }) => {
26-
const redis = new Redis(redisOptions);
26+
const redis = createRedisClient(redisOptions);
2727
try {
2828
const runLock = new RunLocker({ redis });
2929

internal-packages/run-engine/src/run-queue/index.test.ts

+6-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { setTimeout } from "node:timers/promises";
77
import { RunQueue } from "./index.js";
88
import { SimpleWeightedChoiceStrategy } from "./simpleWeightedPriorityStrategy.js";
99
import { InputPayload } from "./types.js";
10+
import { createRedisClient } from "@internal/redis";
1011

1112
const testOptions = {
1213
name: "rq",
@@ -468,7 +469,7 @@ describe("RunQueue", () => {
468469
},
469470
});
470471

471-
const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
472+
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });
472473

473474
try {
474475
await queue.enqueueMessage({
@@ -598,7 +599,7 @@ describe("RunQueue", () => {
598599
},
599600
});
600601

601-
const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
602+
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });
602603

603604
try {
604605
await queue.enqueueMessage({
@@ -689,7 +690,7 @@ describe("RunQueue", () => {
689690
},
690691
});
691692

692-
const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
693+
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });
693694

694695
try {
695696
await queue.enqueueMessage({
@@ -803,7 +804,7 @@ describe("RunQueue", () => {
803804
},
804805
});
805806

806-
const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
807+
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });
807808

808809
try {
809810
await queue.enqueueMessage({
@@ -858,7 +859,7 @@ describe("RunQueue", () => {
858859
expect(dlqMembers).toContain(messageProd.runId);
859860

860861
//redrive
861-
const redisClient = new Redis({
862+
const redisClient = createRedisClient({
862863
host: redisContainer.getHost(),
863864
port: redisContainer.getPort(),
864865
password: redisContainer.getPassword(),

internal-packages/run-engine/src/run-queue/index.ts

+17-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
RunQueueKeyProducer,
2222
RunQueuePriorityStrategy,
2323
} from "./types.js";
24+
import { createRedisClient } from "@internal/redis";
2425

2526
const SemanticAttributes = {
2627
QUEUE: "runqueue.queue",
@@ -71,13 +72,27 @@ export class RunQueue {
7172

7273
constructor(private readonly options: RunQueueOptions) {
7374
this.retryOptions = options.retryOptions ?? defaultRetrySettings;
74-
this.redis = new Redis(options.redis);
75+
this.redis = createRedisClient(options.redis, {
76+
onError: (error) => {
77+
this.logger.error(`RunQueue redis client error:`, {
78+
error,
79+
keyPrefix: options.redis.keyPrefix,
80+
});
81+
},
82+
});
7583
this.logger = options.logger;
7684

7785
this.keys = new RunQueueShortKeyProducer("rq:");
7886
this.queuePriorityStrategy = options.queuePriorityStrategy;
7987

80-
this.subscriber = new Redis(options.redis);
88+
this.subscriber = createRedisClient(options.redis, {
89+
onError: (error) => {
90+
this.logger.error(`RunQueue subscriber redis client error:`, {
91+
error,
92+
keyPrefix: options.redis.keyPrefix,
93+
});
94+
},
95+
});
8196
this.#setupSubscriber();
8297

8398
this.#registerCommands();

0 commit comments

Comments
 (0)