-
-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathDurableSendingAndReceivingCompliance.cs
95 lines (76 loc) · 3.27 KB
/
DurableSendingAndReceivingCompliance.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
using IntegrationTests;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Oakton.Resources;
using Shouldly;
using Wolverine.AmazonSqs.Internal;
using Wolverine.ComplianceTests.Compliance;
using Wolverine.Marten;
using Wolverine.Runtime;
namespace Wolverine.AmazonSqs.Tests;
public class DurableComplianceFixture : TransportComplianceFixture, IAsyncLifetime
{
public static int Number;
public DurableComplianceFixture() : base(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/receiver"), 120)
{
}
public async Task InitializeAsync()
{
var number = ++Number;
OutboundAddress = new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/receiver-" + number);
await SenderIs(opts =>
{
opts.UseAmazonSqsTransportLocally()
.AutoProvision()
.AutoPurgeOnStartup()
.ConfigureListeners(x => x.UseDurableInbox())
.ConfigureListeners(x => x.UseDurableInbox());
opts.Services.AddMarten(store =>
{
store.Connection(Servers.PostgresConnectionString);
store.DatabaseSchemaName = "sender";
}).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "sender");
opts.Services.AddResourceSetupOnStartup();
opts.ListenToSqsQueue("sender-" + number);
});
await ReceiverIs(opts =>
{
opts.UseAmazonSqsTransportLocally()
.AutoProvision()
.AutoPurgeOnStartup()
.ConfigureListeners(x => x.UseDurableInbox())
.ConfigureListeners(x => x.UseDurableInbox());
opts.Services.AddMarten(store =>
{
store.Connection(Servers.PostgresConnectionString);
store.DatabaseSchemaName = "receiver";
}).IntegrateWithWolverine(x => x.MessageStorageSchemaName = "receiver");
opts.Services.AddResourceSetupOnStartup();
opts.ListenToSqsQueue("receiver-" + number);
});
await Sender.Services.GetRequiredService<IWolverineRuntime>().Storage.Admin.RebuildAsync();
await Receiver.Services.GetRequiredService<IWolverineRuntime>().Storage.Admin.RebuildAsync();
}
public async Task DisposeAsync()
{
await DisposeAsync();
}
public class DurableSendingAndReceivingCompliance : TransportCompliance<DurableComplianceFixture>
{
[Fact]
public virtual async Task dlq_mechanics()
{
throwOnAttempt<DivideByZeroException>(1);
throwOnAttempt<DivideByZeroException>(2);
throwOnAttempt<DivideByZeroException>(3);
await shouldMoveToErrorQueueOnAttempt(1);
var runtime = theReceiver.Services.GetRequiredService<IWolverineRuntime>();
var transport = runtime.Options.Transports.GetOrCreate<AmazonSqsTransport>();
var queue = transport.Queues[AmazonSqsTransport.DeadLetterQueueName];
await queue.InitializeAsync(NullLogger.Instance);
var messages = await transport.SqsClient.ReceiveMessageAsync(queue.QueueUrl);
messages.Messages.Count.ShouldBeGreaterThan(0);
}
}
}