Skip to content

Commit fa455ae

Browse files
committed
Merge branch 'enhancement/2.x-disabledirectstreaming-request' into 2.x
2 parents a372028 + 2cb9eba commit fa455ae

File tree

8 files changed

+290
-251
lines changed

8 files changed

+290
-251
lines changed

src/Elasticsearch.Net/Configuration/RequestConfiguration.cs

Lines changed: 204 additions & 190 deletions
Large diffs are not rendered by default.

src/Elasticsearch.Net/Configuration/Security/BasicAuthenticationCredentials.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ public class BasicAuthenticationCredentials
1010
public string UserName { get { return Username; } set { Username = value; } }
1111
public string Password { get; set; }
1212

13-
public override string ToString()
14-
{
15-
return this.Username + ":" + this.Password;
16-
}
13+
public override string ToString() => $"{this.Username}:{this.Password}";
1714
}
1815
}

src/Elasticsearch.Net/Domain/RequestParameters/IRequestParameters.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,5 @@ public interface IRequestParameters
2727
TOut GetQueryStringValue<TOut>(string name);
2828

2929
void AddQueryStringValue(string name, object value);
30-
31-
3230
}
3331
}

src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public RequestData(HttpMethod method, string path, PostData<object> data, IConne
5353

5454
public RequestData(HttpMethod method, string path, PostData<object> data, IConnectionConfigurationValues global, IRequestParameters local, IMemoryStreamFactory memoryStreamFactory)
5555
#pragma warning disable CS0618 // Type or member is obsolete
56-
: this(method, path, data, global, (IRequestConfiguration)local?.RequestConfiguration, memoryStreamFactory)
56+
: this(method, path, data, global, local?.RequestConfiguration, memoryStreamFactory)
5757
#pragma warning restore CS0618 // Type or member is obsolete
5858
{
5959
this.CustomConverter = local?.DeserializationOverride;
@@ -67,12 +67,16 @@ public RequestData(HttpMethod method, string path, PostData<object> data, IConne
6767
this.MemoryStreamFactory = memoryStreamFactory;
6868
this.Method = method;
6969
this.PostData = data;
70+
71+
if (data != null)
72+
data.DisableDirectStreaming = local?.DisableDirectStreaming ?? global.DisableDirectStreaming;
73+
7074
this.Path = this.CreatePathWithQueryStrings(path, this.ConnectionSettings, null);
7175

72-
this.Pipelined = global.HttpPipeliningEnabled || (local?.EnableHttpPipelining).GetValueOrDefault(false);
76+
this.Pipelined = local?.EnableHttpPipelining ?? global.HttpPipeliningEnabled;
7377
this.HttpCompression = global.EnableHttpCompression;
7478
this.ContentType = local?.ContentType ?? MimeType;
75-
this.Headers = new NameValueCollection(global.Headers ?? new NameValueCollection());
79+
this.Headers = global.Headers != null ? new NameValueCollection(global.Headers) : new NameValueCollection();
7680
this.RunAs = local?.RunAs;
7781

7882
this.RequestTimeout = local?.RequestTimeout ?? global.RequestTimeout;

src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
using System;
2-
using System.Collections.Generic;
32
using System.IO;
4-
using System.Linq;
5-
using System.Text;
63
using System.Threading.Tasks;
74

85
namespace Elasticsearch.Net
@@ -11,16 +8,21 @@ public class ResponseBuilder<TReturn>
118
where TReturn : class
129
{
1310
private const int BufferSize = 81920;
11+
private static readonly VoidResponse Void = new VoidResponse();
12+
private static readonly IDisposable EmptyDisposable = new MemoryStream();
13+
14+
private readonly RequestData _requestData;
15+
private readonly bool _disableDirectStreaming;
1416

1517
public Exception Exception { get; set; }
1618
public int? StatusCode { get; set; }
1719
public Stream Stream { get; set; }
1820

19-
private readonly RequestData _requestData;
20-
2121
public ResponseBuilder(RequestData requestData)
2222
{
2323
_requestData = requestData;
24+
_disableDirectStreaming =
25+
this._requestData.PostData?.DisableDirectStreaming ?? this._requestData.ConnectionSettings.DisableDirectStreaming;
2426
}
2527

2628
public ElasticsearchResponse<TReturn> ToResponse()
@@ -53,21 +55,19 @@ private ElasticsearchResponse<TReturn> Initialize(int? statusCode, Exception exc
5355
return response;
5456
}
5557

56-
private static IDisposable EmptyDisposable = new MemoryStream();
57-
5858
private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
59-
{
60-
byte[] bytes = null;
61-
if (NeedsToEagerReadStream())
62-
{
63-
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
64-
stream.CopyTo(inMemoryStream, BufferSize);
65-
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
59+
{
60+
byte[] bytes = null;
61+
if (NeedsToEagerReadStream())
62+
{
63+
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
64+
stream.CopyTo(inMemoryStream, BufferSize);
65+
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
6666
}
6767

6868
var needsDispose = typeof(TReturn) != typeof(Stream);
69-
using (needsDispose ? stream : EmptyDisposable)
70-
{
69+
using (needsDispose ? stream : EmptyDisposable)
70+
{
7171
if (response.Success)
7272
{
7373
if (!SetSpecialTypes(stream, response, bytes))
@@ -81,25 +81,25 @@ private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
8181
ServerError serverError;
8282
if (ServerError.TryCreate(stream, out serverError))
8383
response.ServerError = serverError;
84-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
84+
if (_disableDirectStreaming)
8585
response.ResponseBodyInBytes = bytes;
8686
}
8787
}
8888
}
89-
89+
9090
private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream stream)
91-
{
92-
byte[] bytes = null;
93-
if (NeedsToEagerReadStream())
94-
{
95-
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
96-
await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false);
97-
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
91+
{
92+
byte[] bytes = null;
93+
if (NeedsToEagerReadStream())
94+
{
95+
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
96+
await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false);
97+
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
9898
}
99-
100-
var needsDispose = typeof(TReturn) != typeof(Stream);
101-
using (needsDispose ? stream : EmptyDisposable)
102-
{
99+
100+
var needsDispose = typeof(TReturn) != typeof(Stream);
101+
using (needsDispose ? stream : EmptyDisposable)
102+
{
103103
if (response.Success)
104104
{
105105
if (!SetSpecialTypes(stream, response, bytes))
@@ -108,10 +108,10 @@ private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream
108108
else response.Body = await this._requestData.ConnectionSettings.Serializer.DeserializeAsync<TReturn>(stream, this._requestData.CancellationToken).ConfigureAwait(false);
109109
}
110110
}
111-
else if (response.HttpStatusCode != null)
111+
else if (response.HttpStatusCode != null)
112112
{
113113
response.ServerError = await ServerError.TryCreateAsync(stream, this._requestData.CancellationToken).ConfigureAwait(false);
114-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
114+
if (_disableDirectStreaming)
115115
response.ResponseBodyInBytes = bytes;
116116
}
117117
}
@@ -127,7 +127,7 @@ private void Finalize(ElasticsearchResponse<TReturn> response)
127127
}
128128

129129
private bool NeedsToEagerReadStream() =>
130-
this._requestData.ConnectionSettings.DisableDirectStreaming || typeof(TReturn) == typeof(string) || typeof(TReturn) == typeof(byte[]);
130+
_disableDirectStreaming || typeof(TReturn) == typeof(string) || typeof(TReturn) == typeof(byte[]);
131131

132132
private byte[] SwapStreams(ref Stream responseStream, ref MemoryStream ms)
133133
{
@@ -142,16 +142,16 @@ private byte[] SwapStreams(ref Stream responseStream, ref MemoryStream ms)
142142
private bool SetSpecialTypes(Stream responseStream, ElasticsearchResponse<TReturn> cs, byte[] bytes)
143143
{
144144
var setSpecial = true;
145-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
145+
if (_disableDirectStreaming)
146146
cs.ResponseBodyInBytes = bytes;
147-
var returnType = typeof (TReturn);
147+
var returnType = typeof(TReturn);
148148
if (returnType == typeof(string))
149149
this.SetStringResult(cs as ElasticsearchResponse<string>, bytes);
150150
else if (returnType == typeof(byte[]))
151151
this.SetByteResult(cs as ElasticsearchResponse<byte[]>, bytes);
152152
else if (returnType == typeof(VoidResponse))
153153
this.SetVoidResult(cs as ElasticsearchResponse<VoidResponse>, responseStream);
154-
else if (returnType == typeof(Stream))
154+
else if (returnType == typeof(Stream))
155155
this.SetStreamResult(cs as ElasticsearchResponse<Stream>, responseStream);
156156
else
157157
setSpecial = false;
@@ -164,12 +164,10 @@ private bool SetSpecialTypes(Stream responseStream, ElasticsearchResponse<TRetur
164164

165165
private void SetStreamResult(ElasticsearchResponse<Stream> result, Stream response) => result.Body = response;
166166

167-
private static VoidResponse _void = new VoidResponse();
168-
169167
private void SetVoidResult(ElasticsearchResponse<VoidResponse> result, Stream response)
170168
{
171169
response.Dispose();
172-
result.Body = _void;
170+
result.Body = Void;
173171
}
174172
}
175173
}

src/Elasticsearch.Net/Transport/PostData.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
using System.Collections.Generic;
2-
using Purify;
32
using System.IO;
4-
using System.Reflection;
53
using System.Threading;
64
using System.Threading.Tasks;
75

@@ -33,6 +31,7 @@ public class PostData<T> : IPostData<T>
3331
private readonly IEnumerable<object> _enumerableOfObject;
3432
private readonly T _serializable;
3533

34+
public bool? DisableDirectStreaming { get; set; }
3635
public byte[] WrittenBytes { get; private set; }
3736
public PostType Type { get; }
3837

@@ -68,7 +67,8 @@ public PostData(T item)
6867
public void Write(Stream writableStream, IConnectionConfigurationValues settings)
6968
{
7069
var indent = settings.PrettyJson ? SerializationFormatting.Indented : SerializationFormatting.None;
71-
MemoryStream ms = null; Stream stream = null;
70+
MemoryStream ms = null;
71+
Stream stream = null;
7272
switch (Type)
7373
{
7474
case PostType.ByteArray:
@@ -83,7 +83,7 @@ public void Write(Stream writableStream, IConnectionConfigurationValues settings
8383
case PostType.EnumerableOfObject:
8484
if (!_enumerableOfObject.HasAny()) return;
8585

86-
if (settings.DisableDirectStreaming)
86+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
8787
{
8888
ms = new MemoryStream();
8989
stream = ms;
@@ -97,7 +97,7 @@ public void Write(Stream writableStream, IConnectionConfigurationValues settings
9797
break;
9898
case PostType.Serializable:
9999
stream = writableStream;
100-
if (settings.DisableDirectStreaming)
100+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
101101
{
102102
ms = new MemoryStream();
103103
stream = ms;
@@ -131,7 +131,7 @@ public void Write(Stream writableStream, IConnectionConfigurationValues settings
131131
break;
132132
case PostType.EnumerableOfObject:
133133
if (!_enumerableOfObject.HasAny()) return;
134-
if (settings.DisableDirectStreaming)
134+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
135135
{
136136
ms = new MemoryStream();
137137
stream = ms;
@@ -145,7 +145,7 @@ public void Write(Stream writableStream, IConnectionConfigurationValues settings
145145
break;
146146
case PostType.Serializable:
147147
stream = writableStream;
148-
if (settings.DisableDirectStreaming)
148+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
149149
{
150150
ms = new MemoryStream();
151151
stream = ms;

src/Nest/CommonAbstractions/Request/RequestBase.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public IRequestConfiguration RequestConfiguration
6969
}
7070
}
7171

72-
7372
public abstract class RequestDescriptorBase<TDescriptor, TParameters, TInterface> : RequestBase<TParameters>, IDescriptor
7473
where TDescriptor : RequestDescriptorBase<TDescriptor, TParameters, TInterface>, TInterface
7574
where TParameters : FluentRequestParameters<TParameters>, new()

src/Tests/ClientConcepts/LowLevel/DirectStreaming.cs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
using Elasticsearch.Net;
2-
using FluentAssertions;
1+
using FluentAssertions;
32
using Nest;
43
using System;
5-
using System.Collections.Generic;
6-
using System.Linq;
7-
using System.Text;
8-
using System.Threading.Tasks;
94
using Tests.Framework;
105

116
namespace Tests.ClientConcepts.LowLevel
@@ -79,5 +74,39 @@ public void EnableDirectStreamingOnSuccess()
7974
response = client.SearchAsync<object>(s => s).Result;
8075
assert(response);
8176
}
77+
78+
[U]
79+
public void DisableDirectStreamingOnRequest()
80+
{
81+
Action<IResponse> assert = r =>
82+
{
83+
r.ApiCall.Should().NotBeNull();
84+
r.ApiCall.RequestBodyInBytes.Should().NotBeNull();
85+
r.ApiCall.ResponseBodyInBytes.Should().NotBeNull();
86+
};
87+
88+
var client = TestClient.GetFixedReturnClient(new { });
89+
var response = client.Search<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming()));
90+
assert(response);
91+
response = client.SearchAsync<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming())).Result;
92+
assert(response);
93+
}
94+
95+
[U]
96+
public void EnableDirectStreamingOnRequest()
97+
{
98+
Action<IResponse> assert = r =>
99+
{
100+
r.ApiCall.Should().NotBeNull();
101+
r.ApiCall.RequestBodyInBytes.Should().BeNull();
102+
r.ApiCall.ResponseBodyInBytes.Should().BeNull();
103+
};
104+
105+
var client = TestClient.GetFixedReturnClient(new { }, 200, c => c.DisableDirectStreaming());
106+
var response = client.Search<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming(false)));
107+
assert(response);
108+
response = client.SearchAsync<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming(false))).Result;
109+
assert(response);
110+
}
82111
}
83112
}

0 commit comments

Comments
 (0)