Skip to content

Commit c811296

Browse files
PavelPashovDEFLANDRE ALBAN
andcommitted
feat(stream): Add XDELEX command
Co-authored-by: DEFLANDRE ALBAN <[email protected]>
1 parent 5befe70 commit c811296

File tree

4 files changed

+180
-8
lines changed

4 files changed

+180
-8
lines changed

lib/utils/RedisCommander.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9485,6 +9485,87 @@ interface RedisCommander<Context extends ClientContext = { type: "default" }> {
94859485
...args: [key: RedisKey, ...ids: (string | Buffer | number)[]]
94869486
): Result<number, Context>;
94879487

9488+
/**
9489+
* Deletes one or multiple entries from the stream.
9490+
* - _group_: stream
9491+
* - _complexity_: O(1) for each single item to delete in the stream, regardless of the stream size.
9492+
* - _since_: 8.2.0
9493+
*/
9494+
xdelex(
9495+
...args: [
9496+
key: RedisKey,
9497+
idsToken: "IDS",
9498+
numids: number | string,
9499+
...ids: (string | Buffer | number)[],
9500+
callback: Callback<unknown>
9501+
]
9502+
): Result<unknown, Context>;
9503+
xdelex(
9504+
...args: [
9505+
key: RedisKey,
9506+
idsToken: "IDS",
9507+
numids: number | string,
9508+
...ids: (string | Buffer | number)[]
9509+
]
9510+
): Result<unknown, Context>;
9511+
xdelex(
9512+
...args: [
9513+
key: RedisKey,
9514+
keepref: "KEEPREF",
9515+
idsToken: "IDS",
9516+
numids: number | string,
9517+
...ids: (string | Buffer | number)[],
9518+
callback: Callback<unknown>
9519+
]
9520+
): Result<unknown, Context>;
9521+
xdelex(
9522+
...args: [
9523+
key: RedisKey,
9524+
keepref: "KEEPREF",
9525+
idsToken: "IDS",
9526+
numids: number | string,
9527+
...ids: (string | Buffer | number)[]
9528+
]
9529+
): Result<unknown, Context>;
9530+
xdelex(
9531+
...args: [
9532+
key: RedisKey,
9533+
delref: "DELREF",
9534+
idsToken: "IDS",
9535+
numids: number | string,
9536+
...ids: (string | Buffer | number)[],
9537+
callback: Callback<unknown>
9538+
]
9539+
): Result<unknown, Context>;
9540+
xdelex(
9541+
...args: [
9542+
key: RedisKey,
9543+
delref: "DELREF",
9544+
idsToken: "IDS",
9545+
numids: number | string,
9546+
...ids: (string | Buffer | number)[]
9547+
]
9548+
): Result<unknown, Context>;
9549+
xdelex(
9550+
...args: [
9551+
key: RedisKey,
9552+
acked: "ACKED",
9553+
idsToken: "IDS",
9554+
numids: number | string,
9555+
...ids: (string | Buffer | number)[],
9556+
callback: Callback<unknown>
9557+
]
9558+
): Result<unknown, Context>;
9559+
xdelex(
9560+
...args: [
9561+
key: RedisKey,
9562+
acked: "ACKED",
9563+
idsToken: "IDS",
9564+
numids: number | string,
9565+
...ids: (string | Buffer | number)[]
9566+
]
9567+
): Result<unknown, Context>;
9568+
94889569
/**
94899570
* Create a consumer group.
94909571
* - _group_: stream

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"url": "https://opencollective.com/ioredis"
4242
},
4343
"dependencies": {
44-
"@ioredis/commands": "^1.3.0",
44+
"@ioredis/commands": "1.4.0",
4545
"cluster-key-slot": "^1.1.0",
4646
"debug": "^4.3.4",
4747
"denque": "^2.1.0",

test/functional/commands/xdelex.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import Redis from "../../../lib/Redis";
2+
import { expect } from "chai";
3+
4+
const STREAM_DELETION_REPLY_CODES = {
5+
NOT_FOUND: -1,
6+
DELETED: 1,
7+
DANGLING_REFS: 2,
8+
} as const;
9+
10+
// TODO unskip once we have a mechanism to run only on specific versions
11+
// TODO as these tests can only work against 8.2 or higher
12+
describe.skip("xdelex", () => {
13+
let redis: Redis;
14+
15+
beforeEach(() => {
16+
redis = new Redis();
17+
});
18+
19+
afterEach(() => {
20+
redis.disconnect();
21+
});
22+
23+
it("should handle non-existing key - without policy", async () => {
24+
const reply = await redis.xdelex("stream-key", "IDS", 1, "0-0");
25+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
26+
});
27+
28+
it("should handle existing key - without policy", async () => {
29+
const streamKey = "stream-key";
30+
const messageId = await redis.xadd(streamKey, "*", "field", "value");
31+
32+
const reply = await redis.xdelex(streamKey, "IDS", 1, messageId as string);
33+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.DELETED]);
34+
});
35+
36+
it("should handle existing key - with DELREF policy", async () => {
37+
const streamKey = "stream-key";
38+
const messageId = await redis.xadd(streamKey, "*", "field", "value");
39+
40+
const reply = await redis.xdelex(
41+
streamKey,
42+
"DELREF",
43+
"IDS",
44+
1,
45+
messageId as string
46+
);
47+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.DELETED]);
48+
});
49+
50+
it("should handle ACKED policy - with consumer group", async () => {
51+
const streamKey = "stream-key";
52+
53+
// Add a message to the stream
54+
const messageId = await redis.xadd(streamKey, "*", "field", "value");
55+
56+
// Create consumer group
57+
await redis.xgroup("CREATE", streamKey, "testgroup", "0");
58+
59+
const reply = await redis.xdelex(
60+
streamKey,
61+
"ACKED",
62+
"IDS",
63+
1,
64+
messageId as string
65+
);
66+
expect(reply).to.deep.equal([STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
67+
});
68+
69+
it("should handle multiple keys", async () => {
70+
const streamKey = "stream-key";
71+
const [messageId, messageId2] = await Promise.all([
72+
redis.xadd(streamKey, "*", "field", "value1"),
73+
redis.xadd(streamKey, "*", "field", "value2"),
74+
]);
75+
76+
const reply = await redis.xdelex(
77+
streamKey,
78+
"DELREF",
79+
"IDS",
80+
3,
81+
messageId as string,
82+
messageId2 as string,
83+
"0-0"
84+
);
85+
expect(reply).to.deep.equal([
86+
STREAM_DELETION_REPLY_CODES.DELETED,
87+
STREAM_DELETION_REPLY_CODES.DELETED,
88+
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
89+
]);
90+
});
91+
});

0 commit comments

Comments
 (0)