-
-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathTestingExtensions.cs
398 lines (346 loc) · 13.7 KB
/
TestingExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
using System.Text;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine.Logging;
using Wolverine.Runtime;
using Wolverine.Tracking;
namespace Wolverine;
public class WolverineMessageExpectationException : Exception
{
public WolverineMessageExpectationException(string message, IReadOnlyList<object> messages) : base(message)
{
Messages = messages.ToArray();
}
public IReadOnlyList<object> Messages { get; }
}
public static class TestingExtensions
{
private static object toMessage(this object message)
{
if (message is Envelope e)
{
return e.Message!;
}
return message;
}
private static object[] resolveMessages(this IEnumerable<object> messages)
{
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
return messages.Select(x => x.toMessage()).Where(x => x != null)
.ToArray();
}
private static string toListOfMessages(this IEnumerable<object> messages)
{
var actual = messages
.resolveMessages();
if (actual.Length == 0)
{
return "[no messages]";
}
return actual.Select(x => x.ToString()!).Join(", ");
}
/// <summary>
/// Find the first envelope of the specified message type. Will throw if
/// no matching envelope is found
/// </summary>
/// <param name="envelopes"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <exception cref="WolverineMessageExpectationException"></exception>
public static Envelope FindForMessageType<T>(this IEnumerable<Envelope> envelopes)
{
var values = envelopes.ToArray();
return values.FirstOrDefault(x => x.Message is T) ?? throw new WolverineMessageExpectationException(
$"Could not find an envelope with message type {typeof(T).FullNameInCode()}. The actual messages were {values.toListOfMessages()}",
values.resolveMessages());
}
/// <summary>
/// Assert that there are no messages of any type within this published collection
/// </summary>
/// <param name="messages"></param>
/// <exception cref="WolverineMessageExpectationException"></exception>
public static void ShouldHaveNoMessages(this IEnumerable<object>? messages)
{
if (messages == null)
{
return;
}
// ReSharper disable once PossibleMultipleEnumeration
if (messages.Any())
{
throw new WolverineMessageExpectationException(
$"Should be no messages, but was {messages.toListOfMessages()}", messages.ToArray());
}
}
/// <summary>
/// Assert that no messages of type T were part of this collection
/// </summary>
/// <param name="messages"></param>
/// <typeparam name="T"></typeparam>
/// <exception cref="WolverineMessageExpectationException"></exception>
public static void ShouldHaveNoMessageOfType<T>(this IEnumerable<object> messages)
{
var actual = messages.resolveMessages();
if (actual.Any(message => message is T or DeliveryMessage<T>))
{
throw new WolverineMessageExpectationException(
$"Should be no messages of type {typeof(T).FullNameInCode()}, but the actual messages were {actual.toListOfMessages()}",
actual);
}
}
/// <summary>
/// Assert and return the first message of type T within this collection
/// of published messages (unwraps DeliveryMessage<T>.Message if necessary)
/// </summary>
/// <param name="messages"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <exception cref="WolverineMessageExpectationException"></exception>
public static T ShouldHaveMessageOfType<T>(this IEnumerable<object> messages)
{
return ShouldHaveMessageOfType<T>(messages, null);
}
/// <summary>
/// Assert and return the first message of type T within this collection
/// of published messages (unwraps DeliveryMessage<T>.Message if necessary)
/// </summary>
/// <param name="messages"></param>
/// <param name="deliveryAssertions">
/// Optional assertions against the DeliveryOptions the message was published
/// with. If the message was not published with DeliveryOptions, null is supplied
/// to this action.
/// </param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <exception cref="WolverineMessageExpectationException"></exception>
public static T ShouldHaveMessageOfType<T>(this IEnumerable<object> messages,
Action<DeliveryOptions?>? deliveryAssertions)
{
var actual = messages.resolveMessages();
if (actual.Length == 0)
{
throw new WolverineMessageExpectationException(
$"Should be a message of type {typeof(T).FullNameInCode()}, but there were no messages", actual);
}
foreach (var message in actual)
{
if (message is T directMatch)
{
deliveryAssertions?.Invoke(null);
return directMatch;
}
if (message is DeliveryMessage<T> deliveryMessage)
{
deliveryAssertions?.Invoke(deliveryMessage.Options);
return deliveryMessage.Message;
}
}
throw new WolverineMessageExpectationException(
$"Should be a message of type {typeof(T).FullNameInCode()}, but actual messages were {actual.toListOfMessages()}",
actual);
}
/// <summary>
/// If it exists, find the first envelope that contains a message of type T
/// within this collection of published messages. This is only necessary for
/// testing customized message sending (explicit destination, headers, scheduled delivery)
/// </summary>
/// <param name="messages"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <exception cref="WolverineMessageExpectationException"></exception>
public static Envelope ShouldHaveEnvelopeForMessageType<T>(this IEnumerable<object> messages)
{
return messages.OfType<Envelope>()
.FirstOrDefault(x => x.Message is T) ?? throw new WolverineMessageExpectationException(
$"Unable to find an envelope for type {typeof(T).FullNameInCode()}, actual messages were {messages.toListOfMessages()}",
messages.ToArray());
}
/// <summary>
/// Test helper for agent assignment within Wolverine. Definitely an advanced usage!
/// </summary>
/// <param name="expectedLeader"></param>
/// <param name="configure"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public static Task<bool> WaitUntilAssignmentsChangeTo(this IHost expectedLeader,
Action<AssignmentWaiter> configure, TimeSpan timeout)
{
var waiter = new AssignmentWaiter(expectedLeader);
configure(waiter);
return waiter.Start(timeout);
}
/// <summary>
/// Test helper to quickly retrieve the identity list of all running agents
/// </summary>
/// <param name="host"></param>
/// <returns></returns>
public static IReadOnlyList<Uri> RunningAgents(this IHost host)
{
return host.GetRuntime().Agents.AllRunningAgentUris();
}
// public class AssignmentWaiter : IObserver<IWolverineEvent>
// {
// private readonly TaskCompletionSource<bool> _completion = new();
//
// private IDisposable _unsubscribe;
// private readonly WolverineTracker _tracker;
//
// public Dictionary<Guid, int> AgentCountByHost { get; } = new();
// public string AgentScheme { get; set; }
//
// public AssignmentWaiter(IHost leader)
// {
// _tracker = leader.GetRuntime().Tracker;
// }
//
// public void ExpectRunningAgents(IHost host, int runningCount)
// {
// var id = host.GetRuntime().Options.UniqueNodeId;
// AgentCountByHost[id] = runningCount;
// }
//
// public Task<bool> Start(TimeSpan timeout)
// {
// if (HasReached()) return Task.FromResult(true);
//
// _unsubscribe = _tracker.Subscribe(this);
//
// var timeout1 = new CancellationTokenSource(timeout);
// timeout1.Token.Register(() =>
// {
// _completion.TrySetException(new TimeoutException(
// "Did not reach the expected state or message in time"));
// });
//
//
// return _completion.Task;
// }
//
// public bool HasReached()
// {
// foreach (var pair in AgentCountByHost)
// {
// Func<Uri, bool> filter = AgentScheme.IsEmpty()
// ? x => !x.Scheme.StartsWith("wolverine")
// : x => x.Scheme.EqualsIgnoreCase(AgentScheme);
//
// var runningCount = _tracker.Agents.ToArray().Where(x => filter(x.Key)).Count(x => x.Value == pair.Key);
// if (pair.Value != runningCount) return false;
// }
//
// return true;
// }
//
// public void OnCompleted()
// {
// }
//
// public void OnError(Exception error)
// {
// _completion.SetException(error);
// }
//
// public void OnNext(IWolverineEvent value)
// {
// if (HasReached())
// {
// _completion.TrySetResult(true);
// _unsubscribe.Dispose();
// }
// }
// }
// }
// Used internally by the method above
public class AssignmentWaiter
{
public Dictionary<Guid, int> AgentCountByHost { get; } = new();
private readonly Dictionary<Guid, IAgentRuntime> _runtimes = new();
private readonly WolverineRuntime _leaderRuntime;
public string AgentScheme { get; set; }
// For multiple host agent assignments can vary, so we may need to rely on total count
public bool CountTotal { get; set; }
public AssignmentWaiter(IHost leader)
{
var runtime = leader.GetRuntime();
_leaderRuntime = runtime;
_runtimes[runtime.Options.UniqueNodeId] = runtime.Agents;
}
public void ExpectRunningAgents(IHost host, int runningCount)
{
var runtime = host.GetRuntime();
var id = runtime.Options.UniqueNodeId;
AgentCountByHost[id] = runningCount;
_runtimes[id] = runtime.Agents;
}
public Task<bool> Start(TimeSpan timeout)
{
if (HasReached()) return Task.FromResult(true);
var timeout1 = new CancellationTokenSource(timeout);
timeout1.CancelAfter(timeout);
return Task.Factory.StartNew(async () =>
{
try
{
while (!timeout1.IsCancellationRequested)
{
if (HasReached()) return true;
await Task.Delay(1.Seconds(), timeout1.Token);
}
if (HasReached()) return true;
var builder = await writePersistedActualsAsync();
throw new TimeoutException(builder.ToString());
}
catch (TaskCanceledException)
{
if (HasReached()) return true;
var builder = await writePersistedActualsAsync();
throw new TimeoutException(builder.ToString());
}
}, timeout1.Token).Unwrap();
}
private async Task<StringBuilder> writePersistedActualsAsync()
{
var nodes = await _leaderRuntime.Storage.Nodes.LoadAllNodesAsync(CancellationToken.None);
var builder = new StringBuilder();
var writer = new StringWriter(builder);
writer.WriteLine("According to the database...");
foreach (var node in nodes.OrderBy(x => x.AssignedNodeId))
{
writer.WriteLine($"Node {node.AssignedNodeId} is running:");
foreach (var uri in node.ActiveAgents.OrderBy(x => x.ToString()))
{
writer.WriteLine(uri);
}
}
writer.WriteLine();
writer.WriteLine("According to the runtimes");
foreach (var node in nodes.OrderBy(x => x.AssignedNodeId))
{
writer.WriteLine($"Node {node.AssignedNodeId} is running:");
var runtime = _runtimes[node.Id];
foreach (var uri in runtime.AllRunningAgentUris().OrderBy(x => x.ToString()))
{
writer.WriteLine(uri);
}
}
return builder;
}
public bool HasReached()
{
Func<Uri, bool> filter = AgentScheme.IsEmpty()
? x => !x.Scheme.StartsWith("wolverine")
: x => x.Scheme.EqualsIgnoreCase(AgentScheme);
var difference = 0;
foreach (var pair in AgentCountByHost)
{
var runtime = _runtimes[pair.Key];
var runningCount = runtime.AllRunningAgentUris().Count(x => filter(x));
if (!CountTotal && pair.Value != runningCount) return false;
difference += pair.Value - runningCount;
}
return !CountTotal || difference == 0;
}
}
}