Skip to content

Commit 1e3bcca

Browse files
mgravellatakavci
andauthored
Fix for FT.CURSOR in cluster (#440)
* Fix for FT.CURSOR in cluster; requires single server - update SE.Redis ref to allow new GetServer(RedisKey) usage - add utility API to capture an IServer and database if using cluster - create internal AggregationResult subclass that includes the IServer - capture server in Aggregate[Async] - create new overloads for CusorDel[Async] and CursorRead[Async] that take AggregationResult, and push consumers towards that overload - use captured server/database when appropriate - use the new API from tests - add new I[Async]Enumerable API for simplicity: AggregateEnumerable[Async] - add tests for new API - use cluster env from cursor tests * update interfaces * dotnet format * use correct routing in AddDocument * .gitignore - docker containers * dotnet format * - enable all-environments over almost all FT tests - workaround DBSIZE usage - use IP in endpoints.json to prevent double-counting of servers - compensate for NumDocs oddity on cluster, and don't test detailed numbers (which vary by shard) * don't hit disconnected servers when crawling endpoints * dotnet format * more search test tweaks * rev SE.Redis for RedisValue fix * more test fixes * dotnet format... again * fix routing of dictionary methods * actually: not a key * try to add more replication stability * TestApplyAndFilterAggregations - loop attempt * only continue on last attempt! * allow even more time in TestApplyAndFilterAggregations * fix CI mstest on .net9 * grandfather many cluster tests pre 8 * update local docker file * skip a bunch more tests on cluster pre 8 * skip TestCreate on cluster < 8 * Update tests/dockers/docker-compose.yml Co-authored-by: atakavci <[email protected]> * clarify that the enumerable APIs may involve multiple operations * clarify why/when the old cursor API will fail --------- Co-authored-by: atakavci <[email protected]>
1 parent 79bf8ce commit 1e3bcca

16 files changed

+1172
-508
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,3 +410,8 @@ tests/NRedisStack.Tests/redis_credentials/redis_user.crt
410410
# global.json
411411
global.json
412412
tests/NRedisStack.Tests/lcov.net8.0.info
413+
414+
# docker containers
415+
tests/dockers/cluster/
416+
tests/dockers/standalone/
417+
tests/dockers/all/

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<!-- primary library -->
1313
<PackageVersion Include="NetTopologySuite" Version="2.6.0" />
1414
<PackageVersion Include="System.Text.Json" Version="9.0.7" />
15-
<PackageVersion Include="StackExchange.Redis" Version="2.8.58" />
15+
<PackageVersion Include="StackExchange.Redis" Version="2.9.17" />
1616
<!-- tests, etc -->
1717
<PackageVersion Include="BouncyCastle.Cryptography" Version="2.6.1" />
1818
<PackageVersion Include="coverlet.collector" Version="6.0.4" />

src/NRedisStack/Auxiliary.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,22 @@ public static RedisResult Execute(this IDatabase db, SerializedCommand command)
6767
return db.Execute(command.Command, command.Args);
6868
}
6969

70+
internal static RedisResult Execute(this IServer server, int? db, SerializedCommand command)
71+
{
72+
return server.Execute(db, command.Command, command.Args);
73+
}
74+
7075
public static async Task<RedisResult> ExecuteAsync(this IDatabaseAsync db, SerializedCommand command)
7176
{
7277
((IDatabase)db).SetInfoInPipeline();
7378
return await db.ExecuteAsync(command.Command, command.Args);
7479
}
7580

81+
internal static async Task<RedisResult> ExecuteAsync(this IServer server, int? db, SerializedCommand command)
82+
{
83+
return await server.ExecuteAsync(db, command.Command, command.Args);
84+
}
85+
7686
public static List<RedisResult> ExecuteBroadcast(this IDatabase db, string command)
7787
=> db.ExecuteBroadcast(new SerializedCommand(command));
7888

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,13 @@
11
#nullable enable
2+
NRedisStack.ISearchCommands.AggregateEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IEnumerable<NRedisStack.Search.Aggregation.Row>!
3+
NRedisStack.ISearchCommands.CursorDel(NRedisStack.Search.AggregationResult! result) -> bool
4+
NRedisStack.ISearchCommands.CursorRead(NRedisStack.Search.AggregationResult! result, int? count = null) -> NRedisStack.Search.AggregationResult!
5+
NRedisStack.ISearchCommandsAsync.AggregateAsyncEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IAsyncEnumerable<NRedisStack.Search.Aggregation.Row>!
6+
NRedisStack.ISearchCommandsAsync.CursorDelAsync(NRedisStack.Search.AggregationResult! result) -> System.Threading.Tasks.Task<bool>!
7+
NRedisStack.ISearchCommandsAsync.CursorReadAsync(NRedisStack.Search.AggregationResult! result, int? count = null) -> System.Threading.Tasks.Task<NRedisStack.Search.AggregationResult!>!
8+
NRedisStack.SearchCommands.AggregateEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IEnumerable<NRedisStack.Search.Aggregation.Row>!
9+
NRedisStack.SearchCommands.CursorDel(NRedisStack.Search.AggregationResult! result) -> bool
10+
NRedisStack.SearchCommands.CursorRead(NRedisStack.Search.AggregationResult! result, int? count = null) -> NRedisStack.Search.AggregationResult!
11+
NRedisStack.SearchCommandsAsync.AggregateAsyncEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IAsyncEnumerable<NRedisStack.Search.Aggregation.Row>!
12+
NRedisStack.SearchCommandsAsync.CursorDelAsync(NRedisStack.Search.AggregationResult! result) -> System.Threading.Tasks.Task<bool>!
13+
NRedisStack.SearchCommandsAsync.CursorReadAsync(NRedisStack.Search.AggregationResult! result, int? count = null) -> System.Threading.Tasks.Task<NRedisStack.Search.AggregationResult!>!

src/NRedisStack/ResponseParser.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,20 @@ public static AggregationResult ToAggregationResult(this RedisResult result, Agg
737737
}
738738
}
739739

740+
internal static AggregationResult ToAggregationResult(this RedisResult result, string indexName, AggregationRequest query, IServer? server, int? database)
741+
{
742+
if (query.IsWithCursor())
743+
{
744+
var results = (RedisResult[])result!;
745+
746+
return new AggregationResult.WithCursorAggregationResult(indexName, results[0], (long)results[1], server, database);
747+
}
748+
else
749+
{
750+
return new(result);
751+
}
752+
}
753+
740754
public static Dictionary<string, RedisResult>[] ToDictionarys(this RedisResult result)
741755
{
742756
var resArr = (RedisResult[])result!;

src/NRedisStack/Search/AggregationRequest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public AggregationRequest Cursor(int? count = null, long? maxIdle = null)
128128

129129
if (count != null)
130130
{
131+
Count = count;
131132
args.Add(SearchArgs.COUNT);
132133
args.Add(count);
133134
}
@@ -139,6 +140,7 @@ public AggregationRequest Cursor(int? count = null, long? maxIdle = null)
139140
}
140141
return this;
141142
}
143+
internal int? Count { get; set; }
142144

143145
public AggregationRequest Params(Dictionary<string, object> nameValue)
144146
{

src/NRedisStack/Search/AggregationResult.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,29 @@
33

44
namespace NRedisStack.Search;
55

6-
public sealed class AggregationResult
6+
public class AggregationResult
77
{
8+
// internal subclass for WITHCURSOR calls, which need to be issued to the same connection
9+
internal sealed class WithCursorAggregationResult : AggregationResult
10+
{
11+
internal WithCursorAggregationResult(string indexName, RedisResult result, long cursorId, IServer? server,
12+
int? database) : base(result, cursorId)
13+
{
14+
IndexName = indexName;
15+
Server = server;
16+
Database = database;
17+
}
18+
public string IndexName { get; }
19+
public IServer? Server { get; }
20+
public int? Database { get; }
21+
}
22+
823
public long TotalResults { get; }
924
private readonly Dictionary<string, object>[] _results;
1025
private Dictionary<string, RedisValue>[]? _resultsAsRedisValues;
1126

1227
public long CursorId { get; }
1328

14-
1529
internal AggregationResult(RedisResult result, long cursorId = -1)
1630
{
1731
var arr = (RedisResult[])result!;
@@ -45,7 +59,6 @@ internal AggregationResult(RedisResult result, long cursorId = -1)
4559
CursorId = cursorId;
4660
}
4761

48-
4962
/// <summary>
5063
/// takes a Redis multi-bulk array represented by a RedisResult[] and recursively processes its elements.
5164
/// For each element in the array, it checks if it's another multi-bulk array, and if so, it recursively calls itself.

src/NRedisStack/Search/ISearchCommands.cs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System.ComponentModel;
12
using NRedisStack.Search;
3+
using NRedisStack.Search.Aggregation;
24
using NRedisStack.Search.DataTypes;
35
using StackExchange.Redis;
46

@@ -18,11 +20,20 @@ public interface ISearchCommands
1820
/// Run a search query on an index, and perform aggregate transformations on the results.
1921
/// </summary>
2022
/// <param name="index">The index name.</param>
21-
/// <param name="query">The query</param>
23+
/// <param name="query">The query.</param>
2224
/// <returns>An <see langword="AggregationResult"/> object</returns>
2325
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
2426
AggregationResult Aggregate(string index, AggregationRequest query);
2527

28+
/// <summary>
29+
/// Run a search query on an index, and perform aggregate transformations on the results.
30+
/// </summary>
31+
/// <param name="index">The index name.</param>
32+
/// <param name="query">The query.</param>
33+
/// <returns>A sequence of <see langword="Row"/> values.</returns>
34+
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
35+
IEnumerable<Row> AggregateEnumerable(string index, AggregationRequest query);
36+
2637
/// <summary>
2738
/// Add an alias to an index.
2839
/// </summary>
@@ -92,22 +103,43 @@ public interface ISearchCommands
92103
/// <summary>
93104
/// Delete a cursor from the index.
94105
/// </summary>
95-
/// <param name="indexName">The index name</param>
106+
/// <param name="indexName">The index name.</param>
96107
/// <param name="cursorId">The cursor's ID.</param>
97108
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
98109
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
110+
[Obsolete("When possible, use CursorDel(AggregationResult) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
111+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
99112
bool CursorDel(string indexName, long cursorId);
100113

114+
/// <summary>
115+
/// Delete a cursor from the index.
116+
/// </summary>
117+
/// <param name="result">The result of a previous call to Aggregate or CursorRead.</param>
118+
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
119+
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
120+
bool CursorDel(AggregationResult result);
121+
101122
/// <summary>
102123
/// Read next results from an existing cursor.
103124
/// </summary>
104-
/// <param name="indexName">The index name</param>
125+
/// <param name="indexName">The index name.</param>
105126
/// <param name="cursorId">The cursor's ID.</param>
106127
/// <param name="count">Limit the amount of returned results.</param>
107128
/// <returns>A AggregationResult object with the results</returns>
108129
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
130+
[Obsolete("When possible, use AggregateEnumerable or CursorRead(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
131+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
109132
AggregationResult CursorRead(string indexName, long cursorId, int? count = null);
110133

134+
/// <summary>
135+
/// Read next results from an existing cursor.
136+
/// </summary>
137+
/// <param name="result">The result of a previous call to Aggregate or CursorRead.</param>
138+
/// <param name="count">Limit the amount of returned results.</param>
139+
/// <returns>A AggregationResult object with the results</returns>
140+
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
141+
public AggregationResult CursorRead(AggregationResult result, int? count = null);
142+
111143
/// <summary>
112144
/// Add terms to a dictionary.
113145
/// </summary>

src/NRedisStack/Search/ISearchCommandsAsync.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System.ComponentModel;
12
using NRedisStack.Search;
3+
using NRedisStack.Search.Aggregation;
24
using NRedisStack.Search.DataTypes;
35
using StackExchange.Redis;
46

@@ -14,14 +16,25 @@ public interface ISearchCommandsAsync
1416
Task<RedisResult[]> _ListAsync();
1517

1618
/// <summary>
17-
/// Run a search query on an index, and perform aggregate transformations on the results.
19+
/// Run a search query on an index, and perform aggregate transformations on the results. This operates
20+
/// as a cursor and may involve multiple commands to the server.
1821
/// </summary>
1922
/// <param name="index">The index name.</param>
2023
/// <param name="query">The query</param>
2124
/// <returns>An <see langword="AggregationResult"/> object</returns>
2225
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
2326
Task<AggregationResult> AggregateAsync(string index, AggregationRequest query);
2427

28+
/// <summary>
29+
/// Run a search query on an index, and perform aggregate transformations on the results. This operates
30+
/// as a cursor and may involve multiple commands to the server.
31+
/// </summary>
32+
/// <param name="index">The index name.</param>
33+
/// <param name="query">The query.</param>
34+
/// <returns>A sequence of <see langword="Row"/> values.</returns>
35+
/// <remarks><seealso href="https://redis.io/commands/ft.aggregate"/></remarks>
36+
IAsyncEnumerable<Row> AggregateAsyncEnumerable(string index, AggregationRequest query);
37+
2538
/// <summary>
2639
/// Add an alias to an index.
2740
/// </summary>
@@ -95,8 +108,18 @@ public interface ISearchCommandsAsync
95108
/// <param name="cursorId">The cursor's ID.</param>
96109
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
97110
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
111+
[Obsolete("When possible, use CursorDelAsync(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
112+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
98113
Task<bool> CursorDelAsync(string indexName, long cursorId);
99114

115+
/// <summary>
116+
/// Delete a cursor from the index.
117+
/// </summary>
118+
/// <param name="result">The result of a previous call to AggregateAsync or CursorReadAsync.</param>
119+
/// <returns><see langword="true"/> if it has been deleted, <see langword="false"/> if it did not exist.</returns>
120+
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-del/"/></remarks>
121+
Task<bool> CursorDelAsync(AggregationResult result);
122+
100123
/// <summary>
101124
/// Read next results from an existing cursor.
102125
/// </summary>
@@ -105,8 +128,19 @@ public interface ISearchCommandsAsync
105128
/// <param name="count">Limit the amount of returned results.</param>
106129
/// <returns>A AggregationResult object with the results</returns>
107130
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
131+
[Obsolete("When possible, use AggregateAsyncEnumerable or CursorReadAsync(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
132+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
108133
Task<AggregationResult> CursorReadAsync(string indexName, long cursorId, int? count = null);
109134

135+
/// <summary>
136+
/// Read next results from an existing cursor.
137+
/// </summary>
138+
/// <param name="result">The result of a previous AggregateAsync or CursorReadAsync call.</param>
139+
/// <param name="count">Limit the amount of returned results.</param>
140+
/// <returns>A AggregationResult object with the results</returns>
141+
/// <remarks><seealso href="https://redis.io/commands/ft.cursor-read/"/></remarks>
142+
Task<AggregationResult> CursorReadAsync(AggregationResult result, int? count = null);
143+
110144
/// <summary>
111145
/// Add terms to a dictionary.
112146
/// </summary>

src/NRedisStack/Search/SearchCommands.cs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System.ComponentModel;
12
using NRedisStack.Search;
3+
using NRedisStack.Search.Aggregation;
24
using NRedisStack.Search.DataTypes;
35
using StackExchange.Redis;
46
namespace NRedisStack;
@@ -16,8 +18,54 @@ public RedisResult[] _List()
1618
public AggregationResult Aggregate(string index, AggregationRequest query)
1719
{
1820
SetDefaultDialectIfUnset(query);
19-
var result = db.Execute(SearchCommandBuilder.Aggregate(index, query));
20-
return result.ToAggregationResult(query);
21+
IServer? server = null;
22+
int? database = null;
23+
24+
var command = SearchCommandBuilder.Aggregate(index, query);
25+
if (query.IsWithCursor())
26+
{
27+
// we can issue this anywhere, but follow-up calls need to be on the same server
28+
server = GetRandomServerForCluster(db, out database);
29+
}
30+
31+
RedisResult result;
32+
if (server is not null)
33+
{
34+
result = server.Execute(database, command);
35+
}
36+
else
37+
{
38+
result = db.Execute(command);
39+
}
40+
41+
return result.ToAggregationResult(index, query, server, database);
42+
}
43+
44+
public IEnumerable<Row> AggregateEnumerable(string index, AggregationRequest query)
45+
{
46+
if (!query.IsWithCursor()) query.Cursor();
47+
48+
var result = Aggregate(index, query);
49+
try
50+
{
51+
while (true)
52+
{
53+
var count = checked((int)result.TotalResults);
54+
for (int i = 0; i < count; i++)
55+
{
56+
yield return result.GetRow(i);
57+
}
58+
if (result.CursorId == 0) break;
59+
result = CursorRead(result, query.Count);
60+
}
61+
}
62+
finally
63+
{
64+
if (result.CursorId != 0)
65+
{
66+
CursorDel(result);
67+
}
68+
}
2169
}
2270

2371
/// <inheritdoc/>
@@ -72,18 +120,52 @@ public bool Create(string indexName, Schema schema)
72120
}
73121

74122
/// <inheritdoc/>
123+
[Obsolete("When possible, use CursorDel(AggregationResult) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
124+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
75125
public bool CursorDel(string indexName, long cursorId)
76126
{
77127
return db.Execute(SearchCommandBuilder.CursorDel(indexName, cursorId)).OKtoBoolean();
78128
}
79129

130+
public bool CursorDel(AggregationResult result)
131+
{
132+
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
133+
{
134+
throw new ArgumentException(
135+
message: $"{nameof(CursorDelAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.",
136+
paramName: nameof(result));
137+
}
138+
139+
var command = SearchCommandBuilder.CursorDel(withCursor.IndexName, withCursor.CursorId);
140+
var resp = withCursor.Server is { } server
141+
? server.Execute(withCursor.Database, command)
142+
: db.Execute(command);
143+
return resp.OKtoBoolean();
144+
}
145+
80146
/// <inheritdoc/>
147+
[Obsolete("When possible, use CusorReadEnumerable or CursorRead(AggregationResult, int?) instead. This legacy API will not work correctly on CLUSTER environments, but will continue to work for single-node deployments.")]
148+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
81149
public AggregationResult CursorRead(string indexName, long cursorId, int? count = null)
82150
{
83151
var resp = db.Execute(SearchCommandBuilder.CursorRead(indexName, cursorId, count)).ToArray();
84152
return new(resp[0], (long)resp[1]);
85153
}
86154

155+
public AggregationResult CursorRead(AggregationResult result, int? count = null)
156+
{
157+
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
158+
{
159+
throw new ArgumentException(message: $"{nameof(CursorReadAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.", paramName: nameof(result));
160+
}
161+
var command = SearchCommandBuilder.CursorRead(withCursor.IndexName, withCursor.CursorId, count);
162+
var rawResult = withCursor.Server is { } server
163+
? server.Execute(withCursor.Database, command)
164+
: db.Execute(command);
165+
var resp = rawResult.ToArray();
166+
return new AggregationResult.WithCursorAggregationResult(withCursor.IndexName, resp[0], (long)resp[1], withCursor.Server, withCursor.Database);
167+
}
168+
87169
/// <inheritdoc/>
88170
public long DictAdd(string dict, params string[] terms)
89171
{

0 commit comments

Comments
 (0)