Skip to content

Commit

Permalink
limit number of concurrent messages for kafka consumer (#742)
Browse files Browse the repository at this point in the history
  • Loading branch information
svkeller authored Feb 1, 2023
1 parent 1244086 commit 4cb644a
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 301 deletions.
2 changes: 2 additions & 0 deletions Motor.NET.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SQS/@EntryIndexedValue">SQS</s:String></wpf:ResourceDictionary>
2 changes: 1 addition & 1 deletion shared.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<Version>0.10.5</Version>
<Version>0.10.6</Version>
<LangVersion>10</LangVersion>
<Nullable>enable</Nullable>
<WarningsAsErrors>CS8600;CS8602;CS8625;CS8618;CS8604;CS8601</WarningsAsErrors>
Expand Down
6 changes: 5 additions & 1 deletion src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public sealed class KafkaMessageConsumer<TData> : IMessageConsumer<TData>, IDisp
private readonly IMetricFamily<ISummary>? _consumerLagSummary;
private readonly ILogger<KafkaMessageConsumer<TData>> _logger;
private IConsumer<string?, byte[]>? _consumer;
private readonly SemaphoreSlim _messageSemaphore;

public KafkaMessageConsumer(ILogger<KafkaMessageConsumer<TData>> logger,
IOptions<KafkaConsumerOptions<TData>> config,
Expand All @@ -40,6 +41,7 @@ public KafkaMessageConsumer(ILogger<KafkaMessageConsumer<TData>> logger,
"Contains a summary of current consumer lag of each partition", new[] { "topic", "partition" });
_consumerLagGauge = metricsFactory?.CreateGauge("consumer_lag",
"Contains current number consumer lag of each partition", false, "topic", "partition");
_messageSemaphore = new SemaphoreSlim(config.Value.MaxConcurrentMessages);
}

public Func<MotorCloudEvent<byte[]>, CancellationToken, Task<ProcessedMessageStatus>>? ConsumeCallbackAsync
Expand All @@ -66,10 +68,11 @@ public Task StartAsync(CancellationToken token = default)

public async Task ExecuteAsync(CancellationToken token = default)
{
await Task.Run(() =>
await Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
await _messageSemaphore.WaitAsync(token);
try
{
var msg = _consumer?.Consume(token);
Expand Down Expand Up @@ -169,6 +172,7 @@ private void SingleMessageHandling(CancellationToken token, ConsumeResult<string
taskAwaiter?.OnCompleted(() =>
{
var processedMessageStatus = taskAwaiter?.GetResult();
_messageSemaphore.Release();
switch (processedMessageStatus)
{
case ProcessedMessageStatus.Success:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ public KafkaConsumerOptions()

public string? Topic { get; set; }
public int CommitPeriod { get; set; } = 1000;
public int MaxConcurrentMessages { get; set; } = 1000;
}
113 changes: 0 additions & 113 deletions test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaContainer.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,33 @@ public async Task Consume_PublishIntoExtensionDefinedTopic_ConsumedEqualsPublish
Assert.Equal(motorCloudEvent.Id, id);
}

[Fact(Timeout = 50000)]
public async Task Consume_LimitMaxConcurrentMessages_StartProcessingLimitedNumberOfMessagesSimultaneously()
{
const int maxConcurrentMessages = 5;
var topic = _randomizerString.Generate();
const string message = "testMessage";
for (var i = 0; i < maxConcurrentMessages * 2; i++)
{
await PublishMessage(topic, "someKey", message);
}
var consumer = GetConsumer<string>(topic, maxConcurrentMessages);
var numberOfParallelMessages = 0;
consumer.ConsumeCallbackAsync = async (_, cancellationToken) =>
{
numberOfParallelMessages++;
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
numberOfParallelMessages--;
return await Task.FromResult(ProcessedMessageStatus.Success);
};

await consumer.StartAsync();
consumer.ExecuteAsync();
await Task.Delay(TimeSpan.FromSeconds(2));

Assert.Equal(maxConcurrentMessages, numberOfParallelMessages);
}

private async Task PublishMessage(string topic, string key, string value)
{
using var producer = new ProducerBuilder<string, byte[]>(GetPublisherConfig<string>(topic)).Build();
Expand All @@ -114,9 +141,9 @@ await producer.ProduceAsync(topic,
producer.Flush();
}

private KafkaMessageConsumer<T> GetConsumer<T>(string topic)
private KafkaMessageConsumer<T> GetConsumer<T>(string topic, int maxConcurrentMessages = 1000)
{
var options = Options.Create(GetConsumerConfig<T>(topic));
var options = Options.Create(GetConsumerConfig<T>(topic, maxConcurrentMessages));
var fakeLoggerMock = Mock.Of<ILogger<KafkaMessageConsumer<T>>>();
return new KafkaMessageConsumer<T>(fakeLoggerMock, options, null, GetApplicationNameService(),
new JsonEventFormatter());
Expand All @@ -134,7 +161,7 @@ private KafkaPublisherOptions<T> GetPublisherConfig<T>(string topic)
return new()
{
Topic = topic,
BootstrapServers = $"{_fixture.Hostname}:{_fixture.Port}",
BootstrapServers = _fixture.BootstrapServers,
};
}

Expand All @@ -145,18 +172,19 @@ private IApplicationNameService GetApplicationNameService(string source = "test:
return mock.Object;
}

private KafkaConsumerOptions<T> GetConsumerConfig<T>(string topic, string groupId = "group_id")
private KafkaConsumerOptions<T> GetConsumerConfig<T>(string topic, int maxConcurrentMessages, string groupId = "group_id")
{
return new()
{
Topic = topic,
GroupId = groupId,
CommitPeriod = 1,
BootstrapServers = $"{_fixture.Hostname}:{_fixture.Port}",
BootstrapServers = _fixture.BootstrapServers,
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest
AutoOffsetReset = AutoOffsetReset.Earliest,
MaxConcurrentMessages = maxConcurrentMessages
};
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
using System.Threading.Tasks;
using TestContainers.Container.Abstractions;
using TestContainers.Container.Abstractions.Hosting;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Configurations;
using DotNet.Testcontainers.Containers;
using Xunit;

namespace Motor.Extensions.Hosting.Kafka_IntegrationTest;

public class KafkaFixture : IAsyncLifetime
{
private readonly GenericContainer _kafka;
private readonly KafkaTestcontainer _container;
private const int KafkaPort = 9093;

public KafkaFixture()
{
_kafka = new ContainerBuilder<KafkaContainer>()
_container = new TestcontainersBuilder<KafkaTestcontainer>()
.WithPortBinding(KafkaPort, true)
.WithKafka(new KafkaTestcontainerConfiguration("confluentinc/cp-kafka:6.1.9"))
.Build();
}

public string Hostname => _kafka.GetDockerHostIpAddress();
public int Port => _kafka.GetMappedPort(KafkaContainer.KAFKA_PORT);
public string BootstrapServers => _container.BootstrapServers;

public async Task InitializeAsync()
{
await _kafka.StartAsync();
await _container.StartAsync();
}

public async Task DisposeAsync()
{
await _kafka.StopAsync();
await _container.StopAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />
<PackageReference Include="Moq" Version="4.18.1" />
<PackageReference Include="TestContainers.Container.Abstractions" Version="1.5.4" />
<PackageReference Include="RandomDataGenerator.Net" Version="1.0.15" />
<PackageReference Include="TestContainers" Version="2.3.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<ItemGroup>
<PackageReference Include="Moq" Version="4.18.1" />
<PackageReference Include="RandomDataGenerator.Net" Version="1.0.15" />
<PackageReference Include="TestContainers.Container.Abstractions" Version="1.5.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />
<PackageReference Include="TestContainers" Version="2.3.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down

This file was deleted.

Loading

0 comments on commit 4cb644a

Please sign in to comment.