Skip to content

Commit 7cdc94d

Browse files
authored
[Event Hubs] Change trigger checkpointing behavior (Azure#42299)
* [Event Hubs] Change trigger checkpointing behavior The focus of these changes is to change the behavior of the Event Hubs trigger to not checkpoint when the listener is shutting down. This is necessary to prevent potential data loss from occurring when shutting down Function retries. Because the trigger cannot know if the Function host would have retried a failure if it were not shutting down, we cannot assume that it is safe to checkpoint. This change ensures that the batch of events being processed when shut down was requested will be retried by another instance or the next time the Function app is run. Though this may cause duplicate processing, it is consistent with the Event Hubs at-least-once guarantee.
1 parent ecad1af commit 7cdc94d

File tree

6 files changed

+163
-53
lines changed

6 files changed

+163
-53
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
### Other Changes
1212

13+
- Adjusted checkpointing logic to no longer write a checkpoint when the listener is shutting down. This was necessary to prevent potential data loss from occurring when shutting down Function retries. Because the trigger cannot know if the Function host would have retried a failure if it were not shutting down, we cannot assume that it is safe to checkpoint. This change ensures that the batch of events being processed when shut down was requested will be retried by another instance or the next time the Function app is run.
14+
1315
- Updated the `Azure.Messaging.EventHubs`, which includes a new build of the AMQP transport library. The notable bug fix addresses an obscure race condition when a cancellation token is signaled while service operations are being invoked concurrently which caused those operations to hang.
1416

1517
## 6.1.0 (2024-02-13)

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.PartitionProcessor.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
4242
private Task _cachedEventsBackgroundTask;
4343
private CancellationTokenSource _cachedEventsBackgroundTaskCts;
4444
private SemaphoreSlim _cachedEventsGuard;
45+
private readonly CancellationToken _listenerCancellationToken;
4546
private readonly CancellationToken _functionExecutionToken;
4647
private readonly CancellationTokenSource _ownershipLostTokenSource;
4748

@@ -50,7 +51,7 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
5051
/// </summary>
5152
internal PartitionProcessorEventsManager CachedEventsManager { get; }
5253

53-
public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken functionExecutionToken)
54+
public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken listenerCancellationToken, CancellationToken functionExecutionToken)
5455
{
5556
_executor = executor;
5657
_singleDispatch = singleDispatch;
@@ -59,6 +60,7 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
5960
_firstFunctionInvocation = true;
6061
_maxWaitTime = options.MaxWaitTime;
6162
_minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
63+
_listenerCancellationToken = listenerCancellationToken;
6264
_functionExecutionToken = functionExecutionToken;
6365
_ownershipLostTokenSource = new CancellationTokenSource();
6466

@@ -215,8 +217,10 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
215217
// and wait to send until we receive enough events or total max wait time has passed.
216218
}
217219

218-
// Checkpoint if we processed any events and cancellation has not been signaled.
219-
// Don't checkpoint if no events. This can reset the sequence counter to 0.
220+
// Checkpoint if we processed any events, the listener is not stopping, and
221+
// cancellation has not been signaled. Don't checkpoint if no events. This
222+
// can reset the sequence counter to 0.
223+
//
220224
// Note: we intentionally checkpoint the batch regardless of function
221225
// success/failure. EventHub doesn't support any sort "poison event" model,
222226
// so that is the responsibility of the user's function currently. E.g.
@@ -225,10 +229,12 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
225229
// Don't checkpoint if cancellation has been requested as this can lead to data loss,
226230
// since the user may not actually process the event.
227231

228-
if (eventToCheckpoint != null &&
232+
if (eventToCheckpoint != null
229233
// IMPORTANT - explicitly check each token to avoid data loss as the linkedCts is not canceled atomically when each of the
230234
// sources are canceled.
231-
!_functionExecutionToken.IsCancellationRequested && !_ownershipLostTokenSource.IsCancellationRequested)
235+
&& !_listenerCancellationToken.IsCancellationRequested
236+
&& !_functionExecutionToken.IsCancellationRequested
237+
&& !_ownershipLostTokenSource.IsCancellationRequested)
232238
{
233239
await CheckpointAsync(eventToCheckpoint, context).ConfigureAwait(false);
234240
}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
2828
private readonly ILoggerFactory _loggerFactory;
2929
private readonly ILogger _logger;
3030
private string _details;
31+
private CancellationTokenSource _listenerCancellationTokenSource;
3132
private CancellationTokenSource _functionExecutionCancellationTokenSource;
3233
private readonly IDrainModeManager _drainModeManager;
3334
private volatile bool _disposed;
@@ -50,6 +51,7 @@ public EventHubListener(
5051
_checkpointStore = checkpointStore;
5152
_options = options;
5253
_logger = _loggerFactory.CreateLogger<EventHubListener>();
54+
_listenerCancellationTokenSource = new CancellationTokenSource();
5355
_functionExecutionCancellationTokenSource = new CancellationTokenSource();
5456
_drainModeManager = drainModeManager;
5557

@@ -88,6 +90,7 @@ void IListener.Cancel()
8890

8991
void IDisposable.Dispose()
9092
{
93+
_listenerCancellationTokenSource.Cancel();
9194
_functionExecutionCancellationTokenSource.Cancel();
9295

9396
#pragma warning disable AZC0102
@@ -110,6 +113,8 @@ public async Task StartAsync(CancellationToken cancellationToken)
110113

111114
public async Task StopAsync(CancellationToken cancellationToken)
112115
{
116+
_listenerCancellationTokenSource.Cancel();
117+
113118
if (!_drainModeManager.IsDrainModeEnabled)
114119
{
115120
_functionExecutionCancellationTokenSource.Cancel();
@@ -122,7 +127,13 @@ public async Task StopAsync(CancellationToken cancellationToken)
122127

123128
IEventProcessor IEventProcessorFactory.CreatePartitionProcessor()
124129
{
125-
return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger<PartitionProcessor>(), _singleDispatch, _functionExecutionCancellationTokenSource.Token);
130+
return new PartitionProcessor(
131+
_options,
132+
_executor,
133+
_loggerFactory.CreateLogger<PartitionProcessor>(),
134+
_singleDispatch,
135+
_listenerCancellationTokenSource.Token,
136+
_functionExecutionCancellationTokenSource.Token);
126137
}
127138

128139
public IScaleMonitor GetMonitor()

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Threading;
99
using System.Threading.Tasks;
1010
using Azure.Core;
11+
using Azure.Core.Diagnostics;
1112
using Azure.Core.TestFramework;
1213
using Azure.Messaging.EventHubs;
1314
using Azure.Messaging.EventHubs.Consumer;
@@ -93,14 +94,18 @@ public async Task EventHub_StringBinding()
9394
[Test]
9495
public async Task EventHub_SingleDispatch()
9596
{
97+
var watch = ValueStopwatch.StartNew();
9698
var (jobHost, host) = BuildHost<EventHubTestSingleDispatchJobs>();
99+
97100
using (jobHost)
98101
{
99102
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobs.SendEvent_TestHub), new { input = "data" });
100103

101-
bool result = _eventWait1.WaitOne(Timeout);
104+
bool result = _eventWait1.WaitOne(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
102105
Assert.True(result);
103106

107+
using var cancellationSource = new CancellationTokenSource(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
108+
await AwaitCheckpointing(host, cancellationSource.Token);
104109
await StopWithDrainAsync(host);
105110
}
106111

@@ -158,14 +163,18 @@ public async Task EventHub_SingleDispatch_ConsumerGroup()
158163
[Test]
159164
public async Task EventHub_SingleDispatch_BinaryData()
160165
{
166+
var watch = ValueStopwatch.StartNew();
161167
var (jobHost, host) = BuildHost<EventHubTestSingleDispatchJobsBinaryData>();
168+
162169
using (jobHost)
163170
{
164171
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobsBinaryData.SendEvent_TestHub), new { input = "data" });
165172

166-
bool result = _eventWait1.WaitOne(Timeout);
173+
bool result = _eventWait1.WaitOne(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
167174
Assert.True(result);
168175

176+
using var cancellationSource = new CancellationTokenSource(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
177+
await AwaitCheckpointing(host, cancellationSource.Token);
169178
await StopWithDrainAsync(host);
170179
}
171180

@@ -211,30 +220,6 @@ public async Task EventHub_CollectorPartitionKey()
211220
}
212221
}
213222

214-
private static void AssertSingleDispatchLogs(IHost host)
215-
{
216-
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
217-
.GetAllLogMessages();
218-
219-
Assert.AreEqual(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
220-
&& x.FormattedMessage.Contains("Trigger Details:")
221-
&& x.FormattedMessage.Contains("Offset:")).Count(), 1);
222-
223-
Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
224-
&& x.FormattedMessage.Contains("OpenAsync")).Any());
225-
226-
Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
227-
&& x.FormattedMessage.Contains("CheckpointAsync")
228-
&& x.FormattedMessage.Contains("lease")
229-
&& x.FormattedMessage.Contains("offset")
230-
&& x.FormattedMessage.Contains("sequenceNumber")).Any());
231-
232-
Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
233-
&& x.FormattedMessage.Contains("Sending events to EventHub")).Any());
234-
235-
AssertAzureSdkLogs(logMessages);
236-
}
237-
238223
[Test]
239224
public async Task CanSendAndReceive_ConnectionStringInConfiguration()
240225
{
@@ -332,15 +317,19 @@ public async Task AssertCanSendReceiveMessage(Action<IHostBuilder> hostConfigura
332317
[Test]
333318
public async Task EventHub_MultipleDispatch()
334319
{
320+
var watch = ValueStopwatch.StartNew();
335321
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchJobs>();
322+
336323
using (jobHost)
337324
{
338325
int numEvents = 5;
339326
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
340327

341-
bool result = _eventWait1.WaitOne(Timeout);
328+
bool result = _eventWait1.WaitOne(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
342329
Assert.True(result);
343330

331+
using var cancellationSource = new CancellationTokenSource(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
332+
await AwaitCheckpointing(host, cancellationSource.Token);
344333
await StopWithDrainAsync(host);
345334
}
346335

@@ -350,15 +339,19 @@ public async Task EventHub_MultipleDispatch()
350339
[Test]
351340
public async Task EventHub_MultipleDispatch_BinaryData()
352341
{
342+
var watch = ValueStopwatch.StartNew();
353343
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchJobsBinaryData>();
344+
354345
using (jobHost)
355346
{
356347
int numEvents = 5;
357348
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobsBinaryData.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
358349

359-
bool result = _eventWait1.WaitOne(Timeout);
350+
bool result = _eventWait1.WaitOne(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
360351
Assert.True(result);
361352

353+
using var cancellationSource = new CancellationTokenSource(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
354+
await AwaitCheckpointing(host, cancellationSource.Token);
362355
await StopWithDrainAsync(host);
363356
}
364357

@@ -370,6 +363,7 @@ public async Task EventHub_MultipleDispatch_MinBatchSize()
370363
{
371364
const int minEventBatchSize = 5;
372365

366+
var watch = ValueStopwatch.StartNew();
373367
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchMinBatchSizeJobs>(
374368
builder =>
375369
{
@@ -393,9 +387,11 @@ public async Task EventHub_MultipleDispatch_MinBatchSize()
393387
int numEvents = 5;
394388
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchMinBatchSizeJobs.SendEvents_TestHub), new { numEvents = numEvents, input = "data" });
395389

396-
bool result = _eventWait1.WaitOne(Timeout);
390+
bool result = _eventWait1.WaitOne(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
397391
Assert.True(result);
398392

393+
using var cancellationSource = new CancellationTokenSource(GetRemainingTimeoutMilliseconds(watch.GetElapsedTime()));
394+
await AwaitCheckpointing(host, cancellationSource.Token);
399395
await StopWithDrainAsync(host);
400396
}
401397

@@ -558,6 +554,30 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
558554
}
559555
}
560556

557+
private static void AssertSingleDispatchLogs(IHost host)
558+
{
559+
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
560+
.GetAllLogMessages();
561+
562+
Assert.AreEqual(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
563+
&& x.FormattedMessage.Contains("Trigger Details:")
564+
&& x.FormattedMessage.Contains("Offset:")).Count(), 1);
565+
566+
Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
567+
&& x.FormattedMessage.Contains("OpenAsync")).Any());
568+
569+
Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
570+
&& x.FormattedMessage.Contains("CheckpointAsync")
571+
&& x.FormattedMessage.Contains("lease")
572+
&& x.FormattedMessage.Contains("offset")
573+
&& x.FormattedMessage.Contains("sequenceNumber")).Any());
574+
575+
Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
576+
&& x.FormattedMessage.Contains("Sending events to EventHub")).Any());
577+
578+
AssertAzureSdkLogs(logMessages);
579+
}
580+
561581
private static void AssertMultipleDispatchLogsMinBatch(IHost host)
562582
{
563583
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
@@ -606,17 +626,36 @@ private static void AssertMultipleDispatchLogs(IHost host)
606626
AssertAzureSdkLogs(logMessages);
607627
}
608628

629+
private static void AssertAzureSdkLogs(IEnumerable<LogMessage> logMessages)
630+
{
631+
Assert.True(logMessages.Any(x => x.Category.StartsWith("Azure.")));
632+
}
633+
609634
private static async Task StopWithDrainAsync(IHost host)
610635
{
611-
// Enable drain mode so checkpointing occurs when stopping
612636
var drainModeManager = host.Services.GetService<IDrainModeManager>();
613637
await drainModeManager.EnableDrainModeAsync(CancellationToken.None);
614638
await host.StopAsync();
615639
}
616640

617-
private static void AssertAzureSdkLogs(IEnumerable<LogMessage> logMessages)
641+
private static async Task AwaitCheckpointing(IHost host, CancellationToken cancellationToken)
618642
{
619-
Assert.True(logMessages.Any(x => x.Category.StartsWith("Azure.")));
643+
var logMessages = host.GetTestLoggerProvider()
644+
.GetAllLogMessages();
645+
646+
while (true)
647+
{
648+
if (logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
649+
&& x.FormattedMessage.Contains("CheckpointAsync")
650+
&& x.FormattedMessage.Contains("sequenceNumber")).Any())
651+
{
652+
return;
653+
}
654+
655+
// No need to explicitly check the cancellation token;
656+
// the delay will throw if the token is signaled.
657+
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
658+
}
620659
}
621660

622661
public class EventHubTestSingleDispatchJobs

0 commit comments

Comments
 (0)