From f3220d08d2e9791e87036a3504059c40d85b21a2 Mon Sep 17 00:00:00 2001 From: Xuan Wang Date: Fri, 31 May 2024 10:15:26 -0700 Subject: [PATCH] [Python O11y] Implement CSM observability for Python (#36557) Implement Python CSM observability. Design: [go/grpc-python-opentelemetry](http://goto.google.com/grpc-python-opentelemetry) Closes #36557 PiperOrigin-RevId: 639073741 --- requirements.bazel.txt | 7 +- .../_cython/_cygrpc/observability.pyx.pxi | 12 +- src/python/grpcio/grpc/_observability.py | 70 +- src/python/grpcio/grpc/_server.py | 13 +- .../grpcio_csm_observability/MANIFEST.in | 4 + .../grpcio_csm_observability/README.rst | 5 + .../grpc_csm_observability/BUILD.bazel | 36 + .../grpc_csm_observability/__init__.py | 18 + .../_csm_observability_plugin.py | 343 +++++++++ .../grpcio_csm_observability/grpc_version.py | 17 + src/python/grpcio_csm_observability/setup.py | 63 ++ .../grpc_observability/BUILD.bazel | 2 + .../grpc_observability/_cyobservability.pxd | 7 +- .../grpc_observability/_cyobservability.pyx | 88 ++- .../grpc_observability/_gcp_observability.py | 167 ----- .../grpc_observability/_observability.py | 23 +- .../_open_telemetry_measures.py | 3 +- .../_open_telemetry_observability.py | 239 ++++++- .../_open_telemetry_plugin.py | 114 +-- .../grpc_observability/client_call_tracer.cc | 85 ++- .../grpc_observability/client_call_tracer.h | 29 +- .../grpc_observability/constants.h | 4 + .../grpc_observability/metadata_exchange.cc | 115 +++ .../grpc_observability/metadata_exchange.h | 63 ++ .../grpc_observability/observability_util.cc | 26 +- .../grpc_observability/observability_util.h | 25 +- .../python_observability_context.h | 1 + .../grpc_observability/server_call_tracer.cc | 252 +++---- .../grpc_observability/server_call_tracer.h | 84 +++ .../make_grpcio_observability.py | 4 +- .../observability_lib_deps.py | 1 + src/python/grpcio_observability/setup.py | 5 +- .../tests/observability/BUILD.bazel | 39 ++ .../_csm_observability_plugin_test.py | 652 ++++++++++++++++++ .../_from_csm_observability_import_star.py | 25 + .../observability/_observability_api_test.py | 11 +- .../_observability_plugin_test.py | 363 ++++++++++ .../_open_telemetry_observability_test.py | 77 ++- .../tests/observability/_test_server.py | 1 + src/python/grpcio_tests/tests/tests.json | 3 + .../grpc_version.py.template | 19 + tools/distrib/install_all_python_modules.sh | 2 +- tools/distrib/pylint_code.sh | 1 + .../artifacts/build_artifact_python.sh | 8 + .../run_tests/helper_scripts/build_python.sh | 1 + 45 files changed, 2634 insertions(+), 493 deletions(-) create mode 100644 src/python/grpcio_csm_observability/MANIFEST.in create mode 100644 src/python/grpcio_csm_observability/README.rst create mode 100644 src/python/grpcio_csm_observability/grpc_csm_observability/BUILD.bazel create mode 100644 src/python/grpcio_csm_observability/grpc_csm_observability/__init__.py create mode 100644 src/python/grpcio_csm_observability/grpc_csm_observability/_csm_observability_plugin.py create mode 100644 src/python/grpcio_csm_observability/grpc_version.py create mode 100644 src/python/grpcio_csm_observability/setup.py delete mode 100644 src/python/grpcio_observability/grpc_observability/_gcp_observability.py create mode 100644 src/python/grpcio_observability/grpc_observability/metadata_exchange.cc create mode 100644 src/python/grpcio_observability/grpc_observability/metadata_exchange.h create mode 100644 src/python/grpcio_tests/tests/observability/_csm_observability_plugin_test.py create mode 100644 src/python/grpcio_tests/tests/observability/_from_csm_observability_import_star.py create mode 100644 src/python/grpcio_tests/tests/observability/_observability_plugin_test.py create mode 100644 templates/src/python/grpcio_csm_observability/grpc_version.py.template diff --git a/requirements.bazel.txt b/requirements.bazel.txt index 905c092ce4c33..c6036839b1f56 100644 --- a/requirements.bazel.txt +++ b/requirements.bazel.txt @@ -26,9 +26,12 @@ pyasn1==0.5.0 rsa==4.9 greenlet==1.1.3.post0 zope.interface==6.1 -opentelemetry-sdk==1.21.0 -opentelemetry-api==1.21.0 +opentelemetry-sdk==1.24.0 +opentelemetry-api==1.24.0 importlib-metadata==6.11.0 +opentelemetry-resourcedetector-gcp==1.6.0a0 +opentelemetry-exporter-prometheus==0.45b0 +prometheus_client==0.20.0 Deprecated==1.2.14 opentelemetry-semantic-conventions==0.42b0 typing-extensions==4.9.0 diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi index aa7dce5e8ac5c..a29ccdd9376f7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi @@ -23,11 +23,13 @@ cdef const char* CLIENT_CALL_TRACER = "client_call_tracer" cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory" -def set_server_call_tracer_factory(object observability_plugin) -> None: - capsule = observability_plugin.create_server_call_tracer_factory() - capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY) - _register_server_call_tracer_factory(capsule_ptr) - +def get_server_call_tracer_factory_address(object observability_plugin, bint xds) -> Optional[int]: + capsule = observability_plugin.create_server_call_tracer_factory(xds=xds) + if capsule: + capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY) + return int(capsule_ptr) + else: + return None def clear_server_call_tracer_factory() -> None: _register_server_call_tracer_factory(NULL) diff --git a/src/python/grpcio/grpc/_observability.py b/src/python/grpcio/grpc/_observability.py index e18dbc1dd0fa6..3caf6b5265c5e 100644 --- a/src/python/grpcio/grpc/_observability.py +++ b/src/python/grpcio/grpc/_observability.py @@ -21,6 +21,7 @@ from typing import Any, Generator, Generic, List, Optional, TypeVar from grpc._cython import cygrpc as _cygrpc +from grpc._typing import ChannelArgumentType _LOGGER = logging.getLogger(__name__) @@ -36,6 +37,20 @@ ] +class ServerCallTracerFactory: + """An encapsulation of a ServerCallTracerFactory. + + Instances of this class can be passed to a Channel as values for the + grpc.experimental.server_call_tracer_factory option + """ + + def __init__(self, address): + self._address = address + + def __int__(self): + return self._address + + class ObservabilityPlugin( Generic[ClientCallTracerCapsule, ServerCallTracerFactoryCapsule], metaclass=abc.ABCMeta, @@ -126,19 +141,23 @@ def save_trace_context( @abc.abstractmethod def create_server_call_tracer_factory( self, - ) -> ServerCallTracerFactoryCapsule: + *, + xds: bool = False, + ) -> Optional[ServerCallTracerFactoryCapsule]: """Creates a ServerCallTracerFactoryCapsule. - After register the plugin, if tracing or stats is enabled, this method - will be called by calling observability_init, the ServerCallTracerFactory - created by this method will be registered to gRPC core. + This method will be called at server initialization time to create a + ServerCallTracerFactory, which will be registered to gRPC core. The ServerCallTracerFactory is an object which implements `grpc_core::ServerCallTracerFactory` interface and wrapped in a PyCapsule using `server_call_tracer_factory` as name. + Args: + xds: Whether the server is xds server. Returns: - A PyCapsule which stores a ServerCallTracerFactory object. + A PyCapsule which stores a ServerCallTracerFactory object. Or None if + plugin decides not to create ServerCallTracerFactory. """ raise NotImplementedError() @@ -221,7 +240,7 @@ def set_plugin(observability_plugin: Optional[ObservabilityPlugin]) -> None: Raises: ValueError: If an ObservabilityPlugin was already registered at the - time of calling this method. + time of calling this method. """ global _OBSERVABILITY_PLUGIN # pylint: disable=global-statement with _plugin_lock: @@ -241,13 +260,9 @@ def observability_init(observability_plugin: ObservabilityPlugin) -> None: Raises: ValueError: If an ObservabilityPlugin was already registered at the - time of calling this method. + time of calling this method. """ set_plugin(observability_plugin) - try: - _cygrpc.set_server_call_tracer_factory(observability_plugin) - except Exception: # pylint:disable=broad-except - _LOGGER.exception("Failed to set server call tracer factory!") def observability_deinit() -> None: @@ -284,17 +299,34 @@ def maybe_record_rpc_latency(state: "_channel._RPCState") -> None: Args: state: a grpc._channel._RPCState object which contains the stats related to the - RPC. + RPC. """ # TODO(xuanwn): use channel args to exclude those metrics. for exclude_prefix in _SERVICES_TO_EXCLUDE: if exclude_prefix in state.method.encode("utf8"): return with get_plugin() as plugin: - if not (plugin and plugin.stats_enabled): - return - rpc_latency_s = state.rpc_end_time - state.rpc_start_time - rpc_latency_ms = rpc_latency_s * 1000 - plugin.record_rpc_latency( - state.method, state.target, rpc_latency_ms, state.code - ) + if plugin and plugin.stats_enabled: + rpc_latency_s = state.rpc_end_time - state.rpc_start_time + rpc_latency_ms = rpc_latency_s * 1000 + plugin.record_rpc_latency( + state.method, state.target, rpc_latency_ms, state.code + ) + + +def create_server_call_tracer_factory_option(xds: bool) -> ChannelArgumentType: + with get_plugin() as plugin: + if plugin and plugin.stats_enabled: + server_call_tracer_factory_address = ( + _cygrpc.get_server_call_tracer_factory_address(plugin, xds) + ) + if server_call_tracer_factory_address: + return ( + ( + "grpc.experimental.server_call_tracer_factory", + ServerCallTracerFactory( + server_call_tracer_factory_address + ), + ), + ) + return () diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index ade1371809117..c8af57c0806fe 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -43,6 +43,7 @@ from grpc import _common # pytype: disable=pyi-error from grpc import _compression # pytype: disable=pyi-error from grpc import _interceptor # pytype: disable=pyi-error +from grpc import _observability # pytype: disable=pyi-error from grpc._cython import cygrpc from grpc._typing import ArityAgnosticMethodHandler from grpc._typing import ChannelArgumentType @@ -1403,9 +1404,17 @@ def _validate_generic_rpc_handlers( def _augment_options( base_options: Sequence[ChannelArgumentType], compression: Optional[grpc.Compression], + xds: bool, ) -> Sequence[ChannelArgumentType]: compression_option = _compression.create_channel_option(compression) - return tuple(base_options) + compression_option + maybe_server_call_tracer_factory_option = ( + _observability.create_server_call_tracer_factory_option(xds) + ) + return ( + tuple(base_options) + + compression_option + + maybe_server_call_tracer_factory_option + ) class _Server(grpc.Server): @@ -1423,7 +1432,7 @@ def __init__( xds: bool, ): completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(_augment_options(options, compression), xds) + server = cygrpc.Server(_augment_options(options, compression, xds), xds) server.register_completion_queue(completion_queue) self._state = _ServerState( completion_queue, diff --git a/src/python/grpcio_csm_observability/MANIFEST.in b/src/python/grpcio_csm_observability/MANIFEST.in new file mode 100644 index 0000000000000..754eb1d7e130f --- /dev/null +++ b/src/python/grpcio_csm_observability/MANIFEST.in @@ -0,0 +1,4 @@ +graft src/python/grpcio_csm_observability/grpc_csm_observability.egg-info +graft grpc_csm_observability +include grpc_version.py +include README.rst diff --git a/src/python/grpcio_csm_observability/README.rst b/src/python/grpcio_csm_observability/README.rst new file mode 100644 index 0000000000000..8f2dc6ccd82c1 --- /dev/null +++ b/src/python/grpcio_csm_observability/README.rst @@ -0,0 +1,5 @@ +gRPC Python CSM Observability +========================= + +Package for gRPC Python CSM Observability. +TODO(xuanwn): Add more content. diff --git a/src/python/grpcio_csm_observability/grpc_csm_observability/BUILD.bazel b/src/python/grpcio_csm_observability/grpc_csm_observability/BUILD.bazel new file mode 100644 index 0000000000000..8d65857630fc6 --- /dev/null +++ b/src/python/grpcio_csm_observability/grpc_csm_observability/BUILD.bazel @@ -0,0 +1,36 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("@grpc_python_dependencies//:requirements.bzl", "requirement") + +package(default_visibility = ["//:__subpackages__"]) + +# Since packages in requirement() are non-hermetic, +# csm_observability is for internal use only. +py_library( + name = "csm_observability", + srcs = glob(["*.py"]), + imports = [ + ".", + "../", + ], + srcs_version = "PY3ONLY", + deps = [ + requirement("opentelemetry-resourcedetector-gcp"), + requirement("opentelemetry-sdk"), + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_observability/grpc_observability:pyobservability", + "@com_google_protobuf//:protobuf_python", + ], +) diff --git a/src/python/grpcio_csm_observability/grpc_csm_observability/__init__.py b/src/python/grpcio_csm_observability/grpc_csm_observability/__init__.py new file mode 100644 index 0000000000000..bfed07d386942 --- /dev/null +++ b/src/python/grpcio_csm_observability/grpc_csm_observability/__init__.py @@ -0,0 +1,18 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from grpc_csm_observability._csm_observability_plugin import ( + CsmOpenTelemetryPlugin, +) + +__all__ = ("CsmOpenTelemetryPlugin",) diff --git a/src/python/grpcio_csm_observability/grpc_csm_observability/_csm_observability_plugin.py b/src/python/grpcio_csm_observability/grpc_csm_observability/_csm_observability_plugin.py new file mode 100644 index 0000000000000..49828565c628f --- /dev/null +++ b/src/python/grpcio_csm_observability/grpc_csm_observability/_csm_observability_plugin.py @@ -0,0 +1,343 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import re +from typing import AnyStr, Callable, Dict, Iterable, List, Optional, Union + +from google.protobuf import struct_pb2 +from grpc_observability._observability import OptionalLabelType +from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector +from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin +from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption + +# pytype: disable=pyi-error +from opentelemetry.metrics import MeterProvider +from opentelemetry.resourcedetector.gcp_resource_detector import ( + GoogleCloudResourceDetector, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.semconv.resource import ResourceAttributes + +TRAFFIC_DIRECTOR_AUTHORITY = "traffic-director-global.xds.googleapis.com" +UNKNOWN_VALUE = "unknown" +TYPE_GCE = "gcp_compute_engine" +TYPE_GKE = "gcp_kubernetes_engine" +MESH_ID_PREFIX = "mesh:" + +METADATA_EXCHANGE_KEY_FIXED_MAP = { + "type": "csm.remote_workload_type", + "canonical_service": "csm.remote_workload_canonical_service", +} + +METADATA_EXCHANGE_KEY_GKE_MAP = { + "workload_name": "csm.remote_workload_name", + "namespace_name": "csm.remote_workload_namespace_name", + "cluster_name": "csm.remote_workload_cluster_name", + "location": "csm.remote_workload_location", + "project_id": "csm.remote_workload_project_id", +} + +METADATA_EXCHANGE_KEY_GCE_MAP = { + "workload_name": "csm.remote_workload_name", + "location": "csm.remote_workload_location", + "project_id": "csm.remote_workload_project_id", +} + + +class CSMOpenTelemetryLabelInjector(OpenTelemetryLabelInjector): + """ + An implementation of OpenTelemetryLabelInjector for CSM. + + This injector will fetch labels from GCP resource detector and + environment, it's also responsible for serialize and deserialize + metadata exchange labels. + """ + + _exchange_labels: Dict[str, AnyStr] + _additional_exchange_labels: Dict[str, str] + + def __init__(self): + fields = {} + self._exchange_labels = {} + self._additional_exchange_labels = {} + + # Labels from environment + canonical_service_value = os.getenv( + "CSM_CANONICAL_SERVICE_NAME", UNKNOWN_VALUE + ) + workload_name_value = os.getenv("CSM_WORKLOAD_NAME", UNKNOWN_VALUE) + + gcp_resource = GoogleCloudResourceDetector().detect() + resource_type_value = get_resource_type(gcp_resource) + namespace_value = get_str_value_from_resource( + ResourceAttributes.K8S_NAMESPACE_NAME, gcp_resource + ) + cluster_name_value = get_str_value_from_resource( + ResourceAttributes.K8S_CLUSTER_NAME, gcp_resource + ) + # ResourceAttributes.CLOUD_AVAILABILITY_ZONE are called + # "zones" on Google Cloud. + location_value = get_str_value_from_resource("cloud.zone", gcp_resource) + if UNKNOWN_VALUE == location_value: + location_value = get_str_value_from_resource( + ResourceAttributes.CLOUD_REGION, gcp_resource + ) + project_id_value = get_str_value_from_resource( + ResourceAttributes.CLOUD_ACCOUNT_ID, gcp_resource + ) + + fields["type"] = struct_pb2.Value(string_value=resource_type_value) + fields["canonical_service"] = struct_pb2.Value( + string_value=canonical_service_value + ) + if resource_type_value == TYPE_GKE: + fields["workload_name"] = struct_pb2.Value( + string_value=workload_name_value + ) + fields["namespace_name"] = struct_pb2.Value( + string_value=namespace_value + ) + fields["cluster_name"] = struct_pb2.Value( + string_value=cluster_name_value + ) + fields["location"] = struct_pb2.Value(string_value=location_value) + fields["project_id"] = struct_pb2.Value( + string_value=project_id_value + ) + elif resource_type_value == TYPE_GCE: + fields["workload_name"] = struct_pb2.Value( + string_value=workload_name_value + ) + fields["location"] = struct_pb2.Value(string_value=location_value) + fields["project_id"] = struct_pb2.Value( + string_value=project_id_value + ) + + serialized_struct = struct_pb2.Struct(fields=fields) + serialized_str = serialized_struct.SerializeToString() + + self._exchange_labels = {"XEnvoyPeerMetadata": serialized_str} + self._additional_exchange_labels[ + "csm.workload_canonical_service" + ] = canonical_service_value + self._additional_exchange_labels["csm.mesh_id"] = get_mesh_id() + + def get_labels_for_exchange(self) -> Dict[str, AnyStr]: + return self._exchange_labels + + def get_additional_labels( + self, include_exchange_labels: bool + ) -> Dict[str, str]: + if include_exchange_labels: + return self._additional_exchange_labels + else: + return {} + + @staticmethod + def deserialize_labels(labels: Dict[str, AnyStr]) -> Dict[str, AnyStr]: + deserialized_labels = {} + for key, value in labels.items(): + if "XEnvoyPeerMetadata" == key: + pb_struct = struct_pb2.Struct() + pb_struct.ParseFromString(value) + + remote_type = get_value_from_struct("type", pb_struct) + + for ( + local_key, + remote_key, + ) in METADATA_EXCHANGE_KEY_FIXED_MAP.items(): + deserialized_labels[remote_key] = get_value_from_struct( + local_key, pb_struct + ) + if remote_type == TYPE_GKE: + for ( + local_key, + remote_key, + ) in METADATA_EXCHANGE_KEY_GKE_MAP.items(): + deserialized_labels[remote_key] = get_value_from_struct( + local_key, pb_struct + ) + elif remote_type == TYPE_GCE: + for ( + local_key, + remote_key, + ) in METADATA_EXCHANGE_KEY_GCE_MAP.items(): + deserialized_labels[remote_key] = get_value_from_struct( + local_key, pb_struct + ) + # If CSM label injector is enabled on server side but client didn't send + # XEnvoyPeerMetadata, we'll record remote label as unknown. + else: + for _, remote_key in METADATA_EXCHANGE_KEY_FIXED_MAP.items(): + deserialized_labels[remote_key] = UNKNOWN_VALUE + deserialized_labels[key] = value + + return deserialized_labels + + +class CsmOpenTelemetryPluginOption(OpenTelemetryPluginOption): + """ + An implementation of OpenTelemetryPlugin for CSM. + """ + + _label_injector: CSMOpenTelemetryLabelInjector + + def __init__(self): + self._label_injector = CSMOpenTelemetryLabelInjector() + + @staticmethod + def is_active_on_client_channel(target: str) -> bool: + """Determines whether this plugin option is active on a channel based on target. + + Args: + target: Required. The target for the RPC. + + Returns: + True if this this plugin option is active on the channel, false otherwise. + """ + # CSM channels should have an "xds" scheme + if not target.startswith("xds:"): + return False + # If scheme is correct, the authority should be TD if exist + authority_pattern = r"^xds:\/\/([^/]+)" + match = re.search(authority_pattern, target) + if match: + return TRAFFIC_DIRECTOR_AUTHORITY in match.group(1) + else: + # Return True if the authority doesn't exist + return True + + @staticmethod + def is_active_on_server( + xds: bool, # pylint: disable=unused-argument + ) -> bool: + """Determines whether this plugin option is active on a given server. + + Since servers don't need to be xds enabled to work as part of a service + mesh, we're returning True and enable this PluginOption for all servers. + + Note: This always returns true because server can be part of the mesh even + if it's not xds-enabled. And we want CSM labels for those servers too. + + Args: + xds: Required. if this server is build for xds. + + Returns: + True if this this plugin option is active on the server, false otherwise. + """ + return True + + def get_label_injector(self) -> OpenTelemetryLabelInjector: + return self._label_injector + + +# pylint: disable=no-self-use +class CsmOpenTelemetryPlugin(OpenTelemetryPlugin): + """Describes a Plugin for CSM OpenTelemetry observability. + + This is class is part of an EXPERIMENTAL API. + """ + + plugin_options: Iterable[OpenTelemetryPluginOption] + meter_provider: Optional[MeterProvider] + generic_method_attribute_filter: Callable[[str], bool] + + def __init__( + self, + *, + plugin_options: Iterable[OpenTelemetryPluginOption] = [], + meter_provider: Optional[MeterProvider] = None, + generic_method_attribute_filter: Optional[Callable[[str], bool]] = None, + ): + new_options = list(plugin_options) + [CsmOpenTelemetryPluginOption()] + super().__init__( + plugin_options=new_options, + meter_provider=meter_provider, + generic_method_attribute_filter=generic_method_attribute_filter, + ) + + def _get_enabled_optional_labels(self) -> List[OptionalLabelType]: + return [OptionalLabelType.XDS_SERVICE_LABELS] + + +def get_value_from_struct(key: str, struct: struct_pb2.Struct) -> str: + value = struct.fields.get(key) + if not value: + return UNKNOWN_VALUE + return value.string_value + + +def get_str_value_from_resource( + attribute: Union[ResourceAttributes, str], resource: Resource +) -> str: + value = resource.attributes.get(attribute, UNKNOWN_VALUE) + return str(value) + + +# pylint: disable=line-too-long +def get_resource_type(gcp_resource: Resource) -> str: + # Convert resource type from GoogleCloudResourceDetector to the value we used for + # metadata exchange. + # Reference: https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/cc61f23a5ff2f16f4aa2c38d07e55153828849cc/opentelemetry-resourcedetector-gcp/src/opentelemetry/resourcedetector/gcp_resource_detector/__init__.py#L96 + gcp_resource_type = get_str_value_from_resource( + "gcp.resource_type", gcp_resource + ) + if gcp_resource_type == "gke_container": + return TYPE_GKE + elif gcp_resource_type == "gce_instance": + return TYPE_GCE + else: + return gcp_resource_type + + +# Returns the mesh ID by reading and parsing the bootstrap file. Returns "unknown" +# if for some reason, mesh ID could not be figured out. +def get_mesh_id() -> str: + config_contents = get_bootstrap_config_contents() + + try: + config_json = json.loads(config_contents) + # The expected format of the Node ID is - + # projects/[GCP Project number]/networks/mesh:[Mesh ID]/nodes/[UUID] + node_id_parts = config_json.get("node", {}).get("id", "").split("/") + if len(node_id_parts) == 6 and node_id_parts[3].startswith( + MESH_ID_PREFIX + ): + return node_id_parts[3][len(MESH_ID_PREFIX) :] + except json.decoder.JSONDecodeError: + return UNKNOWN_VALUE + + return UNKNOWN_VALUE + + +def get_bootstrap_config_contents() -> str: + """Get the contents of the bootstrap config from environment variable or file. + + Returns: + The content from environment variable. Or empty str if no config was found. + """ + contents_str = "" + for source in ("GRPC_XDS_BOOTSTRAP", "GRPC_XDS_BOOTSTRAP_CONFIG"): + config = os.getenv(source) + if config: + if os.path.isfile(config): # Prioritize file over raw config + with open(config, "r") as f: + contents_str = f.read() + else: + contents_str = config + + return contents_str diff --git a/src/python/grpcio_csm_observability/grpc_version.py b/src/python/grpcio_csm_observability/grpc_version.py new file mode 100644 index 0000000000000..930d11a6dcb75 --- /dev/null +++ b/src/python/grpcio_csm_observability/grpc_version.py @@ -0,0 +1,17 @@ +# Copyright 2024 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_csm_observability/grpc_version.py.template`!!! + +VERSION = '1.65.0.dev0' diff --git a/src/python/grpcio_csm_observability/setup.py b/src/python/grpcio_csm_observability/setup.py new file mode 100644 index 0000000000000..582c6d9906c68 --- /dev/null +++ b/src/python/grpcio_csm_observability/setup.py @@ -0,0 +1,63 @@ +# Copyright 2024 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import setuptools + +_PACKAGE_PATH = os.path.realpath(os.path.dirname(__file__)) +_README_PATH = os.path.join(_PACKAGE_PATH, "README.rst") + +# Ensure we're in the proper directory whether or not we're being used by pip. +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +import grpc_version + +CLASSIFIERS = [ + "Development Status :: 4 - Beta", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", +] + +PACKAGE_DIRECTORIES = { + "": ".", +} + +INSTALL_REQUIRES = ( + "opentelemetry-sdk>=1.24.0", + "opentelemetry-resourcedetector-gcp>=1.6.0a0", + "grpcio=={version}".format(version=grpc_version.VERSION), + "protobuf>=5.26.1,<6.0dev", +) + +setuptools.setup( + name="grpcio-csm-observability", + version=grpc_version.VERSION, + description="gRPC Python CSM observability package", + long_description=open(_README_PATH, "r").read(), + author="The gRPC Authors", + author_email="grpc-io@googlegroups.com", + url="https://grpc.io", + project_urls={ + "Source Code": "https://github.com/grpc/grpc/tree/master/src/python/grpcio_csm_observability", + "Bug Tracker": "https://github.com/grpc/grpc/issues", + }, + license="Apache License 2.0", + classifiers=CLASSIFIERS, + package_dir=PACKAGE_DIRECTORIES, + packages=setuptools.find_packages("."), + python_requires=">=3.8", + install_requires=INSTALL_REQUIRES, +) diff --git a/src/python/grpcio_observability/grpc_observability/BUILD.bazel b/src/python/grpcio_observability/grpc_observability/BUILD.bazel index 3447bdaf1f23a..f6144418273e2 100644 --- a/src/python/grpcio_observability/grpc_observability/BUILD.bazel +++ b/src/python/grpcio_observability/grpc_observability/BUILD.bazel @@ -21,6 +21,7 @@ cc_library( name = "observability", srcs = [ "client_call_tracer.cc", + "metadata_exchange.cc", "observability_util.cc", "python_observability_context.cc", "rpc_encoding.cc", @@ -30,6 +31,7 @@ cc_library( hdrs = [ "client_call_tracer.h", "constants.h", + "metadata_exchange.h", "observability_util.h", "python_observability_context.h", "rpc_encoding.h", diff --git a/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd b/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd index 240bf66e03ad5..124c08b48bbaa 100644 --- a/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd +++ b/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd @@ -59,6 +59,7 @@ cdef extern from "python_observability_context.h" namespace "grpc_observability" MeasurementType type MeasurementValue value bint registered_method + bint include_exchange_labels ctypedef struct SpanCensusData: string name @@ -80,8 +81,11 @@ cdef extern from "observability_util.h" namespace "grpc_observability": const char* target, const char* trace_id, const char* parent_span_id, + const char* identifier, + const vector[Label] exchange_labels, + bint add_csm_optional_labels, bint registered_method) except + - cdef void* CreateServerCallTracerFactory() except + + cdef void* CreateServerCallTracerFactory(const vector[Label] exchange_labels, const char* identifier) except + cdef queue[NativeCensusData]* g_census_data_buffer cdef void AwaitNextBatchLocked(unique_lock[mutex]&, int) nogil cdef bint PythonCensusStatsEnabled() nogil @@ -91,6 +95,7 @@ cdef extern from "observability_util.h" namespace "grpc_observability": cppclass NativeCensusData "::grpc_observability::CensusData": DataType type + string identifier Measurement measurement_data SpanCensusData span_data vector[Label] labels diff --git a/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx b/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx index 143d24731a7af..91d00fe631dc9 100644 --- a/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx +++ b/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx @@ -20,7 +20,7 @@ import functools import logging import os from threading import Thread -from typing import List, Mapping, Tuple, Union +from typing import AnyStr, Dict, List, Mapping, Tuple, Union from grpc_observability import _observability @@ -33,6 +33,8 @@ cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory" cdef bint GLOBAL_SHUTDOWN_EXPORT_THREAD = False cdef object GLOBAL_EXPORT_THREAD +PLUGIN_IDENTIFIER_SEP = "," + _LOGGER = logging.getLogger(__name__) @@ -114,10 +116,9 @@ def activate_config(object py_config) -> None: def activate_stats() -> None: EnablePythonCensusStats(True); - -def create_client_call_tracer(bytes method_name, bytes target, - bytes trace_id, bint registered_method, - bytes parent_span_id=b'') -> cpython.PyObject: +def create_client_call_tracer(bytes method_name, bytes target, bytes trace_id, str identifier, + dict exchange_labels, object enabled_optional_labels, + bint registered_method, bytes parent_span_id=b'') -> cpython.PyObject: """Create a ClientCallTracer and save to PyCapsule. Returns: A grpc_observability._observability.ClientCallTracerCapsule object. @@ -126,18 +127,30 @@ def create_client_call_tracer(bytes method_name, bytes target, cdef char* c_target = cpython.PyBytes_AsString(target) cdef char* c_trace_id = cpython.PyBytes_AsString(trace_id) cdef char* c_parent_span_id = cpython.PyBytes_AsString(parent_span_id) - - cdef void* call_tracer = CreateClientCallTracer(c_method, c_target, c_trace_id, c_parent_span_id, registered_method) + identifier_bytes = _encode(identifier) + cdef char* c_identifier = cpython.PyBytes_AsString(identifier_bytes) + cdef vector[Label] c_labels = _labels_to_c_labels(exchange_labels) + cdef bint add_csm_optional_labels = False + + for label_type in enabled_optional_labels: + if label_type == _observability.OptionalLabelType.XDS_SERVICE_LABELS: + add_csm_optional_labels = True + + cdef void* call_tracer = CreateClientCallTracer(c_method, c_target, c_trace_id, c_parent_span_id, + c_identifier, c_labels, add_csm_optional_labels, + registered_method) capsule = cpython.PyCapsule_New(call_tracer, CLIENT_CALL_TRACER, NULL) return capsule -def create_server_call_tracer_factory_capsule() -> cpython.PyObject: +def create_server_call_tracer_factory_capsule(dict exchange_labels, str identifier) -> cpython.PyObject: """Create a ServerCallTracerFactory and save to PyCapsule. Returns: A grpc_observability._observability.ServerCallTracerFactoryCapsule object. """ - cdef void* call_tracer_factory = CreateServerCallTracerFactory() + cdef vector[Label] c_labels = _labels_to_c_labels(exchange_labels) + cdef char* c_identifier = cpython.PyBytes_AsString(_encode(identifier)) + cdef void* call_tracer_factory = CreateServerCallTracerFactory(c_labels, c_identifier) capsule = cpython.PyCapsule_New(call_tracer_factory, SERVER_CALL_TRACER_FACTORY, NULL) return capsule @@ -151,13 +164,25 @@ def delete_client_call_tracer(object client_call_tracer) -> None: del call_tracer_ptr -def _c_label_to_labels(vector[Label] c_labels) -> Mapping[str, str]: +def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]: py_labels = {} for label in c_labels: - py_labels[_decode(label.key)] = _decode(label.value) + py_labels[_decode(label.key)] = label.value return py_labels +def _labels_to_c_labels(dict py_labels) -> vector[Label]: + cdef vector[Label] c_labels + cdef Label label + + for key, value in py_labels.items(): + label.key = _encode(key) + label.value = _encode(value) + c_labels.push_back(label) + + return c_labels + + def _c_measurement_to_measurement(object measurement ) -> Mapping[str, Union[enum, Mapping[str, Union[float, int], bool]]]: """Convert Cython Measurement to Python measurement. @@ -171,6 +196,7 @@ def _c_measurement_to_measurement(object measurement name -> cMetricsName type -> MeasurementType registered_method -> bool + include_exchange_labels -> bool value -> {value_double: float | value_int: int} """ measurement: Measurement @@ -179,6 +205,7 @@ def _c_measurement_to_measurement(object measurement py_measurement['name'] = measurement['name'] py_measurement['type'] = measurement['type'] py_measurement['registered_method'] = measurement['registered_method'] + py_measurement['include_exchange_labels'] = measurement['include_exchange_labels'] if measurement['type'] == kMeasurementDouble: py_measurement['value'] = {'value_double': measurement['value']['value_double']} else: @@ -208,7 +235,7 @@ def _cy_metric_name_to_py_metric_name(cMetricsName metric_name) -> MetricsName: raise ValueError('Invalid metric name %s' % metric_name) -def _get_stats_data(object measurement, object labels) -> _observability.StatsData: +def _get_stats_data(object measurement, object labels, object identifier) -> _observability.StatsData: """Convert a Python measurement to StatsData. Args: @@ -216,23 +243,31 @@ def _get_stats_data(object measurement, object labels) -> _observability.StatsDa with keys and values as following: name -> cMetricsName type -> MeasurementType + registered_method -> bool + include_exchange_labels -> bool value -> {value_double: float | value_int: int} - labels: Labels assciociated with stats data with type of dict[str, str]. + labels: Labels assciociated with stats data with type of Mapping[str, AnyStr]. + identifier: Specifies the plugins associated with this stats data. """ measurement: Measurement - labels: Mapping[str, str] + labels: Mapping[str, AnyStr] metric_name = _cy_metric_name_to_py_metric_name(measurement['name']) + identifiers = set(identifier.split(PLUGIN_IDENTIFIER_SEP)) if measurement['type'] == kMeasurementDouble: py_stat = _observability.StatsData(name=metric_name, measure_double=True, value_float=measurement['value']['value_double'], + labels=labels, + identifiers=identifiers, registered_method=measurement['registered_method'], - labels=labels) + include_exchange_labels=measurement['include_exchange_labels'],) else: py_stat = _observability.StatsData(name=metric_name, measure_double=False, value_int=measurement['value']['value_int'], + labels=labels, + identifiers=identifiers, registered_method=measurement['registered_method'], - labels=labels) + include_exchange_labels=measurement['include_exchange_labels'],) return py_stat @@ -253,8 +288,8 @@ def _get_tracing_data(SpanCensusData span_data, vector[Label] span_labels, span_annotations = py_span_annotations) -def _record_rpc_latency(object exporter, str method, str target, - float rpc_latency, str status_code, bint registered_method) -> None: +def _record_rpc_latency(object exporter, str method, str target, float rpc_latency, + str status_code, str identifier, bint registered_method) -> None: exporter: _observability.Exporter measurement = {} @@ -262,12 +297,13 @@ def _record_rpc_latency(object exporter, str method, str target, measurement['type'] = kMeasurementDouble measurement['value'] = {'value_double': rpc_latency} measurement['registered_method'] = registered_method + measurement['include_exchange_labels'] = False labels = {} labels[_decode(kClientMethod)] = method.strip("/") labels[_decode(kClientTarget)] = target labels[_decode(kClientStatus)] = status_code - metric = _get_stats_data(measurement, labels) + metric = _get_stats_data(measurement, labels, identifier) exporter.export_stats_data([metric]) @@ -313,8 +349,9 @@ cdef void _flush_census_data(object exporter): c_census_data = g_census_data_buffer.front() if c_census_data.type == kMetricData: py_labels = _c_label_to_labels(c_census_data.labels) + py_identifier = _decode(c_census_data.identifier) py_measurement = _c_measurement_to_measurement(c_census_data.measurement_data) - py_metric = _get_stats_data(py_measurement, py_labels) + py_metric = _get_stats_data(py_measurement, py_labels, py_identifier) py_metrics_batch.append(py_metric) else: py_span = _get_tracing_data(c_census_data.span_data, c_census_data.span_data.span_labels, @@ -344,3 +381,14 @@ cdef str _decode(bytes bytestring): except UnicodeDecodeError: _LOGGER.exception('Invalid encoding on %s', bytestring) return bytestring.decode('latin1') + + +cdef bytes _encode(object string_or_none): + if string_or_none is None: + return b'' + elif isinstance(string_or_none, (bytes,)): + return string_or_none + elif isinstance(string_or_none, (unicode,)): + return string_or_none.encode('utf8') + else: + raise TypeError('Expected str, not {}'.format(type(string_or_none))) diff --git a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py deleted file mode 100644 index 5e4b6610e2112..0000000000000 --- a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py +++ /dev/null @@ -1,167 +0,0 @@ -# Copyright 2023 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from __future__ import annotations - -import logging -import time -from typing import Any, Set - -import grpc - -# pytype: disable=pyi-error -from grpc_observability import _cyobservability -from grpc_observability import _observability_config - -_LOGGER = logging.getLogger(__name__) - -ClientCallTracerCapsule = Any # it appears only once in the function signature -ServerCallTracerFactoryCapsule = ( - Any # it appears only once in the function signature -) -grpc_observability = Any # grpc_observability.py imports this module. - -GRPC_STATUS_CODE_TO_STRING = { - grpc.StatusCode.OK: "OK", - grpc.StatusCode.CANCELLED: "CANCELLED", - grpc.StatusCode.UNKNOWN: "UNKNOWN", - grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT", - grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED", - grpc.StatusCode.NOT_FOUND: "NOT_FOUND", - grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS", - grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED", - grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED", - grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED", - grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION", - grpc.StatusCode.ABORTED: "ABORTED", - grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE", - grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED", - grpc.StatusCode.INTERNAL: "INTERNAL", - grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE", - grpc.StatusCode.DATA_LOSS: "DATA_LOSS", -} - - -# pylint: disable=no-self-use -class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): - """GCP OpenCensus based plugin implementation. - - If no exporter is passed, the default will be OpenCensus StackDriver - based exporter. - - For more details, please refer to User Guide: - * https://cloud.google.com/stackdriver/docs/solutions/grpc - - Attributes: - config: Configuration for GCP OpenCensus Observability. - exporter: Exporter used to export data. - """ - - config: _observability_config.GcpObservabilityConfig - exporter: "grpc_observability.Exporter" - _registered_method: Set[bytes] - - def __init__(self, exporter: "grpc_observability.Exporter" = None): - self.exporter = None - self.config = None - try: - self.config = _observability_config.read_config() - _cyobservability.activate_config(self.config) - except Exception as e: # pylint: disable=broad-except - raise ValueError(f"Reading configuration failed with: {e}") - - if exporter: - self.exporter = exporter - else: - raise ValueError(f"Please provide an exporter!") - - if self.config.tracing_enabled: - self.set_tracing(True) - if self.config.stats_enabled: - self.set_stats(True) - - def __enter__(self): - try: - _cyobservability.cyobservability_init(self.exporter) - # TODO(xuanwn): Use specific exceptons - except Exception as e: # pylint: disable=broad-except - _LOGGER.exception("GCPOpenCensusObservability failed with: %s", e) - - grpc._observability.observability_init(self) - return self - - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - self.exit() - - def exit(self) -> None: - # Sleep so we don't loss any data. If we shutdown export thread - # immediately after exit, it's possible that core didn't call RecordEnd - # in callTracer, and all data recorded by calling RecordEnd will be - # lost. - # CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in - # AwaitNextBatchLocked. - # TODO(xuanwn): explicit synchronization - # https://github.com/grpc/grpc/issues/33262 - time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) - self.set_tracing(False) - self.set_stats(False) - _cyobservability.observability_deinit() - grpc._observability.observability_deinit() - - def create_client_call_tracer( - self, method_name: bytes, target: bytes - ) -> ClientCallTracerCapsule: - trace_id = b"TRACE_ID" - capsule = _cyobservability.create_client_call_tracer( - method_name, - target, - trace_id, - method_name in self._registered_methods, - ) - return capsule - - def create_server_call_tracer_factory( - self, - ) -> ServerCallTracerFactoryCapsule: - capsule = _cyobservability.create_server_call_tracer_factory_capsule() - return capsule - - def delete_client_call_tracer( - self, client_call_tracer: ClientCallTracerCapsule - ) -> None: - _cyobservability.delete_client_call_tracer(client_call_tracer) - - def save_trace_context( - self, trace_id: str, span_id: str, is_sampled: bool - ) -> None: - pass - - def record_rpc_latency( - self, - method: str, - target: str, - rpc_latency: float, - status_code: grpc.StatusCode, - ) -> None: - status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") - _cyobservability._record_rpc_latency( - self.exporter, - method, - target, - rpc_latency, - status_code, - method in self._registered_methods, - ) - - def save_registered_method(self, method_name: bytes) -> None: - self._registered_methods.add(method_name) diff --git a/src/python/grpcio_observability/grpc_observability/_observability.py b/src/python/grpcio_observability/grpc_observability/_observability.py index 352a25c69d7ef..4d4d7dbb81c7f 100644 --- a/src/python/grpcio_observability/grpc_observability/_observability.py +++ b/src/python/grpcio_observability/grpc_observability/_observability.py @@ -16,7 +16,8 @@ import abc from dataclasses import dataclass from dataclasses import field -from typing import List, Mapping, Tuple +import enum +from typing import AnyStr, Dict, List, Mapping, Set, Tuple class Exporter(metaclass=abc.ABCMeta): @@ -52,18 +53,23 @@ class StatsData: value. value_int: The actual metric value if measure_double is False. value_float: The actual metric value if measure_double is True. - registered_method: Whether the method in this data is a registered method - in stubs. + include_exchange_labels: Whether this data should include exchanged labels. labels: A dictionary that maps label tags associated with this metric to corresponding label value. + identifiers: A set of strings identifying which stats plugins this StatsData + belongs to. + registered_method: Whether the method in this data is a registered method + in stubs. """ name: "grpc_observability._cyobservability.MetricsName" measure_double: bool value_int: int = 0 value_float: float = 0.0 + include_exchange_labels: bool = False + labels: Dict[str, AnyStr] = field(default_factory=dict) + identifiers: Set[str] = field(default_factory=set) registered_method: bool = False - labels: Mapping[str, str] = field(default_factory=dict) @dataclass(frozen=True) @@ -102,5 +108,12 @@ class TracingData: status: str should_sample: bool child_span_count: int - span_labels: Mapping[str, str] = field(default_factory=dict) + span_labels: Mapping[str, AnyStr] = field(default_factory=dict) span_annotations: List[Tuple[str, str]] = field(default_factory=list) + + +@enum.unique +class OptionalLabelType(enum.Enum): + """What kinds of optional labels to add to metrics.""" + + XDS_SERVICE_LABELS = "kXdsServiceLabels" diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py index 209ecd22c916b..99a2fd94ce87b 100644 --- a/src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py @@ -87,7 +87,8 @@ def base_metrics() -> List[Metric]: return [ CLIENT_ATTEMPT_STARTED, CLIENT_ATTEMPT_DURATION, - CLIENT_RPC_DURATION, + # CLIENT_RPC_DURATION is not required yet + # CLIENT_RPC_DURATION, CLIENT_ATTEMPT_SEND_BYTES, CLIENT_ATTEMPT_RECEIVED_BYTES, SERVER_STARTED_RPCS, diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py index 24eee8e062ff8..b382bbea1fb37 100644 --- a/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py @@ -15,7 +15,7 @@ import logging import threading import time -from typing import Any, Dict, Iterable, List, Optional, Set, Union +from typing import Any, AnyStr, Dict, Iterable, List, Optional, Set, Union import grpc @@ -24,6 +24,8 @@ from grpc_observability import _observability from grpc_observability import _open_telemetry_measures from grpc_observability._cyobservability import MetricsName +from grpc_observability._cyobservability import PLUGIN_IDENTIFIER_SEP +from grpc_observability._observability import OptionalLabelType from grpc_observability._observability import StatsData from opentelemetry.metrics import Counter from opentelemetry.metrics import Histogram @@ -37,9 +39,13 @@ ) grpc_observability = Any # grpc_observability.py imports this module. OpenTelemetryPlugin = Any # _open_telemetry_plugin.py imports this module. +OpenTelemetryPluginOption = ( + Any # _open_telemetry_plugin.py imports this module. +) GRPC_METHOD_LABEL = "grpc.method" GRPC_TARGET_LABEL = "grpc.target" +GRPC_CLIENT_METRIC_PREFIX = "grpc.client" GRPC_OTHER_LABEL_VALUE = "other" _observability_lock: threading.RLock = threading.RLock() _OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None @@ -68,10 +74,16 @@ class _OpenTelemetryPlugin: _plugin: OpenTelemetryPlugin _metric_to_recorder: Dict[MetricsName, Union[Counter, Histogram]] + _enabled_client_plugin_options: Optional[List[OpenTelemetryPluginOption]] + _enabled_server_plugin_options: Optional[List[OpenTelemetryPluginOption]] + identifier: str def __init__(self, plugin: OpenTelemetryPlugin): self._plugin = plugin self._metric_to_recorder = dict() + self.identifier = str(id(self)) + self._enabled_client_plugin_options = None + self._enabled_server_plugin_options = None meter_provider = self._plugin.meter_provider if meter_provider: @@ -87,16 +99,38 @@ def _should_record(self, stats_data: StatsData) -> bool: def _record_stats_data(self, stats_data: StatsData) -> None: recorder = self._metric_to_recorder[stats_data.name] + enabled_plugin_options = [] + if GRPC_CLIENT_METRIC_PREFIX in recorder.name: + enabled_plugin_options = self._enabled_client_plugin_options + else: + enabled_plugin_options = self._enabled_server_plugin_options + # Only deserialize labels if we need add exchanged labels. + if stats_data.include_exchange_labels: + deserialized_labels = self._deserialize_labels( + stats_data.labels, enabled_plugin_options + ) + else: + deserialized_labels = stats_data.labels + labels = self._maybe_add_labels( + stats_data.include_exchange_labels, + deserialized_labels, + enabled_plugin_options, + ) + decoded_labels = self.decode_labels(labels) - target = stats_data.labels.get(GRPC_TARGET_LABEL, "") + target = decoded_labels.get(GRPC_TARGET_LABEL, "") if not self._plugin.target_attribute_filter(target): # Filter target name. - stats_data.labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE + decoded_labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE - method = stats_data.labels.get(GRPC_METHOD_LABEL, "") - if not self._plugin.generic_method_attribute_filter(method): - # Filter method name. - stats_data.labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE + method = decoded_labels.get(GRPC_METHOD_LABEL, "") + if not ( + stats_data.registered_method + or self._plugin.generic_method_attribute_filter(method) + ): + # Filter method name if it's not registered method and + # generic_method_attribute_filter returns false. + decoded_labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE value = 0 if stats_data.measure_double: @@ -104,18 +138,109 @@ def _record_stats_data(self, stats_data: StatsData) -> None: else: value = stats_data.value_int if isinstance(recorder, Counter): - recorder.add(value, attributes=stats_data.labels) + recorder.add(value, attributes=decoded_labels) elif isinstance(recorder, Histogram): - recorder.record(value, attributes=stats_data.labels) + recorder.record(value, attributes=decoded_labels) - # pylint: disable=no-self-use def maybe_record_stats_data(self, stats_data: List[StatsData]) -> None: # Records stats data to MeterProvider. if self._should_record(stats_data): self._record_stats_data(stats_data) + def get_client_exchange_labels(self) -> Dict[str, AnyStr]: + """Get labels used for client side Metadata Exchange.""" + + labels_for_exchange = {} + for plugin_option in self._enabled_client_plugin_options: + if hasattr(plugin_option, "get_label_injector") and hasattr( + plugin_option.get_label_injector(), "get_labels_for_exchange" + ): + labels_for_exchange.update( + plugin_option.get_label_injector().get_labels_for_exchange() + ) + return labels_for_exchange + + def get_server_exchange_labels(self) -> Dict[str, str]: + """Get labels used for server side Metadata Exchange.""" + labels_for_exchange = {} + for plugin_option in self._enabled_server_plugin_options: + if hasattr(plugin_option, "get_label_injector") and hasattr( + plugin_option.get_label_injector(), "get_labels_for_exchange" + ): + labels_for_exchange.update( + plugin_option.get_label_injector().get_labels_for_exchange() + ) + return labels_for_exchange + + def activate_client_plugin_options(self, target: bytes) -> None: + """Activate client plugin options based on option settings.""" + target_str = target.decode("utf-8", "replace") + if not self._enabled_client_plugin_options: + self._enabled_client_plugin_options = [] + for plugin_option in self._plugin.plugin_options: + if hasattr( + plugin_option, "is_active_on_client_channel" + ) and plugin_option.is_active_on_client_channel(target_str): + self._enabled_client_plugin_options.append(plugin_option) + + def activate_server_plugin_options(self, xds: bool) -> None: + """Activate server plugin options based on option settings.""" + if not self._enabled_server_plugin_options: + self._enabled_server_plugin_options = [] + for plugin_option in self._plugin.plugin_options: + if hasattr( + plugin_option, "is_active_on_server" + ) and plugin_option.is_active_on_server(xds): + self._enabled_server_plugin_options.append(plugin_option) + + @staticmethod + def _deserialize_labels( + labels: Dict[str, AnyStr], + enabled_plugin_options: List[OpenTelemetryPluginOption], + ) -> Dict[str, AnyStr]: + for plugin_option in enabled_plugin_options: + if all( + [ + hasattr(plugin_option, "get_label_injector"), + hasattr( + plugin_option.get_label_injector(), "deserialize_labels" + ), + ] + ): + labels = plugin_option.get_label_injector().deserialize_labels( + labels + ) + return labels + + @staticmethod + def _maybe_add_labels( + include_exchange_labels: bool, + labels: Dict[str, str], + enabled_plugin_options: List[OpenTelemetryPluginOption], + ) -> Dict[str, AnyStr]: + for plugin_option in enabled_plugin_options: + if all( + [ + hasattr(plugin_option, "get_label_injector"), + hasattr( + plugin_option.get_label_injector(), + "get_additional_labels", + ), + ] + ): + labels.update( + plugin_option.get_label_injector().get_additional_labels( + include_exchange_labels + ) + ) + return labels + + def get_enabled_optional_labels(self) -> List[OptionalLabelType]: + return self._plugin._get_enabled_optional_labels() + + @staticmethod def _register_metrics( - self, meter: Meter, metrics: List[_open_telemetry_measures.Metric] + meter: Meter, metrics: List[_open_telemetry_measures.Metric] ) -> Dict[MetricsName, Union[Counter, Histogram]]: metric_to_recorder_map = {} recorder = None @@ -179,6 +304,15 @@ def _register_metrics( metric_to_recorder_map[metric.cyname] = recorder return metric_to_recorder_map + @staticmethod + def decode_labels(labels: Dict[str, AnyStr]) -> Dict[str, str]: + decoded_labels = {} + for key, value in labels.items(): + if isinstance(value, bytes): + value = value.decode() + decoded_labels[key] = value + return decoded_labels + def start_open_telemetry_observability( *, @@ -220,19 +354,25 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): This is class is part of an EXPERIMENTAL API. Args: - plugin: _OpenTelemetryPlugin to enable. + plugins: _OpenTelemetryPlugins to enable. """ - exporter: "grpc_observability.Exporter" + _exporter: "grpc_observability.Exporter" + _plugins: List[_OpenTelemetryPlugin] _registered_method: Set[bytes] + _client_option_activated: bool + _server_option_activated: bool def __init__( self, *, plugins: Optional[Iterable[_OpenTelemetryPlugin]], ): - self.exporter = _OpenTelemetryExporterDelegator(plugins) + self._exporter = _OpenTelemetryExporterDelegator(plugins) self._registered_methods = set() + self._plugins = plugins + self._client_option_activated = False + self._server_option_activated = False def observability_init(self): try: @@ -242,7 +382,7 @@ def observability_init(self): raise ValueError(f"Activate observability metrics failed with: {e}") try: - _cyobservability.cyobservability_init(self.exporter) + _cyobservability.cyobservability_init(self._exporter) # TODO(xuanwn): Use specific exceptons except Exception as e: # pylint: disable=broad-except _LOGGER.exception("Initiate observability failed with: %s", e) @@ -268,18 +408,34 @@ def create_client_call_tracer( self, method_name: bytes, target: bytes ) -> ClientCallTracerCapsule: trace_id = b"TRACE_ID" - registered_method = False - if method_name in self._registered_methods: - registered_method = True + self._maybe_activate_client_plugin_options(target) + exchange_labels = self._get_client_exchange_labels() + enabled_optional_labels = set() + for plugin in self._plugins: + enabled_optional_labels.update(plugin.get_enabled_optional_labels()) + capsule = _cyobservability.create_client_call_tracer( - method_name, target, trace_id, registered_method + method_name, + target, + trace_id, + self._get_identifier(), + exchange_labels, + enabled_optional_labels, + method_name in self._registered_methods, ) return capsule def create_server_call_tracer_factory( self, - ) -> ServerCallTracerFactoryCapsule: - capsule = _cyobservability.create_server_call_tracer_factory_capsule() + *, + xds: bool = False, + ) -> Optional[ServerCallTracerFactoryCapsule]: + capsule = None + self._maybe_activate_server_plugin_options(xds) + exchange_labels = self._get_server_exchange_labels() + capsule = _cyobservability.create_server_call_tracer_factory_capsule( + exchange_labels, self._get_identifier() + ) return capsule def delete_client_call_tracer( @@ -300,22 +456,53 @@ def record_rpc_latency( status_code: grpc.StatusCode, ) -> None: status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") - registered_method = False encoded_method = method.encode("utf8") - if encoded_method in self._registered_methods: - registered_method = True _cyobservability._record_rpc_latency( - self.exporter, + self._exporter, method, target, rpc_latency, status_code, - registered_method, + self._get_identifier(), + encoded_method in self._registered_methods, ) def save_registered_method(self, method_name: bytes) -> None: self._registered_methods.add(method_name) + def _get_client_exchange_labels(self) -> Dict[str, AnyStr]: + client_exchange_labels = {} + for _plugin in self._plugins: + client_exchange_labels.update(_plugin.get_client_exchange_labels()) + return client_exchange_labels + + def _get_server_exchange_labels(self) -> Dict[str, AnyStr]: + server_exchange_labels = {} + for _plugin in self._plugins: + server_exchange_labels.update(_plugin.get_server_exchange_labels()) + return server_exchange_labels + + def _maybe_activate_client_plugin_options(self, target: bytes) -> None: + if not self._client_option_activated: + for _plugin in self._plugins: + _plugin.activate_client_plugin_options(target) + self._client_option_activated = True + + def _maybe_activate_server_plugin_options(self, xds: bool) -> None: + if not self._server_option_activated: + for _plugin in self._plugins: + _plugin.activate_server_plugin_options(xds) + self._server_option_activated = True + + def _get_identifier(self) -> str: + plugin_identifiers = [] + for _plugin in self._plugins: + plugin_identifiers.append(_plugin.identifier) + return PLUGIN_IDENTIFIER_SEP.join(plugin_identifiers) + + def get_enabled_optional_labels(self) -> List[OptionalLabelType]: + return [] + def _start_open_telemetry_observability( otel_o11y: OpenTelemetryObservability, diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py index 16c89d9594e0b..b0c7e9841e393 100644 --- a/src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py @@ -12,70 +12,79 @@ # See the License for the specific language governing permissions and # limitations under the License. -import abc -from typing import Callable, Dict, Iterable, List, Optional +from typing import AnyStr, Callable, Dict, Iterable, List, Optional # pytype: disable=pyi-error from grpc_observability import _open_telemetry_observability +from grpc_observability._observability import OptionalLabelType from opentelemetry.metrics import MeterProvider +GRPC_METHOD_LABEL = "grpc.method" +GRPC_TARGET_LABEL = "grpc.target" +GRPC_CLIENT_METRIC_PREFIX = "grpc.client" +GRPC_OTHER_LABEL_VALUE = "other" -class OpenTelemetryLabelInjector(abc.ABC): + +class OpenTelemetryLabelInjector: """ An interface that allows you to add additional labels on the calls traced. - - Please note that this class is still work in progress and NOT READY to be used. """ - _labels: List[Dict[str, str]] - - def __init__(self): - # Calls Python OTel API to detect resource and get labels, save - # those lables to OpenTelemetryLabelInjector.labels. - pass + def get_labels_for_exchange(self) -> Dict[str, AnyStr]: + """ + Get labels used for metadata exchange. - @abc.abstractmethod - def get_labels(self): - # Get additional labels for this OpenTelemetryLabelInjector. + Returns: + A dict of labels, with a string as key representing label name, string or bytes + as value representing label value. + """ raise NotImplementedError() + def get_additional_labels( + self, include_exchange_labels: bool + ) -> Dict[str, str]: + """ + Get additional labels added by this injector. -class OpenTelemetryPluginOption(abc.ABC): - """ - An interface that allows you to add additional function to OpenTelemetryPlugin. - - Please note that this class is still work in progress and NOT READY to be used. - """ - - @abc.abstractmethod - def is_active_on_method(self, method: str) -> bool: - """Determines whether this plugin option is active on a given method. + The return value from this method will be added directly to metric data. Args: - method: Required. The RPC method, for example: `/helloworld.Greeter/SayHello`. + include_exchange_labels: Whether to add additional metadata exchange related labels. Returns: - True if this this plugin option is active on the giving method, false otherwise. + A dict of labels. """ raise NotImplementedError() - @abc.abstractmethod - def is_active_on_server(self, channel_args: List[str]) -> bool: - """Determines whether this plugin option is active on a given server. + # pylint: disable=no-self-use + def deserialize_labels( + self, labels: Dict[str, AnyStr] + ) -> Dict[str, AnyStr]: + """ + Deserialize the labels if required. - Args: - channel_args: Required. The channel args used for server. - TODO(xuanwn): detail on what channel_args will contain. + If this injector added labels for metadata exchange, this method will be called to + deserialize the exchanged labels. + + For example, if this injector added xds_peer_metadata_label for exchange: + + labels: {"labelA": b"valueA", "xds_peer_metadata_label": b"exchanged_bytes"} + + This method should deserialize xds_peer_metadata_label and return labels as: + + labels: {"labelA": b"valueA", "xds_label_A": "xds_label_A", + "xds_label_B": "xds_label_B"} Returns: - True if this this plugin option is active on the server, false otherwise. + A dict of deserialized labels. """ - raise NotImplementedError() + return labels - @abc.abstractmethod - def get_label_injector(self) -> Optional[OpenTelemetryLabelInjector]: - # Returns the LabelsInjector used by this plugin option, or None. - raise NotImplementedError() + +class OpenTelemetryPluginOption: + """ + An interface that allows you to add additional function to OpenTelemetryPlugin. + """ # pylint: disable=no-self-use @@ -86,7 +95,7 @@ class OpenTelemetryPlugin: meter_provider: Optional[MeterProvider] target_attribute_filter: Callable[[str], bool] generic_method_attribute_filter: Callable[[str], bool] - _plugin: _open_telemetry_observability._OpenTelemetryPlugin + _plugins: List[_open_telemetry_observability._OpenTelemetryPlugin] def __init__( self, @@ -120,17 +129,15 @@ def __init__( """ self.plugin_options = plugin_options self.meter_provider = meter_provider - if target_attribute_filter: - self.target_attribute_filter = target_attribute_filter - else: - self.target_attribute_filter = lambda target: True - if generic_method_attribute_filter: - self.generic_method_attribute_filter = ( - generic_method_attribute_filter - ) - else: - self.generic_method_attribute_filter = lambda method: False - self._plugin = _open_telemetry_observability._OpenTelemetryPlugin(self) + self.target_attribute_filter = target_attribute_filter or ( + lambda target: True + ) + self.generic_method_attribute_filter = ( + generic_method_attribute_filter or (lambda target: False) + ) + self._plugins = [ + _open_telemetry_observability._OpenTelemetryPlugin(self) + ] def register_global(self) -> None: """ @@ -140,7 +147,7 @@ def register_global(self) -> None: RuntimeError: If a global plugin was already registered. """ _open_telemetry_observability.start_open_telemetry_observability( - plugins=[self._plugin] + plugins=self._plugins ) def deregister_global(self) -> None: @@ -154,8 +161,11 @@ def deregister_global(self) -> None: def __enter__(self) -> None: _open_telemetry_observability.start_open_telemetry_observability( - plugins=[self._plugin] + plugins=self._plugins ) def __exit__(self, exc_type, exc_val, exc_tb) -> None: _open_telemetry_observability.end_open_telemetry_observability() + + def _get_enabled_optional_labels(self) -> List[OptionalLabelType]: + return [] diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc index 5d31619b949aa..9d9ca694be143 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc @@ -22,6 +22,7 @@ #include "absl/strings/str_cat.h" #include "absl/time/clock.h" #include "constants.h" +#include "metadata_exchange.h" #include "observability_util.h" #include "python_observability_context.h" @@ -42,10 +43,15 @@ constexpr uint32_t PythonOpenCensusCallTracer::PythonOpenCensusCallTracer( const char* method, const char* target, const char* trace_id, - const char* parent_span_id, bool tracing_enabled, bool registered_method) + const char* parent_span_id, const char* identifier, + const std::vector