Skip to content

Commit 7379e1e

Browse files
RD-4775 - support threading in tracer (#212)
* support threading in tracer * add threading tests * avoid re-escaping * fix Nirhod's CR * fix Dori CR
1 parent 9d86d69 commit 7379e1e

15 files changed

+385
-203
lines changed

src/lumigo_tracer/lumigo_utils.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
SKIP_SCRUBBING_KEYS = [EXECUTION_TAGS_KEY]
6868
LUMIGO_SECRET_MASKING_REGEX_BACKWARD_COMP = "LUMIGO_BLACKLIST_REGEX"
6969
LUMIGO_SECRET_MASKING_REGEX = "LUMIGO_SECRET_MASKING_REGEX"
70+
LUMIGO_SYNC_TRACING = "LUMIGO_SYNC_TRACING"
7071
WARN_CLIENT_PREFIX = "Lumigo Warning"
7172
INTERNAL_ANALYTICS_PREFIX = "Lumigo Analytic Log"
7273
TRUNCATE_SUFFIX = "...[too long]"
@@ -134,6 +135,7 @@ class Configuration:
134135
edge_kinesis_aws_access_key_id: Optional[str] = None
135136
edge_kinesis_aws_secret_access_key: Optional[str] = None
136137
should_scrub_known_services: bool = False
138+
is_sync_tracer: bool = False
137139

138140
@staticmethod
139141
def get_max_entry_size(has_error: bool = False) -> int:
@@ -239,6 +241,7 @@ def config(
239241
Configuration.should_scrub_known_services = (
240242
os.environ.get("LUMIGO_SCRUB_KNOWN_SERVICES") == "true"
241243
)
244+
Configuration.is_sync_tracer = os.environ.get(LUMIGO_SYNC_TRACING, "FALSE").lower() == "true"
242245

243246

244247
def _is_span_has_error(span: dict) -> bool:
@@ -747,6 +750,8 @@ def lumigo_dumps(
747750
return "[" + ", ".join(organs) + "]"
748751

749752
try:
753+
if isinstance(d, str) and d.endswith(TRUNCATE_SUFFIX):
754+
return d
750755
retval = aws_dump(d, decimal_safe=decimal_safe)
751756
except TypeError:
752757
if enforce_jsonify:
@@ -757,6 +762,22 @@ def lumigo_dumps(
757762
)
758763

759764

765+
def concat_old_body_to_new(old_body: Optional[str], new_body: bytes) -> str:
766+
"""
767+
We have only a dumped body from the previous request,
768+
so to concatenate the new body we should undo the lumigo_dumps.
769+
Note that the old body is dumped bytes
770+
"""
771+
if not new_body:
772+
return old_body or ""
773+
if not old_body:
774+
return lumigo_dumps(new_body)
775+
if old_body.endswith(TRUNCATE_SUFFIX):
776+
return old_body
777+
undumped_body = (old_body or "").encode().strip(b'"')
778+
return lumigo_dumps(undumped_body + new_body)
779+
780+
760781
def is_kill_switch_on():
761782
return str(os.environ.get(KILL_SWITCH, "")).lower() == "true"
762783

src/lumigo_tracer/spans_container.py

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def __init__(
105105
self.base_msg,
106106
)
107107
self.span_ids_to_send: Set[str] = set()
108-
self.spans: List[Dict] = []
108+
self.spans: Dict[str, Dict] = {}
109109
if is_new_invocation:
110110
SpansContainer.is_cold = False
111111

@@ -127,7 +127,7 @@ def start(self, event=None, context=None):
127127

128128
def handle_timeout(self, *args):
129129
get_logger().info("The tracer reached the end of the timeout timer")
130-
to_send = [s for s in self.spans if s["id"] in self.span_ids_to_send]
130+
to_send = [self.spans[span_id] for span_id in self.span_ids_to_send]
131131
self.span_ids_to_send.clear()
132132
if Configuration.send_only_if_error:
133133
to_send.append(self._generate_start_span())
@@ -150,44 +150,48 @@ def add_span(self, span: dict) -> dict:
150150
This function parses an request event and add it to the span.
151151
"""
152152
new_span = recursive_json_join(span, self.base_msg)
153-
self.spans.append(new_span)
154-
self.span_ids_to_send.add(span["id"])
153+
span_id = new_span["id"]
154+
self.spans[span_id] = new_span
155+
self.span_ids_to_send.add(span_id)
155156
return new_span
156157

157-
def get_last_span(self) -> Optional[dict]:
158-
if not self.spans:
158+
def get_span_by_id(self, span_id: Optional[str]) -> Optional[dict]:
159+
if not span_id:
159160
return None
160-
return self.spans[-1]
161+
return self.spans.get(span_id)
161162

162-
def get_span_by_id(self, span_id: str) -> Optional[dict]:
163-
for span in self.spans:
164-
if span.get("id") == span_id:
165-
return span
166-
return None
167-
168-
def pop_last_span(self) -> Optional[dict]:
169-
return self.spans.pop() if self.spans else None
163+
def pop_span(self, span_id: Optional[str]) -> Optional[dict]:
164+
if not span_id:
165+
return None
166+
self.span_ids_to_send.discard(span_id)
167+
return self.spans.pop(span_id, None)
170168

171-
def update_event_end_time(self) -> None:
169+
def update_event_end_time(self, span_id: str) -> None:
172170
"""
173171
This function assumes synchronous execution - we update the last http event.
174172
"""
175-
if self.spans:
176-
span = self.spans[-1]
177-
span["ended"] = get_current_ms_time()
178-
self.span_ids_to_send.add(span["id"])
173+
if span_id in self.spans:
174+
self.spans[span_id]["ended"] = get_current_ms_time()
175+
self.span_ids_to_send.add(span_id)
176+
else:
177+
get_logger().warning(f"update_event_end_time: Got unknown span id: {span_id}")
179178

180179
def update_event_times(
181-
self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
180+
self,
181+
span_id: str,
182+
start_time: Optional[datetime] = None,
183+
end_time: Optional[datetime] = None,
182184
) -> None:
183185
"""
184186
This function assumes synchronous execution - we update the last http event.
185187
"""
186-
if self.spans:
188+
if span_id in self.spans:
187189
start_timestamp = start_time.timestamp() if start_time else time.time()
188190
end_timestamp = end_time.timestamp() if end_time else time.time()
189-
self.spans[-1]["started"] = int(start_timestamp * 1000)
190-
self.spans[-1]["ended"] = int(end_timestamp * 1000)
191+
self.spans[span_id]["started"] = int(start_timestamp * 1000)
192+
self.spans[span_id]["ended"] = int(end_timestamp * 1000)
193+
else:
194+
get_logger().warning(f"update_event_times: Got unknown span id: {span_id}")
191195

192196
@staticmethod
193197
def _create_exception_event(
@@ -223,8 +227,9 @@ def add_exception_event(
223227
def add_step_end_event(self, ret_val):
224228
message_id = str(uuid.uuid4())
225229
step_function_span = create_step_function_span(message_id)
226-
self.spans.append(recursive_json_join(step_function_span, self.base_msg))
227-
self.span_ids_to_send.add(step_function_span["id"])
230+
span_id = step_function_span["id"]
231+
self.spans[span_id] = recursive_json_join(step_function_span, self.base_msg)
232+
self.span_ids_to_send.add(span_id)
228233
if isinstance(ret_val, dict):
229234
ret_val[LUMIGO_EVENT_KEY] = {STEP_FUNCTION_UID_KEY: message_id}
230235
get_logger().debug(f"Added key {LUMIGO_EVENT_KEY} to the user's return value")
@@ -259,12 +264,12 @@ def end(self, ret_val=None, event: Optional[dict] = None, context=None) -> Optio
259264
if _is_span_has_error(self.function_span):
260265
self._set_error_extra_data(event)
261266
spans_contain_errors: bool = any(
262-
_is_span_has_error(s) for s in self.spans + [self.function_span]
263-
)
267+
_is_span_has_error(s) for s in self.spans.values()
268+
) or _is_span_has_error(self.function_span)
264269

265270
if (not Configuration.send_only_if_error) or spans_contain_errors:
266271
to_send = [self.function_span] + [
267-
s for s in self.spans if s["id"] in self.span_ids_to_send
272+
span for span_id, span in self.spans.items() if span_id in self.span_ids_to_send
268273
]
269274
reported_rtt = lumigo_utils.report_json(region=self.region, msgs=to_send)
270275
else:

src/lumigo_tracer/wrappers/http/http_data_classes.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from copy import deepcopy
2-
from typing import Optional, List
2+
from typing import Optional, List, Dict
33

44

55
class HttpRequest:
@@ -27,10 +27,13 @@ def clone(self, **kwargs):
2727

2828
class HttpState:
2929
previous_request: Optional[HttpRequest] = None
30-
previous_response_body: bytes = b""
30+
previous_span_id: Optional[str] = None
3131
omit_skip_path: Optional[List[str]] = None
32+
request_id_to_span_id: Dict[int, str] = {}
33+
response_id_to_span_id: Dict[int, str] = {}
3234

3335
@staticmethod
3436
def clear():
3537
HttpState.previous_request = None
36-
HttpState.previous_response_body = b""
38+
HttpState.request_id_to_span_id.clear()
39+
HttpState.response_id_to_span_id.clear()

0 commit comments

Comments
 (0)