Skip to content

Commit 545f6ef

Browse files
Mpdreamzrusscam
authored andcommitted
coordinated request observables when aborted could bubble out an UnexpectedElasticsearchClientException or a OperationCancelledException, this normalizes what gets passed to OnError (OperationCancelledException) (#4027)
(cherry picked from commit 6b8c63f)
1 parent 82ef640 commit 545f6ef

File tree

5 files changed

+22
-5
lines changed

5 files changed

+22
-5
lines changed

src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
290290
{
291291
if (!FirstPoolUsageNeedsSniffing) return;
292292

293+
// TODO cancellationToken could throw here and will bubble out as OperationCancelledException
294+
// everywhere else it would bubble out wrapped in a `UnexpectedElasticsearchClientException`
293295
var success = await semaphore.WaitAsync(_settings.RequestTimeout, cancellationToken).ConfigureAwait(false);
294296
if (!success)
295297
{

src/Elasticsearch.Net/Transport/Transport.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ public async Task<TResponse> RequestAsync<TResponse>(HttpMethod method, string p
142142
}
143143
catch (Exception killerException)
144144
{
145+
if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested)
146+
pipeline.AuditCancellationRequested();
147+
145148
throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
146149
{
147150
Request = requestData,

src/Nest/CommonAbstractions/Reactive/CoordinatedRequestObserverBase.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using Elasticsearch.Net;
23

34
namespace Nest
45
{
@@ -27,7 +28,15 @@ protected CoordinatedRequestObserverBase(Action<T> onNext = null, Action<Excepti
2728

2829
public void OnCompleted() => _completed?.Invoke();
2930

30-
public void OnError(Exception error) => _onError?.Invoke(error);
31+
public void OnError(Exception error)
32+
{
33+
// This normalizes task cancellation exceptions for observables
34+
// If a task cancellation happens in the client it bubbles out as a UnexpectedElasticsearchClientException
35+
// where as inside our IObservable implementation we .ThrowIfCancellationRequested() directly.
36+
if (error is UnexpectedElasticsearchClientException es && es.InnerException != null && es.InnerException is OperationCanceledException c)
37+
_onError?.Invoke(c);
38+
else _onError?.Invoke(error);
39+
}
3140

3241
public void OnNext(T value) => _onNext?.Invoke(value);
3342
}

src/Tests/Tests/Document/Multiple/BulkAll/BulkAllCancellationTokenApiTests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ public void CancelBulkAll()
4646
//when we subscribe the observable becomes hot
4747
observableBulk.Subscribe(bulkObserver);
4848

49-
//we wait Nseconds to see some bulks
49+
//we wait N seconds to see some bulks
5050
handle.WaitOne(TimeSpan.FromSeconds(3));
5151
tokenSource.Cancel();
52-
//we wait Nseconds to give in flight request a chance to cancel
52+
//we wait N seconds to give in flight request a chance to cancel
5353
handle.WaitOne(TimeSpan.FromSeconds(3));
54-
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;
54+
55+
if (ex != null && !(ex is OperationCanceledException)) throw ex;
5556

5657
seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
5758
var count = Client.Count<SmallObject>(f => f.Index(index));

src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDisposeApiTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Threading;
33
using System.Threading.Tasks;
44
using Elastic.Xunit.XunitPlumbing;
5+
using Elasticsearch.Net;
56
using FluentAssertions;
67
using Nest;
78
using Tests.Core.ManagedElasticsearch.Clusters;
@@ -51,7 +52,8 @@ public void DisposingObservableCancelsBulkAll()
5152
observableBulk.Dispose();
5253
//we wait N seconds to give in flight request a chance to cancel
5354
handle.WaitOne(TimeSpan.FromSeconds(3));
54-
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;
55+
56+
if (ex != null && !(ex is OperationCanceledException)) throw ex;
5557

5658
seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
5759
var count = Client.Count<SmallObject>(f => f.Index(index));

0 commit comments

Comments
 (0)