Skip to content

Commit 828fe92

Browse files
PavelPashovDEFLANDRE ALBAN
andcommitted
feat(stream): Add XDELEX command
Co-authored-by: DEFLANDRE ALBAN <[email protected]>
1 parent 95e80af commit 828fe92

File tree

4 files changed

+180
-8
lines changed

4 files changed

+180
-8
lines changed

lib/utils/RedisCommander.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9502,6 +9502,87 @@ interface RedisCommander<Context extends ClientContext = { type: "default" }> {
95029502
...args: [key: RedisKey, ...ids: (string | Buffer | number)[]]
95039503
): Result<number, Context>;
95049504

9505+
/**
9506+
* Deletes one or multiple entries from the stream.
9507+
* - _group_: stream
9508+
* - _complexity_: O(1) for each single item to delete in the stream, regardless of the stream size.
9509+
* - _since_: 8.2.0
9510+
*/
9511+
xdelex(
9512+
...args: [
9513+
key: RedisKey,
9514+
idsToken: "IDS",
9515+
numids: number | string,
9516+
...ids: (string | Buffer | number)[],
9517+
callback: Callback<unknown>
9518+
]
9519+
): Result<unknown, Context>;
9520+
xdelex(
9521+
...args: [
9522+
key: RedisKey,
9523+
idsToken: "IDS",
9524+
numids: number | string,
9525+
...ids: (string | Buffer | number)[]
9526+
]
9527+
): Result<unknown, Context>;
9528+
xdelex(
9529+
...args: [
9530+
key: RedisKey,
9531+
keepref: "KEEPREF",
9532+
idsToken: "IDS",
9533+
numids: number | string,
9534+
...ids: (string | Buffer | number)[],
9535+
callback: Callback<unknown>
9536+
]
9537+
): Result<unknown, Context>;
9538+
xdelex(
9539+
...args: [
9540+
key: RedisKey,
9541+
keepref: "KEEPREF",
9542+
idsToken: "IDS",
9543+
numids: number | string,
9544+
...ids: (string | Buffer | number)[]
9545+
]
9546+
): Result<unknown, Context>;
9547+
xdelex(
9548+
...args: [
9549+
key: RedisKey,
9550+
delref: "DELREF",
9551+
idsToken: "IDS",
9552+
numids: number | string,
9553+
...ids: (string | Buffer | number)[],
9554+
callback: Callback<unknown>
9555+
]
9556+
): Result<unknown, Context>;
9557+
xdelex(
9558+
...args: [
9559+
key: RedisKey,
9560+
delref: "DELREF",
9561+
idsToken: "IDS",
9562+
numids: number | string,
9563+
...ids: (string | Buffer | number)[]
9564+
]
9565+
): Result<unknown, Context>;
9566+
xdelex(
9567+
...args: [
9568+
key: RedisKey,
9569+
acked: "ACKED",
9570+
idsToken: "IDS",
9571+
numids: number | string,
9572+
...ids: (string | Buffer | number)[],
9573+
callback: Callback<unknown>
9574+
]
9575+
): Result<unknown, Context>;
9576+
xdelex(
9577+
...args: [
9578+
key: RedisKey,
9579+
acked: "ACKED",
9580+
idsToken: "IDS",
9581+
numids: number | string,
9582+
...ids: (string | Buffer | number)[]
9583+
]
9584+
): Result<unknown, Context>;
9585+
95059586
/**
95069587
* Create a consumer group.
95079588
* - _group_: stream

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"url": "https://opencollective.com/ioredis"
4444
},
4545
"dependencies": {
46-
"@ioredis/commands": "^1.3.1",
46+
"@ioredis/commands": "1.4.0",
4747
"cluster-key-slot": "^1.1.0",
4848
"debug": "^4.3.4",
4949
"denque": "^2.1.0",

test/functional/commands/xdelex.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import Redis from "../../../lib/Redis";
2+
import { expect } from "chai";
3+
4+
const STREAM_DELETION_REPLY_CODES = {
5+
NOT_FOUND: -1,
6+
DELETED: 1,
7+
DANGLING_REFS: 2,
8+
} as const;
9+
10+
// TODO unskip once we have a mechanism to run only on specific versions
11+
// TODO as these tests can only work against 8.2 or higher
12+
describe.skip("xdelex", () => {
13+
let redis: Redis;
14+
15+
beforeEach(() => {
16+
redis = new Redis();
17+
});
18+
19+
afterEach(() => {
20+
redis.disconnect();
21+
});
22+
23+
it("should handle non-existing key - without policy", async () => {
24+
const reply = await redis.xdelex("stream-key", "IDS", 1, "0-0");
25+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
26+
});
27+
28+
it("should handle existing key - without policy", async () => {
29+
const streamKey = "stream-key";
30+
const messageId = await redis.xadd(streamKey, "*", "field", "value");
31+
32+
const reply = await redis.xdelex(streamKey, "IDS", 1, messageId as string);
33+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.DELETED]);
34+
});
35+
36+
it("should handle existing key - with DELREF policy", async () => {
37+
const streamKey = "stream-key";
38+
const messageId = await redis.xadd(streamKey, "*", "field", "value");
39+
40+
const reply = await redis.xdelex(
41+
streamKey,
42+
"DELREF",
43+
"IDS",
44+
1,
45+
messageId as string
46+
);
47+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.DELETED]);
48+
});
49+
50+
it("should handle ACKED policy - with consumer group", async () => {
51+
const streamKey = "stream-key";
52+
53+
// Add a message to the stream
54+
const messageId = await redis.xadd(streamKey, "*", "field", "value");
55+
56+
// Create consumer group
57+
await redis.xgroup("CREATE", streamKey, "testgroup", "0");
58+
59+
const reply = await redis.xdelex(
60+
streamKey,
61+
"ACKED",
62+
"IDS",
63+
1,
64+
messageId as string
65+
);
66+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
67+
});
68+
69+
it("should handle multiple keys", async () => {
70+
const streamKey = "stream-key";
71+
const [messageId, messageId2] = await Promise.all([
72+
redis.xadd(streamKey, "*", "field", "value1"),
73+
redis.xadd(streamKey, "*", "field", "value2"),
74+
]);
75+
76+
const reply = await redis.xdelex(
77+
streamKey,
78+
"DELREF",
79+
"IDS",
80+
3,
81+
messageId as string,
82+
messageId2 as string,
83+
"0-0"
84+
);
85+
expect(reply).to.deep.equal([
86+
STREAM_DELETION_REPLY_CODES.DELETED,
87+
STREAM_DELETION_REPLY_CODES.DELETED,
88+
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
89+
]);
90+
});
91+
});

0 commit comments

Comments
 (0)