From 4cb644ab13f1437ff1161379d1930da409b8fb13 Mon Sep 17 00:00:00 2001
From: svkeller <33978960+svkeller@users.noreply.github.com>
Date: Wed, 1 Feb 2023 10:00:24 +0100
Subject: [PATCH] limit number of concurrent messages for kafka consumer (#742)
---
Motor.NET.sln.DotSettings | 2 +
shared.csproj | 2 +-
.../KafkaMessageConsumer.cs | 6 +-
.../Options/KafkaConsumerOptions.cs | 1 +
.../KafkaContainer.cs | 113 ------------------
.../KafkaExtensionTests.cs | 40 ++++++-
.../KafkaFixture.cs | 19 +--
...sions.Hosting.Kafka_IntegrationTest.csproj | 2 +-
...nsions.Hosting.NATS_IntegrationTest.csproj | 2 +-
.../NATSContainer.cs | 66 ----------
.../NATSFixture.cs | 15 ++-
.../NATSIntegrationTests.cs | 6 +-
...ns.Hosting.RabbitMQ_IntegrationTest.csproj | 2 +-
.../RabbitMQFixture.cs | 31 ++---
...ensions.Hosting.SQS_IntegrationTest.csproj | 2 +-
.../SQSContainer.cs | 67 -----------
.../SQSFixture.cs | 14 ++-
.../SQSMessageConsumerTests.cs | 12 +-
18 files changed, 101 insertions(+), 301 deletions(-)
create mode 100644 Motor.NET.sln.DotSettings
delete mode 100644 test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaContainer.cs
delete mode 100644 test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSContainer.cs
delete mode 100644 test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSContainer.cs
diff --git a/Motor.NET.sln.DotSettings b/Motor.NET.sln.DotSettings
new file mode 100644
index 00000000..789b1638
--- /dev/null
+++ b/Motor.NET.sln.DotSettings
@@ -0,0 +1,2 @@
+
+ SQS
\ No newline at end of file
diff --git a/shared.csproj b/shared.csproj
index c000a683..8a5db2a7 100644
--- a/shared.csproj
+++ b/shared.csproj
@@ -1,7 +1,7 @@
- 0.10.5
+ 0.10.6
10
enable
CS8600;CS8602;CS8625;CS8618;CS8604;CS8601
diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
index 4af91d5f..a1c02c16 100644
--- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
+++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
@@ -25,6 +25,7 @@ public sealed class KafkaMessageConsumer : IMessageConsumer, IDisp
private readonly IMetricFamily? _consumerLagSummary;
private readonly ILogger> _logger;
private IConsumer? _consumer;
+ private readonly SemaphoreSlim _messageSemaphore;
public KafkaMessageConsumer(ILogger> logger,
IOptions> config,
@@ -40,6 +41,7 @@ public KafkaMessageConsumer(ILogger> logger,
"Contains a summary of current consumer lag of each partition", new[] { "topic", "partition" });
_consumerLagGauge = metricsFactory?.CreateGauge("consumer_lag",
"Contains current number consumer lag of each partition", false, "topic", "partition");
+ _messageSemaphore = new SemaphoreSlim(config.Value.MaxConcurrentMessages);
}
public Func, CancellationToken, Task>? ConsumeCallbackAsync
@@ -66,10 +68,11 @@ public Task StartAsync(CancellationToken token = default)
public async Task ExecuteAsync(CancellationToken token = default)
{
- await Task.Run(() =>
+ await Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
+ await _messageSemaphore.WaitAsync(token);
try
{
var msg = _consumer?.Consume(token);
@@ -169,6 +172,7 @@ private void SingleMessageHandling(CancellationToken token, ConsumeResult
{
var processedMessageStatus = taskAwaiter?.GetResult();
+ _messageSemaphore.Release();
switch (processedMessageStatus)
{
case ProcessedMessageStatus.Success:
diff --git a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs
index 4d762fd2..2fb701bd 100644
--- a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs
+++ b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs
@@ -11,4 +11,5 @@ public KafkaConsumerOptions()
public string? Topic { get; set; }
public int CommitPeriod { get; set; } = 1000;
+ public int MaxConcurrentMessages { get; set; } = 1000;
}
diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaContainer.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaContainer.cs
deleted file mode 100644
index a4ce29de..00000000
--- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaContainer.cs
+++ /dev/null
@@ -1,113 +0,0 @@
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using Docker.DotNet;
-using Docker.DotNet.Models;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using TestContainers.Container.Abstractions;
-using TestContainers.Container.Abstractions.Images;
-
-namespace Motor.Extensions.Hosting.Kafka_IntegrationTest;
-
-public class KafkaContainer : GenericContainer
-{
- private static readonly string STARTER_SCRIPT = "/testcontainers_start.sh";
-
- ///
- /// Default image name
- ///
- private static readonly string KafkaDefaultImage = "confluentinc/cp-kafka";
-
- ///
- /// Default image tag
- ///
- private static readonly string KafkaDefaultTag = "5.4.3";
-
- private static IImage CreateDefaultImage(IDockerClient dockerClient, ILoggerFactory loggerFactory)
- {
- return new GenericImage(dockerClient, loggerFactory) { ImageName = $"{KafkaDefaultImage}:{KafkaDefaultTag}" };
- }
-
- public const int KAFKA_PORT = 9093;
- private const int ZOOKEEPER_PORT = 2181;
- private readonly IDockerClient _dockerClient;
- private ContainerInspectResponse _containerInfo;
-
- ///
- public KafkaContainer(IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base($"{DefaultImage}:{DefaultTag}", dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- ///
- public KafkaContainer(string dockerImageName, IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base(dockerImageName, dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- ///
- [ActivatorUtilitiesConstructor]
- public KafkaContainer(IImage dockerImage, IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base(NullImage.IsNullImage(dockerImage) ? CreateDefaultImage(dockerClient, loggerFactory) : dockerImage,
- dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- protected override async Task ConfigureAsync()
- {
- await base.ConfigureAsync();
- ExposedPorts.Add(KAFKA_PORT);
- ExposedPorts.Add(ZOOKEEPER_PORT);
-
- // Use two listeners with different names, it will force Kafka to communicate with itself via internal
- // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
- Env["KAFKA_LISTENERS"] = "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092";
- Env["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT";
- Env["KAFKA_INTER_BROKER_LISTENER_NAME"] = "BROKER";
-
- Env["KAFKA_BROKER_ID"] = "1";
- Env["KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"] = "1";
- Env["KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS"] = "1";
- Env["KAFKA_LOG_FLUSH_INTERVAL_MESSAGES"] = long.MaxValue + "";
- Env["KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS"] = "0";
- Command = new List
- {"sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT};
- }
-
- protected override async Task ContainerStarted()
- {
- await base.ContainerStarting();
- _containerInfo = await _dockerClient.Containers.InspectContainerAsync(ContainerId);
-
- await ExecuteCommand("sh", "-c",
- $"echo \"{StartupScript()}\" > {STARTER_SCRIPT} && chmod +x {STARTER_SCRIPT}");
- }
-
- private string getBootstrapServers()
- {
- return
- $"PLAINTEXT://{GetDockerHostIpAddress()}:{GetMappedPort(KAFKA_PORT)},BROKER://{GetDockerHostIpAddress()}:9092";
- }
-
- private string StartupScript()
- {
- var command = "#!/bin/bash\n";
- var zookeeperConnect = $"localhost:{ZOOKEEPER_PORT}";
- command += $"echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties\n";
- command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
- command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
- command += "zookeeper-server-start zookeeper.properties &\n";
-
- command += $"export KAFKA_ZOOKEEPER_CONNECT='{zookeeperConnect}'\n";
-
- command += $"export KAFKA_ADVERTISED_LISTENERS='{getBootstrapServers()}'\n";
-
- command += ". /etc/confluent/docker/bash-config \n";
- command += "/etc/confluent/docker/configure \n";
- command += "/etc/confluent/docker/launch \n";
- return command;
- }
-}
diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs
index 6b45247f..bdcc4f3d 100644
--- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs
+++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs
@@ -106,6 +106,33 @@ public async Task Consume_PublishIntoExtensionDefinedTopic_ConsumedEqualsPublish
Assert.Equal(motorCloudEvent.Id, id);
}
+ [Fact(Timeout = 50000)]
+ public async Task Consume_LimitMaxConcurrentMessages_StartProcessingLimitedNumberOfMessagesSimultaneously()
+ {
+ const int maxConcurrentMessages = 5;
+ var topic = _randomizerString.Generate();
+ const string message = "testMessage";
+ for (var i = 0; i < maxConcurrentMessages * 2; i++)
+ {
+ await PublishMessage(topic, "someKey", message);
+ }
+ var consumer = GetConsumer(topic, maxConcurrentMessages);
+ var numberOfParallelMessages = 0;
+ consumer.ConsumeCallbackAsync = async (_, cancellationToken) =>
+ {
+ numberOfParallelMessages++;
+ await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
+ numberOfParallelMessages--;
+ return await Task.FromResult(ProcessedMessageStatus.Success);
+ };
+
+ await consumer.StartAsync();
+ consumer.ExecuteAsync();
+ await Task.Delay(TimeSpan.FromSeconds(2));
+
+ Assert.Equal(maxConcurrentMessages, numberOfParallelMessages);
+ }
+
private async Task PublishMessage(string topic, string key, string value)
{
using var producer = new ProducerBuilder(GetPublisherConfig(topic)).Build();
@@ -114,9 +141,9 @@ await producer.ProduceAsync(topic,
producer.Flush();
}
- private KafkaMessageConsumer GetConsumer(string topic)
+ private KafkaMessageConsumer GetConsumer(string topic, int maxConcurrentMessages = 1000)
{
- var options = Options.Create(GetConsumerConfig(topic));
+ var options = Options.Create(GetConsumerConfig(topic, maxConcurrentMessages));
var fakeLoggerMock = Mock.Of>>();
return new KafkaMessageConsumer(fakeLoggerMock, options, null, GetApplicationNameService(),
new JsonEventFormatter());
@@ -134,7 +161,7 @@ private KafkaPublisherOptions GetPublisherConfig(string topic)
return new()
{
Topic = topic,
- BootstrapServers = $"{_fixture.Hostname}:{_fixture.Port}",
+ BootstrapServers = _fixture.BootstrapServers,
};
}
@@ -145,18 +172,19 @@ private IApplicationNameService GetApplicationNameService(string source = "test:
return mock.Object;
}
- private KafkaConsumerOptions GetConsumerConfig(string topic, string groupId = "group_id")
+ private KafkaConsumerOptions GetConsumerConfig(string topic, int maxConcurrentMessages, string groupId = "group_id")
{
return new()
{
Topic = topic,
GroupId = groupId,
CommitPeriod = 1,
- BootstrapServers = $"{_fixture.Hostname}:{_fixture.Port}",
+ BootstrapServers = _fixture.BootstrapServers,
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
- AutoOffsetReset = AutoOffsetReset.Earliest
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ MaxConcurrentMessages = maxConcurrentMessages
};
}
}
diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaFixture.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaFixture.cs
index 82c0911c..35353620 100644
--- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaFixture.cs
+++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaFixture.cs
@@ -1,30 +1,33 @@
using System.Threading.Tasks;
-using TestContainers.Container.Abstractions;
-using TestContainers.Container.Abstractions.Hosting;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Configurations;
+using DotNet.Testcontainers.Containers;
using Xunit;
namespace Motor.Extensions.Hosting.Kafka_IntegrationTest;
public class KafkaFixture : IAsyncLifetime
{
- private readonly GenericContainer _kafka;
+ private readonly KafkaTestcontainer _container;
+ private const int KafkaPort = 9093;
public KafkaFixture()
{
- _kafka = new ContainerBuilder()
+ _container = new TestcontainersBuilder()
+ .WithPortBinding(KafkaPort, true)
+ .WithKafka(new KafkaTestcontainerConfiguration("confluentinc/cp-kafka:6.1.9"))
.Build();
}
- public string Hostname => _kafka.GetDockerHostIpAddress();
- public int Port => _kafka.GetMappedPort(KafkaContainer.KAFKA_PORT);
+ public string BootstrapServers => _container.BootstrapServers;
public async Task InitializeAsync()
{
- await _kafka.StartAsync();
+ await _container.StartAsync();
}
public async Task DisposeAsync()
{
- await _kafka.StopAsync();
+ await _container.StopAsync();
}
}
diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj
index 48c179e6..2d2de69a 100644
--- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj
+++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj
@@ -8,8 +8,8 @@
-
+
all
diff --git a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/Motor.Extensions.Hosting.NATS_IntegrationTest.csproj b/test/Motor.Extensions.Hosting.NATS_IntegrationTest/Motor.Extensions.Hosting.NATS_IntegrationTest.csproj
index 22c13e3a..337bc556 100644
--- a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/Motor.Extensions.Hosting.NATS_IntegrationTest.csproj
+++ b/test/Motor.Extensions.Hosting.NATS_IntegrationTest/Motor.Extensions.Hosting.NATS_IntegrationTest.csproj
@@ -9,8 +9,8 @@
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSContainer.cs b/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSContainer.cs
deleted file mode 100644
index 070959ad..00000000
--- a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSContainer.cs
+++ /dev/null
@@ -1,66 +0,0 @@
-using System.Threading.Tasks;
-using Docker.DotNet;
-using Docker.DotNet.Models;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using TestContainers.Container.Abstractions;
-using TestContainers.Container.Abstractions.Images;
-
-namespace Motor.Extensions.Hosting.NATS_IntegrationTest;
-
-public class NATSContainer : GenericContainer
-{
- ///
- /// Default image name
- ///
- private static readonly string NatsDefaultImage = "nats";
-
- ///
- /// Default image tag
- ///
- private static readonly string NatsDefaultTag = "2.5";
-
- private static IImage CreateDefaultImage(IDockerClient dockerClient, ILoggerFactory loggerFactory)
- {
- return new GenericImage(dockerClient, loggerFactory) { ImageName = $"{NatsDefaultImage}:{NatsDefaultTag}" };
- }
-
- public const int Port = 4222;
- private readonly IDockerClient _dockerClient;
- private ContainerInspectResponse _containerInfo;
-
- ///
- public NATSContainer(IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base($"{NatsDefaultImage}:{NatsDefaultTag}", dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- ///
- public NATSContainer(string dockerImageName, IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base(dockerImageName, dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- ///
- [ActivatorUtilitiesConstructor]
- public NATSContainer(IImage dockerImage, IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base(NullImage.IsNullImage(dockerImage) ? CreateDefaultImage(dockerClient, loggerFactory) : dockerImage,
- dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- protected override async Task ConfigureAsync()
- {
- await base.ConfigureAsync();
- ExposedPorts.Add(Port);
- }
-
- protected override async Task ContainerStarted()
- {
- await base.ContainerStarting();
- _containerInfo = await _dockerClient.Containers.InspectContainerAsync(ContainerId);
- }
-}
diff --git a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSFixture.cs b/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSFixture.cs
index f7058eef..5346adae 100644
--- a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSFixture.cs
+++ b/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSFixture.cs
@@ -1,22 +1,25 @@
using System.Threading.Tasks;
-using TestContainers.Container.Abstractions;
-using TestContainers.Container.Abstractions.Hosting;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
using Xunit;
namespace Motor.Extensions.Hosting.NATS_IntegrationTest;
public class NATSFixture : IAsyncLifetime
{
- private readonly GenericContainer _container;
+ private readonly TestcontainersContainer _container;
+ private const int NATSPort = 4222;
public NATSFixture()
{
- _container = new ContainerBuilder()
+ _container = new TestcontainersBuilder()
+ .WithImage("nats:2.9.11")
+ .WithPortBinding(NATSPort, true)
.Build();
}
- public string Hostname => _container.GetDockerHostIpAddress();
- public int Port => _container.GetMappedPort(NATSContainer.Port);
+ public string Hostname => _container.Hostname;
+ public int Port => _container.GetMappedPublicPort(NATSPort);
public async Task InitializeAsync()
{
diff --git a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSIntegrationTests.cs b/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSIntegrationTests.cs
index 4749dbb5..8b91930a 100644
--- a/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSIntegrationTests.cs
+++ b/test/Motor.Extensions.Hosting.NATS_IntegrationTest/NATSIntegrationTests.cs
@@ -30,7 +30,7 @@ public NATSIntegrationTests(NATSFixture fixture)
_natsUrl = $"{fixture.Hostname}:{fixture.Port}";
}
- [Fact(Timeout = 50000, Skip = "does not run on ci")]
+ [Fact(Timeout = 50000)]
public async void PublishMessageWithoutException()
{
const string expectedMessage = "testMessage";
@@ -50,7 +50,7 @@ public async void PublishMessageWithoutException()
Assert.Equal(expectedMessage, Encoding.UTF8.GetString(rawConsumedNatsMessage));
}
- [Fact(Timeout = 50000, Skip = "does not run on ci")]
+ [Fact(Timeout = 50000)]
public async void PublishMessageAsJsonFormat()
{
const string expectedMessage = "testMessage";
@@ -73,7 +73,7 @@ public async void PublishMessageAsJsonFormat()
}
- [Fact(Timeout = 50000, Skip = "does not run on ci")]
+ [Fact(Timeout = 50000)]
public async void Consume_RawPublishIntoNATSAndConsumeCreateCloudEvent_ConsumedEqualsPublished()
{
const string expectedMessage = "testMessage";
diff --git a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest.csproj b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest.csproj
index 182b90a6..2c9a1323 100644
--- a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest.csproj
+++ b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest.csproj
@@ -9,6 +9,7 @@
+
all
@@ -18,7 +19,6 @@
-
diff --git a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs
index 08a7388b..b8754d5d 100644
--- a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs
+++ b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs
@@ -1,27 +1,30 @@
using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Configurations;
+using DotNet.Testcontainers.Containers;
using Motor.Extensions.Hosting.RabbitMQ;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
-using TestContainers.Container.Abstractions;
-using TestContainers.Container.Abstractions.Hosting;
using Xunit;
namespace Motor.Extensions.Hosting.RabbitMQ_IntegrationTest;
public class RabbitMQFixture : IAsyncLifetime
{
+ private const int RabbitMqPort = 5672;
public RabbitMQFixture()
{
- Container = new ContainerBuilder()
- .ConfigureDockerImageName("rabbitmq:3.7.21")
- .ConfigureContainer((_, container) =>
- {
- container.Env["RABBITMQ_DEFAULT_USER"] = "guest";
- container.Env["RABBITMQ_DEFAULT_PASS"] = "guest";
- container.ExposedPorts.Add(5672);
- })
+ var conf = new RabbitMqTestcontainerConfiguration("rabbitmq:3.11.7")
+ {
+ Password = "guest",
+ Username = "guest"
+ };
+ Container = new TestcontainersBuilder()
+ .WithPortBinding(RabbitMqPort, true)
+ .WithMessageBroker(conf)
.Build();
}
@@ -30,10 +33,10 @@ public RabbitMQFixture()
public IRabbitMQConnectionFactory ConnectionFactory() =>
new RabbitMQConnectionFactory(CreateConnectionFactory());
- public int Port => Container.GetMappedPort(5672);
- public string Hostname => Container.GetDockerHostIpAddress();
+ public int Port => Container.GetMappedPublicPort(RabbitMqPort);
+ public string Hostname => Container.Hostname;
- private GenericContainer Container { get; }
+ private RabbitMqTestcontainer Container { get; }
public async Task InitializeAsync()
{
@@ -51,7 +54,7 @@ public Task DisposeAsync()
private IConnectionFactory CreateConnectionFactory() => new ConnectionFactory
{
- Uri = new Uri($"amqp://guest:guest@{Hostname}:{Port}")
+ Uri = new Uri(Container.ConnectionString)
};
private IConnection CreateConnection() => CreateConnectionFactory().CreateConnection();
diff --git a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/Motor.Extensions.Hosting.SQS_IntegrationTest.csproj b/test/Motor.Extensions.Hosting.SQS_IntegrationTest/Motor.Extensions.Hosting.SQS_IntegrationTest.csproj
index ad73ef48..9c77a0d7 100644
--- a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/Motor.Extensions.Hosting.SQS_IntegrationTest.csproj
+++ b/test/Motor.Extensions.Hosting.SQS_IntegrationTest/Motor.Extensions.Hosting.SQS_IntegrationTest.csproj
@@ -12,7 +12,7 @@
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSContainer.cs b/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSContainer.cs
deleted file mode 100644
index 47289629..00000000
--- a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSContainer.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using Docker.DotNet;
-using Docker.DotNet.Models;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using TestContainers.Container.Abstractions;
-using TestContainers.Container.Abstractions.Images;
-
-namespace Motor.Extensions.Hosting.SQS_IntegrationTest;
-
-public class SQSContainer : GenericContainer
-{
- ///
- /// Default image name
- ///
- private static readonly string SQSDefaultImage = "roribio16/alpine-sqs";
-
- ///
- /// Default image tag
- ///
- private static readonly string SQSDefaultTag = "1.2.0";
-
- private static IImage CreateDefaultImage(IDockerClient dockerClient, ILoggerFactory loggerFactory)
- {
- return new GenericImage(dockerClient, loggerFactory) { ImageName = $"{SQSDefaultImage}:{SQSDefaultTag}" };
- }
-
- public const int SQS_PORT = 9324;
- private readonly IDockerClient _dockerClient;
- private ContainerInspectResponse _containerInfo;
-
- ///
- public SQSContainer(IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base($"{DefaultImage}:{DefaultTag}", dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- ///
- public SQSContainer(string dockerImageName, IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base(dockerImageName, dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- ///
- [ActivatorUtilitiesConstructor]
- public SQSContainer(IImage dockerImage, IDockerClient dockerClient, ILoggerFactory loggerFactory)
- : base(NullImage.IsNullImage(dockerImage) ? CreateDefaultImage(dockerClient, loggerFactory) : dockerImage,
- dockerClient, loggerFactory)
- {
- _dockerClient = dockerClient;
- }
-
- protected override async Task ConfigureAsync()
- {
- await base.ConfigureAsync();
- ExposedPorts.Add(SQS_PORT);
- }
-
- protected override async Task ContainerStarted()
- {
- await base.ContainerStarting();
- _containerInfo = await _dockerClient.Containers.InspectContainerAsync(ContainerId);
- }
-}
diff --git a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSFixture.cs b/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSFixture.cs
index c5ce8484..5b089ac9 100644
--- a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSFixture.cs
+++ b/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSFixture.cs
@@ -1,22 +1,24 @@
using System.Threading.Tasks;
-using TestContainers.Container.Abstractions;
-using TestContainers.Container.Abstractions.Hosting;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
using Xunit;
namespace Motor.Extensions.Hosting.SQS_IntegrationTest;
public class SQSFixture : IAsyncLifetime
{
- private readonly GenericContainer _sqsContainer;
+ private readonly TestcontainersContainer _sqsContainer;
+ private const int SQSPort = 9324;
public SQSFixture()
{
- _sqsContainer = new ContainerBuilder()
+ _sqsContainer = new TestcontainersBuilder()
+ .WithImage("roribio16/alpine-sqs:1.2.0")
+ .WithPortBinding(SQSPort, true)
.Build();
}
- public string Hostname => _sqsContainer.GetDockerHostIpAddress();
- public int Port => _sqsContainer.GetMappedPort(SQSContainer.SQS_PORT);
+ public string BaseSQSUrl => $"http://{_sqsContainer.Hostname}:{_sqsContainer.GetMappedPublicPort(SQSPort)}";
public async Task InitializeAsync()
{
diff --git a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSMessageConsumerTests.cs b/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSMessageConsumerTests.cs
index 55506a97..1da83987 100644
--- a/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSMessageConsumerTests.cs
+++ b/test/Motor.Extensions.Hosting.SQS_IntegrationTest/SQSMessageConsumerTests.cs
@@ -20,12 +20,12 @@ namespace Motor.Extensions.Hosting.SQS_IntegrationTest;
public class SQSMessageConsumerTests : IClassFixture
{
private readonly IRandomizerString _randomizerString;
- private readonly string _baseSQSUrl;
+ private readonly SQSFixture _fixture;
public SQSMessageConsumerTests(SQSFixture fixture)
{
+ _fixture = fixture;
_randomizerString = RandomizerFactory.GetRandomizer(new FieldOptionsTextRegex { Pattern = @"^[A-Z]{10}" });
- _baseSQSUrl = $"http://{fixture.Hostname}:{fixture.Port}";
}
[Fact(Timeout = 50000)]
@@ -40,7 +40,7 @@ public async Task Consume_RawPublishIntoSQSAndConsumeCreateCloudEvent_ConsumedEq
await sqs.CreateQueueAsync(queueName);
await PublishMessage(sqs, queueName, expectedMessage);
- var consumer = GetConsumer(Options.Create(clientOptions), $"{_baseSQSUrl}/queue/{queueName}");
+ var consumer = GetConsumer(Options.Create(clientOptions), $"{_fixture.BaseSQSUrl}/queue/{queueName}");
var rawConsumedSQSMessage = await RawConsumedSqsMessage(consumer);
Assert.Equal(expectedMessage, Encoding.UTF8.GetString(rawConsumedSQSMessage));
}
@@ -67,8 +67,8 @@ private SQSClientOptions GetSqsClientOptions(string queueName)
{
var clientOptions = new SQSClientOptions
{
- ServiceUrl = $"{_baseSQSUrl}",
- QueueUrl = $"{_baseSQSUrl}/queue/{queueName}"
+ ServiceUrl = $"{_fixture.BaseSQSUrl}",
+ QueueUrl = $"{_fixture.BaseSQSUrl}/queue/{queueName}"
};
return clientOptions;
}
@@ -77,7 +77,7 @@ private async Task PublishMessage(IAmazonSQS sqsClient, string queue, string mes
{
await sqsClient.SendMessageAsync(new SendMessageRequest
{
- QueueUrl = $"{_baseSQSUrl}/queue/{queue}",
+ QueueUrl = $"{_fixture.BaseSQSUrl}/queue/{queue}",
MessageBody = message
});
}