diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs index a6c132e0..7fe63748 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Text.Json; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using CloudNative.CloudEvents; using Confluent.Kafka; @@ -18,6 +19,9 @@ namespace Motor.Extensions.Hosting.Kafka; +public record ConsumeResultAndProcessedMessageStatus(ConsumeResult ConsumeResult, + ProcessedMessageStatus ProcessedMessageStatus); + public sealed class KafkaMessageConsumer : IMessageConsumer, IDisposable where TData : notnull { private readonly IApplicationNameService _applicationNameService; @@ -28,7 +32,6 @@ public sealed class KafkaMessageConsumer : IMessageConsumer, IDisp private readonly ILogger> _logger; private readonly IHostApplicationLifetime _applicationLifetime; private IConsumer? _consumer; - private readonly SemaphoreSlim _messageSemaphore; public KafkaMessageConsumer(ILogger> logger, IOptions> config, @@ -39,14 +42,16 @@ public KafkaMessageConsumer(ILogger> logger, { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _applicationLifetime = applicationLifetime; - _applicationNameService = applicationNameService ?? throw new ArgumentNullException(nameof(config)); + _applicationNameService = applicationNameService ?? throw new ArgumentNullException(nameof(applicationNameService)); _cloudEventFormatter = cloudEventFormatter; _options = config.Value ?? throw new ArgumentNullException(nameof(config)); _consumerLagSummary = metricsFactory?.CreateSummary("consumer_lag_distribution", "Contains a summary of current consumer lag of each partition", new[] { "topic", "partition" }); _consumerLagGauge = metricsFactory?.CreateGauge("consumer_lag", "Contains current number consumer lag of each partition", false, "topic", "partition"); - _messageSemaphore = new SemaphoreSlim(config.Value.MaxConcurrentMessages); + + _processedMessages = Channel.CreateBounded>(_options.MaxConcurrentMessages); + _timer = new Timer(HandleCommitTimer); } public Func, CancellationToken, Task>? ConsumeCallbackAsync @@ -75,31 +80,44 @@ public async Task ExecuteAsync(CancellationToken token = default) { await Task.Run(async () => { - while (!token.IsCancellationRequested) + var committer = ExecuteCommitLoopAsync(token); + + try { - await _messageSemaphore.WaitAsync(token); - try + while (!token.IsCancellationRequested) { - var msg = _consumer?.Consume(token); - if (msg is { IsPartitionEOF: false }) + try { - SingleMessageHandlingAsync(msg, token); + if (!await _processedMessages.Writer.WaitToWriteAsync(token)) + { + break; + } + + var msg = _consumer?.Consume(token); + if (msg is { IsPartitionEOF: false }) + { + await _processedMessages.Writer.WriteAsync(SingleMessageHandlingAsync(msg, token), token); + } + else + { + _logger.LogDebug(LogEvents.NoMessageReceived, "No messages received"); + } } - else + catch (Exception e) when (e is not OperationCanceledException or ChannelClosedException) { - _logger.LogDebug(LogEvents.NoMessageReceived, "No messages received"); + _logger.LogError(LogEvents.MessageReceivedFailure, e, "Failed to receive message."); } } - catch (OperationCanceledException) - { - _logger.LogInformation(LogEvents.TerminatingKafkaListener, "Terminating Kafka listener..."); - break; - } - catch (Exception e) - { - _logger.LogError(LogEvents.MessageReceivedFailure, e, "Failed to receive message."); - } + + await committer; } + catch (Exception e) when (e is OperationCanceledException or ChannelClosedException) + { + // Execution was cancelled + } + + Commit(); + _logger.LogInformation(LogEvents.TerminatingKafkaListener, "Terminating Kafka listener..."); }, token).ConfigureAwait(false); } @@ -167,7 +185,7 @@ private void WriteStatistics(string json) } } - private async Task SingleMessageHandlingAsync(ConsumeResult msg, CancellationToken token) + private async Task SingleMessageHandlingAsync(ConsumeResult msg, CancellationToken token) { try { @@ -180,8 +198,9 @@ private async Task SingleMessageHandlingAsync(ConsumeResult msg .HandleResult(status => status == ProcessedMessageStatus.TemporaryFailure) .WaitAndRetryAsync(_options.RetriesOnTemporaryFailure, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))); - var status = await retryPolicy.ExecuteAsync(() => ConsumeCallbackAsync!.Invoke(cloudEvent, token)); - HandleMessageStatus(msg, status); + var status = await retryPolicy.ExecuteAsync( + (cancellationToken) => ConsumeCallbackAsync!.Invoke(cloudEvent, cancellationToken), token); + return new ConsumeResultAndProcessedMessageStatus(msg, status); } catch (Exception e) { @@ -189,52 +208,157 @@ private async Task SingleMessageHandlingAsync(ConsumeResult msg "Unexpected exception in message handling"); _applicationLifetime.StopApplication(); } + + return new ConsumeResultAndProcessedMessageStatus(msg, ProcessedMessageStatus.CriticalFailure); + } + + #region Commit + + private readonly Channel> _processedMessages; + private readonly Timer _timer; + private readonly object _commitLock = new(); + private ConsumeResultAndProcessedMessageStatus? _lastConsumeResultAndProcessedMessageStatus; + + private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken) + { + RestartCommitTimer(); + + while (!cancellationToken.IsCancellationRequested) + { + try + { + var result = await PeekAndAwaitProcessedMessages(cancellationToken); + + if (IsIrrecoverableFailure(result.ProcessedMessageStatus)) + { + _applicationLifetime.StopApplication(); + break; + } + + // Remove message from channel, when Task is successfully completed + await _processedMessages.Reader.ReadAsync(cancellationToken); + + lock (_commitLock) + { + _lastConsumeResultAndProcessedMessageStatus = result; + } + + if ((result.ConsumeResult.Offset.Value + 1) % _options.CommitPeriod == 0) + { + Commit(); + RestartCommitTimer(); + } + } + catch (Exception e) when (e is OperationCanceledException or ChannelClosedException) + { + break; + } + } + + StopCommitTimer(); + } + + private async Task PeekAndAwaitProcessedMessages(CancellationToken cancellationToken) + { + await _processedMessages.Reader.WaitToReadAsync(cancellationToken); + + if (!_processedMessages.Reader.TryPeek(out var consumeAndProcessTask)) + { + throw new InvalidOperationException("Awaited channel data has been removed by another consumer"); + } + + return await consumeAndProcessTask; + } + + private void Commit() + { + lock (_commitLock) + { + if (_lastConsumeResultAndProcessedMessageStatus == null) + { + return; + } + + try + { + _consumer?.Commit(_lastConsumeResultAndProcessedMessageStatus.ConsumeResult); + _lastConsumeResultAndProcessedMessageStatus = null; + } + catch (KafkaException e) + { + _logger.LogError(LogEvents.CommitError, e, "Commit error: {Reason}", e.Error.Reason); + } + } + } + + private void RestartCommitTimer() + { + var autoCommitIntervalMs = _options.AutoCommitIntervalMs; + if (autoCommitIntervalMs != null) + { + _timer.Change(autoCommitIntervalMs.Value, Timeout.Infinite); + } + } + + private void StopCommitTimer() + { + _timer.Change(Timeout.Infinite, Timeout.Infinite); + } + + + private void HandleCommitTimer(object? state) + { + Commit(); + RestartCommitTimer(); } - private void HandleMessageStatus(ConsumeResult msg, ProcessedMessageStatus? status) + private bool IsIrrecoverableFailure(ProcessedMessageStatus status) { switch (status) { case ProcessedMessageStatus.Success: case ProcessedMessageStatus.InvalidInput: case ProcessedMessageStatus.Failure: - if (msg.Offset.Value % _options.CommitPeriod == 0) - { - try - { - _consumer?.Commit(msg); - } - catch (KafkaException e) - { - _logger.LogError(LogEvents.CommitError, e, "Commit error: {Reason}", e.Error.Reason); - } - } - _messageSemaphore.Release(); - break; + return false; case ProcessedMessageStatus.TemporaryFailure: - _logger.LogWarning(LogEvents.FailureDespiteRetrying, + _logger.LogCritical(LogEvents.FailureDespiteRetrying, "Message consume fails despite retrying"); - _applicationLifetime.StopApplication(); - break; + return true; case ProcessedMessageStatus.CriticalFailure: - _logger.LogWarning(LogEvents.CriticalFailureOnConsume, + _logger.LogCritical(LogEvents.CriticalFailureOnConsume, "Message consume fails with critical failure"); - _applicationLifetime.StopApplication(); - break; + return true; default: - throw new ArgumentOutOfRangeException(nameof(status), status, "Unhandled ProcessedMessageStatus"); + _logger.LogCritical(LogEvents.UnknownProcessedMessageStatus, "Unknown processed message status {status}", status); + return true; } } + #endregion + public MotorCloudEvent KafkaMessageToCloudEvent(Message msg) { return msg.ToMotorCloudEvent(_applicationNameService, _cloudEventFormatter); } + /// + /// For testing. + /// + public IEnumerable Committed() + { + if (_consumer == null) + { + throw new InvalidOperationException("Consumer is not initialized"); + } + + return _consumer.Committed(TimeSpan.FromSeconds(10)); + } + private void Dispose(bool disposing) { if (disposing) { + _timer.Dispose(); _consumer?.Dispose(); } } diff --git a/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs index 55f3b37f..bb1ab278 100644 --- a/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs +++ b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs @@ -11,6 +11,7 @@ public static class LogEvents public static readonly EventId TerminatingKafkaListener = new(4, nameof(TerminatingKafkaListener)); public static readonly EventId MessageReceivedFailure = new(5, nameof(MessageReceivedFailure)); public static readonly EventId ReceivedMessage = new(6, nameof(ReceivedMessage)); + public static readonly EventId UnknownProcessedMessageStatus = new(7, nameof(UnknownProcessedMessageStatus)); public static readonly EventId MessageHandlingUnexpectedException = new(7, nameof(MessageHandlingUnexpectedException)); diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs index a27bfe5b..f048f21e 100644 --- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs @@ -3,11 +3,11 @@ using System.Linq; using System.Text; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using CloudNative.CloudEvents.SystemTextJson; using Confluent.Kafka; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Moq; using Motor.Extensions.Hosting.Abstractions; @@ -18,28 +18,35 @@ using RandomDataGenerator.FieldOptions; using RandomDataGenerator.Randomizers; using Xunit; +using Xunit.Abstractions; namespace Motor.Extensions.Hosting.Kafka_IntegrationTest; [Collection("KafkaMessage")] public class KafkaExtensionTests : IClassFixture { + private readonly ITestOutputHelper _output; private readonly KafkaFixture _fixture; - private readonly IRandomizerString _randomizerString; + private readonly Mock _fakeLifetimeMock = new Mock(); + private readonly string _topic; + private const string Message = "message"; + private readonly byte[] _expectedMessage = Encoding.UTF8.GetBytes(Message); + private readonly Channel _consumedChannel = Channel.CreateUnbounded(); - public KafkaExtensionTests(KafkaFixture fixture) + public KafkaExtensionTests(ITestOutputHelper output, KafkaFixture fixture) { + _output = output; _fixture = fixture; - _randomizerString = RandomizerFactory.GetRandomizer(new FieldOptionsTextRegex { Pattern = @"^[A-Z]{10}" }); + var randomizer = RandomizerFactory.GetRandomizer(new FieldOptionsTextRegex { Pattern = @"^[A-Z]{10}" }); + _topic = randomizer.Generate(); } [Fact(Timeout = 50000)] public async Task Consume_RawPublishIntoKafkaAndConsumeCreateCloudEvent_ConsumedEqualsPublished() { - var topic = _randomizerString.Generate(); const string message = "testMessage"; - await PublishMessage(topic, "someKey", message); - var consumer = GetConsumer(topic); + await PublishMessage(_topic, "someKey", message); + var consumer = GetConsumer(_topic); var rawConsumedKafkaMessage = (byte[])null; var taskCompletionSource = new TaskCompletionSource(); consumer.ConsumeCallbackAsync = async (dataEvent, _) => @@ -59,13 +66,11 @@ public async Task Consume_RawPublishIntoKafkaAndConsumeCreateCloudEvent_Consumed [Fact(Timeout = 50000)] public async Task Consume_PublishIntoKafkaAndConsumeWithCloudEvent_ConsumedEqualsPublished() { - var topic = _randomizerString.Generate(); - const string message = "testMessage"; - var publisher = GetPublisher(topic); + var publisher = GetPublisher(_topic); var motorCloudEvent = - MotorCloudEvent.CreateTestCloudEvent(message).CreateNew(Encoding.UTF8.GetBytes(message)); + MotorCloudEvent.CreateTestCloudEvent(Message).CreateNew(Encoding.UTF8.GetBytes(Message)); await publisher.PublishMessageAsync(motorCloudEvent, CancellationToken.None); - var consumer = GetConsumer(topic); + var consumer = GetConsumer(_topic); string id = null; var taskCompletionSource = new TaskCompletionSource(); consumer.ConsumeCallbackAsync = async (dataEvent, _) => @@ -85,14 +90,12 @@ public async Task Consume_PublishIntoKafkaAndConsumeWithCloudEvent_ConsumedEqual [Fact(Timeout = 50000)] public async Task Consume_PublishIntoExtensionDefinedTopic_ConsumedEqualsPublished() { - var topic = _randomizerString.Generate(); - const string message = "testMessage"; var publisher = GetPublisher("wrong_topic"); var motorCloudEvent = - MotorCloudEvent.CreateTestCloudEvent(message).CreateNew(Encoding.UTF8.GetBytes(message)); - motorCloudEvent.SetKafkaTopic(topic); + MotorCloudEvent.CreateTestCloudEvent(Message).CreateNew(Encoding.UTF8.GetBytes(Message)); + motorCloudEvent.SetKafkaTopic(_topic); await publisher.PublishMessageAsync(motorCloudEvent, CancellationToken.None); - var consumer = GetConsumer(topic); + var consumer = GetConsumer(_topic); string id = null; var taskCompletionSource = new TaskCompletionSource(); consumer.ConsumeCallbackAsync = async (dataEvent, _) => @@ -114,20 +117,19 @@ public async Task Consume_LimitMaxConcurrentMessages_StartProcessingLimitedNumbe { const int maxConcurrentMessages = 5; var taskCompletionSource = new TaskCompletionSource(); - var topic = _randomizerString.Generate(); - const string message = "testMessage"; for (var i = 0; i < maxConcurrentMessages * 2; i++) { - await PublishMessage(topic, "someKey", message); + await PublishMessage(_topic, "someKey", Message); } - var config = GetConsumerConfig(topic, maxConcurrentMessages); - var consumer = GetConsumer(topic, config); + + var config = GetConsumerConfig(_topic, maxConcurrentMessages); + var consumer = GetConsumer(_topic, config); var numberOfStartedMessages = 0; consumer.ConsumeCallbackAsync = async (_, cancellationToken) => { numberOfStartedMessages++; taskCompletionSource.TrySetResult(); - await Task.Delay(-1, cancellationToken); // Wait indefinitely + await Task.Delay(-1, cancellationToken); // Wait indefinitely return await Task.FromResult(ProcessedMessageStatus.Success); }; @@ -144,14 +146,14 @@ public async Task Consume_LimitMaxConcurrentMessages_StartProcessingLimitedNumbe [Theory(Timeout = 50000)] [InlineData(ProcessedMessageStatus.TemporaryFailure)] [InlineData(ProcessedMessageStatus.CriticalFailure)] - public async Task Consume_SynchronousMessageHandlingWhereProcessingFailed_DoesNotProcessSecondMessage(ProcessedMessageStatus returnStatus) + public async Task Consume_SynchronousMessageHandlingWhereProcessingFailed_DoesNotProcessSecondMessage( + ProcessedMessageStatus returnStatus) { var taskCompletionSource = new TaskCompletionSource(); - var topic = _randomizerString.Generate(); - await PublishMessage(topic, "someKey", "1"); - await PublishMessage(topic, "someKey", "2"); - var config = GetConsumerConfig(topic, maxConcurrentMessages: 1); - var consumer = GetConsumer(topic, config); + await PublishMessage(_topic, "someKey", "1"); + await PublishMessage(_topic, "someKey", "2"); + var config = GetConsumerConfig(_topic, maxConcurrentMessages: 1); + var consumer = GetConsumer(_topic, config); var distinctHandledMessages = new HashSet(); consumer.ConsumeCallbackAsync = async (data, _) => { @@ -174,18 +176,19 @@ public async Task Consume_SynchronousMessageHandlingWhereProcessingFailed_DoesNo [InlineData(ProcessedMessageStatus.Success)] [InlineData(ProcessedMessageStatus.Failure)] [InlineData(ProcessedMessageStatus.InvalidInput)] - public async Task Consume_SynchronousMessageHandlingWithMultipleMessages_AllMessagesProcessed(ProcessedMessageStatus processedMessageStatus) + public async Task Consume_SynchronousMessageHandlingWithMultipleMessages_AllMessagesProcessed( + ProcessedMessageStatus processedMessageStatus) { const int numMessages = 10; var taskCompletionSource = new TaskCompletionSource(); - var topic = _randomizerString.Generate(); var messages = Enumerable.Range(1, numMessages).Select(i => $"{i}").ToHashSet(); foreach (var message in messages) { - await PublishMessage(topic, "someKey", message); + await PublishMessage(_topic, "someKey", message); } - var config = GetConsumerConfig(topic, maxConcurrentMessages: 1); - var consumer = GetConsumer(topic, config); + + var config = GetConsumerConfig(_topic, maxConcurrentMessages: 1); + var consumer = GetConsumer(_topic, config); var distinctHandledMessages = new HashSet(); consumer.ConsumeCallbackAsync = async (data, _) => { @@ -209,10 +212,9 @@ public async Task Consume_TemporaryFailure_ExecuteTheConfiguredNumberOfRetries() { const int expectedNumberOfRetries = 2; var taskCompletionSource = new TaskCompletionSource(); - var topic = _randomizerString.Generate(); - await PublishMessage(topic, "someKey", "message"); - var config = GetConsumerConfig(topic, retriesOnTemporaryFailure: expectedNumberOfRetries); - var consumer = GetConsumer(topic, config); + await PublishMessage(_topic, "someKey", Message); + var config = GetConsumerConfig(_topic, retriesOnTemporaryFailure: expectedNumberOfRetries); + var consumer = GetConsumer(_topic, config); var actualNumberOfTries = 0; consumer.ConsumeCallbackAsync = async (data, _) => { @@ -235,12 +237,10 @@ public async Task Consume_TemporaryFailure_ExecuteTheConfiguredNumberOfRetries() public async Task Consume_TemporaryFailureEvenAfterRetries_ApplicationIsStopped() { const int numberOfRetries = 2; - var fakeLifetimeMock = new Mock(); var taskCompletionSource = new TaskCompletionSource(); - var topic = _randomizerString.Generate(); - await PublishMessage(topic, "someKey", "message"); - var config = GetConsumerConfig(topic, retriesOnTemporaryFailure: numberOfRetries); - var consumer = GetConsumer(topic, config, fakeLifetimeMock.Object); + await PublishMessage(_topic, "someKey", Message); + var config = GetConsumerConfig(_topic, retriesOnTemporaryFailure: numberOfRetries); + var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object); consumer.ConsumeCallbackAsync = async (data, _) => { taskCompletionSource.TrySetResult(); @@ -254,18 +254,16 @@ public async Task Consume_TemporaryFailureEvenAfterRetries_ApplicationIsStopped( await taskCompletionSource.Task; // Give consumer enough time to handle returned ProcessedMessageStatus await Task.Delay(TimeSpan.FromSeconds(2 * Math.Pow(2, numberOfRetries))); - fakeLifetimeMock.Verify(mock => mock.StopApplication(), Times.Once); + _fakeLifetimeMock.Verify(mock => mock.StopApplication(), Times.Once); } [Fact(Timeout = 50000)] public async Task Consume_CriticalFailure_ApplicationIsStopped() { - var fakeLifetimeMock = new Mock(); var taskCompletionSource = new TaskCompletionSource(); - var topic = _randomizerString.Generate(); - await PublishMessage(topic, "someKey", "message"); - var config = GetConsumerConfig(topic); - var consumer = GetConsumer(topic, config, fakeLifetimeMock.Object); + await PublishMessage(_topic, "someKey", Message); + var config = GetConsumerConfig(_topic); + var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object); consumer.ConsumeCallbackAsync = async (data, _) => { taskCompletionSource.TrySetResult(); @@ -279,9 +277,147 @@ public async Task Consume_CriticalFailure_ApplicationIsStopped() await taskCompletionSource.Task; // Give consumer enough time to handle returned ProcessedMessageStatus await Task.Delay(TimeSpan.FromSeconds(1)); - fakeLifetimeMock.Verify(mock => mock.StopApplication(), Times.Once); + _fakeLifetimeMock.Verify(mock => mock.StopApplication(), Times.Once); + } + + [Fact(Timeout = 50000)] + public async Task Consume_AfterProcessingAMessage_CommitsEveryCommitPeriod() + { + var config = GetConsumerConfig(_topic, maxConcurrentMessages: 1, retriesOnTemporaryFailure: 1); + config.CommitPeriod = 2; + config.AutoCommitIntervalMs = null; + using var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object); + consumer.ConsumeCallbackAsync = CreateConsumeCallback(ProcessedMessageStatus.Success, _consumedChannel); + var cts = new CancellationTokenSource(); + await consumer.StartAsync(cts.Token); + var execution = consumer.ExecuteAsync(cts.Token); + + var numberOfProcessedMessages = 2; + await PublishAndAwaitMessages(_consumedChannel, numberOfProcessedMessages); + + await WaitForCommittedOffset(consumer, numberOfProcessedMessages); + cts.Cancel(); + await execution; + } + + private async Task PublishAndAwaitMessages(Channel channel, int count) + { + for (var i = 0; i < count; i++) + { + await PublishMessage(_topic, "someKey", Message); + } + + for (var i = 0; i < count; i++) + { + Assert.Equal(_expectedMessage, await channel.Reader.ReadAsync(CancellationToken.None)); + } } + /// Consumer to get offset from + /// Offset of last committed message + 1, starts at 0 + private async Task WaitForCommittedOffset(KafkaMessageConsumer consumer, long expectedOffset) + { + while (true) + { + var offset = GetCommittedOffset(consumer); + + _output.WriteLine($"Waiting for offset {expectedOffset} got {offset}"); + if (offset == expectedOffset) + { + return; + } + + await Task.Delay(10, CancellationToken.None); + } + } + + private Offset? GetCommittedOffset(KafkaMessageConsumer consumer) + { + var offsets = consumer.Committed(); + return offsets.FirstOrDefault()?.Offset; + } + + [Fact(Timeout = 50000)] + public async Task Consume_AfterProcessingAMessage_CommitsOnlyEveryCommitPeriod() + { + var config = GetConsumerConfig(_topic, maxConcurrentMessages: 1, retriesOnTemporaryFailure: 1); + config.CommitPeriod = 10; + config.AutoCommitIntervalMs = null; + using var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object); + consumer.ConsumeCallbackAsync = CreateConsumeCallback(ProcessedMessageStatus.Success, _consumedChannel); + var cts = new CancellationTokenSource(); + await consumer.StartAsync(cts.Token); + var execution = consumer.ExecuteAsync(cts.Token); + + var numberOfProcessedMessages = 9; + await PublishAndAwaitMessages(_consumedChannel, numberOfProcessedMessages); + + await Task.Delay(5000, CancellationToken.None); + Assert.Equal(Offset.Unset, GetCommittedOffset(consumer)); + cts.Cancel(); + await execution; + } + + [Fact(Timeout = 50000)] + public async Task Consume_WhenHavingUncommittedMessages_CommitsEveryAutoCommitIntervalMs() + { + var config = GetConsumerConfig(_topic, maxConcurrentMessages: 1, retriesOnTemporaryFailure: 1); + config.CommitPeriod = 1000; // default + config.AutoCommitIntervalMs = 1; + using var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object); + consumer.ConsumeCallbackAsync = CreateConsumeCallback(ProcessedMessageStatus.Success, _consumedChannel); + var cts = new CancellationTokenSource(); + await consumer.StartAsync(cts.Token); + var execution = consumer.ExecuteAsync(cts.Token); + + var numberOfProcessedMessages = 2; + await PublishAndAwaitMessages(_consumedChannel, numberOfProcessedMessages); + + await WaitForCommittedOffset(consumer, numberOfProcessedMessages); + cts.Cancel(); + await execution; + } + + [Fact(Timeout = 50000)] + public async Task Consume_OnShutdown_Commits() + { + var config = GetConsumerConfig(_topic, maxConcurrentMessages: 1, retriesOnTemporaryFailure: 1); + config.AutoCommitIntervalMs = null; + using var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object); + consumer.ConsumeCallbackAsync = CreateConsumeCallback(ProcessedMessageStatus.Success, _consumedChannel); + var cts = new CancellationTokenSource(); + await consumer.StartAsync(cts.Token); + var execution = consumer.ExecuteAsync(cts.Token); + + var numberOfProcessedMessages = 2; + await PublishAndAwaitMessages(_consumedChannel, numberOfProcessedMessages); + // Publish another message, that blocks in processing. + // This makes sure that the first 2 messages have been processed and have been written to the commit channel. + consumer.ConsumeCallbackAsync = CreateBlockingCallback(_consumedChannel); + await PublishAndAwaitMessages(_consumedChannel, 1); + cts.Cancel(); + await execution; + + await WaitForCommittedOffset(consumer, numberOfProcessedMessages); + } + + private Func, CancellationToken, Task> CreateConsumeCallback( + ProcessedMessageStatus statusToReturn, Channel channel) => async (data, _) => + { + _output.WriteLine($"Processed message with status {statusToReturn.ToString()}"); + await channel.Writer.WriteAsync(data.TypedData, CancellationToken.None); + return await Task.FromResult(statusToReturn); + }; + + private Func, CancellationToken, Task> CreateBlockingCallback( + Channel channel) => async (data, cancellationToken) => + { + _output.WriteLine($"Blocking message"); + await channel.Writer.WriteAsync(data.TypedData, CancellationToken.None); + await Task.Delay(TimeSpan.MaxValue, cancellationToken); + return ProcessedMessageStatus.Success; + }; + private async Task PublishMessage(string topic, string key, string value) { using var producer = new ProducerBuilder(GetPublisherConfig(topic)).Build(); @@ -290,12 +426,13 @@ await producer.ProduceAsync(topic, producer.Flush(); } - private KafkaMessageConsumer GetConsumer(string topic, KafkaConsumerOptions config = null, IHostApplicationLifetime fakeLifetimeMock = null) + private KafkaMessageConsumer GetConsumer(string topic, KafkaConsumerOptions config = null, + IHostApplicationLifetime fakeLifetimeMock = null) { var options = Options.Create(config ?? GetConsumerConfig(topic)); - var fakeLoggerMock = Mock.Of>>(); + var logger = _output.BuildLoggerFor>(); fakeLifetimeMock ??= Mock.Of(); - return new KafkaMessageConsumer(fakeLoggerMock, options, fakeLifetimeMock, null, GetApplicationNameService(), + return new KafkaMessageConsumer(logger, options, fakeLifetimeMock, null, GetApplicationNameService(), new JsonEventFormatter()); } diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj index 206066a0..be0f6117 100644 --- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj +++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj @@ -6,6 +6,7 @@ +