Skip to content

Commit

Permalink
[Python O11y] Implement CSM observability for Python (grpc#36557)
Browse files Browse the repository at this point in the history
Implement Python CSM observability.

Design: [go/grpc-python-opentelemetry](http://goto.google.com/grpc-python-opentelemetry)
<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes grpc#36557

PiperOrigin-RevId: 639073741
  • Loading branch information
XuanWang-Amos authored and copybara-github committed May 31, 2024
1 parent 7ccb51e commit f3220d0
Show file tree
Hide file tree
Showing 45 changed files with 2,634 additions and 493 deletions.
7 changes: 5 additions & 2 deletions requirements.bazel.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ pyasn1==0.5.0
rsa==4.9
greenlet==1.1.3.post0
zope.interface==6.1
opentelemetry-sdk==1.21.0
opentelemetry-api==1.21.0
opentelemetry-sdk==1.24.0
opentelemetry-api==1.24.0
importlib-metadata==6.11.0
opentelemetry-resourcedetector-gcp==1.6.0a0
opentelemetry-exporter-prometheus==0.45b0
prometheus_client==0.20.0
Deprecated==1.2.14
opentelemetry-semantic-conventions==0.42b0
typing-extensions==4.9.0
Expand Down
12 changes: 7 additions & 5 deletions src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ cdef const char* CLIENT_CALL_TRACER = "client_call_tracer"
cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory"


def set_server_call_tracer_factory(object observability_plugin) -> None:
capsule = observability_plugin.create_server_call_tracer_factory()
capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY)
_register_server_call_tracer_factory(capsule_ptr)

def get_server_call_tracer_factory_address(object observability_plugin, bint xds) -> Optional[int]:
capsule = observability_plugin.create_server_call_tracer_factory(xds=xds)
if capsule:
capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY)
return int(<uintptr_t>capsule_ptr)
else:
return None

def clear_server_call_tracer_factory() -> None:
_register_server_call_tracer_factory(NULL)
Expand Down
70 changes: 51 additions & 19 deletions src/python/grpcio/grpc/_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Any, Generator, Generic, List, Optional, TypeVar

from grpc._cython import cygrpc as _cygrpc
from grpc._typing import ChannelArgumentType

_LOGGER = logging.getLogger(__name__)

Expand All @@ -36,6 +37,20 @@
]


class ServerCallTracerFactory:
"""An encapsulation of a ServerCallTracerFactory.
Instances of this class can be passed to a Channel as values for the
grpc.experimental.server_call_tracer_factory option
"""

def __init__(self, address):
self._address = address

def __int__(self):
return self._address


class ObservabilityPlugin(
Generic[ClientCallTracerCapsule, ServerCallTracerFactoryCapsule],
metaclass=abc.ABCMeta,
Expand Down Expand Up @@ -126,19 +141,23 @@ def save_trace_context(
@abc.abstractmethod
def create_server_call_tracer_factory(
self,
) -> ServerCallTracerFactoryCapsule:
*,
xds: bool = False,
) -> Optional[ServerCallTracerFactoryCapsule]:
"""Creates a ServerCallTracerFactoryCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called by calling observability_init, the ServerCallTracerFactory
created by this method will be registered to gRPC core.
This method will be called at server initialization time to create a
ServerCallTracerFactory, which will be registered to gRPC core.
The ServerCallTracerFactory is an object which implements
`grpc_core::ServerCallTracerFactory` interface and wrapped in a PyCapsule
using `server_call_tracer_factory` as name.
Args:
xds: Whether the server is xds server.
Returns:
A PyCapsule which stores a ServerCallTracerFactory object.
A PyCapsule which stores a ServerCallTracerFactory object. Or None if
plugin decides not to create ServerCallTracerFactory.
"""
raise NotImplementedError()

Expand Down Expand Up @@ -221,7 +240,7 @@ def set_plugin(observability_plugin: Optional[ObservabilityPlugin]) -> None:
Raises:
ValueError: If an ObservabilityPlugin was already registered at the
time of calling this method.
time of calling this method.
"""
global _OBSERVABILITY_PLUGIN # pylint: disable=global-statement
with _plugin_lock:
Expand All @@ -241,13 +260,9 @@ def observability_init(observability_plugin: ObservabilityPlugin) -> None:
Raises:
ValueError: If an ObservabilityPlugin was already registered at the
time of calling this method.
time of calling this method.
"""
set_plugin(observability_plugin)
try:
_cygrpc.set_server_call_tracer_factory(observability_plugin)
except Exception: # pylint:disable=broad-except
_LOGGER.exception("Failed to set server call tracer factory!")


def observability_deinit() -> None:
Expand Down Expand Up @@ -284,17 +299,34 @@ def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
Args:
state: a grpc._channel._RPCState object which contains the stats related to the
RPC.
RPC.
"""
# TODO(xuanwn): use channel args to exclude those metrics.
for exclude_prefix in _SERVICES_TO_EXCLUDE:
if exclude_prefix in state.method.encode("utf8"):
return
with get_plugin() as plugin:
if not (plugin and plugin.stats_enabled):
return
rpc_latency_s = state.rpc_end_time - state.rpc_start_time
rpc_latency_ms = rpc_latency_s * 1000
plugin.record_rpc_latency(
state.method, state.target, rpc_latency_ms, state.code
)
if plugin and plugin.stats_enabled:
rpc_latency_s = state.rpc_end_time - state.rpc_start_time
rpc_latency_ms = rpc_latency_s * 1000
plugin.record_rpc_latency(
state.method, state.target, rpc_latency_ms, state.code
)


def create_server_call_tracer_factory_option(xds: bool) -> ChannelArgumentType:
with get_plugin() as plugin:
if plugin and plugin.stats_enabled:
server_call_tracer_factory_address = (
_cygrpc.get_server_call_tracer_factory_address(plugin, xds)
)
if server_call_tracer_factory_address:
return (
(
"grpc.experimental.server_call_tracer_factory",
ServerCallTracerFactory(
server_call_tracer_factory_address
),
),
)
return ()
13 changes: 11 additions & 2 deletions src/python/grpcio/grpc/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from grpc import _common # pytype: disable=pyi-error
from grpc import _compression # pytype: disable=pyi-error
from grpc import _interceptor # pytype: disable=pyi-error
from grpc import _observability # pytype: disable=pyi-error
from grpc._cython import cygrpc
from grpc._typing import ArityAgnosticMethodHandler
from grpc._typing import ChannelArgumentType
Expand Down Expand Up @@ -1403,9 +1404,17 @@ def _validate_generic_rpc_handlers(
def _augment_options(
base_options: Sequence[ChannelArgumentType],
compression: Optional[grpc.Compression],
xds: bool,
) -> Sequence[ChannelArgumentType]:
compression_option = _compression.create_channel_option(compression)
return tuple(base_options) + compression_option
maybe_server_call_tracer_factory_option = (
_observability.create_server_call_tracer_factory_option(xds)
)
return (
tuple(base_options)
+ compression_option
+ maybe_server_call_tracer_factory_option
)


class _Server(grpc.Server):
Expand All @@ -1423,7 +1432,7 @@ def __init__(
xds: bool,
):
completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_augment_options(options, compression), xds)
server = cygrpc.Server(_augment_options(options, compression, xds), xds)
server.register_completion_queue(completion_queue)
self._state = _ServerState(
completion_queue,
Expand Down
4 changes: 4 additions & 0 deletions src/python/grpcio_csm_observability/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
graft src/python/grpcio_csm_observability/grpc_csm_observability.egg-info
graft grpc_csm_observability
include grpc_version.py
include README.rst
5 changes: 5 additions & 0 deletions src/python/grpcio_csm_observability/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
gRPC Python CSM Observability
=========================

Package for gRPC Python CSM Observability.
TODO(xuanwn): Add more content.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

load("@grpc_python_dependencies//:requirements.bzl", "requirement")

package(default_visibility = ["//:__subpackages__"])

# Since packages in requirement() are non-hermetic,
# csm_observability is for internal use only.
py_library(
name = "csm_observability",
srcs = glob(["*.py"]),
imports = [
".",
"../",
],
srcs_version = "PY3ONLY",
deps = [
requirement("opentelemetry-resourcedetector-gcp"),
requirement("opentelemetry-sdk"),
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_observability/grpc_observability:pyobservability",
"@com_google_protobuf//:protobuf_python",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from grpc_csm_observability._csm_observability_plugin import (
CsmOpenTelemetryPlugin,
)

__all__ = ("CsmOpenTelemetryPlugin",)
Loading

0 comments on commit f3220d0

Please sign in to comment.