From 5aa6a80566b909f84db76a39a35b23dec3bc9395 Mon Sep 17 00:00:00 2001 From: Zach Sherbondy Date: Fri, 13 Sep 2024 10:33:37 -0400 Subject: [PATCH] xadd-maxlen-support --- lib/interfaces.ts | 6 +++++- lib/redis.client.ts | 10 +++++++++- lib/redis.server.ts | 15 ++++++++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/lib/interfaces.ts b/lib/interfaces.ts index 6c7bdc4..2bfa140 100644 --- a/lib/interfaces.ts +++ b/lib/interfaces.ts @@ -18,7 +18,11 @@ interface RedisStreamOptionsXreadGroup { deleteMessagesAfterAck?: boolean; } -export type RedisStreamOptions = RedisStreamOptionsXreadGroup; +interface RedisStreamOptionsXadd { + maxLen?: number; +} + +export type RedisStreamOptions = RedisStreamOptionsXreadGroup & RedisStreamOptionsXadd; // [id, [key, value, key, value]] export type RawStreamMessage = [id: string, payload: string[]]; diff --git a/lib/redis.client.ts b/lib/redis.client.ts index bb8b70c..644dafc 100644 --- a/lib/redis.client.ts +++ b/lib/redis.client.ts @@ -7,6 +7,7 @@ import { RequestsMap } from './requests-map'; import { deserialize, generateCorrelationId, serialize } from './streams.utils'; import { RedisStreamContext } from './stream.context'; import { firstValueFrom, share } from 'rxjs'; +import { RedisValue } from 'ioredis'; @Injectable() export class RedisStreamClient extends ClientProxy { @@ -104,9 +105,16 @@ export class RedisStreamClient extends ClientProxy { try { if (!this.client) throw new Error('Redis client instance not found.'); + const commandArgs: RedisValue[] = []; + if(this.options.streams?.maxLen){ + commandArgs.push("MAXLEN") + commandArgs.push("~") + commandArgs.push(this.options.streams.maxLen.toString()) + } + commandArgs.push("*") let response = await this.client.xadd( stream, - '*', + ...commandArgs, ...serializedPayloadArray, ); return response; diff --git a/lib/redis.server.ts b/lib/redis.server.ts index 9648181..bd1cd91 100644 --- a/lib/redis.server.ts +++ b/lib/redis.server.ts @@ -12,6 +12,7 @@ import { CONNECT_EVENT, ERROR_EVENT } from '@nestjs/microservices/constants'; import { deserialize, serialize } from './streams.utils'; import { RedisStreamContext } from './stream.context'; import { Observable } from 'rxjs'; +import { RedisValue } from 'ioredis'; export class RedisStreamStrategy extends Server @@ -140,7 +141,19 @@ export class RedisStreamStrategy if (!this.client) throw new Error('Redis client instance not found.'); - await this.client.xadd(responseObj.stream, '*', ...serializedEntries); + const commandArgs: RedisValue[] = []; + if(this.options.streams?.maxLen){ + commandArgs.push("MAXLEN") + commandArgs.push("~") + commandArgs.push(this.options.streams.maxLen.toString()) + } + commandArgs.push("*") + + await this.client.xadd( + responseObj.stream, + ...commandArgs, + ...serializedEntries, + ); }), );