Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Review of using kafka consumer for multiple topics in a single service #2395

Closed
n1l opened this issue Jan 9, 2025 · 4 comments
Closed

Comments

@n1l
Copy link

n1l commented Jan 9, 2025

Description

Hi there, first of all thanks for your support and for what you are doing.
I wish to ask some code review to double check I have no misunderstanding of using kafka client for dotnet and kafka consuming api.

Here is the code. The main idea is to have a single consumer to consume everything from kafka for every topic the service is in charge to process messages.
Consuming placesed in longruning task since that the Consume method is synchronous so that we dont use thread pool threads for that.
When message consumed we run a task on thread pool to processed the message and use some internal orchestration to create a handler for the message. Then we store offset.

We also use default strategy to consume messages - at least once. We enabled autocommit and disabled autostoreoffset.

If anything there seams odd to you please point it. Thanks in advance.

using System.Diagnostics;
using System.Threading.Channels;
using Confluent.Kafka;

namespace ConsoleApp1;

public sealed class KafkaConsumerBackgroundService : BackgroundService
{
    private readonly ConsumerConfig config;
    private readonly IEnumerable<string> topics;
    private readonly IMessageOrchestrator messageOrcestrator;
    private readonly Channel<ConsumeResult<Ignore, byte[]>> kafkaConsumingChannel
        = Channel.CreateUnbounded<ConsumeResult<Ignore, byte[]>>();
    private readonly SemaphoreSlim asyncLock = new(100);

    private IConsumer<Ignore, byte[]> consumer = default!;
    private CancellationTokenSource? stoppingSource;
    private Task kafkaConsumingTask = Task.CompletedTask;

    public KafkaConsumerBackgroundService(
        ConsumerConfig config,
        IEnumerable<string> topics,
        IMessageOrchestrator messageOrcestrator)
    {
        this.config = config;
        this.config.EnableAutoCommit = true;
        this.config.EnableAutoOffsetStore = false;

        this.topics = topics;
        this.messageOrcestrator = messageOrcestrator;
    }

    public override Task StartAsync(CancellationToken cancellationToken)
    {
        this.stoppingSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        var builder = new ConsumerBuilder<Ignore, byte[]>(this.config);

        this.consumer = builder.Build();

        this.consumer.Subscribe(this.topics);

        this.kafkaConsumingTask = this.StartKafkaConsumingAsync(cancellationToken);

        return base.StartAsync(cancellationToken);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var consumeResult = await this.kafkaConsumingChannel.Reader.ReadAsync(stoppingToken)
                .ConfigureAwait(false);

            _ = Task.Run(
                async () =>
                {
                    try
                    {
                        await this.asyncLock.WaitAsync(stoppingToken);

                        var handler = this.messageOrcestrator.GetHandler(consumeResult.Topic);

                        await handler.HandleAsync(consumeResult)
                            .ConfigureAwait(false);
                    }
                    catch
                    {
                        // error processing
                    }
                },
                stoppingToken);

            this.consumer.StoreOffset(consumeResult);
        }
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        try
        {
            this.stoppingSource?.Cancel();
        }
        finally
        {
            var joinedTasks = Task.WhenAll(
                this.kafkaConsumingTask,
                base.StopAsync(cancellationToken));

            await Task.WhenAny(joinedTasks, Task.Delay(Timeout.Infinite, cancellationToken))
                .ConfigureAwait(false);
        }
    }

    public override void Dispose()
    {
        base.Dispose();

        this.consumer.Close();
        this.consumer.Dispose();
    }

    private Task StartKafkaConsumingAsync(CancellationToken cancellationToken)
    {
        var stopProcessingToken = this.stoppingSource!.Token;

        return Task.Factory.StartNew(() =>
        {
            ConsumeResult<Ignore, byte[]> consumeResult = default!;

            while (!stopProcessingToken.IsCancellationRequested)
            {
                try
                {
                    consumeResult = this.consumer.Consume(stopProcessingToken);

                    this.kafkaConsumingChannel.Writer.TryWrite(consumeResult);
                }
                catch
                {
                }
            }

        },
        cancellationToken,
        TaskCreationOptions.LongRunning,
        TaskScheduler.Default);
    }
}

the idea is created based on to these sample and discussion:
https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Consumer/Program.cs
#197

@brnls
Copy link

brnls commented Jan 11, 2025

Watch out for the following potential issues with this implementation:

  • This implementation does not guarantee message processing order within a topic partition. This may or may not matter for your use case. If you want to preserve that ordering, you should create a channel per topic/partition.
  • It stores the offset before the message is processed (which is happening asynchronously). This could mean the handler fails or is interrupted, but because you stored/committed the offset, you will never process that message. This works for at most once processing, but if you want at least once, you should store offsets within that async message handler after handling is definitely completed.
  • If partitions are revoked and you have messages buffered in your channel, your application will continue to attempt to process them. This will cause an error when committing offsets because this consumer no longer owns those. To handle this you can register a call back for topic partitions being revoked and clear those messages so you don't attempt to process them.
  • The channel you use to gather messages is unbounded. If you have a slow consumer and the topic has many messages, your application will receive those messages in an unbounded way which could lead to memory exhaustion. I don't know a good solution for this. You can pause/resume specific topic partitions to apply back pressure, but with that approach it also clears the librdkafka internal prefetch queue as I understand it. So pausing/resuming often as a means of applying back pressure can lead to the library background processor having to re-fetch a bunch of messages frequently, increasing network traffic unnecessarily.

@n1l
Copy link
Author

n1l commented Jan 14, 2025

@brnls thanks for your replay.

  1. Yes, ordering is not needed. So its ok
  2. Actually this is a simplified implementation, in the original one I store messages by batches in the databse and only then store offest, so I think it's ok as well.
  3. That is an interesting one. Revoked partitions is a dangerous operation if I understand correctly, for example kafka ui shows warning when you delete a partition from a topic. If that is the only case - it's ok as well. We can live with that, if not - could you share a sample case when it could happen?
  4. Yep. But I can share my thioughts about it. Fisrt of all if I understand correctly Consume method is not really consume its rather a receiver of a message that already consumed by librdkafka internally, since that kafka reads 50 mib from cluster not just a single message, so if I understand correctly it can be solved via consumer settings by reducing the number of data the client reads. Because from my point of view creating a new layer of backpressure wont help me. at all, e.g. change unbound channel to bound. What do you think?

@brnls
Copy link

brnls commented Jan 18, 2025

For 3) revoking partitions is part of normal kafka operation and not dangerous. This is a good description of what it is and what can happen https://docs.confluent.io/kafka-clients/dotnet/current/overview.html#committing-during-a-rebalance
The partition revoke callback happens on the same thread calling Consume, so you can manage it there, but since your message processing is happening on other threads, you will have to synchronize the two somehow to avoid storing offsets for topic partitions the consumer doesn't own

For 4) You are right that there is back pressure which can be controlled with various settings (like queued.max.messages.kbytes) but that only applies to messages in the internal queue. Once you call Consume I think that that is removing it from the internal queue and making room for more messages, so it will fill up your Channel in an unbounded way if you keep calling Consume.

@n1l
Copy link
Author

n1l commented Jan 20, 2025

@brnls Thanks a lot for your review!
So I need to add a handler for a revoked partitions because this is an excpected behavior and I need to add a second backpressure layer.
I think we can close the issue, thanks a lot again.

@n1l n1l closed this as completed Jan 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants