Skip to content

Commit eaf4f91

Browse files
Lumigo extension (#151)
1 parent ea79a42 commit eaf4f91

21 files changed

+631
-20
lines changed

scripts/prepare_extension_layer.sh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
echo "Extension layer"
2+
python -W ignore setup.py --quiet bdist_wheel # create .egg-info
3+
pushd src > /dev/null || exit
4+
echo "-create directories to zip"
5+
echo "--extension"
6+
mkdir extensions
7+
cp lumigo_tracer/extension/bootstrap/lumigo extensions/
8+
echo "--tracer"
9+
mkdir extension-python-modules
10+
cp -R lumigo_tracer.egg-info extension-python-modules/
11+
cp -R lumigo_tracer extension-python-modules/
12+
echo "--python runtime"
13+
aws s3 cp --quiet s3://lumigo-runtimes/python/lean-python-runtime-37.zip runtime.zip
14+
unzip -q runtime.zip
15+
echo "--special temp file"
16+
touch preview-extensions-ggqizro707
17+
echo "-zipping"
18+
zip -qr "extensions.zip" "extensions" "extension-python-modules" "python-runtime" "preview-extensions-ggqizro707" # take all the directory
19+
echo "-publish"
20+
../utils/common_bash/create_layer.sh --layer-name "extensions-layer" --region ALL --package-folder "extensions.zip" --version $(git describe --abbrev=0 --tags) --runtimes ""
21+
rm -rf extensions extension-python-modules extensions.zip runtime.zip python-runtime __MACOSX
22+
popd > /dev/null || exit
23+
24+
echo "\nDone.\n"
25+

src/lumigo_tracer/extension/__init__.py

Whitespace-only changes.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/bin/sh
2+
3+
set -eo pipefail
4+
5+
# python runtime initialization
6+
if [[ $LD_LIBRARY_PATH != *python* ]]; then
7+
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/python-runtime/var/lang/lib
8+
export PYTHONPATH=$PYTHONPATH:/opt/extension-python-modules:/opt/python-runtime/var/runtime
9+
fi
10+
11+
/opt/python-runtime/var/lang/bin/python3.7 /opt/extension-python-modules/lumigo_tracer/extension/main.py &
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import os
2+
from dataclasses import dataclass, asdict
3+
4+
from typing import Dict, Optional, List, Union
5+
from datetime import datetime
6+
7+
from lumigo_tracer.extension.extension_utils import get_current_bandwidth, get_extension_logger
8+
from lumigo_tracer import utils
9+
from lumigo_tracer.extension.lambda_service import LambdaService
10+
from lumigo_tracer.extension.sampler import Sampler
11+
from lumigo_tracer.utils import lumigo_safe_execute
12+
13+
SPAN_TYPE = "extensionExecutionEnd"
14+
15+
16+
@dataclass
17+
class ExtensionEvent:
18+
token: str
19+
started: int
20+
requestId: str
21+
networkBytesUsed: int
22+
cpuUsageTime: List[Dict[str, Union[float, int]]]
23+
type: str = SPAN_TYPE
24+
25+
26+
class LumigoExtension:
27+
def __init__(self, lambda_service: LambdaService):
28+
self.lambda_service: LambdaService = lambda_service
29+
self.sampler: Sampler = Sampler()
30+
self.start_time: Optional[datetime] = None
31+
self.request_id: Optional[str] = None
32+
self.bandwidth: Optional[int] = None
33+
34+
def start_new_invocation(self, event: Dict[str, str]):
35+
with lumigo_safe_execute("Extension: start_new_invocation"):
36+
current_bandwidth = get_current_bandwidth()
37+
if self.request_id:
38+
self._finish_previous_invocation(current_bandwidth)
39+
self.sampler.start_sampling()
40+
self.lambda_service.ready_for_next_event()
41+
self.request_id = event.get("requestId")
42+
self.start_time = datetime.now()
43+
self.bandwidth = current_bandwidth
44+
45+
def shutdown(self):
46+
with lumigo_safe_execute("Extension: shutdown"):
47+
current_bandwidth = get_current_bandwidth()
48+
self._finish_previous_invocation(current_bandwidth)
49+
50+
def _finish_previous_invocation(self, current_bandwidth: Optional[int]):
51+
self.sampler.stop_sampling()
52+
token = os.environ.get(utils.LUMIGO_TOKEN_KEY)
53+
if not token:
54+
get_extension_logger().warning(
55+
f"Skip sending data: No token was found. Request id: {self.request_id}"
56+
)
57+
return
58+
if not (
59+
self.request_id
60+
and self.start_time # noqa: W503
61+
and self.sampler.get_samples() # noqa: W503
62+
and self.bandwidth # noqa: W503
63+
and current_bandwidth # noqa: W503
64+
):
65+
get_extension_logger().warning("Skip sending data: unable retrieving all data")
66+
return
67+
span = ExtensionEvent(
68+
token=token,
69+
started=int(self.start_time.timestamp() * 1000),
70+
requestId=self.request_id,
71+
networkBytesUsed=current_bandwidth - self.bandwidth,
72+
cpuUsageTime=[s.dump() for s in self.sampler.get_samples()],
73+
)
74+
utils.report_json(os.environ.get("AWS_REGION", "us-east-1"), msgs=[asdict(span)])
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import json
2+
import os
3+
import urllib.request
4+
from typing import Optional, Dict
5+
6+
from lumigo_tracer.utils import get_logger, lumigo_safe_execute
7+
8+
9+
def get_current_cpu_time() -> Optional[int]:
10+
"""
11+
:return: the total number of milliseconds that being used by the CPU.
12+
"""
13+
with lumigo_safe_execute("Extension: get cpu time"):
14+
total = 0
15+
with open("/proc/stat", "r") as stats:
16+
for line in stats.readlines():
17+
if line.startswith("cpu "):
18+
parts = line.split()
19+
total += (int(parts[1]) + int(parts[3])) * 10
20+
return total
21+
22+
23+
def get_current_bandwidth() -> Optional[int]:
24+
with lumigo_safe_execute("Extension: get bandwidth"):
25+
with open("/proc/net/netstat", "r") as stats:
26+
last = stats.read().splitlines()[-1]
27+
parts = last.split()
28+
return int(parts[7]) + int(parts[8])
29+
30+
31+
def request_event(extension_id: str) -> Dict[str, str]:
32+
url = f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}/2020-01-01/extension/event/next"
33+
headers = {"Lambda-Extension-Identifier": extension_id}
34+
return json.loads(urllib.request.urlopen(urllib.request.Request(url, headers=headers)).read())
35+
36+
37+
def get_extension_logger():
38+
return get_logger("lumigo-extension")
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
from typing import Dict, Generator, Optional
3+
4+
from lumigo_tracer.extension.extension_utils import get_extension_logger, request_event
5+
6+
7+
_lambda_service: Optional["LambdaService"] = None
8+
9+
10+
class LambdaService:
11+
def __init__(self, extension_id):
12+
self.extension_id: str = extension_id
13+
self.thread: ThreadPoolExecutor = ThreadPoolExecutor()
14+
self.next_event_future = None
15+
16+
def ready_for_next_event(self):
17+
if not self.next_event_future:
18+
self.next_event_future = self.thread.submit(request_event, self.extension_id)
19+
20+
def block_until_next_event(self) -> Dict[str, str]:
21+
self.ready_for_next_event()
22+
result = self.next_event_future.result()
23+
self.next_event_future = None
24+
return result
25+
26+
def events_generator(self) -> Generator[Dict[str, str], None, None]:
27+
try:
28+
while True:
29+
yield self.block_until_next_event()
30+
except Exception:
31+
get_extension_logger().exception("Extension:: failed retrieving events")
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import json
2+
import os
3+
import http.client
4+
5+
from lumigo_tracer.extension.extension_utils import get_extension_logger
6+
from lumigo_tracer.extension.extension import LumigoExtension
7+
from lumigo_tracer.extension.lambda_service import LambdaService
8+
from lumigo_tracer.utils import is_kill_switch_on, lumigo_safe_execute, config
9+
10+
STOP_EXTENSION_KEY = "LUMIGO_EXTENSION_STOP"
11+
LUMIGO_EXTENSION_NAME = "lumigo"
12+
13+
REQUESTED_ENVS = [
14+
"LUMIGO_DEBUG",
15+
"LUMIGO_TRACER_TOKEN",
16+
"LUMIGO_TRACER_HOST",
17+
"LUMIGO_EXTENSION_ON",
18+
"LUMIGO_SWITCH_OFF",
19+
]
20+
EVENTS = ["INVOKE", "SHUTDOWN"]
21+
22+
23+
def register():
24+
body = json.dumps({"events": EVENTS})
25+
headers = {"Lambda-Extension-Name": LUMIGO_EXTENSION_NAME}
26+
conn = http.client.HTTPConnection(os.environ["AWS_LAMBDA_RUNTIME_API"])
27+
conn.request("POST", "/2020-01-01/extension/register", body, headers=headers)
28+
response = conn.getresponse()
29+
response.read()
30+
31+
return response.headers["Lambda-Extension-Identifier"]
32+
33+
34+
def start_extension_loop(lambda_service: LambdaService):
35+
with lumigo_safe_execute("Extension main initialization"):
36+
get_extension_logger().debug(
37+
f"Extension started running with extension id: {lambda_service.extension_id}"
38+
)
39+
config()
40+
extension = LumigoExtension(lambda_service)
41+
for event in lambda_service.events_generator():
42+
get_extension_logger().debug(f"Extension got event: {event}")
43+
if event.get("eventType") == "INVOKE":
44+
with lumigo_safe_execute("Extension: start new invocation"):
45+
extension.start_new_invocation(event)
46+
elif event.get("eventType") == "SHUTDOWN":
47+
with lumigo_safe_execute("Extension: shutdown"):
48+
extension.shutdown()
49+
break
50+
else:
51+
get_extension_logger().error(f"Extension got unknown event: {event}")
52+
get_extension_logger().debug("Extension finished running")
53+
54+
55+
def start_empty_extension_loop(lambda_service: LambdaService):
56+
get_extension_logger().debug("Lumigo extension is disables")
57+
for event in lambda_service.events_generator():
58+
if event.get("eventType") == "SHUTDOWN":
59+
break
60+
61+
62+
def main():
63+
extension_id = register()
64+
65+
lambda_service = LambdaService(extension_id)
66+
if is_kill_switch_on() or (os.environ.get(STOP_EXTENSION_KEY, "").lower() == "true"):
67+
start_empty_extension_loop(lambda_service)
68+
else:
69+
start_extension_loop(lambda_service)
70+
71+
72+
if __name__ == "__main__":
73+
main()
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from datetime import datetime
2+
3+
from typing import Optional, List, Union, Dict
4+
5+
from dataclasses import dataclass
6+
7+
import signal
8+
9+
from lumigo_tracer.extension.extension_utils import get_current_cpu_time
10+
11+
DEFAULT_SAMPLING_INTERVAL = 500
12+
13+
14+
@dataclass
15+
class CpuSample:
16+
start_time: datetime
17+
end_time: datetime
18+
cpu_time: float
19+
20+
def dump(self) -> Dict[str, Union[float, int]]:
21+
return {
22+
"start_time": int(self.start_time.timestamp() * 1000),
23+
"end_time": int(self.end_time.timestamp() * 1000),
24+
"cpu_time": self.cpu_time,
25+
}
26+
27+
28+
class Sampler:
29+
def __init__(self):
30+
self.cpu_last_sample_value: Optional[float] = None
31+
self.cpu_last_sample_time: Optional[datetime] = None
32+
self.cpu_samples: List[CpuSample] = []
33+
34+
def start_sampling(self, interval_ms: int = DEFAULT_SAMPLING_INTERVAL):
35+
self.cpu_samples = []
36+
self.sample()
37+
signal.signal(signal.SIGALRM, self.sample)
38+
signal.setitimer(signal.ITIMER_REAL, interval_ms / 1000, interval_ms / 1000)
39+
40+
def stop_sampling(self):
41+
signal.alarm(0)
42+
signal.signal(signal.SIGALRM, signal.SIG_DFL)
43+
self.sample()
44+
45+
def get_samples(self) -> List[CpuSample]:
46+
return self.cpu_samples
47+
48+
def sample(self, *args):
49+
now = datetime.now()
50+
current_cpu = get_current_cpu_time()
51+
if self.cpu_last_sample_time and self.cpu_last_sample_value and current_cpu:
52+
self.cpu_samples.append(
53+
CpuSample(
54+
start_time=self.cpu_last_sample_time,
55+
end_time=now,
56+
cpu_time=current_cpu - self.cpu_last_sample_value,
57+
)
58+
)
59+
self.cpu_last_sample_time = now
60+
self.cpu_last_sample_value = current_cpu

src/lumigo_tracer/parsers/event_parser.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@
3838
"requestParameters",
3939
]
4040

41-
S3_BUCKET_KEYS_ORDER = str_to_list(os.environ.get("LUMIGO_S3_BUCKET_KEYS_ORDER", "")) or [
42-
"arn",
43-
]
41+
S3_BUCKET_KEYS_ORDER = str_to_list(os.environ.get("LUMIGO_S3_BUCKET_KEYS_ORDER", "")) or ["arn"]
4442

4543
S3_OBJECT_KEYS_ORDER = str_to_list(os.environ.get("LUMIGO_S3_OBJECT_KEYS_ORDER", "")) or [
4644
"key",

src/lumigo_tracer/sync_http/sync_hook.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33
import logging
44
import http.client
55
from io import BytesIO
6-
import os
76
import builtins
87
from functools import wraps
98
import importlib.util
10-
import botocore.awsrequest # noqa: F401
119

1210
from lumigo_tracer.auto_tag.auto_tag_event import AutoTagEvent
1311
from lumigo_tracer.libs.wrapt import wrap_function_wrapper
@@ -19,14 +17,14 @@
1917
lumigo_safe_execute,
2018
is_aws_environment,
2119
ensure_str,
20+
is_kill_switch_on,
2221
)
2322
from lumigo_tracer.spans_container import SpansContainer, TimeoutMechanism
2423
from lumigo_tracer.parsers.http_data_classes import HttpRequest
2524
from collections import namedtuple
2625

2726
_BODY_HEADER_SPLITTER = b"\r\n\r\n"
2827
_FLAGS_HEADER_SPLITTER = b"\r\n"
29-
_KILL_SWITCH = "LUMIGO_SWITCH_OFF"
3028
CONTEXT_WRAPPED_BY_LUMIGO_KEY = "_wrapped_by_lumigo"
3129
MAX_READ_SIZE = 1024
3230
already_wrapped = False
@@ -198,7 +196,7 @@ def _add_wrap_flag_to_context(*args):
198196
def _lumigo_tracer(func):
199197
@wraps(func)
200198
def lambda_wrapper(*args, **kwargs):
201-
if str(os.environ.get(_KILL_SWITCH, "")).lower() == "true":
199+
if is_kill_switch_on():
202200
return func(*args, **kwargs)
203201

204202
if _is_context_already_wrapped(*args):
@@ -330,7 +328,10 @@ def wrap_http_calls():
330328
wrap_function_wrapper(
331329
"http.client", "HTTPConnection.request", _headers_reminder_wrapper
332330
)
333-
wrap_function_wrapper("botocore.awsrequest", "AWSRequest.__init__", _putheader_wrapper)
331+
if importlib.util.find_spec("botocore"):
332+
wrap_function_wrapper(
333+
"botocore.awsrequest", "AWSRequest.__init__", _putheader_wrapper
334+
)
334335
wrap_function_wrapper("http.client", "HTTPConnection.getresponse", _response_wrapper)
335336
wrap_function_wrapper("http.client", "HTTPResponse.read", _read_wrapper)
336337
if importlib.util.find_spec("urllib3"):

0 commit comments

Comments
 (0)