Skip to content

Commit fe11d16

Browse files
RD-3150 - faster http hooks (#120)
* faster http hooks
1 parent a73472c commit fe11d16

File tree

12 files changed

+121
-75
lines changed

12 files changed

+121
-75
lines changed

src/lumigo_tracer/parsers/http_data_classes.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
1-
from http import client
21
from copy import deepcopy
3-
from typing import Optional
42

53

64
class HttpRequest:
75
host: str
86
method: str
97
uri: str
10-
headers: Optional[client.HTTPMessage]
8+
headers: dict
119
body: bytes
1210

1311
def __init__(self, **kwargs):
1412
self.host = kwargs["host"]
1513
self.method = kwargs["method"]
1614
self.uri = kwargs["uri"]
17-
self.headers = kwargs.get("headers")
15+
self.headers = {k.lower(): v for k, v in (kwargs.get("headers") or {}).items()}
1816
self.body = kwargs.get("body")
1917

2018
def clone(self, **kwargs):

src/lumigo_tracer/parsers/parser.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import uuid
22
from typing import Type, Optional
33
import time
4-
import http.client
54

65
from lumigo_tracer.parsers.utils import (
76
safe_split_get,
@@ -35,9 +34,7 @@ class Parser:
3534
def parse_request(self, parse_params: HttpRequest) -> dict:
3635
if Configuration.verbose and parse_params and not should_scrub_domain(parse_params.host):
3736
additional_info = {
38-
"headers": prepare_large_data(
39-
dict(parse_params.headers.items() if parse_params.headers else {})
40-
),
37+
"headers": prepare_large_data(parse_params.headers),
4138
"body": prepare_large_data(parse_params.body),
4239
"method": parse_params.method,
4340
"uri": parse_params.uri,
@@ -60,12 +57,10 @@ def parse_request(self, parse_params: HttpRequest) -> dict:
6057
"started": int(time.time() * 1000),
6158
}
6259

63-
def parse_response(
64-
self, url: str, status_code: int, headers: Optional[http.client.HTTPMessage], body: bytes
65-
) -> dict:
60+
def parse_response(self, url: str, status_code: int, headers: dict, body: bytes) -> dict:
6661
if Configuration.verbose and not should_scrub_domain(url):
6762
additional_info = {
68-
"headers": prepare_large_data(dict(headers.items() if headers else {})),
63+
"headers": prepare_large_data(headers),
6964
"body": prepare_large_data(body),
7065
"statusCode": status_code,
7166
}
@@ -85,7 +80,7 @@ class ServerlessAWSParser(Parser):
8580

8681
def parse_response(self, url: str, status_code: int, headers, body: bytes) -> dict:
8782
additional_info = {}
88-
message_id = headers.get("x-amzn-RequestId")
83+
message_id = headers.get("x-amzn-requestid")
8984
if message_id and self.should_add_message_id:
9085
additional_info["info"] = {"messageId": message_id}
9186
span_id = headers.get("x-amzn-requestid") or headers.get("x-amz-requestid")
@@ -100,7 +95,7 @@ class DynamoParser(ServerlessAWSParser):
10095
should_add_message_id = False
10196

10297
def parse_request(self, parse_params: HttpRequest) -> dict:
103-
target: str = str(parse_params.headers.get("x-amz-target", "")) # type: ignore
98+
target: str = parse_params.headers.get("x-amz-target", "")
10499
return recursive_json_join(
105100
{
106101
"info": {
@@ -139,10 +134,8 @@ class LambdaParser(ServerlessAWSParser):
139134
def parse_request(self, parse_params: HttpRequest) -> dict:
140135
return recursive_json_join(
141136
{
142-
"name": safe_split_get(
143-
str(parse_params.headers.get("path", "")), "/", 3 # type: ignore
144-
),
145-
"invocationType": parse_params.headers.get("x-amz-invocation-type"), # type: ignore
137+
"name": safe_split_get(str(parse_params.headers.get("path", "")), "/", 3),
138+
"invocationType": parse_params.headers.get("x-amz-invocation-type"),
146139
},
147140
super().parse_request(parse_params),
148141
)
@@ -223,16 +216,16 @@ class ApiGatewayV2Parser(ServerlessAWSParser):
223216
# API-GW V1 covered by ServerlessAWSParser
224217

225218
def parse_response(self, url: str, status_code: int, headers, body: bytes) -> dict:
226-
aws_request_id = headers.get("x-amzn-RequestId")
227-
apigw_request_id = headers.get("Apigw-Requestid")
219+
aws_request_id = headers.get("x-amzn-requestid")
220+
apigw_request_id = headers.get("apigw-requestid")
228221
message_id = aws_request_id or apigw_request_id
229222
return recursive_json_join(
230223
{"info": {"messageId": message_id}},
231224
super().parse_response(url, status_code, headers, body),
232225
)
233226

234227

235-
def get_parser(url: str, headers: Optional[http.client.HTTPMessage] = None) -> Type[Parser]:
228+
def get_parser(url: str, headers: Optional[dict] = None) -> Type[Parser]:
236229
service = safe_split_get(url, ".", 0)
237230
if service == "dynamodb":
238231
return DynamoParser
@@ -249,6 +242,6 @@ def get_parser(url: str, headers: Optional[http.client.HTTPMessage] = None) -> T
249242
return SqsParser
250243
elif "execute-api" in url:
251244
return ApiGatewayV2Parser
252-
elif url.endswith("amazonaws.com") or (headers and headers.get("x-amzn-RequestId")):
245+
elif url.endswith("amazonaws.com") or (headers and headers.get("x-amzn-requestid")):
253246
return ServerlessAWSParser
254247
return Parser

src/lumigo_tracer/parsers/utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,10 @@ def recursive_json_join(d1: dict, d2: dict):
136136
* if key in d2 and is not dictionary, then the value is d2[key]
137137
* otherwise, join d1[key] and d2[key]
138138
"""
139+
if d1 is None or d2 is None:
140+
return d1 or d2
139141
d = {}
140-
for key in itertools.chain(d1.keys(), d2.keys()):
142+
for key in set(itertools.chain(d1.keys(), d2.keys())):
141143
value = d1.get(key, d2.get(key))
142144
if isinstance(value, dict):
143145
d[key] = recursive_json_join(d1.get(key, {}), d2.get(key, {}))
@@ -293,7 +295,7 @@ def _parse_streams(event: dict) -> Dict[str, str]:
293295
def should_scrub_domain(url: str) -> bool:
294296
if url and Configuration.domains_scrubber:
295297
for regex in Configuration.domains_scrubber:
296-
if re.match(regex, url, re.IGNORECASE):
298+
if regex.match(url):
297299
return True
298300
return False
299301

src/lumigo_tracer/spans_container.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import uuid
55
import signal
66
import traceback
7-
import http.client
87
from typing import List, Dict, Tuple, Optional, Callable, Set
98

109
from lumigo_tracer.parsers.event_parser import EventParser
@@ -17,6 +16,7 @@
1716
omit_keys,
1817
EXECUTION_TAGS_KEY,
1918
MAX_ENTRY_SIZE,
19+
get_timeout_buffer,
2020
)
2121
from lumigo_tracer import utils
2222
from lumigo_tracer.parsers.parser import get_parser, HTTP_TYPE, StepFunctionParser
@@ -72,8 +72,6 @@ def __init__(
7272
"region": region,
7373
"parentId": request_id,
7474
"info": {"tracer": {"version": version}, "traceId": {"Root": trace_root}},
75-
"event": event,
76-
"envs": envs,
7775
"token": Configuration.token,
7876
}
7977
self.function_span = recursive_json_join(
@@ -82,6 +80,8 @@ def __init__(
8280
"type": "function",
8381
"name": name,
8482
"runtime": runtime,
83+
"event": event,
84+
"envs": envs,
8585
"memoryAllocated": memory_allocated,
8686
"readiness": "cold" if SpansContainer.is_cold else "warm",
8787
"info": {
@@ -93,7 +93,7 @@ def __init__(
9393
},
9494
self.base_msg,
9595
)
96-
self.previous_request: Tuple[Optional[http.client.HTTPMessage], bytes] = (None, b"")
96+
self.previous_request: Tuple[Optional[dict], bytes] = (None, b"")
9797
self.previous_response_body: bytes = b""
9898
self.http_span_ids_to_send: Set[str] = set()
9999
self.http_spans: List[Dict] = []
@@ -124,12 +124,11 @@ def start_timeout_timer(self, context=None) -> None:
124124
get_logger().info("Skip setting timeout timer - Could not get the remaining time.")
125125
return
126126
remaining_time = context.get_remaining_time_in_millis() / 1000
127-
if Configuration.timeout_timer_buffer >= remaining_time:
127+
buffer = get_timeout_buffer(remaining_time)
128+
if buffer >= remaining_time or remaining_time < 2:
128129
get_logger().debug("Skip setting timeout timer - Too short timeout.")
129130
return
130-
TimeoutMechanism.start(
131-
remaining_time - Configuration.timeout_timer_buffer, self.handle_timeout
132-
)
131+
TimeoutMechanism.start(remaining_time - buffer, self.handle_timeout)
133132

134133
def add_request_event(self, parse_params: HttpRequest):
135134
"""
@@ -169,7 +168,7 @@ def update_event_end_time(self) -> None:
169168
self.http_spans[-1]["ended"] = int(time.time() * 1000)
170169

171170
def update_event_response(
172-
self, host: Optional[str], status_code: int, headers: http.client.HTTPMessage, body: bytes
171+
self, host: Optional[str], status_code: int, headers: dict, body: bytes
173172
) -> None:
174173
"""
175174
:param host: If None, use the host from the last span, otherwise this is the first chuck and we can empty
@@ -183,6 +182,7 @@ def update_event_response(
183182
else:
184183
self.previous_response_body = b""
185184

185+
headers = {k.lower(): v for k, v in headers.items()} if headers else {}
186186
parser = get_parser(host, headers)() # type: ignore
187187
if len(self.previous_response_body) < MAX_ENTRY_SIZE:
188188
self.previous_response_body += body

src/lumigo_tracer/sync_http/sync_hook.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
get_logger,
1818
lumigo_safe_execute,
1919
is_aws_environment,
20+
ensure_str,
2021
)
2122
from lumigo_tracer.spans_container import SpansContainer, TimeoutMechanism
2223
from lumigo_tracer.parsers.http_data_classes import HttpRequest
@@ -27,6 +28,7 @@
2728
CONTEXT_WRAPPED_BY_LUMIGO_KEY = "_wrapped_by_lumigo"
2829
MAX_READ_SIZE = 1024
2930
already_wrapped = False
31+
LUMIGO_HEADERS_HOOK_KEY = "_lumigo_headers_hook"
3032

3133

3234
def _request_wrapper(func, instance, args, kwargs):
@@ -51,7 +53,12 @@ def _request_wrapper(func, instance, args, kwargs):
5153
with lumigo_safe_execute("parse request"):
5254
if isinstance(data, bytes) and _BODY_HEADER_SPLITTER in data:
5355
headers, body = data.split(_BODY_HEADER_SPLITTER, 1)
54-
if _FLAGS_HEADER_SPLITTER in headers:
56+
hooked_headers = getattr(instance, LUMIGO_HEADERS_HOOK_KEY, None)
57+
if hooked_headers:
58+
# we will get here only if _headers_reminder_wrapper ran first. remove its traces.
59+
headers = {ensure_str(k): ensure_str(v) for k, v in hooked_headers.items()}
60+
setattr(instance, LUMIGO_HEADERS_HOOK_KEY, None)
61+
elif _FLAGS_HEADER_SPLITTER in headers:
5562
request_info, headers = headers.split(_FLAGS_HEADER_SPLITTER, 1)
5663
headers = http.client.parse_headers(BytesIO(headers))
5764
path_and_query_params = (
@@ -63,6 +70,8 @@ def _request_wrapper(func, instance, args, kwargs):
6370
)
6471
uri = f"{host}{path_and_query_params}"
6572
host = host or headers.get("Host")
73+
else:
74+
headers = None
6675

6776
with lumigo_safe_execute("add request event"):
6877
if headers:
@@ -80,14 +89,23 @@ def _request_wrapper(func, instance, args, kwargs):
8089
return ret_val
8190

8291

92+
def _headers_reminder_wrapper(func, instance, args, kwargs):
93+
"""
94+
This is the wrapper of the function `http.client.HTTPConnection.request` that gets the headers.
95+
Remember the headers helps us to improve performances on requests that use this flow.
96+
"""
97+
setattr(instance, LUMIGO_HEADERS_HOOK_KEY, kwargs.get("headers"))
98+
return func(*args, **kwargs)
99+
100+
83101
def _response_wrapper(func, instance, args, kwargs):
84102
"""
85103
This is the wrapper of the function that can be called only after that the http request was sent.
86104
Note that we don't examine the response data because it may change the original behaviour (ret_val.peek()).
87105
"""
88106
ret_val = func(*args, **kwargs)
89107
with lumigo_safe_execute("parse response"):
90-
headers = ret_val.headers
108+
headers = dict(ret_val.headers.items())
91109
status_code = ret_val.code
92110
SpansContainer.get_span().update_event_response(instance.host, status_code, headers, b"")
93111
return ret_val
@@ -101,7 +119,7 @@ def _read_wrapper(func, instance, args, kwargs):
101119
if ret_val:
102120
with lumigo_safe_execute("parse response.read"):
103121
SpansContainer.get_span().update_event_response(
104-
None, instance.code, instance.headers, ret_val
122+
None, instance.code, dict(instance.headers.items()), ret_val
105123
)
106124
return ret_val
107125

@@ -115,7 +133,7 @@ def _read_stream_wrapper_generator(stream_generator, instance):
115133
for partial_response in stream_generator:
116134
with lumigo_safe_execute("parse response.read_chunked"):
117135
SpansContainer.get_span().update_event_response(
118-
None, instance.status, instance.headers, partial_response
136+
None, instance.status, dict(instance.headers.items()), partial_response
119137
)
120138
yield partial_response
121139

@@ -281,6 +299,9 @@ def wrap_http_calls():
281299
with lumigo_safe_execute("wrap http calls"):
282300
get_logger().debug("wrapping the http request")
283301
wrap_function_wrapper("http.client", "HTTPConnection.send", _request_wrapper)
302+
wrap_function_wrapper(
303+
"http.client", "HTTPConnection.request", _headers_reminder_wrapper
304+
)
284305
wrap_function_wrapper("botocore.awsrequest", "AWSRequest.__init__", _putheader_wrapper)
285306
wrap_function_wrapper("http.client", "HTTPConnection.getresponse", _response_wrapper)
286307
wrap_function_wrapper("http.client", "HTTPResponse.read", _read_wrapper)

0 commit comments

Comments
 (0)