Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Current package versions:

## Unreleased

- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972))
- Support `MSETEX` (Redis 8.4.0) for multi-key operations with expiration ([#2977 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2977))

## 2.9.32
Expand Down
25 changes: 25 additions & 0 deletions src/StackExchange.Redis/APITypes/StreamEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ public StreamEntry(RedisValue id, NameValueEntry[] values)
{
Id = id;
Values = values;
IdleTime = null;
DeliveryCount = 0;
}

/// <summary>
/// Creates a stream entry.
/// </summary>
public StreamEntry(RedisValue id, NameValueEntry[] values, TimeSpan? idleTime, int deliveryCount)
{
Id = id;
Values = values;
IdleTime = idleTime;
DeliveryCount = deliveryCount;
}

/// <summary>
Expand Down Expand Up @@ -51,6 +64,18 @@ public RedisValue this[RedisValue fieldName]
}
}

/// <summary>
/// Delivery count - the number of times this entry has been delivered: 0 for new messages that haven't been delivered before,
/// 1+ for claimed messages (previously unacknowledged entries).
/// </summary>
public int DeliveryCount { get; }

/// <summary>
/// Idle time in milliseconds - the number of milliseconds elapsed since this entry was last delivered to a consumer.
/// </summary>
/// <remarks>This member is populated when using <c>XREADGROUP</c> with <c>CLAIM</c>.</remarks>
public TimeSpan? IdleTime { get; }

/// <summary>
/// Indicates that the Redis Stream Entry is null.
/// </summary>
Expand Down
37 changes: 35 additions & 2 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2971,7 +2971,22 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks><seealso href="https://redis.io/commands/xreadgroup"/></remarks>
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, bool noAck, CommandFlags flags);

/// <summary>
/// Read messages from a stream into an associated consumer group.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="StreamPosition.NewMessages"/> when <see langword="null"/>.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
/// <param name="claimMinIdleTime">Auto-claim messages that have been idle for at least this long.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks><seealso href="https://redis.io/commands/xreadgroup"/></remarks>
StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Read from multiple streams into the given consumer group.
Expand Down Expand Up @@ -3004,7 +3019,25 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
/// </remarks>
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);

/// <summary>
/// Read from multiple streams into the given consumer group.
/// The consumer group with the given <paramref name="groupName"/> will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The name of the consumer.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
/// <param name="claimMinIdleTime">Auto-claim messages that have been idle for at least this long.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
/// </remarks>
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Trim the stream to a specified maximum length.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,7 @@ Task<bool> VectorSetSetAttributesJsonAsync(
RedisKey key,
VectorSetSimilaritySearchRequest query,
CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, TimeSpan?, CommandFlags)"/>
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);
}
7 changes: 5 additions & 2 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -725,13 +725,16 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamReadGroup(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, CommandFlags)"/>
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position, int? count, bool noAck, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamReadGroup(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, TimeSpan?, CommandFlags)"/>
Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, CommandFlags)"/>
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, CommandFlags)"/>
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags);
Expand Down
6 changes: 6 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,18 @@ public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupNa
public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, noAck, flags);

public Task<StreamEntry[]> StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroupAsync(ToInner(key), groupName, consumerName, position, count, noAck, claimMinIdleTime, flags);

public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) =>
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, flags);

public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, flags);

public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);

public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,18 @@ public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisVa
public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, noAck, flags);

public StreamEntry[] StreamReadGroup(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroup(ToInner(key), groupName, consumerName, position, count, noAck, claimMinIdleTime, flags);

public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags) =>
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, flags);

public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, flags);

public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);

public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);

Expand Down
Loading
Loading