|
1 | | -from typing import ClassVar, Dict, Optional |
| 1 | +import os |
| 2 | +import platform |
| 3 | +import ssl |
| 4 | +from typing import Callable, ClassVar, Dict, Optional |
2 | 5 |
|
3 | | -from requests import Response, exceptions, request |
| 6 | +import requests |
| 7 | +from requests import Response, exceptions |
| 8 | +from requests.adapters import HTTPAdapter |
4 | 9 |
|
5 | 10 | from cycode.cli.exceptions.custom_exceptions import HttpUnauthorizedError, NetworkError |
6 | 11 | from cycode.cyclient import config, logger |
7 | 12 | from cycode.cyclient.headers import get_cli_user_agent, get_correlation_id |
8 | 13 |
|
9 | 14 |
|
| 15 | +class SystemStorageSslContext(HTTPAdapter): |
| 16 | + def init_poolmanager(self, *args, **kwargs) -> None: |
| 17 | + default_context = ssl.create_default_context() |
| 18 | + default_context.load_default_certs() |
| 19 | + kwargs['ssl_context'] = default_context |
| 20 | + return super().init_poolmanager(*args, **kwargs) |
| 21 | + |
| 22 | + def cert_verify(self, *args, **kwargs) -> None: |
| 23 | + super().cert_verify(*args, **kwargs) |
| 24 | + conn = kwargs['conn'] if 'conn' in kwargs else args[0] |
| 25 | + conn.ca_certs = None |
| 26 | + |
| 27 | + |
| 28 | +def _get_request_function() -> Callable: |
| 29 | + if platform.system() == 'Darwin': |
| 30 | + return requests.request |
| 31 | + |
| 32 | + if os.environ.get('REQUESTS_CA_BUNDLE') or os.environ.get('CURL_CA_BUNDLE'): |
| 33 | + return requests.request |
| 34 | + |
| 35 | + session = requests.Session() |
| 36 | + session.mount('https://', SystemStorageSslContext()) |
| 37 | + return session.request |
| 38 | + |
| 39 | + |
10 | 40 | class CycodeClientBase: |
11 | 41 | MANDATORY_HEADERS: ClassVar[Dict[str, str]] = { |
12 | 42 | 'User-Agent': get_cli_user_agent(), |
@@ -56,6 +86,7 @@ def _execute( |
56 | 86 |
|
57 | 87 | try: |
58 | 88 | headers = self.get_request_headers(headers, without_auth=without_auth) |
| 89 | + request = _get_request_function() |
59 | 90 | response = request(method=method, url=url, timeout=timeout, headers=headers, **kwargs) |
60 | 91 |
|
61 | 92 | content = 'HIDDEN' if hide_response_content_log else response.text |
|
0 commit comments