@@ -2,7 +2,7 @@ use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings
2
2
use crate :: {
3
3
body:: BoxBody ,
4
4
client:: GrpcService ,
5
- codec:: { encode_client, Codec , Streaming } ,
5
+ codec:: { encode_client, Codec , Decoder , Streaming } ,
6
6
request:: SanitizeHeaders ,
7
7
Code , Request , Response , Status ,
8
8
} ;
@@ -30,6 +30,10 @@ use std::fmt;
30
30
/// [gRPC protocol definition]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
31
31
pub struct Grpc < T > {
32
32
inner : T ,
33
+ config : GrpcConfig ,
34
+ }
35
+
36
+ struct GrpcConfig {
33
37
origin : Uri ,
34
38
/// Which compression encodings does the client accept?
35
39
accept_compression_encodings : EnabledCompressionEncodings ,
@@ -40,12 +44,7 @@ pub struct Grpc<T> {
40
44
impl < T > Grpc < T > {
41
45
/// Creates a new gRPC client with the provided [`GrpcService`].
42
46
pub fn new ( inner : T ) -> Self {
43
- Self {
44
- inner,
45
- origin : Uri :: default ( ) ,
46
- send_compression_encodings : None ,
47
- accept_compression_encodings : EnabledCompressionEncodings :: default ( ) ,
48
- }
47
+ Self :: with_origin ( inner, Uri :: default ( ) )
49
48
}
50
49
51
50
/// Creates a new gRPC client with the provided [`GrpcService`] and `Uri`.
@@ -55,9 +54,11 @@ impl<T> Grpc<T> {
55
54
pub fn with_origin ( inner : T , origin : Uri ) -> Self {
56
55
Self {
57
56
inner,
58
- origin,
59
- send_compression_encodings : None ,
60
- accept_compression_encodings : EnabledCompressionEncodings :: default ( ) ,
57
+ config : GrpcConfig {
58
+ origin,
59
+ send_compression_encodings : None ,
60
+ accept_compression_encodings : EnabledCompressionEncodings :: default ( ) ,
61
+ } ,
61
62
}
62
63
}
63
64
@@ -88,7 +89,7 @@ impl<T> Grpc<T> {
88
89
/// # };
89
90
/// ```
90
91
pub fn send_compressed ( mut self , encoding : CompressionEncoding ) -> Self {
91
- self . send_compression_encodings = Some ( encoding) ;
92
+ self . config . send_compression_encodings = Some ( encoding) ;
92
93
self
93
94
}
94
95
@@ -119,7 +120,7 @@ impl<T> Grpc<T> {
119
120
/// # };
120
121
/// ```
121
122
pub fn accept_compressed ( mut self , encoding : CompressionEncoding ) -> Self {
122
- self . accept_compression_encodings . enable ( encoding) ;
123
+ self . config . accept_compression_encodings . enable ( encoding) ;
123
124
self
124
125
}
125
126
@@ -226,6 +227,73 @@ impl<T> Grpc<T> {
226
227
M1 : Send + Sync + ' static ,
227
228
M2 : Send + Sync + ' static ,
228
229
{
230
+ let request = request
231
+ . map ( |s| encode_client ( codec. encoder ( ) , s, self . config . send_compression_encodings ) )
232
+ . map ( BoxBody :: new) ;
233
+
234
+ let request = self . config . prepare_request ( request, path) ;
235
+
236
+ let response = self
237
+ . inner
238
+ . call ( request)
239
+ . await
240
+ . map_err ( Status :: from_error_generic) ?;
241
+
242
+ let decoder = codec. decoder ( ) ;
243
+
244
+ self . create_response ( decoder, response)
245
+ }
246
+
247
+ // Keeping this code in a separate function from Self::streaming lets functions that return the
248
+ // same output share the generated binary code
249
+ fn create_response < M2 > (
250
+ & self ,
251
+ decoder : impl Decoder < Item = M2 , Error = Status > + Send + ' static ,
252
+ response : http:: Response < T :: ResponseBody > ,
253
+ ) -> Result < Response < Streaming < M2 > > , Status >
254
+ where
255
+ T : GrpcService < BoxBody > ,
256
+ T :: ResponseBody : Body + Send + ' static ,
257
+ <T :: ResponseBody as Body >:: Error : Into < crate :: Error > ,
258
+ {
259
+ let encoding = CompressionEncoding :: from_encoding_header (
260
+ response. headers ( ) ,
261
+ self . config . accept_compression_encodings ,
262
+ ) ?;
263
+
264
+ let status_code = response. status ( ) ;
265
+ let trailers_only_status = Status :: from_header_map ( response. headers ( ) ) ;
266
+
267
+ // We do not need to check for trailers if the `grpc-status` header is present
268
+ // with a valid code.
269
+ let expect_additional_trailers = if let Some ( status) = trailers_only_status {
270
+ if status. code ( ) != Code :: Ok {
271
+ return Err ( status) ;
272
+ }
273
+
274
+ false
275
+ } else {
276
+ true
277
+ } ;
278
+
279
+ let response = response. map ( |body| {
280
+ if expect_additional_trailers {
281
+ Streaming :: new_response ( decoder, body, status_code, encoding)
282
+ } else {
283
+ Streaming :: new_empty ( decoder, body)
284
+ }
285
+ } ) ;
286
+
287
+ Ok ( Response :: from_http ( response) )
288
+ }
289
+ }
290
+
291
+ impl GrpcConfig {
292
+ fn prepare_request (
293
+ & self ,
294
+ request : Request < http_body:: combinators:: UnsyncBoxBody < bytes:: Bytes , Status > > ,
295
+ path : PathAndQuery ,
296
+ ) -> http:: Request < http_body:: combinators:: UnsyncBoxBody < bytes:: Bytes , Status > > {
229
297
let scheme = self . origin . scheme ( ) . cloned ( ) ;
230
298
let authority = self . origin . authority ( ) . cloned ( ) ;
231
299
@@ -236,10 +304,6 @@ impl<T> Grpc<T> {
236
304
237
305
let uri = Uri :: from_parts ( parts) . expect ( "path_and_query only is valid Uri" ) ;
238
306
239
- let request = request
240
- . map ( |s| encode_client ( codec. encoder ( ) , s, self . send_compression_encodings ) )
241
- . map ( BoxBody :: new) ;
242
-
243
307
let mut request = request. into_http (
244
308
uri,
245
309
http:: Method :: POST ,
@@ -274,51 +338,19 @@ impl<T> Grpc<T> {
274
338
) ;
275
339
}
276
340
277
- let response = self
278
- . inner
279
- . call ( request)
280
- . await
281
- . map_err ( |err| Status :: from_error ( err. into ( ) ) ) ?;
282
-
283
- let encoding = CompressionEncoding :: from_encoding_header (
284
- response. headers ( ) ,
285
- self . accept_compression_encodings ,
286
- ) ?;
287
-
288
- let status_code = response. status ( ) ;
289
- let trailers_only_status = Status :: from_header_map ( response. headers ( ) ) ;
290
-
291
- // We do not need to check for trailers if the `grpc-status` header is present
292
- // with a valid code.
293
- let expect_additional_trailers = if let Some ( status) = trailers_only_status {
294
- if status. code ( ) != Code :: Ok {
295
- return Err ( status) ;
296
- }
297
-
298
- false
299
- } else {
300
- true
301
- } ;
302
-
303
- let response = response. map ( |body| {
304
- if expect_additional_trailers {
305
- Streaming :: new_response ( codec. decoder ( ) , body, status_code, encoding)
306
- } else {
307
- Streaming :: new_empty ( codec. decoder ( ) , body)
308
- }
309
- } ) ;
310
-
311
- Ok ( Response :: from_http ( response) )
341
+ request
312
342
}
313
343
}
314
344
315
345
impl < T : Clone > Clone for Grpc < T > {
316
346
fn clone ( & self ) -> Self {
317
347
Self {
318
348
inner : self . inner . clone ( ) ,
319
- origin : self . origin . clone ( ) ,
320
- send_compression_encodings : self . send_compression_encodings ,
321
- accept_compression_encodings : self . accept_compression_encodings ,
349
+ config : GrpcConfig {
350
+ origin : self . config . origin . clone ( ) ,
351
+ send_compression_encodings : self . config . send_compression_encodings ,
352
+ accept_compression_encodings : self . config . accept_compression_encodings ,
353
+ } ,
322
354
}
323
355
}
324
356
}
@@ -329,13 +361,16 @@ impl<T: fmt::Debug> fmt::Debug for Grpc<T> {
329
361
330
362
f. field ( "inner" , & self . inner ) ;
331
363
332
- f. field ( "origin" , & self . origin ) ;
364
+ f. field ( "origin" , & self . config . origin ) ;
333
365
334
- f. field ( "compression_encoding" , & self . send_compression_encodings ) ;
366
+ f. field (
367
+ "compression_encoding" ,
368
+ & self . config . send_compression_encodings ,
369
+ ) ;
335
370
336
371
f. field (
337
372
"accept_compression_encodings" ,
338
- & self . accept_compression_encodings ,
373
+ & self . config . accept_compression_encodings ,
339
374
) ;
340
375
341
376
f. finish ( )
0 commit comments