1
+ import datetime
1
2
import decimal
2
3
import base64
3
4
import hashlib
9
10
from functools import reduce , lru_cache
10
11
import time
11
12
import http .client
13
+ import socket
12
14
from collections import OrderedDict
13
15
import random
14
16
from typing import Union , List , Optional , Dict , Any , Tuple , Pattern , TypeVar
33
35
HTTPS_PREFIX = "https://"
34
36
LOG_FORMAT = "#LUMIGO# - %(asctime)s - %(levelname)s - %(message)s"
35
37
SECONDS_TO_TIMEOUT = 0.5
38
+ COOLDOWN_AFTER_TIMEOUT_DURATION = datetime .timedelta (seconds = 10 )
36
39
LUMIGO_EVENT_KEY = "_lumigo"
37
40
STEP_FUNCTION_UID_KEY = "step_function_uid"
38
41
# number of spans that are too big to enter the reported message before break
81
84
82
85
edge_kinesis_boto_client = None
83
86
edge_connection = None
84
- internal_error_already_logged = False
85
87
86
88
87
89
def should_use_tracer_extension () -> bool :
@@ -92,18 +94,25 @@ def get_region() -> str:
92
94
return os .environ .get ("AWS_REGION" ) or "UNKNOWN"
93
95
94
96
95
- try :
96
- # Try to establish the connection in initialization
97
- if (
98
- os .environ .get ("LUMIGO_INITIALIZATION_CONNECTION" , "" ).lower () != "false"
99
- and get_region () != CHINA_REGION # noqa
100
- ):
101
- edge_connection = http .client .HTTPSConnection ( # type: ignore
102
- EDGE_HOST .format (region = os .environ .get ("AWS_REGION" )), timeout = EDGE_TIMEOUT
103
- )
104
- edge_connection .connect ()
105
- except Exception :
106
- pass
97
+ class InternalState :
98
+ timeout_on_connection : Optional [datetime .datetime ] = None
99
+ internal_error_already_logged = False
100
+
101
+ @staticmethod
102
+ def reset ():
103
+ InternalState .timeout_on_connection = None
104
+ InternalState .internal_error_already_logged = False
105
+
106
+ @staticmethod
107
+ def mark_timeout_to_edge ():
108
+ InternalState .timeout_on_connection = datetime .datetime .now ()
109
+
110
+ @staticmethod
111
+ def should_report_to_edge () -> bool :
112
+ if not InternalState .timeout_on_connection :
113
+ return True
114
+ time_diff = datetime .datetime .now () - InternalState .timeout_on_connection
115
+ return time_diff > COOLDOWN_AFTER_TIMEOUT_DURATION
107
116
108
117
109
118
class Configuration :
@@ -275,8 +284,10 @@ def _create_request_body(
275
284
return aws_dump (spans_to_send )[:max_size ]
276
285
277
286
278
- def establish_connection (host ):
287
+ def establish_connection (host = None ):
279
288
try :
289
+ if not host :
290
+ host = get_edge_host (os .environ .get ("AWS_REGION" ))
280
291
return http .client .HTTPSConnection (host , timeout = EDGE_TIMEOUT )
281
292
except Exception as e :
282
293
get_logger ().exception (f"Could not establish connection to { host } " , exc_info = e )
@@ -303,6 +314,9 @@ def report_json(region: Optional[str], msgs: List[dict], should_retry: bool = Tr
303
314
:return: The duration of reporting (in milliseconds),
304
315
or 0 if we didn't send (due to configuration or fail).
305
316
"""
317
+ if not InternalState .should_report_to_edge ():
318
+ get_logger ().info ("Skip sending messages due to previous timeout" )
319
+ return 0
306
320
if not Configuration .should_report :
307
321
return 0
308
322
get_logger ().info (f"reporting the messages: { msgs [:10 ]} " )
@@ -338,6 +352,10 @@ def report_json(region: Optional[str], msgs: List[dict], should_retry: bool = Tr
338
352
response .read () # We most read the response to keep the connection available
339
353
duration = int ((time .time () - start_time ) * 1000 )
340
354
get_logger ().info (f"successful reporting, code: { getattr (response , 'code' , 'unknown' )} " )
355
+ except socket .timeout :
356
+ get_logger ().exception (f"Timeout while connecting to { host } " )
357
+ InternalState .mark_timeout_to_edge ()
358
+ internal_analytics_message ("report: socket.timeout" )
341
359
except Exception as e :
342
360
if should_retry :
343
361
get_logger ().exception (f"Could not report to { host } . Retrying." , exc_info = e )
@@ -529,12 +547,11 @@ def warn_client(msg: str) -> None:
529
547
530
548
531
549
def internal_analytics_message (msg : str , force : bool = False ) -> None :
532
- global internal_error_already_logged
533
550
if os .environ .get ("LUMIGO_ANALYTICS" ) != "off" :
534
- if force or not internal_error_already_logged :
551
+ if force or not InternalState . internal_error_already_logged :
535
552
b64_message = base64 .b64encode (msg .encode ()).decode ()
536
553
print (f"{ INTERNAL_ANALYTICS_PREFIX } : { b64_message } " )
537
- internal_error_already_logged = True
554
+ InternalState . internal_error_already_logged = True
538
555
539
556
540
557
def is_api_gw_event (event : dict ) -> bool :
@@ -753,3 +770,17 @@ def is_provision_concurrency_initialization() -> bool:
753
770
def get_stacktrace (exception : Exception ) -> str :
754
771
original_traceback = traceback .format_tb (exception .__traceback__ )
755
772
return "" .join (filter (lambda line : STACKTRACE_LINE_TO_DROP not in line , original_traceback ))
773
+
774
+
775
+ try :
776
+ # Try to establish the connection in initialization
777
+ if (
778
+ os .environ .get ("LUMIGO_INITIALIZATION_CONNECTION" , "" ).lower () != "false"
779
+ and get_region () != CHINA_REGION # noqa
780
+ ):
781
+ edge_connection = establish_connection ()
782
+ edge_connection .connect ()
783
+ except socket .timeout :
784
+ InternalState .mark_timeout_to_edge ()
785
+ except Exception :
786
+ pass
0 commit comments