Skip to content

Commit 7b6f0ee

Browse files
committed
* Added support for consumer strategies that do not act as "stop the world" scenarios (e.g., cooperative sticky). Fixes issue Farfetch#557 and Fixes issue Farfetch#456
* Enabled automatic committing with `confluent auto commit: true` instead of relying solely on manual commits, but only when the consumer strategy is cooperative sticky. (Refer to the open librdkafka issue at confluentinc/librdkafka#4059).
1 parent e121a5b commit 7b6f0ee

File tree

4 files changed

+112
-20
lines changed

4 files changed

+112
-20
lines changed

src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Linq;
55
using System.Threading.Tasks;
66
using KafkaFlow.Consumers.DistributionStrategies;
7+
using KafkaFlow.Extensions;
78

89
namespace KafkaFlow.Configuration;
910

@@ -251,7 +252,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
251252
consumerConfigCopy.StatisticsIntervalMs = _consumerConfig.StatisticsIntervalMs ?? _statisticsInterval;
252253

253254
consumerConfigCopy.EnableAutoOffsetStore = false;
254-
consumerConfigCopy.EnableAutoCommit = false;
255+
consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false;
256+
consumerConfigCopy.AutoCommitIntervalMs = _consumerConfig.AutoCommitIntervalMs ?? 5000;
255257

256258
consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);
257259

src/KafkaFlow/Consumers/Consumer.cs

+56-14
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
using Confluent.Kafka;
88
using KafkaFlow.Authentication;
99
using KafkaFlow.Configuration;
10+
using KafkaFlow.Extensions;
1011

1112
namespace KafkaFlow.Consumers;
1213

1314
internal class Consumer : IConsumer
1415
{
1516
private readonly IDependencyResolver _dependencyResolver;
1617
private readonly ILogHandler _logHandler;
18+
private readonly bool _stopTheWorldStrategy;
1719

1820
private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<TopicPartition>>>
1921
_partitionsAssignedHandlers = new();
@@ -40,6 +42,7 @@ public Consumer(
4042
this.Configuration = configuration;
4143
_flowManager = new ConsumerFlowManager(this, _logHandler);
4244
_maxPollIntervalExceeded = new(_logHandler);
45+
_stopTheWorldStrategy = Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy();
4346

4447
foreach (var handler in this.Configuration.StatisticsHandlers)
4548
{
@@ -148,7 +151,17 @@ public void Commit(IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> off
148151
return;
149152
}
150153

151-
_consumer.Commit(validOffsets);
154+
if (_stopTheWorldStrategy)
155+
{
156+
_consumer.Commit(validOffsets);
157+
}
158+
else
159+
{
160+
foreach (var topicPartitionOffset in validOffsets)
161+
{
162+
_consumer.StoreOffset(topicPartitionOffset);
163+
}
164+
}
152165

153166
foreach (var offset in validOffsets)
154167
{
@@ -237,17 +250,8 @@ private void EnsureConsumer()
237250
var kafkaConfig = this.Configuration.GetKafkaConfig();
238251

239252
var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(kafkaConfig)
240-
.SetPartitionsAssignedHandler(
241-
(consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions))
242-
.SetPartitionsRevokedHandler(
243-
(consumer, partitions) =>
244-
{
245-
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
246-
this.Assignment = new List<TopicPartition>();
247-
this.Subscription = new List<string>();
248-
_currentPartitionsOffsets.Clear();
249-
_flowManager.Stop();
250-
})
253+
.SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers)
254+
.SetPartitionsRevokedHandler(FirePartitionRevokedHandlers)
251255
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
252256
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));
253257

@@ -293,13 +297,51 @@ private void FirePartitionsAssignedHandlers(
293297
IConsumer<byte[], byte[]> consumer,
294298
List<TopicPartition> partitions)
295299
{
296-
this.Assignment = partitions;
300+
if (_stopTheWorldStrategy)
301+
{
302+
this.Assignment = partitions;
303+
this.Subscription = consumer.Subscription;
304+
_flowManager.Start(consumer);
305+
_partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
306+
return;
307+
}
308+
309+
if (partitions.Count == 0)
310+
{
311+
return;
312+
}
313+
314+
this.Assignment = this.Assignment.Union(partitions).ToArray();
297315
this.Subscription = consumer.Subscription;
316+
_flowManager.Stop();
298317
_flowManager.Start(consumer);
299-
300318
_partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
301319
}
302320

321+
private void FirePartitionRevokedHandlers(IConsumer<byte[], byte[]> consumer, List<Confluent.Kafka.TopicPartitionOffset> partitions)
322+
{
323+
if (_stopTheWorldStrategy)
324+
{
325+
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
326+
this.Assignment = new List<TopicPartition>();
327+
this.Subscription = new List<string>();
328+
_currentPartitionsOffsets.Clear();
329+
_flowManager.Stop();
330+
return;
331+
}
332+
333+
this.Assignment = this.Assignment.Except(partitions.Select(x => x.TopicPartition)).ToArray();
334+
this.Subscription = consumer.Subscription;
335+
foreach (var partition in partitions)
336+
{
337+
_currentPartitionsOffsets.TryRemove(partition.TopicPartition, out _);
338+
}
339+
340+
_flowManager.Stop();
341+
_flowManager.Start(consumer);
342+
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
343+
}
344+
303345
private void InvalidateConsumer()
304346
{
305347
_consumer?.Close();

src/KafkaFlow/Consumers/ConsumerManager.cs

+36-5
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
using System.Linq;
44
using System.Threading;
55
using System.Threading.Tasks;
6+
using Confluent.Kafka;
67
using KafkaFlow.Configuration;
8+
using KafkaFlow.Extensions;
79

810
namespace KafkaFlow.Consumers;
911

1012
internal class ConsumerManager : IConsumerManager
1113
{
1214
private readonly IDependencyResolver _dependencyResolver;
1315
private readonly ILogHandler _logHandler;
16+
private readonly bool _stopTheWorldStrategy;
1417

1518
private Timer _evaluateWorkersCountTimer;
1619

@@ -26,7 +29,7 @@ public ConsumerManager(
2629
this.Consumer = consumer;
2730
this.WorkerPool = consumerWorkerPool;
2831
this.Feeder = feeder;
29-
32+
_stopTheWorldStrategy = consumer.Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy();
3033
this.Consumer.OnPartitionsAssigned((_, _, partitions) => this.OnPartitionAssigned(partitions));
3134
this.Consumer.OnPartitionsRevoked((_, _, partitions) => this.OnPartitionRevoked(partitions));
3235
}
@@ -104,9 +107,23 @@ private void OnPartitionRevoked(IEnumerable<Confluent.Kafka.TopicPartitionOffset
104107
{
105108
_logHandler.Warning(
106109
"Partitions revoked",
107-
this.GetConsumerLogInfo(topicPartitions.Select(x => x.TopicPartition)));
110+
this.GetConsumerLogInfo(topicPartitions?.Select(x => x.TopicPartition).ToArray()
111+
?? Array.Empty<TopicPartition>()));
108112

109113
this.WorkerPool.StopAsync().GetAwaiter().GetResult();
114+
115+
if (_stopTheWorldStrategy)
116+
{
117+
return;
118+
}
119+
120+
var assignedPartitions = Consumer.Assignment;
121+
var workersCount = this.CalculateWorkersCount(assignedPartitions).GetAwaiter().GetResult();
122+
123+
this.WorkerPool
124+
.StartAsync(assignedPartitions, workersCount)
125+
.GetAwaiter()
126+
.GetResult();
110127
}
111128

112129
private void OnPartitionAssigned(IReadOnlyCollection<Confluent.Kafka.TopicPartition> partitions)
@@ -115,10 +132,16 @@ private void OnPartitionAssigned(IReadOnlyCollection<Confluent.Kafka.TopicPartit
115132
"Partitions assigned",
116133
this.GetConsumerLogInfo(partitions));
117134

118-
var workersCount = this.CalculateWorkersCount(partitions).GetAwaiter().GetResult();
135+
if (_stopTheWorldStrategy is false)
136+
{
137+
this.WorkerPool.StopAsync().GetAwaiter().GetResult();
138+
}
139+
140+
var assignedPartitions = Consumer.Assignment;
141+
var workersCount = this.CalculateWorkersCount(assignedPartitions).GetAwaiter().GetResult();
119142

120143
this.WorkerPool
121-
.StartAsync(partitions, workersCount)
144+
.StartAsync(assignedPartitions, workersCount)
122145
.GetAwaiter()
123146
.GetResult();
124147
}
@@ -127,7 +150,7 @@ private void OnPartitionAssigned(IReadOnlyCollection<Confluent.Kafka.TopicPartit
127150
{
128151
this.Consumer.Configuration.GroupId,
129152
this.Consumer.Configuration.ConsumerName,
130-
Topics = partitions
153+
UpdatedTopics = partitions
131154
.GroupBy(x => x.Topic)
132155
.Select(
133156
x => new
@@ -136,6 +159,14 @@ private void OnPartitionAssigned(IReadOnlyCollection<Confluent.Kafka.TopicPartit
136159
PartitionsCount = x.Count(),
137160
Partitions = x.Select(y => y.Partition.Value),
138161
}),
162+
CurrentTopics = Consumer.Assignment.GroupBy(x => x.Topic)
163+
.Select(
164+
x => new
165+
{
166+
x.First().Topic,
167+
PartitionsCount = x.Count(),
168+
Partitions = x.Select(y => y.Partition.Value),
169+
}),
139170
};
140171

141172
private async Task<int> CalculateWorkersCount(IEnumerable<Confluent.Kafka.TopicPartition> partitions)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using Confluent.Kafka;
2+
3+
namespace KafkaFlow.Extensions;
4+
5+
/// <summary>
6+
/// Strategy extension methods.
7+
/// </summary>
8+
public static class StrategyExtensions
9+
{
10+
/// <summary>
11+
/// Determine if the strategy is a "stop the world" behavior.
12+
/// </summary>
13+
/// <param name="strategy">Strategy</param>
14+
/// <returns></returns>
15+
public static bool IsStopTheWorldStrategy(this PartitionAssignmentStrategy? strategy) =>
16+
strategy is null or PartitionAssignmentStrategy.Range or PartitionAssignmentStrategy.RoundRobin;
17+
}

0 commit comments

Comments
 (0)