Skip to content

Commit 1bd2db0

Browse files
authored
Null check bug fixes and tweak accessibility of properties on TransportResponse (#144)
`DefaultResponseFactory` was throwing if the response stream was null. This can occur when an exception is thrown when sending the request (e.g., `HttpRequestException`), for example, when the `HttpClient` cannot connect to the endpoint. Rather than throwing a null exception here, we still want to return a response with the original exception attached. In `StreamResponse`, we must safety-check that any linked disposables are not null before attempting to dispose of them. The final change in `TransportResponse` is a tweak for the ingest work. The `BulkStreamingResponse` was initially derived from the `StreamResponse` to share behaviour. However, this causes the `Body` property (stream) to be present on the derived type. As we are handling stream reading internally, this is unnecessary and could produce weird behaviour if the consumer tries to access the stream directly. Instead, `BulkStreamingResponse` derives directly from `TransportResponse`, overriding `LeaveOpen` and handling `LinkedDisposables` in its own `Dispose` method. This means we could potentially seal `StreamResponse` again. However, it might still be helpful for consumers to derive responses from this for advanced scenarios, with the base class doing the right thing during disposal. I am open to thoughts on whether that's likely to happen. @flobernd, were you deriving from this in the client?
1 parent efe6b75 commit 1bd2db0

File tree

6 files changed

+98
-55
lines changed

6 files changed

+98
-55
lines changed

src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs

+8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ internal static void SetCommonAttributes(Activity? activity, ITransportConfigura
5353
}
5454

5555
var productSchemaVersion = string.Empty;
56+
foreach (var attribute in activity.TagObjects)
57+
{
58+
if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal))
59+
{
60+
if (attribute.Value is string schemaVersion)
61+
productSchemaVersion = schemaVersion;
62+
}
63+
}
5664

5765
// We add the client schema version only when it differs from the product schema version
5866
if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal))

src/Elastic.Transport/DistributedTransport.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,6 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
124124

125125
if (activity is { IsAllDataRequested: true })
126126
{
127-
if (activity.IsAllDataRequested)
128-
OpenTelemetry.SetCommonAttributes(activity, Configuration);
129-
130127
if (Configuration.Authentication is BasicAuthentication basicAuthentication)
131128
activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username);
132129

@@ -261,9 +258,13 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
261258
activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode);
262259
activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes);
263260

261+
// We don't check IsAllDataRequested here as that's left to the consumer.
264262
if (configureActivity is not null && activity is not null)
265263
configureActivity.Invoke(activity);
266264

265+
if (activity is { IsAllDataRequested: true })
266+
OpenTelemetry.SetCommonAttributes(activity, Configuration);
267+
267268
return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response);
268269
}
269270
finally

src/Elastic.Transport/Responses/DefaultResponseFactory.cs

+1-3
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,11 @@ private async ValueTask<TResponse> CreateCoreAsync<TResponse>(
7979
IReadOnlyDictionary<TcpState, int>? tcpStats,
8080
CancellationToken cancellationToken = default) where TResponse : TransportResponse, new()
8181
{
82-
responseStream.ThrowIfNull(nameof(responseStream));
83-
8482
var details = InitializeApiCallDetails(endpoint, boundConfiguration, postData, ex, statusCode, headers, contentType, threadPoolStats, tcpStats, contentLength);
8583

8684
TResponse? response = null;
8785

88-
if (MayHaveBody(statusCode, endpoint.Method, contentLength)
86+
if (responseStream is not null && MayHaveBody(statusCode, endpoint.Method, contentLength)
8987
&& TryResolveBuilder<TResponse>(boundConfiguration.ResponseBuilders, boundConfiguration.ProductResponseBuilders, out var builder))
9088
{
9189
var ownsStream = false;

src/Elastic.Transport/Responses/Special/StreamResponse.cs

+11-45
Original file line numberDiff line numberDiff line change
@@ -8,65 +8,31 @@
88
namespace Elastic.Transport;
99

1010
/// <summary>
11-
/// A response that exposes the response <see cref="TransportResponse{T}.Body"/> as <see cref="Stream"/>.
11+
/// A response that exposes the response as a <see cref="Stream"/>.
1212
/// <para>
1313
/// <strong>MUST</strong> be disposed after use to ensure the HTTP connection is freed for reuse.
1414
/// </para>
1515
/// </summary>
16-
public class StreamResponse : TransportResponse<Stream>, IDisposable
16+
public sealed class StreamResponse : StreamResponseBase, IDisposable
1717
{
18-
private bool _disposed;
19-
20-
/// <summary>
21-
/// The MIME type of the response, if present.
22-
/// </summary>
23-
public string ContentType { get; }
24-
2518
/// <inheritdoc cref="StreamResponse"/>
26-
public StreamResponse()
27-
{
28-
Body = Stream.Null;
19+
public StreamResponse() : base(Stream.Null) =>
2920
ContentType = string.Empty;
30-
}
3121

3222
/// <inheritdoc cref="StreamResponse"/>
33-
public StreamResponse(Stream body, string? contentType)
34-
{
35-
Body = body;
23+
public StreamResponse(Stream body, string? contentType) : base(body) =>
3624
ContentType = contentType ?? string.Empty;
37-
}
38-
39-
internal override bool LeaveOpen => true;
4025

4126
/// <summary>
42-
/// Disposes the underlying stream.
27+
/// The MIME type of the response, if present.
4328
/// </summary>
44-
/// <param name="disposing"></param>
45-
protected virtual void Dispose(bool disposing)
46-
{
47-
if (!_disposed)
48-
{
49-
if (disposing)
50-
{
51-
Body.Dispose();
52-
53-
if (LinkedDisposables is not null)
54-
{
55-
foreach (var disposable in LinkedDisposables)
56-
disposable.Dispose();
57-
}
58-
}
59-
60-
_disposed = true;
61-
}
62-
}
29+
public string ContentType { get; }
6330

6431
/// <summary>
65-
/// Disposes the underlying stream.
32+
/// The raw response stream.
6633
/// </summary>
67-
public void Dispose()
68-
{
69-
Dispose(disposing: true);
70-
GC.SuppressFinalize(this);
71-
}
34+
public Stream Body => Stream;
35+
36+
/// <inheritdoc/>
37+
protected internal override bool LeaveOpen => true;
7238
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System;
6+
using System.IO;
7+
using Elastic.Transport.Extensions;
8+
9+
namespace Elastic.Transport;
10+
11+
/// <summary>
12+
/// A base class for implementing responses that access the raw response stream.
13+
/// </summary>
14+
public abstract class StreamResponseBase : TransportResponse, IDisposable
15+
{
16+
/// <inheritdoc/>
17+
protected internal override bool LeaveOpen => true;
18+
19+
/// <summary>
20+
/// The raw response stream from the HTTP layer.
21+
/// </summary>
22+
/// <remarks>
23+
/// <b>MUST</b> be disposed to release the underlying HTTP connection for reuse.
24+
/// </remarks>
25+
protected Stream Stream { get; }
26+
27+
/// <summary>
28+
/// Indicates that the response has been disposed and it is not longer safe to access the stream.
29+
/// </summary>
30+
protected bool Disposed { get; private set; }
31+
32+
/// <inheritdoc cref="StreamResponseBase"/>
33+
public StreamResponseBase(Stream responseStream)
34+
{
35+
responseStream.ThrowIfNull(nameof(responseStream));
36+
Stream = responseStream;
37+
}
38+
39+
/// <summary>
40+
/// Disposes the underlying stream.
41+
/// </summary>
42+
/// <param name="disposing"></param>
43+
protected virtual void Dispose(bool disposing)
44+
{
45+
if (!Disposed)
46+
{
47+
if (disposing)
48+
{
49+
Stream?.Dispose();
50+
51+
if (LinkedDisposables is not null)
52+
{
53+
foreach (var disposable in LinkedDisposables)
54+
disposable?.Dispose();
55+
}
56+
}
57+
58+
Disposed = true;
59+
}
60+
}
61+
62+
/// <summary>
63+
/// Disposes the underlying stream.
64+
/// </summary>
65+
public void Dispose()
66+
{
67+
Dispose(disposing: true);
68+
GC.SuppressFinalize(this);
69+
}
70+
}

src/Elastic.Transport/Responses/TransportResponse.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ namespace Elastic.Transport;
1010

1111
/// <summary>
1212
/// A response from an Elastic product including details about the request/response life cycle. Base class for the built in low level response
13-
/// types, <see cref="StringResponse"/>, <see cref="BytesResponse"/>, <see cref="DynamicResponse"/>, <see cref="StreamResponse"/> and <see cref="VoidResponse"/>
13+
/// types, <see cref="StringResponse"/>, <see cref="BytesResponse"/>, <see cref="DynamicResponse"/>, and <see cref="VoidResponse"/>
1414
/// </summary>
1515
public abstract class TransportResponse<T> : TransportResponse
1616
{
1717
/// <summary>
18-
/// The deserialized body returned by the product.
18+
/// The (potentially deserialized) response returned by the product.
1919
/// </summary>
2020
public T Body { get; protected internal set; }
2121
}
@@ -46,7 +46,7 @@ public override string ToString() => ApiCallDetails?.DebugInformation
4646
/// StreamResponse and kept internal. If we later make this public, we might need to refine this.
4747
/// </remarks>
4848
[JsonIgnore]
49-
internal IEnumerable<IDisposable>? LinkedDisposables { get; set; }
49+
protected internal IEnumerable<IDisposable>? LinkedDisposables { get; internal set; }
5050

5151
/// <summary>
5252
/// Allows the response to identify that the response stream should NOT be automatically disposed.
@@ -55,6 +55,6 @@ public override string ToString() => ApiCallDetails?.DebugInformation
5555
/// Currently only used by StreamResponse and therefore internal.
5656
/// </remarks>
5757
[JsonIgnore]
58-
internal virtual bool LeaveOpen { get; } = false;
58+
protected internal virtual bool LeaveOpen { get; } = false;
5959
}
6060

0 commit comments

Comments
 (0)