Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adds Amazon SNS Support #1314

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,94 @@ public void sanitizing_identifiers(string identifier, string expected)
public void return_all_endpoints_gets_dead_letter_queue_too()
{
var transport = new AmazonSqsTransport();
var one = transport.Queues["one"];
var two = transport.Queues["two"];
var three = transport.Queues["three"];
three.IsListener = true;

one.DeadLetterQueueName = null;
two.DeadLetterQueueName = "two-dead-letter-queue";
two.IsListener = true;

var endpoints = transport.Endpoints().OfType<AmazonSqsQueue>().ToArray();

endpoints.ShouldContain(x => x.QueueName == AmazonSqsTransport.DeadLetterQueueName);
endpoints.ShouldContain(x => x.QueueName == "two-dead-letter-queue");
endpoints.ShouldContain(x => x.QueueName == "one");
endpoints.ShouldContain(x => x.QueueName == "two");
endpoints.ShouldContain(x => x.QueueName == "three");
var queueOne = transport.Queues["one"];
var queueTwo = transport.Queues["two"];
var queueThree = transport.Queues["three"];
queueThree.IsListener = true;

var topicOne = transport.Topics["one"];
var topicTwo = transport.Topics["two"];

queueOne.DeadLetterQueueName = null;
queueTwo.DeadLetterQueueName = "two-dead-letter-queue";
queueTwo.IsListener = true;

var queues = transport.Endpoints().OfType<AmazonSqsQueue>().ToArray();

queues.ShouldContain(x => x.QueueName == AmazonSqsTransport.DeadLetterQueueName);
queues.ShouldContain(x => x.QueueName == "two-dead-letter-queue");
queues.ShouldContain(x => x.QueueName == "one");
queues.ShouldContain(x => x.QueueName == "two");
queues.ShouldContain(x => x.QueueName == "three");

var topics = transport.Endpoints().OfType<AmazonSnsTopic>().ToArray();
topics.ShouldContain(x => x.TopicName == "one");
topics.ShouldContain(x => x.TopicName == "two");
}

[Fact]
public void findEndpointByUri_should_correctly_find_by_queuename()
public void findEndpointByUri_should_correctly_find_by_queueName()
{
string queueNameInPascalCase = "TestQueue";
string queueNameLowerCase = "testqueue";
var transport = new AmazonSqsTransport();
var testQueue = transport.Queues[queueNameInPascalCase];
var testQueue2 = transport.Queues[queueNameLowerCase];

var result = transport.GetOrCreateEndpoint(new Uri($"sqs://{queueNameInPascalCase}"));
var result = transport.GetOrCreateEndpoint(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/{queueNameInPascalCase}"));
transport.Queues.Count.ShouldBe(2);

result.EndpointName.ShouldBe(queueNameInPascalCase);
}

[Fact]
public void findEndpointByUri_should_correctly_create_endpoint_if_it_doesnt_exist()
public void findEndpointByUri_should_correctly_find_by_topicName()
{
string topicNameInPascalCase = "TestTopic";
string topicNameLowerCase = "testtopic";
var transport = new AmazonSqsTransport();
var testTopic = transport.Topics[topicNameInPascalCase];
var testTopic2 = transport.Topics[topicNameLowerCase];

var result = transport.GetOrCreateEndpoint(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/{topicNameInPascalCase}"));
transport.Topics.Count.ShouldBe(2);

result.EndpointName.ShouldBe(topicNameInPascalCase);
}

[Fact]
public void findEndpointByUri_should_correctly_create_queue_if_it_doesnt_exist()
{
string queueName = "TestQueue";
string queueName2 = "testqueue";
var transport = new AmazonSqsTransport();
transport.Queues.Count.ShouldBe(0);

var result = transport.GetOrCreateEndpoint(new Uri($"sqs://{queueName}"));
var result = transport.GetOrCreateEndpoint(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/{queueName}"));
transport.Queues.Count.ShouldBe(1);

result.EndpointName.ShouldBe(queueName);

var result2 = transport.GetOrCreateEndpoint(new Uri($"sqs://{queueName2}"));
var result2 = transport.GetOrCreateEndpoint(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/{queueName2}"));
transport.Queues.Count.ShouldBe(2);
result2.EndpointName.ShouldBe(queueName2);
}
}

[Fact]
public void findEndpointByUri_should_correctly_create_topic_if_it_doesnt_exist()
{
string topicName = "TestTopic";
string topicName2 = "testtopic";
var transport = new AmazonSqsTransport();
transport.Topics.Count.ShouldBe(0);

var result = transport.GetOrCreateEndpoint(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SnsSegment}/{topicName}"));
transport.Topics.Count.ShouldBe(1);

result.EndpointName.ShouldBe(topicName);

var result2 = transport.GetOrCreateEndpoint(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SnsSegment}/{topicName2}"));
transport.Topics.Count.ShouldBe(2);
result2.EndpointName.ShouldBe(topicName2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace Wolverine.AmazonSqs.Tests;

public class BufferedComplianceFixture : TransportComplianceFixture, IAsyncLifetime
{
public BufferedComplianceFixture() : base(new Uri("sqs://receiver"), 120)
public BufferedComplianceFixture() : base(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/receiver"), 120)
{
}

public async Task InitializeAsync()
{
var number = Guid.NewGuid().ToString().Replace(".", "-");

OutboundAddress = new Uri("sqs://receiver-" + number);
OutboundAddress = new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/receiver-" + number);

await SenderIs(opts =>
{
Expand Down Expand Up @@ -58,7 +58,7 @@ public virtual async Task dlq_mechanics()
var transport = runtime.Options.Transports.GetOrCreate<AmazonSqsTransport>();
var queue = transport.Queues[AmazonSqsTransport.DeadLetterQueueName];
await queue.InitializeAsync(NullLogger.Instance);
var messages = await transport.Client.ReceiveMessageAsync(queue.QueueUrl);
var messages = await transport.SqsClient.ReceiveMessageAsync(queue.QueueUrl);
messages.Messages.Count.ShouldBeGreaterThan(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void exclude_types()

AssertNoRoutes<PublishedMessage>();

var uri = "sqs://published.message".ToUri();
var uri = $"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/published.message".ToUri();
var endpoint = theRuntime.Endpoints.EndpointFor(uri);
endpoint.ShouldBeNull();

Expand All @@ -51,7 +51,7 @@ public void include_types()

PublishingRoutesFor<PublishedMessage>().Any().ShouldBeTrue();

var uri = "sqs://Message1".ToUri();
var uri = $"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/Message1".ToUri();
var endpoint = theRuntime.Endpoints.EndpointFor(uri);
endpoint.ShouldBeNull();

Expand Down Expand Up @@ -83,7 +83,7 @@ public void disable_listener_by_lambda()
return t.ToMessageTypeName().Replace('.', '-');
}));

var uri = "sqs://routed".ToUri();
var uri = $"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/routed".ToUri();
var endpoint = theRuntime.Endpoints.EndpointFor(uri);
endpoint.ShouldBeNull();

Expand All @@ -96,7 +96,7 @@ public void configure_listener()
{
ConfigureConventions(c => c.ConfigureListeners((x, _) => { x.UseDurableInbox(); }));

var endpoint = theRuntime.Endpoints.EndpointFor("sqs://routed".ToUri())
var endpoint = theRuntime.Endpoints.EndpointFor($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/routed".ToUri())
.ShouldBeOfType<AmazonSqsQueue>();

endpoint.Mode.ShouldBe(EndpointMode.Durable);
Expand All @@ -109,4 +109,4 @@ public void Modify(Envelope envelope)
throw new NotImplementedException();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.AmazonSqs.Internal;
using Wolverine.Runtime;
using Xunit.Abstractions;

Expand Down Expand Up @@ -33,8 +34,8 @@ public void discover_listener_with_prefix()
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

var uris = runtime.Endpoints.ActiveListeners().Select(x => x.Uri).ToArray();
uris.ShouldContain(new Uri("sqs://zztop-orderextension-createorder/"));
uris.ShouldContain(new Uri("sqs://zztop-orderextension-shiporder/"));
uris.ShouldContain(new Uri("sqs://zztop-routed/"));
uris.ShouldContain(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/zztop-orderextension-createorder/"));
uris.ShouldContain(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/zztop-orderextension-shiporder/"));
uris.ShouldContain(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/zztop-routed/"));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.AmazonSqs.Internal;
using Wolverine.ComplianceTests;
using Wolverine.Tracking;

Expand Down Expand Up @@ -55,6 +56,6 @@ public async Task send_from_one_node_to_another_all_with_conventional_routing()
.ServiceName.ShouldBe("Receiver");

received.Envelope.Destination
.ShouldBe(new Uri("sqs://shazaam-routed/"));
.ShouldBe(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/shazaam-routed/"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Wolverine.AmazonSqs.Tests.ConventionalRouting;

public class when_discovering_a_listening_endpoint_with_all_defaults : ConventionalRoutingContext
{
private readonly Uri theExpectedUri = "sqs://routed".ToUri();
private readonly Uri theExpectedUri = $"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/routed".ToUri();
private readonly AmazonSqsQueue theQueue;

public when_discovering_a_listening_endpoint_with_all_defaults()
Expand Down Expand Up @@ -39,4 +39,4 @@ public void should_be_an_active_listener()
theRuntime.Endpoints.ActiveListeners().Any(x => x.Uri == theExpectedUri)
.ShouldBeTrue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Wolverine.AmazonSqs.Tests.ConventionalRouting;

public class when_discovering_a_listening_endpoint_with_overridden_queue_naming : ConventionalRoutingContext
{
private readonly Uri theExpectedUri = "sqs://routedmessage2".ToUri();
private readonly Uri theExpectedUri = $"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/routedmessage2".ToUri();
private readonly AmazonSqsQueue theQueue;

public when_discovering_a_listening_endpoint_with_overridden_queue_naming()
Expand Down Expand Up @@ -35,4 +35,4 @@ public void should_be_an_active_listener()
theRuntime.Endpoints.ActiveListeners().Any(x => x.Uri == theExpectedUri)
.ShouldBeTrue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ public class DurableComplianceFixture : TransportComplianceFixture, IAsyncLifeti
{
public static int Number;

public DurableComplianceFixture() : base(new Uri("sqs://receiver"), 120)
public DurableComplianceFixture() : base(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/receiver"), 120)
{
}

public async Task InitializeAsync()
{
var number = ++Number;

OutboundAddress = new Uri("sqs://receiver-" + number);
OutboundAddress = new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/receiver-" + number);

await SenderIs(opts =>
{
Expand Down Expand Up @@ -88,8 +88,8 @@ public virtual async Task dlq_mechanics()
var transport = runtime.Options.Transports.GetOrCreate<AmazonSqsTransport>();
var queue = transport.Queues[AmazonSqsTransport.DeadLetterQueueName];
await queue.InitializeAsync(NullLogger.Instance);
var messages = await transport.Client.ReceiveMessageAsync(queue.QueueUrl);
var messages = await transport.SqsClient.ReceiveMessageAsync(queue.QueueUrl);
messages.Messages.Count.ShouldBeGreaterThan(0);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ public class InlineComplianceFixture : TransportComplianceFixture, IAsyncLifetim
{
public static int Number;

public InlineComplianceFixture() : base(new Uri("sqs://buffered-receiver"), 120)
public InlineComplianceFixture() : base(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/buffered-receiver"), 120)
{
}

public async Task InitializeAsync()
{
var number = ++Number;

OutboundAddress = new Uri("sqs://receiver-" + number);
OutboundAddress = new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/receiver-" + number);

await SenderIs(opts =>
{
Expand Down Expand Up @@ -64,7 +64,7 @@ public virtual async Task dlq_mechanics()
var transport = runtime.Options.Transports.GetOrCreate<AmazonSqsTransport>();
var queue = transport.Queues[AmazonSqsTransport.DeadLetterQueueName];
await queue.InitializeAsync(NullLogger.Instance);
var messages = await transport.Client.ReceiveMessageAsync(queue.QueueUrl);
var messages = await transport.SqsClient.ReceiveMessageAsync(queue.QueueUrl);
messages.Messages.Count.ShouldBeGreaterThan(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void default_endpoint_name_is_queue_name()
public void uri()
{
new AmazonSqsQueue("foo", new AmazonSqsTransport())
.Uri.ShouldBe(new Uri("sqs://foo"));
.Uri.ShouldBe(new Uri($"{AmazonSqsTransport.SqsProtocol}://{AmazonSqsTransport.SqsSegment}/foo"));
}

[Fact]
Expand Down Expand Up @@ -191,4 +191,4 @@ public void set_visibility_timeout()
theQueue.VisibilityTimeout(55);
theQueue.Configuration.Attributes[QueueAttributeName.VisibilityTimeout].ShouldBe("55");
}
}
}
Loading
Loading