Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
OSS_ACCESS_KEY_SECRET: ${{ secrets.OSS_ACCESS_KEY_SECRET }}
TRACE_TOKEN: ${{ secrets.TRACE_TOKEN }}
DISABLE_REDIS_CACHE_IN_TESTS: "true"
ENCRYPTION_KEY: ${{ secrets.ENCRYPTION_KEY }}

- name: Get Cover
uses: orgoro/[email protected]
Expand Down
2 changes: 2 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def create_app():
from app.log_middleware import CustomLoggingMiddleware
from fastapi.middleware.cors import CORSMiddleware
from api.request_validate_exception import validation_exception_handler
from extensions.trace.base import setup_propagator

app = FastAPI(lifespan=lifespan)
add_config_router(app)
Expand All @@ -98,6 +99,7 @@ def create_app():
allow_credentials=False,
)
app.add_middleware(CustomLoggingMiddleware)
setup_propagator(app)
app.add_exception_handler(ApiException, api_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
return app
Expand Down
1 change: 1 addition & 0 deletions backend/common/tool/search_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


class SearchResult(BaseModel):
id: str | None = None
title: str | None = None
content: str | None = None
url: str | None = None
Expand Down
4 changes: 2 additions & 2 deletions backend/db/models/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ class TraceModel(SQLModel):
class TraceModelEntity(TraceModel, table=True):
__tablename__ = "pai_trace_config"

id: str = Field(default=lambda x: str(uuid.uuid4().hex), primary_key=True)
id: str = Field(default_factory=lambda x: str(uuid.uuid4().hex), primary_key=True)
def is_enabled(self) -> bool:
return self.enabled and self.service_name and self.token and self.endpoint
return self.enabled and self.service_name and self.endpoint
133 changes: 133 additions & 0 deletions backend/extensions/trace/baggage_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, Set

from opentelemetry.baggage import get_all as get_all_baggage
from opentelemetry.context import Context
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.trace import Span


DEFAULT_ALLOWED_PREFIXES = set(["traffic.llm_sdk."])
DEFAULT_STRIP_PREFIXES = set(["traffic.llm_sdk."])


class LoongSuiteBaggageSpanProcessor(SpanProcessor):
"""
LoongSuite Baggage Span Processor

Reads Baggage entries from the parent context and adds matching baggage
key-value pairs to span attributes based on configured prefix matching rules.

Supported features:
1. Prefix matching: Only process baggage keys that match specified prefixes
2. Prefix stripping: Remove specified prefixes before writing to attributes

Example:
# Configure matching prefixes: "traffic.", "app."
# Configure stripping prefix: "traffic."
# baggage: traffic.hello_key = "value"
# Result: attributes will have hello_key = "value" (prefix stripped)

# baggage: app.user_id = "123"
# Result: attributes will have app.user_id = "123" (app. prefix not stripped)

⚠ Warning ⚠️

Do not put sensitive information in Baggage.

To repeat: a consequence of adding data to Baggage is that the keys and
values will appear in all outgoing HTTP headers from the application.
"""

def __init__(
self,
allowed_prefixes: Optional[Set[str]] = DEFAULT_ALLOWED_PREFIXES,
strip_prefixes: Optional[Set[str]] = DEFAULT_STRIP_PREFIXES,
) -> None:
"""
Initialize LoongSuite Baggage Span Processor

Args:
allowed_prefixes: Set of allowed baggage key prefixes. If None or empty,
all baggage keys are allowed. If specified, only keys
matching these prefixes will be processed.
strip_prefixes: Set of prefixes to strip. If a baggage key matches these
prefixes, they will be removed before writing to attributes.
"""
self._allowed_prefixes = allowed_prefixes or []
self._strip_prefixes = strip_prefixes or set()

# If allowed_prefixes is empty, allow all prefixes
self._allow_all = len(self._allowed_prefixes) == 0

def _should_process_key(self, key: str) -> bool:
"""
Determine whether this baggage key should be processed

Args:
key: baggage key

Returns:
True if the key should be processed, False otherwise
"""
if self._allow_all:
return True

# Check if key matches any of the allowed prefixes
for prefix in self._allowed_prefixes:
if key.startswith(prefix):
return True

return False

def _strip_prefix(self, key: str) -> str:
"""
Strip matching prefix from key

Args:
key: original baggage key

Returns:
key with prefix stripped
"""
for prefix in self._strip_prefixes:
if key.startswith(prefix):
return key[len(prefix) :]
return key

def on_start(
self, span: "Span", parent_context: Optional[Context] = None
) -> None:
"""
Called when a span starts, adds matching baggage entries to span attributes

Args:
span: span to add attributes to
parent_context: parent context used to retrieve baggage
"""
baggage = get_all_baggage(parent_context)

for key, value in baggage.items():
# Check if this key should be processed
if not self._should_process_key(key):
continue

# Strip prefix if needed
attribute_key = self._strip_prefix(key)

# Add to span attributes
# Baggage values are strings, which are valid AttributeValue
span.set_attribute(attribute_key, value) # type: ignore[arg-type]
56 changes: 42 additions & 14 deletions backend/extensions/trace/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import socket
from functools import wraps
from typing import Callable, AsyncGenerator
from typing import Callable, AsyncGenerator, Union
from loguru import logger

from opentelemetry import trace
Expand All @@ -14,21 +14,41 @@
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
SimpleSpanProcessor,
)
from opentelemetry.trace import Span
from opentelemetry.context import attach, detach
from openinference.instrumentation.openai import OpenAIInstrumentor
from openinference.semconv.trace import SpanAttributes, MessageAttributes, MessageContentAttributes

from extensions.trace.reloadable_exporter import ReloadableOTLPSpanExporter
from extensions.trace.grpc_exporter import ReloadableGrpcOTLPSpanExporter
from extensions.trace.http_exporter import ReloadableHttpOTLPSpanExporter
from extensions.trace import context as trace_context
from extensions.trace.trace_config import TraceConfig
from opentelemetry.propagate import set_global_textmap
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from opentelemetry.propagators.composite import CompositePropagator
from extensions.trace.baggage_processor import LoongSuiteBaggageSpanProcessor
from extensions.trace.trace_context_middleware import TraceContextMiddleware


ENABLE_TRACE_DEBUG = os.getenv("ENABLE_TRACE_DEBUG", "false").lower() in ["true", "1", "yes", "y"]


def setup_propagator(app):
set_global_textmap(CompositePropagator([
TraceContextTextMapPropagator(), # 处理 traceparent
W3CBaggagePropagator() # 处理 baggage
]))
app.add_middleware(TraceContextMiddleware)


# trace_provider为singleton, 不支持覆盖,故修改trace配置时,默认覆盖exporter和resource
# 这样如果用户填错密码,还可以成功刷新
trace_config: TraceConfig = None
exporter: ReloadableOTLPSpanExporter = None
exporter: Union[ReloadableGrpcOTLPSpanExporter, ReloadableHttpOTLPSpanExporter] = None
resource: Resource = None
trace_provider: TracerProvider = None

Expand All @@ -49,17 +69,13 @@ def init_instrument(config: TraceConfig):
if config.user_args:
trace_context.init_custom_context(config.user_args.values())

grpc_endpoint = config.endpoint
trace_endpoint = config.endpoint
token = config.token
service_name = config.service_name
service_app_name = config.service_name

attributes = {SERVICE_NAME: service_name, HOST_NAME: socket.gethostname()}

if not token:
logger.error("token not provided in trace config.")
raise ValueError("token must be provided!")

attributes["service.app.name"] = service_app_name

# ToDo: change to adaptive versioning
Expand All @@ -73,19 +89,31 @@ def init_instrument(config: TraceConfig):

global exporter
if exporter is None:
exporter = ReloadableOTLPSpanExporter(
endpoint=grpc_endpoint, headers=(f"Authentication={token}")
)
if config.exporter_type == "grpc":
logger.info(f"Use grpc exporter: {trace_endpoint}")
exporter = ReloadableGrpcOTLPSpanExporter(
endpoint=trace_endpoint, headers=(f"Authentication={token}")
)
elif config.exporter_type == "http":
logger.info(f"Use http exporter: {trace_endpoint}")
exporter = ReloadableHttpOTLPSpanExporter(
endpoint=trace_endpoint, headers=(f"Authentication={token}")
)
else:
raise ValueError(f"Invalid exporter type: {config.exporter_type}")
else:
exporter.reload(endpoint=grpc_endpoint, headers=(f"Authentication={token}"))
exporter.reload(endpoint=trace_endpoint, headers=(f"Authentication={token}"))

global trace_provider
if trace_provider is None:
span_processor = BatchSpanProcessor(exporter)
trace_provider = TracerProvider(
resource=resource, active_span_processor=span_processor
resource=resource
)

trace_provider.add_span_processor(span_processor)
trace_provider.add_span_processor(LoongSuiteBaggageSpanProcessor())
if ENABLE_TRACE_DEBUG:
trace_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(trace_provider)

OpenAIInstrumentor().instrument()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@


# pylint: disable=no-member
class ReloadableOTLPSpanExporter(OTLPSpanExporter):
class ReloadableGrpcOTLPSpanExporter(OTLPSpanExporter):
# pylint: disable=unsubscriptable-object
"""OTLP span exporter

Expand Down
83 changes: 83 additions & 0 deletions backend/extensions/trace/http_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright The OpenTelemetry Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""OTLP Span Exporter"""

from os import environ
from typing import Dict, Optional
from urllib.parse import urlparse
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_HEADERS,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_HEADERS,
)
from opentelemetry.util.re import parse_env_headers


# pylint: disable=no-member
class ReloadableHttpOTLPSpanExporter(OTLPSpanExporter):
# pylint: disable=unsubscriptable-object
"""OTLP span exporter

Args:
endpoint: OpenTelemetry Collector receiver endpoint
insecure: Connection type
credentials: Credentials object for server authentication
headers: Headers to send when exporting
timeout: Backend request timeout in seconds
compression: gRPC compression method to use
"""
def __init__(
self,
endpoint: Optional[str] = None,
headers: Dict[str, str] = None,
timeout: Optional[int] = None,
):
endpoint = endpoint or environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)

assert endpoint, "endpoint is required"
endpoint = endpoint.rstrip('/')

if not endpoint.endswith("/v1/traces"):
endpoint = f"{endpoint}/v1/traces"

headers = headers or environ.get(OTEL_EXPORTER_OTLP_TRACES_HEADERS)
if isinstance(headers, str):
headers = parse_env_headers(headers, liberal=True)

super().__init__(
endpoint=endpoint,
headers=headers,
timeout=timeout,
)

def reload(
self,
endpoint: Optional[str] = None,
headers: Dict[str, str] | str = None,
):
self._endpoint = endpoint

parsed_url = urlparse(self._endpoint)

if parsed_url.netloc:
self._endpoint = parsed_url.netloc

self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS)
if isinstance(self._headers, str):
self._headers = parse_env_headers(self._headers, liberal=True)

self._session.headers.update(self._headers)
1 change: 0 additions & 1 deletion backend/extensions/trace/pai_agent_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.status import Status, StatusCode
from openinference.semconv.trace import SpanAttributes, OpenInferenceSpanKindValues

from extensions.trace import context as trace_context
from extensions.trace.utils import pydantic_to_dict

Expand Down
Loading
Loading