Skip to content

Commit e489d38

Browse files
RD 3761 - non-http libraries preparations (#150)
* split event and parsers * split sync_hook to tracer and http hooks * refactor http hooks to be a wrapper * refactor wrap once * increase coverage
1 parent f74fbd7 commit e489d38

37 files changed

+1730
-1676
lines changed

src/lumigo_tracer/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from .sync_http.sync_hook import lumigo_tracer, LumigoChalice # noqa
1+
from .tracer import lumigo_tracer, LumigoChalice # noqa
22
from .user_utils import report_error, add_execution_tag # noqa
3-
from .sync_http.handler import _handler # noqa
3+
from .auto_instrument_handler import _handler # noqa
File renamed without changes.

src/lumigo_tracer/auto_tag/auto_tag_event.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from typing import Dict, List, Optional
44

55
from lumigo_tracer.user_utils import add_execution_tag
6-
from lumigo_tracer.parsers.utils import str_to_list
7-
from lumigo_tracer.utils import get_logger, is_api_gw_event
6+
from lumigo_tracer.parsing_utils import str_to_list
7+
from lumigo_tracer.lumigo_utils import get_logger, is_api_gw_event
88

99
AUTO_TAG_API_GW_HEADERS: Optional[List[str]] = (
1010
str_to_list(os.environ.get("LUMIGO_AUTO_TAG_API_GW_HEADERS", "")) or []
File renamed without changes.

src/lumigo_tracer/parsers/event_parser.py renamed to src/lumigo_tracer/event/event_dumper.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from collections import OrderedDict
55
from typing import Dict, List
66

7-
from lumigo_tracer.parsers.utils import str_to_list, safe_get
8-
from lumigo_tracer.utils import get_logger, is_api_gw_event
7+
from lumigo_tracer.parsing_utils import str_to_list, safe_get
8+
from lumigo_tracer.lumigo_utils import get_logger, is_api_gw_event, lumigo_dumps
99

1010
API_GW_KEYS_ORDER = str_to_list(os.environ.get("LUMIGO_API_GW_KEYS_ORDER", "")) or [
1111
"version",
@@ -77,7 +77,7 @@ def is_supported(event) -> bool:
7777

7878
@staticmethod
7979
@abstractmethod
80-
def parse(event) -> Dict:
80+
def parse(event) -> OrderedDict:
8181
raise NotImplementedError()
8282

8383

@@ -87,7 +87,7 @@ def is_supported(event) -> bool:
8787
return safe_get(event, ["Records", 0, "eventSource"]) == "aws:s3"
8888

8989
@staticmethod
90-
def parse(event) -> Dict:
90+
def parse(event) -> OrderedDict:
9191
new_event: OrderedDict = OrderedDict()
9292
new_event["Records"] = []
9393

@@ -116,7 +116,7 @@ def is_supported(event) -> bool:
116116
return bool(safe_get(event, ["Records", 0, "cf", "config", "distributionId"], {}))
117117

118118
@staticmethod
119-
def parse(event) -> Dict:
119+
def parse(event) -> OrderedDict:
120120
new_event: OrderedDict = OrderedDict()
121121
new_event["Records"] = []
122122

@@ -144,7 +144,7 @@ def is_supported(event) -> bool:
144144
return is_api_gw_event(event=event)
145145

146146
@staticmethod
147-
def parse(event) -> Dict:
147+
def parse(event) -> OrderedDict:
148148
new_event: OrderedDict = OrderedDict()
149149
# Add order keys
150150
for order_key in API_GW_KEYS_ORDER:
@@ -173,7 +173,7 @@ def is_supported(event) -> bool:
173173
return safe_get(event, ["Records", 0, "EventSource"]) == "aws:sns"
174174

175175
@staticmethod
176-
def parse(event) -> Dict:
176+
def parse(event) -> OrderedDict:
177177
new_sns_event: OrderedDict = OrderedDict()
178178
new_sns_event["Records"] = []
179179
# Add order keys
@@ -192,7 +192,7 @@ def is_supported(event) -> bool:
192192
return safe_get(event, ["Records", 0, "eventSource"]) == "aws:sqs"
193193

194194
@staticmethod
195-
def parse(event) -> Dict:
195+
def parse(event) -> OrderedDict:
196196
new_sqs_event: OrderedDict = OrderedDict()
197197
new_sqs_event["Records"] = []
198198
# Add order keys
@@ -205,9 +205,9 @@ def parse(event) -> Dict:
205205
return new_sqs_event
206206

207207

208-
class EventParser:
208+
class EventDumper:
209209
@staticmethod
210-
def parse_event(event: Dict, handlers: List[EventParseHandler] = None):
210+
def dump_event(event: Dict, handlers: List[EventParseHandler] = None) -> str:
211211
handlers = handlers or [
212212
ApiGWHandler(),
213213
SNSHandler(),
@@ -218,10 +218,10 @@ def parse_event(event: Dict, handlers: List[EventParseHandler] = None):
218218
for handler in handlers:
219219
try:
220220
if handler.is_supported(event):
221-
return handler.parse(event)
221+
return lumigo_dumps(handler.parse(event))
222222
except Exception as e:
223223
get_logger().debug(
224224
f"Error while trying to parse with handler {handler.__class__.__name__} event {event}",
225225
exc_info=e,
226226
)
227-
return event
227+
return lumigo_dumps(event)
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
from typing import Union, List, Dict
2+
3+
from lumigo_tracer.parsing_utils import recursive_get_key, safe_get
4+
from lumigo_tracer.lumigo_utils import (
5+
lumigo_safe_execute,
6+
Configuration,
7+
STEP_FUNCTION_UID_KEY,
8+
LUMIGO_EVENT_KEY,
9+
md5hash,
10+
)
11+
12+
TRIGGER_CREATION_TIME_KEY = "approxEventCreationTime"
13+
MESSAGE_ID_KEY = "messageId"
14+
MESSAGE_IDS_KEY = "messageIds"
15+
16+
17+
def parse_triggered_by(event: dict):
18+
"""
19+
This function parses the event and build the dictionary that describes the given event.
20+
21+
The current possible values are:
22+
* {triggeredBy: unknown}
23+
* {triggeredBy: apigw, api: <host>, resource: <>, httpMethod: <>, stage: <>, identity: <>, referer: <>}
24+
"""
25+
with lumigo_safe_execute("triggered by"):
26+
if not isinstance(event, dict):
27+
if _is_step_function(event):
28+
return _parse_step_function(event)
29+
return None
30+
if _is_supported_http_method(event):
31+
return _parse_http_method(event)
32+
elif _is_supported_sns(event):
33+
return _parse_sns(event)
34+
elif _is_supported_streams(event):
35+
return _parse_streams(event)
36+
elif _is_supported_cw(event):
37+
return _parse_cw(event)
38+
elif _is_step_function(event):
39+
return _parse_step_function(event)
40+
41+
return _parse_unknown(event)
42+
43+
44+
def _parse_unknown(event: dict):
45+
result = {"triggeredBy": "unknown"}
46+
return result
47+
48+
49+
def _is_step_function(event: Union[List, Dict]):
50+
return (
51+
Configuration.is_step_function
52+
and isinstance(event, (list, dict)) # noqa
53+
and STEP_FUNCTION_UID_KEY in recursive_get_key(event, LUMIGO_EVENT_KEY, default={}) # noqa
54+
)
55+
56+
57+
def _parse_step_function(event: dict):
58+
result = {
59+
"triggeredBy": "stepFunction",
60+
"messageId": recursive_get_key(event, LUMIGO_EVENT_KEY)[STEP_FUNCTION_UID_KEY],
61+
}
62+
return result
63+
64+
65+
def _is_supported_http_method(event: dict):
66+
return (
67+
"httpMethod" in event # noqa
68+
and "headers" in event # noqa
69+
and "requestContext" in event # noqa
70+
and event.get("requestContext", {}).get("elb") is None # noqa
71+
) or ( # noqa
72+
event.get("version", "") == "2.0" and "headers" in event # noqa
73+
) # noqa # noqa
74+
75+
76+
def _parse_http_method(event: dict):
77+
version = event.get("version")
78+
if version and version.startswith("2.0"):
79+
return _parse_http_method_v2(event)
80+
return _parse_http_method_v1(event)
81+
82+
83+
def _parse_http_method_v1(event: dict):
84+
result = {
85+
"triggeredBy": "apigw",
86+
"httpMethod": event.get("httpMethod", ""),
87+
"resource": event.get("resource", ""),
88+
"messageId": event.get("requestContext", {}).get("requestId", ""),
89+
}
90+
if isinstance(event.get("headers"), dict):
91+
result["api"] = event["headers"].get("Host", "unknown.unknown.unknown")
92+
if isinstance(event.get("requestContext"), dict):
93+
result["stage"] = event["requestContext"].get("stage", "unknown")
94+
return result
95+
96+
97+
def _parse_http_method_v2(event: dict):
98+
result = {
99+
"triggeredBy": "apigw",
100+
"httpMethod": event.get("requestContext", {}).get("http", {}).get("method"),
101+
"resource": event.get("requestContext", {}).get("http", {}).get("path"),
102+
"messageId": event.get("requestContext", {}).get("requestId", ""),
103+
"api": event.get("requestContext", {}).get("domainName", ""),
104+
"stage": event.get("requestContext", {}).get("stage", "unknown"),
105+
}
106+
return result
107+
108+
109+
def _is_supported_sns(event: dict):
110+
return event.get("Records", [{}])[0].get("EventSource") == "aws:sns"
111+
112+
113+
def _parse_sns(event: dict):
114+
return {
115+
"triggeredBy": "sns",
116+
"arn": event["Records"][0]["Sns"]["TopicArn"],
117+
"messageId": event["Records"][0]["Sns"].get("MessageId"),
118+
}
119+
120+
121+
def _is_supported_cw(event: dict):
122+
return event.get("detail-type") == "Scheduled Event" and "source" in event and "time" in event
123+
124+
125+
def _parse_cw(event: dict):
126+
resource = event.get("resources", ["/unknown"])[0].split("/")[1]
127+
return {
128+
"triggeredBy": "cloudwatch",
129+
"resource": resource,
130+
"region": event.get("region"),
131+
"detailType": event.get("detail-type"),
132+
}
133+
134+
135+
def _is_supported_streams(event: dict):
136+
return event.get("Records", [{}])[0].get("eventSource") in [
137+
"aws:kinesis",
138+
"aws:dynamodb",
139+
"aws:sqs",
140+
"aws:s3",
141+
]
142+
143+
144+
def _parse_streams(event: dict) -> Dict[str, str]:
145+
"""
146+
:return: {"triggeredBy": str, "arn": str}
147+
If has messageId, return also: {"messageId": str}
148+
"""
149+
triggered_by = event["Records"][0]["eventSource"].split(":")[1]
150+
result = {"triggeredBy": triggered_by}
151+
if triggered_by == "s3":
152+
result["arn"] = event["Records"][0]["s3"]["bucket"]["arn"]
153+
result["messageId"] = (
154+
event["Records"][0].get("responseElements", {}).get("x-amz-request-id")
155+
)
156+
else:
157+
result["arn"] = event["Records"][0]["eventSourceARN"]
158+
if triggered_by == "sqs":
159+
result.update(_parse_sqs_event(event))
160+
elif triggered_by == "kinesis":
161+
result["messageId"] = safe_get(event, ["Records", 0, "kinesis", "sequenceNumber"])
162+
elif triggered_by == "dynamodb":
163+
result.update(_parse_dynamomdb_event(event))
164+
return result
165+
166+
167+
def _get_ddb_approx_creation_time_ms(event) -> int:
168+
return event["Records"][0].get("dynamodb", {}).get("ApproximateCreationDateTime", 0) * 1000
169+
170+
171+
def _parse_dynamomdb_event(event) -> Dict[str, Union[int, List[str]]]:
172+
creation_time = _get_ddb_approx_creation_time_ms(event)
173+
mids = []
174+
for record in event["Records"]:
175+
event_name = record.get("eventName")
176+
if event_name in ("MODIFY", "REMOVE") and record.get("dynamodb", {}).get("Keys"):
177+
mids.append(md5hash(record["dynamodb"]["Keys"]))
178+
elif event_name == "INSERT" and record.get("dynamodb", {}).get("NewImage"):
179+
mids.append(md5hash(record["dynamodb"]["NewImage"]))
180+
return {MESSAGE_IDS_KEY: mids, TRIGGER_CREATION_TIME_KEY: creation_time}
181+
182+
183+
def _parse_sqs_event(event) -> Dict[str, Union[int, List[str]]]:
184+
mids = [record["messageId"] for record in event["Records"] if record.get("messageId")]
185+
return {MESSAGE_IDS_KEY: mids} if len(mids) > 1 else {MESSAGE_ID_KEY: mids[0]}

src/lumigo_tracer/extension/extension.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
from datetime import datetime
66

77
from lumigo_tracer.extension.extension_utils import get_current_bandwidth, get_extension_logger
8-
from lumigo_tracer import utils
8+
from lumigo_tracer import lumigo_utils
99
from lumigo_tracer.extension.lambda_service import LambdaService
1010
from lumigo_tracer.extension.sampler import Sampler
11-
from lumigo_tracer.utils import lumigo_safe_execute
11+
from lumigo_tracer.lumigo_utils import lumigo_safe_execute
1212

1313
SPAN_TYPE = "extensionExecutionEnd"
1414

@@ -49,7 +49,7 @@ def shutdown(self):
4949

5050
def _finish_previous_invocation(self, current_bandwidth: Optional[int]):
5151
self.sampler.stop_sampling()
52-
token = os.environ.get(utils.LUMIGO_TOKEN_KEY)
52+
token = os.environ.get(lumigo_utils.LUMIGO_TOKEN_KEY)
5353
if not token:
5454
get_extension_logger().warning(
5555
f"Skip sending data: No token was found. Request id: {self.request_id}"
@@ -71,4 +71,4 @@ def _finish_previous_invocation(self, current_bandwidth: Optional[int]):
7171
networkBytesUsed=current_bandwidth - self.bandwidth,
7272
cpuUsageTime=[s.dump() for s in self.sampler.get_samples()],
7373
)
74-
utils.report_json(os.environ.get("AWS_REGION", "us-east-1"), msgs=[asdict(span)])
74+
lumigo_utils.report_json(os.environ.get("AWS_REGION", "us-east-1"), msgs=[asdict(span)])

src/lumigo_tracer/extension/extension_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import urllib.request
44
from typing import Optional, Dict
55

6-
from lumigo_tracer.utils import get_logger, lumigo_safe_execute
6+
from lumigo_tracer.lumigo_utils import get_logger, lumigo_safe_execute
77

88

99
def get_current_cpu_time() -> Optional[int]:

src/lumigo_tracer/extension/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from lumigo_tracer.extension.extension_utils import get_extension_logger
66
from lumigo_tracer.extension.extension import LumigoExtension
77
from lumigo_tracer.extension.lambda_service import LambdaService
8-
from lumigo_tracer.utils import is_kill_switch_on, lumigo_safe_execute, config
8+
from lumigo_tracer.lumigo_utils import is_kill_switch_on, lumigo_safe_execute, config
99

1010
STOP_EXTENSION_KEY = "LUMIGO_EXTENSION_STOP"
1111
LUMIGO_EXTENSION_NAME = "lumigo"

src/lumigo_tracer/utils.py renamed to src/lumigo_tracer/lumigo_utils.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import re
7+
import uuid
78
from functools import reduce, lru_cache
89
import time
910
import http.client
@@ -371,6 +372,19 @@ def is_api_gw_event(event: dict) -> bool:
371372
)
372373

373374

375+
def create_step_function_span(message_id: str):
376+
return {
377+
"id": str(uuid.uuid4()),
378+
"type": "http",
379+
"info": {
380+
"resourceName": "StepFunction",
381+
"messageId": message_id,
382+
"httpInfo": {"host": "StepFunction", "request": {"method": "", "body": ""}},
383+
},
384+
"started": int(time.time() * 1000),
385+
}
386+
387+
374388
def get_timeout_buffer(remaining_time: float):
375389
buffer = Configuration.timeout_timer_buffer
376390
if not buffer:
@@ -471,7 +485,7 @@ def lumigo_dumps(
471485
regexes: Optional[Pattern[str]] = None,
472486
enforce_jsonify: bool = False,
473487
decimal_safe=False,
474-
):
488+
) -> str:
475489
regexes = regexes or get_omitting_regex()
476490
max_size = max_size if max_size is not None else Configuration.max_entry_size
477491
is_truncated = False

0 commit comments

Comments
 (0)