From b3dda91a3979405c95912e146f5fa60133332c4f Mon Sep 17 00:00:00 2001 From: Piotr Halama Date: Mon, 10 Jun 2024 09:22:38 +0200 Subject: [PATCH] Update example (#1009) --- .../Dockerfile | 24 +++- .../custom-serverless-runtime-image/README.md | 2 +- .../{kubeless => }/kubeless.py | 51 +++++---- .../kubeless/requirements.txt | 11 -- .../kubeless/tracing.py | 92 ---------------- .../{kubeless => lib}/ce.py | 103 +++++++++++++----- .../lib/requirements.txt | 12 ++ .../lib/tracing.py | 67 ++++++++++++ 8 files changed, 199 insertions(+), 163 deletions(-) rename examples/custom-serverless-runtime-image/{kubeless => }/kubeless.py (84%) delete mode 100644 examples/custom-serverless-runtime-image/kubeless/requirements.txt delete mode 100644 examples/custom-serverless-runtime-image/kubeless/tracing.py rename examples/custom-serverless-runtime-image/{kubeless => lib}/ce.py (50%) create mode 100644 examples/custom-serverless-runtime-image/lib/requirements.txt create mode 100644 examples/custom-serverless-runtime-image/lib/tracing.py diff --git a/examples/custom-serverless-runtime-image/Dockerfile b/examples/custom-serverless-runtime-image/Dockerfile index 8f9ea43a6..8d9857286 100644 --- a/examples/custom-serverless-runtime-image/Dockerfile +++ b/examples/custom-serverless-runtime-image/Dockerfile @@ -1,13 +1,27 @@ -FROM python:3.10-bullseye +FROM python:3.12.3-slim-bookworm -COPY kubeless/requirements.txt /kubeless/requirements.txt -RUN pip install -r /kubeless/requirements.txt -RUN pip install protobuf==3.20.* --force-reinstall +# Serverless +LABEL source = git@github.com:kyma-project/serverless.git -COPY kubeless/ / +# build-base and linux-headers are needed to install all requirements +RUN apt update && apt install -y build-essential linux-headers-generic && \ + rm -rf /var/lib/apt/lists/* + +COPY ./lib/requirements.txt /kubeless/requirements.txt +RUN chmod 644 /kubeless/requirements.txt + +RUN pip install --no-cache-dir -r /kubeless/requirements.txt + +COPY ./lib / +RUN chmod -R 755 /lib +COPY ./kubeless.py / +RUN chmod 644 /kubeless.py WORKDIR / USER 1000 +# Tracing propagators are configured based on OTEL_PROPAGATORS env variable https://opentelemetry.io/docs/instrumentation/python/manual/#using-environment-variables +ENV OTEL_PROPAGATORS=tracecontext,baggage,b3multi +ENV OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="healthz,favicon.ico,metrics" CMD ["python", "/kubeless.py"] diff --git a/examples/custom-serverless-runtime-image/README.md b/examples/custom-serverless-runtime-image/README.md index 7c984ce49..3f4e6cf5b 100644 --- a/examples/custom-serverless-runtime-image/README.md +++ b/examples/custom-serverless-runtime-image/README.md @@ -2,7 +2,7 @@ ## Overview -This example shows how to create own custom runtime for a Serverless Function based on the Python runtime and the `debian:bullseye-slim` base image to provide support for glibc. +This example shows how to create own custom runtime for a Serverless Function based on the Python 3.12 runtime and the `python:3.12.3--slim-bookworm` base image to provide support for glibc. ## Prerequisites diff --git a/examples/custom-serverless-runtime-image/kubeless/kubeless.py b/examples/custom-serverless-runtime-image/kubeless.py similarity index 84% rename from examples/custom-serverless-runtime-image/kubeless/kubeless.py rename to examples/custom-serverless-runtime-image/kubeless.py index fd3cbab58..e9c4bf1fc 100644 --- a/examples/custom-serverless-runtime-image/kubeless/kubeless.py +++ b/examples/custom-serverless-runtime-image/kubeless.py @@ -3,23 +3,17 @@ import importlib import os import queue +import sys import threading import bottle import prometheus_client as prom -import sys import tracing from ce import Event from tracing import set_req_context -def create_service_name(pod_name: str, service_namespace: str) -> str: - # remove generated pods suffix ( two last sections ) - deployment_name = '-'.join(pod_name.split('-')[0:pod_name.count('-') - 1]) - return '.'.join([deployment_name, service_namespace]) - - # The reason this file has an underscore prefix in its name is to avoid a # name collision with the user-defined module. module_name = os.getenv('MOD_NAME') @@ -31,7 +25,8 @@ def create_service_name(pod_name: str, service_namespace: str) -> str: print('Module cannot be named {} as current module'.format(current_mod), flush=True) exit(2) -sys.path.append('/kubeless') +function_location = os.getenv('FUNCTION_PATH', default='/kubeless') +sys.path.append(function_location) mod = importlib.import_module(module_name) func_name = os.getenv('FUNC_HANDLER') @@ -49,28 +44,30 @@ def create_service_name(pod_name: str, service_namespace: str) -> str: app = application = bottle.app() function_context = { - 'function-name': func.__name__, + 'function-name': os.getenv('FUNC_NAME'), + 'namespace': os.getenv('SERVICE_NAMESPACE'), 'timeout': timeout, 'runtime': os.getenv('FUNC_RUNTIME'), 'memory-limit': os.getenv('FUNC_MEMORY_LIMIT'), } -tracecollector_endpoint = os.getenv('TRACE_COLLECTOR_ENDPOINT') -pod_name = os.getenv('HOSTNAME') -service_namespace = os.getenv('SERVICE_NAMESPACE') -service_name = create_service_name(pod_name, service_namespace) - -tracer_provider = None -# To not create several tracer providers, when the server start forking. if __name__ == "__main__": - tracer_provider = tracing.ServerlessTracerProvider(tracecollector_endpoint, service_name) + tracer = tracing._setup_tracer() def func_with_context(e, function_context): ex = e.ceHeaders["extensions"] with set_req_context(ex["request"]): - return func(e, function_context) + with tracer.start_as_current_span("userFunction"): + try: + return func(e, function_context) + except Exception as e: + return e + +@app.get('/favicon.ico') +def favicon(): + return bottle.HTTPResponse(status=204) @app.get('/healthz') def healthz(): @@ -84,14 +81,13 @@ def metrics(): @app.error(500) -def exception_handler(): +def exception_handler(err): return 'Internal server error' @app.route('/<:re:.*>', method=['GET', 'POST', 'PATCH', 'DELETE']) def handler(): req = bottle.request - tracer = tracer_provider.get_tracer(req) event = Event(req, tracer) method = req.method @@ -103,12 +99,14 @@ def handler(): t.start() try: res = que.get(block=True, timeout=timeout) - if hasattr(res, 'headers') and res.headers["content-type"]: + if hasattr(res, 'headers') and 'content-type' in res.headers: bottle.response.content_type = res.headers["content-type"] except queue.Empty: return bottle.HTTPError(408, "Timeout while processing the function") else: t.join() + if isinstance(res, Exception): + raise res return res @@ -120,13 +118,18 @@ def preload(): if __name__ == '__main__': import logging import multiprocessing as mp + from multiprocessing import util import requestlogger + # TODO: this is workaround for: CVE-2022-42919 + # More details: https://github.com/python/cpython/issues/97514 + util.abstract_sockets_supported = False + mp_context = os.getenv('MP_CONTEXT', 'forkserver') if mp_context == "fork": raise ValueError( - '"fork" multiprocessing context is not supported because cherrypy is a ' + '"fork" multiprocessing context is not supported because cheroot is a ' 'multithreaded server and safely forking a multithreaded process is ' 'problematic' ) @@ -182,7 +185,7 @@ def preload(): bottle.run( loggedapp, - server='cherrypy', + server='cheroot', host='0.0.0.0', port=func_port, # Set this flag to True to auto-reload the server after any source files change @@ -190,4 +193,4 @@ def preload(): # Number of requests that can be handled in parallel (default = 50). numthreads=int(os.getenv('CHERRYPY_NUMTHREADS', 50)), quiet='KYMA_BOTTLE_QUIET_OPTION_DISABLED' not in os.environ, - ) \ No newline at end of file + ) diff --git a/examples/custom-serverless-runtime-image/kubeless/requirements.txt b/examples/custom-serverless-runtime-image/kubeless/requirements.txt deleted file mode 100644 index dbb5d6b4e..000000000 --- a/examples/custom-serverless-runtime-image/kubeless/requirements.txt +++ /dev/null @@ -1,11 +0,0 @@ -setuptools==69.1.1 -requests -bottle==0.12.21 -cherrypy==8.9.1 -wsgi-request-logger==0.4.6 -prometheus_client==0.8.0 -opentelemetry-api==1.9.1 -opentelemetry-sdk==1.9.1 -opentelemetry-exporter-otlp-proto-http==1.9.1 -opentelemetry-propagator-b3==1.9.1 -opentelemetry-instrumentation-requests==0.28b1 diff --git a/examples/custom-serverless-runtime-image/kubeless/tracing.py b/examples/custom-serverless-runtime-image/kubeless/tracing.py deleted file mode 100644 index 5e713cdb3..000000000 --- a/examples/custom-serverless-runtime-image/kubeless/tracing.py +++ /dev/null @@ -1,92 +0,0 @@ -import logging -from contextlib import contextmanager -from typing import Iterator - -import requests -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.instrumentation.requests import RequestsInstrumentor -from opentelemetry.propagate import extract -from opentelemetry.propagate import set_global_textmap -from opentelemetry.propagators.b3 import B3MultiFormat -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider, _Span -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.trace import context_api -from opentelemetry.trace.propagation import _SPAN_KEY - -_TRACING_SAMPLE_HEADER = "x-b3-sampled" - - -class ServerlessTracerProvider: - def __init__(self, tracecollector_endpoint: str, service_name: str): - self.noop_tracer = trace.NoOpTracer() - if _is_tracecollector_available(tracecollector_endpoint): - self.tracer = _get_tracer(tracecollector_endpoint, service_name) - else: - logging.info("tracecollector is not available") - self.tracer = trace.NoOpTracer() - - def get_tracer(self, req): - val = req.get_header(_TRACING_SAMPLE_HEADER) - if val is not None and val == "1": - return self.tracer - - return self.noop_tracer - - -def _get_tracer(tracecollector_endpoint: str, service_name: str) -> trace.Tracer: - set_global_textmap(B3MultiFormat()) - RequestsInstrumentor().instrument() - - trace.set_tracer_provider( - TracerProvider( - resource=Resource.create({SERVICE_NAME: service_name}) - ) - ) - - otlp_exporter = OTLPSpanExporter( - endpoint=tracecollector_endpoint, - ) - - span_processor = BatchSpanProcessor(otlp_exporter) - - trace.get_tracer_provider().add_span_processor(span_processor) - - return trace.get_tracer(__name__) - - -def _is_tracecollector_available(tracecollectorEndpoint) -> bool: - try: - res = requests.get(tracecollectorEndpoint, timeout=2) - # 405 is the right status code for the GET method if jaeger service exists - # because the only allowed method is POST and usage of other methods are not allowe - # https://github.com/jaegertracing/jaeger/blob/7872d1b07439c3f2d316065b1fd53e885b26a66f/cmd/collector/app/handler/http_handler.go#L60 - if res.status_code == 405: - return True - except: - pass - - return False - - -@contextmanager # type: ignore -def set_req_context(req) -> Iterator[trace.Span]: - '''Propagates incoming span from the request to the current context - - This method allows to set up a context in any thread based on the incoming request. - By design, span context can't be moved between threads and because we run every function - in the separated thread we have to propagate the context manually. - ''' - span = _Span( - "request-span", - trace.get_current_span( - extract(req.headers) - ).get_span_context() - ) - - token = context_api.attach(context_api.set_value(_SPAN_KEY, span)) - try: - yield span - finally: - context_api.detach(token) diff --git a/examples/custom-serverless-runtime-image/kubeless/ce.py b/examples/custom-serverless-runtime-image/lib/ce.py similarity index 50% rename from examples/custom-serverless-runtime-image/kubeless/ce.py rename to examples/custom-serverless-runtime-image/lib/ce.py index 7ebefebd4..139344f7c 100644 --- a/examples/custom-serverless-runtime-image/kubeless/ce.py +++ b/examples/custom-serverless-runtime-image/lib/ce.py @@ -1,11 +1,16 @@ -import requests import bottle import io -import os import json +import logging +import os + +import requests +from cloudevents.http import from_http, CloudEvent +from cloudevents.conversion import to_structured publisher_proxy_address = os.getenv('PUBLISHER_PROXY_ADDRESS') + class PicklableBottleRequest(bottle.BaseRequest): '''Bottle request that can be pickled (serialized). @@ -40,28 +45,58 @@ def __setstate__(self, env): setattr(self, 'environ', env) -class Event: - ceHeaders = dict() - tracer = None +def resolve_data_type(event_data): + if type(event_data) is dict: + return 'application/json' + elif type(event_data) is str: + return 'text/plain' + + +def build_cloud_event_attributes(req, data): + event = from_http(req.headers, data) + ceHeaders = { + 'data': event.data, + 'ce-type': event['type'], + 'ce-source': event['source'], + 'ce-id': event['id'], + 'ce-time': event['time'], + } + if event.get('eventtypeversion') is not None: + ceHeaders['ce-eventtypeversion'] = event.get('eventtypeversion') + + if event.get('specversion') is not None: + ceHeaders['ce-specversion'] = event.get('specversion') + + return ceHeaders + + +def has_ce_headers(headers): + has = 'ce-type' in headers and 'ce-source' in headers + return has + + +def is_cloud_event(req): + return 'application/cloudevents+json' in req.content_type.split(';') or has_ce_headers(req.headers) + +class Event: def __init__(self, req, tracer): + self.ceHeaders = dict() + self.tracer = tracer + self.req = req data = req.body.read() picklable_req = PicklableBottleRequest(data, req.environ.copy()) - if req.get_header('content-type') == 'application/json': - data = req.json - - self.req = req - self.tracer = tracer - self.ceHeaders = { - 'data': data, - 'ce-type': req.get_header('ce-type'), - 'ce-source': req.get_header('ce-source'), - 'ce-eventtypeversion': req.get_header('ce-eventtypeversion'), - 'ce-specversion': req.get_header('ce-specversion'), - 'ce-id': req.get_header('ce-id'), - 'ce-time': req.get_header('ce-time'), + self.ceHeaders.update({ 'extensions': {'request': picklable_req} - } + }) + + if is_cloud_event(req): + ce_headers = build_cloud_event_attributes(req, data) + self.ceHeaders.update(ce_headers) + else: + if req.get_header('content-type') == 'application/json': + data = req.json + self.ceHeaders.update({'data': data}) def __getitem__(self, item): return self.ceHeaders[item] @@ -69,20 +104,29 @@ def __getitem__(self, item): def __setitem__(self, name, value): self.ceHeaders[name] = value + def emitCloudEvent(self, type, source, data, optionalCloudEventAttributes=None): + attributes = { + "type": type, + "source": source, + } + if optionalCloudEventAttributes is not None: + attributes.update(optionalCloudEventAttributes) + + event = CloudEvent(attributes, data) + headers, body = to_structured(event) + + requests.post(publisher_proxy_address, data=body, headers=headers) + def publishCloudEvent(self, data): + logging.warn('"publishCloudEvent" is deprecated. Use "emitCloudEvent"') return requests.post( publisher_proxy_address, - data = json.dumps(data), - headers = {"Content-Type": "application/cloudevents+json"} - ) - - def resolveDataType(self, event_data): - if type(event_data) is dict: - return 'application/json' - elif type(event_data) is str: - return 'text/plain' + data=json.dumps(data, default=str), + headers={"Content-Type": "application/cloudevents+json"} + ) def buildResponseCloudEvent(self, event_id, event_type, event_data): + logging.warn('"buildResponseCloudEvent" is deprecated. Use "emitCloudEvent"') return { 'type': event_type, 'source': self.ceHeaders['ce-source'], @@ -90,6 +134,5 @@ def buildResponseCloudEvent(self, event_id, event_type, event_data): 'specversion': self.ceHeaders['ce-specversion'], 'id': event_id, 'data': event_data, - 'datacontenttype': self.resolveDataType(event_data) + 'datacontenttype': resolve_data_type(event_data) } - diff --git a/examples/custom-serverless-runtime-image/lib/requirements.txt b/examples/custom-serverless-runtime-image/lib/requirements.txt new file mode 100644 index 000000000..792d6d902 --- /dev/null +++ b/examples/custom-serverless-runtime-image/lib/requirements.txt @@ -0,0 +1,12 @@ +setuptools==70.0.0 +requests>=2.31.0 +bottle==0.12.25 +cheroot==10.0.1 +wsgi-request-logger==0.4.6 +prometheus_client==0.20.0 +opentelemetry-api==1.25.0 +opentelemetry-sdk==1.25.0 +opentelemetry-exporter-otlp-proto-http==1.25.0 +opentelemetry-propagator-b3==1.25.0 +opentelemetry-instrumentation-requests==0.46b0 +cloudevents diff --git a/examples/custom-serverless-runtime-image/lib/tracing.py b/examples/custom-serverless-runtime-image/lib/tracing.py new file mode 100644 index 000000000..0ec0e04b6 --- /dev/null +++ b/examples/custom-serverless-runtime-image/lib/tracing.py @@ -0,0 +1,67 @@ +from contextlib import contextmanager +from typing import Iterator + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider, _Span +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.propagate import extract +from opentelemetry.sdk.resources import Resource + +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, +) +from opentelemetry.sdk.trace.sampling import ( + DEFAULT_ON, +) + +from opentelemetry.trace import context_api +from opentelemetry.trace.propagation import _SPAN_KEY +from opentelemetry.instrumentation.requests import RequestsInstrumentor + +import os + +# Tracing propagators are configured based on OTEL_PROPAGATORS env variable set in dockerfile +# https://opentelemetry.io/docs/instrumentation/python/manual/#using-environment-variables +def _setup_tracer() -> trace.Tracer: + + provider = TracerProvider( + resource=Resource.create(), + sampler=DEFAULT_ON, + ) + + tracecollector_endpoint = os.getenv('TRACE_COLLECTOR_ENDPOINT') + + if tracecollector_endpoint: + span_processor = SimpleSpanProcessor(OTLPSpanExporter(endpoint=tracecollector_endpoint)) + provider.add_span_processor(span_processor) + + # Sets the global default tracer provider + trace.set_tracer_provider(provider) + + #Auto instrument all requests via `requests` library + RequestsInstrumentor().instrument() + + # Creates a tracer from the global tracer provider + return trace.get_tracer(__name__) + + +@contextmanager # type: ignore +def set_req_context(req) -> Iterator[trace.Span]: + '''Propagates incoming span from the request to the current context + + This method allows to set up a context in any thread based on the incoming request. + By design, span context can't be moved between threads and because we run every function + in the separated thread we have to propagate the context manually. + ''' + span = _Span( + "request-span", + trace.get_current_span( + extract(req.headers) + ).get_span_context() + ) + + token = context_api.attach(context_api.set_value(_SPAN_KEY, span)) + try: + yield span + finally: + context_api.detach(token)