Skip to content

Commit 9846a4b

Browse files
Introduce SUBSCRIBE and PSUBSCRIBE Command (#1360)
* introduce SUBSCRIBE command * refactor event handler * finalize interface * update tests * cleanup * replace EventEmitter class for runtime compat * add: streamOptions in commandOptions * introduce PSUBSCRIBE command * introduce docs link refs * improve type coverage
1 parent a08f1ca commit 9846a4b

8 files changed

+795
-4
lines changed

pkg/commands/command.ts

+44
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,35 @@ export type CommandOptions<TResult, TData> = {
3131
*/
3232
automaticDeserialization?: boolean;
3333
latencyLogging?: boolean;
34+
/**
35+
* Additional headers to be sent with the request
36+
*/
37+
headers?: Record<string, string>;
38+
39+
/**
40+
* Path to append to the URL
41+
*/
42+
path?: string[];
43+
44+
/**
45+
* Options for streaming requests, mainly used for subscribe, monitor commands
46+
**/
47+
streamOptions?: {
48+
/**
49+
* Callback to be called when a message is received
50+
*/
51+
onMessage?: (data: string) => void;
52+
53+
/**
54+
* Whether the request is streaming
55+
*/
56+
isStreaming?: boolean;
57+
58+
/**
59+
* Signal to abort the request
60+
*/
61+
signal?: AbortSignal;
62+
};
3463
};
3564
/**
3665
* Command offers default (de)serialization and the exec method to all commands.
@@ -42,6 +71,11 @@ export class Command<TResult, TData> {
4271
public readonly command: (string | number | boolean)[];
4372
public readonly serialize: Serialize;
4473
public readonly deserialize: Deserialize<TResult, TData>;
74+
protected readonly headers?: Record<string, string>;
75+
protected readonly path?: string[];
76+
protected readonly onMessage?: (data: string) => void;
77+
protected readonly isStreaming: boolean;
78+
protected readonly signal?: AbortSignal;
4579
/**
4680
* Create a new command instance.
4781
*
@@ -58,6 +92,11 @@ export class Command<TResult, TData> {
5892
: (x) => x as unknown as TData;
5993

6094
this.command = command.map((c) => this.serialize(c));
95+
this.headers = opts?.headers;
96+
this.path = opts?.path;
97+
this.onMessage = opts?.streamOptions?.onMessage;
98+
this.isStreaming = opts?.streamOptions?.isStreaming ?? false;
99+
this.signal = opts?.streamOptions?.signal;
61100

62101
if (opts?.latencyLogging) {
63102
const originalExec = this.exec.bind(this);
@@ -83,7 +122,12 @@ export class Command<TResult, TData> {
83122
public async exec(client: Requester): Promise<TData> {
84123
const { result, error } = await client.request<TResult>({
85124
body: this.command,
125+
path: this.path,
86126
upstashSyncToken: client.upstashSyncToken,
127+
headers: this.headers,
128+
onMessage: this.onMessage,
129+
isStreaming: this.isStreaming,
130+
signal: this.signal,
87131
});
88132

89133
if (error) {

pkg/commands/psubscribe.test.ts

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import { expect, test, describe } from "bun:test";
2+
import { Redis } from "../redis";
3+
import { newHttpClient } from "../test-utils";
4+
5+
interface TestMessage {
6+
user?: string;
7+
msg?: string;
8+
message?: string;
9+
timestamp?: number;
10+
type?: string;
11+
}
12+
13+
describe("Pattern Subscriber", () => {
14+
const client = newHttpClient();
15+
const redis = new Redis(client);
16+
17+
test("receives pattern matched messages", async () => {
18+
const pattern = "user:*";
19+
const receivedMessages: TestMessage[] = [];
20+
21+
const subscriber = redis.psubscribe<TestMessage>([pattern]);
22+
subscriber.on("pmessage", ({ message }) => {
23+
receivedMessages.push(message);
24+
});
25+
26+
await new Promise((resolve) => setTimeout(resolve, 500));
27+
28+
const testMessage: TestMessage = {
29+
user: "testUser",
30+
message: "Hello, World!",
31+
timestamp: Date.now(),
32+
};
33+
34+
await redis.publish("user:123", testMessage);
35+
await redis.publish("user:456", testMessage);
36+
await redis.publish("other:789", testMessage); // Should not receive this
37+
await new Promise((resolve) => setTimeout(resolve, 500));
38+
39+
expect(receivedMessages).toHaveLength(2); // Only messages from user:* channels
40+
expect(receivedMessages[0]).toEqual(testMessage);
41+
expect(receivedMessages[1]).toEqual(testMessage);
42+
43+
await subscriber.unsubscribe();
44+
}, 10_000);
45+
46+
test("handles pattern-specific messages with channel info", async () => {
47+
const pattern = "chat:*:messages";
48+
const messages: { pattern: string; channel: string; message: TestMessage }[] = [];
49+
50+
const subscriber = redis.psubscribe<TestMessage>([pattern]);
51+
subscriber.on("pmessage", (data) => {
52+
messages.push(data);
53+
});
54+
55+
await new Promise((resolve) => setTimeout(resolve, 500));
56+
57+
await redis.publish("chat:room1:messages", { msg: "Hello Room 1" });
58+
await redis.publish("chat:room2:messages", { msg: "Hello Room 2" });
59+
await redis.publish("chat:room1:users", { msg: "User joined" }); // Should not receive this
60+
await new Promise((resolve) => setTimeout(resolve, 500));
61+
62+
expect(messages).toHaveLength(2);
63+
expect(messages[0].pattern).toBe("chat:*:messages");
64+
expect(messages[0].channel).toBe("chat:room1:messages");
65+
expect(messages[0].message).toEqual({ msg: "Hello Room 1" });
66+
expect(messages[1].channel).toBe("chat:room2:messages");
67+
expect(messages[1].message).toEqual({ msg: "Hello Room 2" });
68+
69+
await subscriber.unsubscribe();
70+
}, 10_000);
71+
72+
test("handles multiple patterns", async () => {
73+
const patterns = ["user:*", "chat:*"];
74+
const messages: Record<string, Array<{ channel: string; message: TestMessage }>> = {
75+
"user:*": [],
76+
"chat:*": [],
77+
};
78+
79+
const subscriber = redis.psubscribe<TestMessage>(patterns);
80+
subscriber.on("pmessage", ({ pattern, channel, message }) => {
81+
messages[pattern].push({ channel, message });
82+
});
83+
84+
await new Promise((resolve) => setTimeout(resolve, 500));
85+
86+
await redis.publish("user:123", { type: "user" });
87+
await redis.publish("chat:room1", { type: "chat" });
88+
await redis.publish("other:xyz", { type: "other" }); // Should not receive this
89+
await new Promise((resolve) => setTimeout(resolve, 500));
90+
91+
expect(messages["user:*"]).toHaveLength(1);
92+
expect(messages["chat:*"]).toHaveLength(1);
93+
expect(messages["user:*"][0].channel).toBe("user:123");
94+
expect(messages["chat:*"][0].channel).toBe("chat:room1");
95+
96+
await subscriber.unsubscribe();
97+
}, 10_000);
98+
99+
test("unsubscribe from specific pattern", async () => {
100+
const patterns = ["user:*", "chat:*"];
101+
const messages: Record<string, Array<{ channel: string; message: TestMessage }>> = {
102+
"user:*": [],
103+
"chat:*": [],
104+
};
105+
106+
const subscriber = redis.psubscribe<TestMessage>(patterns);
107+
subscriber.on("pmessage", ({ pattern, channel, message }) => {
108+
messages[pattern].push({ channel, message });
109+
});
110+
111+
await new Promise((resolve) => setTimeout(resolve, 500));
112+
113+
// Initial messages
114+
await redis.publish("user:123", { msg: "user1" });
115+
await redis.publish("chat:room1", { msg: "chat1" });
116+
await new Promise((resolve) => setTimeout(resolve, 500));
117+
118+
expect(messages["user:*"]).toHaveLength(1);
119+
expect(messages["chat:*"]).toHaveLength(1);
120+
121+
// Unsubscribe from user:* pattern
122+
await subscriber.unsubscribe(["user:*"]);
123+
expect(subscriber.getSubscribedChannels()).toEqual(["chat:*"]);
124+
125+
// Clear messages
126+
messages["user:*"] = [];
127+
messages["chat:*"] = [];
128+
129+
// Send more messages
130+
await redis.publish("user:123", { msg: "user2" });
131+
await redis.publish("chat:room1", { msg: "chat2" });
132+
await new Promise((resolve) => setTimeout(resolve, 500));
133+
134+
expect(messages["user:*"]).toHaveLength(0); // Should not receive any more user messages
135+
expect(messages["chat:*"]).toHaveLength(1); // Should still receive chat messages
136+
expect(messages["chat:*"][0].message.msg).toBe("chat2");
137+
138+
await subscriber.unsubscribe();
139+
}, 15_000);
140+
141+
test("pattern and regular subscriptions work together", async () => {
142+
const patternSubscriber = redis.psubscribe<TestMessage>(["user:*"]);
143+
const channelSubscriber = redis.subscribe<TestMessage>(["user:123"]);
144+
145+
const patternMessages: TestMessage[] = [];
146+
const channelMessages: TestMessage[] = [];
147+
148+
patternSubscriber.on("pmessage", ({ message }) => {
149+
patternMessages.push(message);
150+
});
151+
152+
channelSubscriber.on("message", ({ message }) => {
153+
channelMessages.push(message);
154+
});
155+
156+
await new Promise((resolve) => setTimeout(resolve, 500));
157+
158+
const testMessage: TestMessage = { msg: "Hello" };
159+
await redis.publish("user:123", testMessage);
160+
await new Promise((resolve) => setTimeout(resolve, 500));
161+
162+
expect(patternMessages).toHaveLength(1);
163+
expect(channelMessages).toHaveLength(1);
164+
expect(patternMessages[0]).toEqual(testMessage);
165+
expect(channelMessages[0]).toEqual(testMessage);
166+
167+
await Promise.all([patternSubscriber.unsubscribe(), channelSubscriber.unsubscribe()]);
168+
}, 15_000);
169+
});

pkg/commands/psubscribe.ts

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { Command, type CommandOptions } from "./command";
2+
3+
/**
4+
* @see https://redis.io/commands/psubscribe
5+
*/
6+
export class PSubscribeCommand extends Command<number, number> {
7+
constructor(cmd: [...patterns: string[]], opts?: CommandOptions<number, number>) {
8+
const sseHeaders = {
9+
Accept: "text/event-stream",
10+
"Cache-Control": "no-cache",
11+
Connection: "keep-alive",
12+
};
13+
14+
super([], {
15+
...opts,
16+
headers: sseHeaders,
17+
path: ["psubscribe", ...cmd],
18+
streamOptions: {
19+
isStreaming: true,
20+
onMessage: opts?.streamOptions?.onMessage,
21+
signal: opts?.streamOptions?.signal,
22+
},
23+
});
24+
}
25+
}

0 commit comments

Comments
 (0)