1
1
import aiohttp
2
2
import asyncio
3
- import io
4
- import pathlib
5
- import os
6
- import ssl
7
- import sys
3
+
8
4
import aiohttp .http_exceptions
9
- from aiohttp .client import URL
10
5
from botocore .endpoint import EndpointCreator , Endpoint , DEFAULT_TIMEOUT , \
11
6
MAX_POOL_CONNECTIONS , logger , history_recorder , create_request_object
12
7
from botocore .exceptions import ConnectionClosedError
13
8
from botocore .hooks import first_non_none_response
14
9
from botocore .utils import is_valid_endpoint_url
15
- from multidict import MultiDict
16
- from urllib .parse import urlparse
17
10
from urllib3 .response import HTTPHeaderDict
11
+
12
+ from aiobotocore .httpsession import AIOHTTPSession
18
13
from aiobotocore .response import StreamingBody
19
- from aiobotocore ._endpoint_helpers import _text , _IOBaseWrapper , \
20
- ClientResponseProxy
14
+ from aiobotocore ._endpoint_helpers import ClientResponseProxy # noqa: F401, E501 lgtm [py/unused-import]
21
15
22
16
23
17
async def convert_to_response_dict (http_response , operation_model ):
@@ -62,10 +56,6 @@ async def convert_to_response_dict(http_response, operation_model):
62
56
63
57
64
58
class AioEndpoint (Endpoint ):
65
- def __init__ (self , * args , proxies = None , ** kwargs ):
66
- super ().__init__ (* args , ** kwargs )
67
- self .proxies = proxies or {}
68
-
69
59
async def create_request (self , params , operation_model = None ):
70
60
request = create_request_object (params )
71
61
if operation_model :
@@ -237,53 +227,15 @@ async def _needs_retry(self, attempts, operation_model, request_dict,
237
227
return True
238
228
239
229
async def _send (self , request ):
240
- # Note: When using aiobotocore with dynamodb, requests fail on crc32
241
- # checksum computation as soon as the response data reaches ~5KB.
242
- # When AWS response is gzip compressed:
243
- # 1. aiohttp is automatically decompressing the data
244
- # (http://aiohttp.readthedocs.io/en/stable/client.html#binary-response-content)
245
- # 2. botocore computes crc32 on the uncompressed data bytes and fails
246
- # cause crc32 has been computed on the compressed data
247
- # The following line forces aws not to use gzip compression,
248
- # if there is a way to configure aiohttp not to perform decompression,
249
- # we can remove the following line and take advantage of
250
- # aws gzip compression.
251
- # https://github.com/boto/botocore/issues/1255
252
- url = request .url
253
- headers = request .headers
254
- data = request .body
255
-
256
- headers ['Accept-Encoding' ] = 'identity'
257
- headers_ = MultiDict (
258
- (z [0 ], _text (z [1 ], encoding = 'utf-8' )) for z in headers .items ())
259
-
260
- # botocore does this during the request so we do this here as well
261
- # TODO: this should be part of the ClientSession, perhaps make wrapper
262
- proxy = self .proxies .get (urlparse (url .lower ()).scheme )
263
-
264
- if isinstance (data , io .IOBase ):
265
- data = _IOBaseWrapper (data )
266
-
267
- url = URL (url , encoded = True )
268
- resp = await self .http_session .request (
269
- request .method , url = url , headers = headers_ , data = data , proxy = proxy )
270
-
271
- # If we're not streaming, read the content so we can retry any timeout
272
- # errors, see:
273
- # https://github.com/boto/botocore/blob/develop/botocore/vendored/requests/sessions.py#L604
274
- if not request .stream_output :
275
- await resp .read ()
276
-
277
- return resp
230
+ return await self .http_session .send (request )
278
231
279
232
280
233
class AioEndpointCreator (EndpointCreator ):
281
- # TODO: handle socket_options
282
234
def create_endpoint (self , service_model , region_name , endpoint_url ,
283
235
verify = None , response_parser_factory = None ,
284
236
timeout = DEFAULT_TIMEOUT ,
285
237
max_pool_connections = MAX_POOL_CONNECTIONS ,
286
- http_session_cls = aiohttp . ClientSession ,
238
+ http_session_cls = AIOHTTPSession ,
287
239
proxies = None ,
288
240
socket_options = None ,
289
241
client_cert = None ,
@@ -297,68 +249,20 @@ def create_endpoint(self, service_model, region_name, endpoint_url,
297
249
endpoint_prefix = service_model .endpoint_prefix
298
250
299
251
logger .debug ('Setting %s timeout as %s' , endpoint_prefix , timeout )
300
-
301
- if isinstance (timeout , (list , tuple )):
302
- conn_timeout , read_timeout = timeout
303
- else :
304
- conn_timeout = read_timeout = timeout
305
-
306
- if connector_args is None :
307
- # AWS has a 20 second idle timeout:
308
- # https://forums.aws.amazon.com/message.jspa?messageID=215367
309
- # aiohttp default timeout is 30s so set something reasonable here
310
- connector_args = dict (keepalive_timeout = 12 )
311
-
312
- timeout = aiohttp .ClientTimeout (
313
- sock_connect = conn_timeout ,
314
- sock_read = read_timeout
315
- )
316
-
317
- verify = self ._get_verify_value (verify )
318
- ssl_context = None
319
- if client_cert :
320
- if isinstance (client_cert , str ):
321
- key_file = None
322
- cert_file = client_cert
323
- elif isinstance (client_cert , tuple ):
324
- cert_file , key_file = client_cert
325
- else :
326
- raise TypeError ("client_cert must be str or tuple, not %s" %
327
- client_cert .__class__ .__name__ )
328
-
329
- ssl_context = ssl .create_default_context (ssl .Purpose .CLIENT_AUTH )
330
- ssl_context .load_cert_chain (cert_file , key_file )
331
- elif isinstance (verify , (str , pathlib .Path )):
332
- ssl_context = ssl .create_default_context (ssl .Purpose .CLIENT_AUTH ,
333
- cafile = str (verify ))
334
-
335
- if ssl_context :
336
- # Enable logging of TLS session keys via defacto standard environment variable # noqa: E501
337
- # 'SSLKEYLOGFILE', if the feature is available (Python 3.8+). Skip empty values. # noqa: E501
338
- if hasattr (ssl_context , 'keylog_filename' ):
339
- keylogfile = os .environ .get ('SSLKEYLOGFILE' )
340
- if keylogfile and not sys .flags .ignore_environment :
341
- ssl_context .keylog_filename = keylogfile
342
-
343
- # TODO: add support for proxies_config
344
-
345
- connector = aiohttp .TCPConnector (
346
- limit = max_pool_connections ,
347
- verify_ssl = bool (verify ),
348
- ssl = ssl_context ,
349
- ** connector_args )
350
-
351
- aio_session = http_session_cls (
352
- connector = connector ,
252
+ http_session = http_session_cls (
353
253
timeout = timeout ,
354
- skip_auto_headers = {'CONTENT-TYPE' },
355
- response_class = ClientResponseProxy ,
356
- auto_decompress = False )
254
+ proxies = proxies ,
255
+ verify = self ._get_verify_value (verify ),
256
+ max_pool_connections = max_pool_connections ,
257
+ socket_options = socket_options ,
258
+ client_cert = client_cert ,
259
+ proxies_config = proxies_config ,
260
+ connector_args = connector_args
261
+ )
357
262
358
263
return AioEndpoint (
359
264
endpoint_url ,
360
265
endpoint_prefix = endpoint_prefix ,
361
266
event_emitter = self ._event_emitter ,
362
267
response_parser_factory = response_parser_factory ,
363
- http_session = aio_session ,
364
- proxies = proxies )
268
+ http_session = http_session )
0 commit comments