diff --git a/lazyllm/__init__.py b/lazyllm/__init__.py index 021b21994..3dcd2bcb3 100644 --- a/lazyllm/__init__.py +++ b/lazyllm/__init__.py @@ -17,6 +17,7 @@ ServerModule, TrialModule, register as module_register, OnlineModule, OnlineChatModule, OnlineEmbeddingModule, AutoModel, OnlineMultiModalModule) from .hook import LazyLLMHook, LazyLLMFuncHook +from .tracing import TracingSetupError, get_trace_context, set_trace_context from .prompt_templates import ActorPrompt, DataPrompt from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -89,6 +90,11 @@ def __getattr__(name: str): 'LazyLLMHook', 'LazyLLMFuncHook', + # tracing + 'TracingSetupError', + 'get_trace_context', + 'set_trace_context', + # tools 'Document', 'Retriever', diff --git a/lazyllm/common/globals.py b/lazyllm/common/globals.py index 1b2ce380c..50f75b8c7 100644 --- a/lazyllm/common/globals.py +++ b/lazyllm/common/globals.py @@ -127,8 +127,16 @@ def __reduce__(self): class Globals(metaclass=SingletonABCMeta): - __global_attrs__ = ThreadSafeDict(user_id=None, chat_history={}, global_parameters={}, - lazyllm_files={}, usage={}, config={}, call_stack=[]) + __global_attrs__ = ThreadSafeDict( + user_id=None, + chat_history={}, + global_parameters={}, + lazyllm_files={}, + usage={}, + trace={}, + config={}, + call_stack=[] + ) def __new__(cls, *args, **kw): if cls is not Globals: return super().__new__(cls) diff --git a/lazyllm/configs.py b/lazyllm/configs.py index 0f261629d..a4448f4b2 100644 --- a/lazyllm/configs.py +++ b/lazyllm/configs.py @@ -244,8 +244,8 @@ def prefix(self): return self._config._prefix -config = _NamespaceConfig().add('mode', Mode, Mode.Normal, dict(DISPLAY=Mode.Display, DEBUG=Mode.Debug), - description='The default mode for LazyLLM.' +config = (_NamespaceConfig().add('mode', Mode, Mode.Normal, dict(DISPLAY=Mode.Display, DEBUG=Mode.Debug), + description='The default mode for LazyLLM.' ).add('repr_ml', bool, False, 'REPR_USE_ML', description='Whether to use Markup Language for repr.' ).add('repr_show_child', bool, False, 'REPR_SHOW_CHILD', description='Whether to show child modules in repr.' @@ -263,7 +263,8 @@ def prefix(self): description='Whether to skip check keywords for deployment.' ).add('allow_internal_network', bool, False, 'ALLOW_INTERNAL_NETWORK', description='Whether to allow loading images from internal network addresses. ' - 'Set to False for security in production environments.') + 'Set to False for security in production environments.' + )) def refresh_config(key): if key in Config: diff --git a/lazyllm/docs/__init__.py b/lazyllm/docs/__init__.py index 48ac2bc96..f21ecb08e 100644 --- a/lazyllm/docs/__init__.py +++ b/lazyllm/docs/__init__.py @@ -3,8 +3,10 @@ config.add('init_doc', bool, False, 'INIT_DOC', description='whether to init docs') if config['init_doc'] and (add_doc.__doc__ is None or 'Add document' not in add_doc.__doc__): - from . import common, components, configs, flow, hook, launcher, module, patch, prompt_template, tools, utils # noqa F401 - del common, components, configs, flow, hook, launcher, module, patch, prompt_template, tools, utils + from . import common, components, configs, flow, hook, launcher # noqa: F401 + from . import module, patch, prompt_template, tools, tracing, utils # noqa: F401 + del (common, components, configs, flow, hook, launcher, module, + patch, prompt_template, tools, tracing, utils) __all__ = [ 'add_doc' diff --git a/lazyllm/docs/hook.py b/lazyllm/docs/hook.py index 5a3eb7e6a..26054c8b0 100644 --- a/lazyllm/docs/hook.py +++ b/lazyllm/docs/hook.py @@ -85,14 +85,196 @@ output: The return value of the monitored function. ''') -add_chinese_doc('LazyLLMHook.report', '''\ -生成钩子的执行报告。 +add_chinese_doc('LazyLLMHook.on_error', '''\ +异常处理钩子,在被监控函数执行抛出异常时调用。 -这是一个抽象方法,需要在子类中实现。 +默认实现为空操作,子类可以按需覆盖,用于记录错误状态、补充诊断信息或执行清理逻辑。 + +Args: + exc: 被监控函数抛出的异常对象。 ''') -add_english_doc('LazyLLMHook.report', '''\ -Generate a report of the hook execution. +add_english_doc('LazyLLMHook.on_error', '''\ +Error-handling hook, called when the monitored function raises an exception. -This is an abstract method and must be implemented in subclasses. +The default implementation is a no-op. Subclasses can override it to record error status, +attach diagnostic information, or perform cleanup. + +Args: + exc: The exception raised by the monitored function. ''') + +add_chinese_doc('LazyLLMHook.finalize', '''\ +执行 hook 生命周期的最终收尾逻辑。 + +该方法会在 hook 生命周期结束时调用,适合执行资源释放、收尾处理或最终上报。 + +这是推荐使用的最终阶段接口。为兼容旧实现,``report`` 仍然可用,并会被视为 +``finalize`` 的历史别名。 +''') + +add_english_doc('LazyLLMHook.finalize', '''\ +Run the final cleanup logic of the hook lifecycle. + +This method is called at the end of the hook lifecycle and is intended for cleanup, +resource release, or final reporting. + +This is the preferred final-phase interface. For backward compatibility, ``report`` is +still supported and treated as a legacy alias of ``finalize``. +''') + +add_chinese_doc('HookPhaseError', '''\ +Hook 阶段错误,当一个 hook 阶段中有一个或多个 strict 模式的 hook 执行失败时抛出。 + +Args: + phase (str): 发生错误的 hook 阶段名称(如 ``'post_hook'``、``'on_error'``、``'finalize'``)。 + errors: 包含 ``(hook_obj, exception)`` 元组的序列,记录所有失败的 hook 及其异常。 +''') + +add_english_doc('HookPhaseError', '''\ +Raised when one or more strict-mode hooks fail during a hook phase. + +Args: + phase (str): The name of the hook phase where the error(s) occurred (e.g. ``'post_hook'``, ``'on_error'``, ``'finalize'``). + errors: A sequence of ``(hook_obj, exception)`` tuples recording each failed hook and its exception. +''') + +add_chinese_doc('register_builtin_hook_provider', '''\ +注册一个内建 hook provider。 + +provider 接收一个待初始化的对象(如 flow 或 module),并返回应自动注册到该对象上的 +hook 类型或 hook 实例列表。该机制用于让 tracing 等子系统在不侵入通用 hook 框架的前提下, +动态参与默认 hook 注册。 + +Args: + provider: 一个可调用对象,签名形如 ``provider(obj) -> list``。 +''') + +add_english_doc('register_builtin_hook_provider', '''\ +Register a built-in hook provider. + +A provider receives the object being initialized (for example, a flow or module) and +returns a list of hook types or hook instances that should be automatically registered +on that object. This allows subsystems such as tracing to participate in default hook +registration without coupling the generic hook framework to a specific feature. + +Args: + provider: A callable with a signature like ``provider(obj) -> list``. +''') + +add_chinese_doc('resolve_builtin_hooks', '''\ +解析并收集当前对象应自动注册的内建 hooks。 + +该函数会依次调用所有已注册的内建 hook provider,并合并它们返回的 hook 列表。 + +Args: + obj: 当前待初始化的对象(如 flow 或 module)。 + +Returns: + list: 需要注册到该对象上的 hook 类型或 hook 实例列表。 +''') + +add_english_doc('resolve_builtin_hooks', '''\ +Resolve and collect built-in hooks that should be automatically registered on the current object. + +This function calls each registered built-in hook provider in order and merges the hook +lists they return. + +Args: + obj: The object currently being initialized (for example, a flow or module). + +Returns: + list: A list of hook types or hook instances to register on that object. +''') + +utils.add_chinese_doc('LazyTracingHook', '''\ +为 flow 或 module 创建 tracing hook。 + +该 hook 会在执行生命周期中创建、更新并结束对应的 tracing span。 + +Args: + obj: 要进行 tracing 的 flow 或 module 对象。 +''', module=lazyllm.tracing) + +utils.add_english_doc('LazyTracingHook', '''\ +Create a tracing hook for a flow or module object. + +This hook is responsible for creating, updating, and finishing the corresponding tracing span +during the execution lifecycle. + +Args: + obj: The flow or module object to be traced. +''', module=lazyllm.tracing) + +utils.add_chinese_doc('LazyTracingHook.pre_hook', '''\ +创建并激活当前 flow 或 module 对应的 tracing span。 + +该方法会在被包裹对象执行前调用,并根据当前调用参数初始化 span 上下文。 + +Args: + *args: 传递给目标对象的位置参数。 + **kwargs: 传递给目标对象的关键字参数。 +''', module=lazyllm.tracing) + +utils.add_english_doc('LazyTracingHook.pre_hook', '''\ +Create and activate the tracing span for the current flow or module. + +This method is called before the wrapped object executes and initializes the span context +from the current call arguments. + +Args: + *args: Positional arguments passed to the target object. + **kwargs: Keyword arguments passed to the target object. +''', module=lazyllm.tracing) + +utils.add_chinese_doc('LazyTracingHook.post_hook', '''\ +在 tracing span 上记录执行输出。 + +该方法会在被包裹对象成功执行后调用,把返回结果写入当前 span。 + +Args: + output: 被包裹对象的返回值。 +''', module=lazyllm.tracing) + +utils.add_english_doc('LazyTracingHook.post_hook', '''\ +Record the execution output on the active tracing span. + +This method is called after the wrapped object completes successfully and writes the +returned result to the current span. + +Args: + output: The return value of the wrapped object. +''', module=lazyllm.tracing) + +utils.add_chinese_doc('LazyTracingHook.on_error', '''\ +在 tracing span 上记录异常状态。 + +当被包裹的 flow 或 module 执行失败时,该方法会把异常信息写入当前 span。 + +Args: + exc: 执行过程中抛出的异常对象。 +''', module=lazyllm.tracing) + +utils.add_english_doc('LazyTracingHook.on_error', '''\ +Record the error state on the active tracing span. + +When the wrapped flow or module execution fails, this method writes the exception information +to the current span. + +Args: + exc: The exception raised during execution. +''', module=lazyllm.tracing) + +utils.add_chinese_doc('LazyTracingHook.finalize', '''\ +结束并上报当前 tracing span。 + +该方法会在 hook 生命周期结束时调用,用于关闭当前 span 并完成本次 tracing 记录。 +''', module=lazyllm.tracing) + +utils.add_english_doc('LazyTracingHook.finalize', '''\ +Finalize the current tracing span. + +This method is called at the end of the hook lifecycle to close the current span and +complete the tracing record for this execution. +''', module=lazyllm.tracing) + diff --git a/lazyllm/docs/tracing.py b/lazyllm/docs/tracing.py new file mode 100644 index 000000000..fb40dd2b8 --- /dev/null +++ b/lazyllm/docs/tracing.py @@ -0,0 +1,212 @@ +# flake8: noqa E501 +from . import utils +import functools +import lazyllm + +# ============= Tracing helpers + +add_chinese_doc_trace = functools.partial(utils.add_chinese_doc, module=lazyllm.tracing) +add_english_doc_trace = functools.partial(utils.add_english_doc, module=lazyllm.tracing) + +add_chinese_doc_trace('resolve_tracing_hooks', '''\ +解析并返回当前对象应自动注册的 tracing hooks。 + +该函数会根据全局 trace 配置、采样标志以及 module 默认 trace 策略,决定是否为当前 +flow 或 module 注册 ``LazyTracingHook``。 + +Args: + obj: 当前待初始化的 flow 或 module 对象。 + +Returns: + list: 应自动注册到该对象上的 tracing hook 列表。 +''') + +add_english_doc_trace('resolve_tracing_hooks', '''\ +Resolve and return tracing hooks that should be automatically registered on the current object. + +This function decides whether to register ``LazyTracingHook`` for the current flow or +module according to the global trace configuration, sampling flag, and default module +trace policy. + +Args: + obj: The flow or module object currently being initialized. + +Returns: + list: A list of tracing hooks that should be automatically registered on that object. +''') + +# ============= TracingBackend + +add_chinese_doc = functools.partial(utils.add_chinese_doc, module=lazyllm.tracing.backends.base) +add_english_doc = functools.partial(utils.add_english_doc, module=lazyllm.tracing.backends.base) + +add_chinese_doc('TracingBackend', '''\ +追踪后端的抽象基类,定义了将 LazyLLM 追踪数据导出至外部可观测平台所需的接口。 + +子类需要实现所有抽象方法,以适配具体的可观测后端(如 Langfuse、Jaeger 等)。 + +**注意**: 此类是抽象基类,不能直接实例化。 +''') + +add_english_doc('TracingBackend', '''\ +Abstract base class for tracing backends, defining the interface required to export +LazyLLM tracing data to external observability platforms. + +Subclasses must implement all abstract methods to adapt to a specific observability +backend (e.g. Langfuse, Jaeger, etc.). + +**Note**: This class is an abstract base class and cannot be instantiated directly. +''') + +add_chinese_doc('TracingBackend.build_exporter', '''\ +构建并返回一个 OpenTelemetry SpanExporter 实例,用于将 Span 数据发送至目标后端。 + +Returns: + opentelemetry.sdk.trace.export.SpanExporter: 配置好的 Span 导出器。 + +Raises: + RuntimeError: 当必要的后端配置缺失时抛出。 +''') + +add_english_doc('TracingBackend.build_exporter', '''\ +Build and return an OpenTelemetry SpanExporter instance for sending span data to +the target backend. + +Returns: + opentelemetry.sdk.trace.export.SpanExporter: A configured span exporter. + +Raises: + RuntimeError: If required backend configuration is missing. +''') + +add_chinese_doc('TracingBackend.context_attributes', '''\ +将追踪上下文转换为后端特定的 Span 属性。 + +Args: + trace_ctx (Dict[str, Any]): 当前请求的追踪上下文,包含 ``session_id``、``user_id``、``request_tags`` 等字段。 + is_root_span (bool): 当前 Span 是否为调用链的根 Span。 + +Returns: + Dict[str, Any]: 后端特定的 Span 属性字典。 +''') + +add_english_doc('TracingBackend.context_attributes', '''\ +Convert the trace context into backend-specific span attributes. + +Args: + trace_ctx (Dict[str, Any]): The current request trace context containing fields such as ``session_id``, ``user_id``, and ``request_tags``. + is_root_span (bool): Whether the current span is the root span of the trace. + +Returns: + Dict[str, Any]: A dictionary of backend-specific span attributes. +''') + +add_chinese_doc('TracingBackend.input_attributes', '''\ +将调用输入转换为后端特定的 Span 属性。 + +Args: + args (tuple): 传递给目标对象的位置参数。 + kwargs (Dict[str, Any]): 传递给目标对象的关键字参数。 + capture_payload (bool): 是否记录输入 payload 内容。 + is_root_span (bool): 当前 Span 是否为调用链的根 Span。 + +Returns: + Dict[str, Any]: 后端特定的输入属性字典。若 ``capture_payload`` 为 False,返回空字典。 +''') + +add_english_doc('TracingBackend.input_attributes', '''\ +Convert call inputs into backend-specific span attributes. + +Args: + args (tuple): Positional arguments passed to the target object. + kwargs (Dict[str, Any]): Keyword arguments passed to the target object. + capture_payload (bool): Whether to record the input payload content. + is_root_span (bool): Whether the current span is the root span of the trace. + +Returns: + Dict[str, Any]: A dictionary of backend-specific input attributes. Returns an empty dict when ``capture_payload`` is False. +''') + +add_chinese_doc('TracingBackend.set_root_span_name', '''\ +为根 Span 设置后端特定的显示名称。 + +Args: + span: OpenTelemetry Span 对象。 + span_name (str): 要设置的 Span 名称。 +''') + +add_english_doc('TracingBackend.set_root_span_name', '''\ +Set a backend-specific display name on the root span. + +Args: + span: The OpenTelemetry span object. + span_name (str): The name to assign to the span. +''') + +add_chinese_doc('TracingBackend.output_attributes', '''\ +将调用输出转换为后端特定的 Span 属性。 + +Args: + text (str): 序列化后的输出文本。 + is_root_span (bool): 当前 Span 是否为调用链的根 Span。 + +Returns: + Dict[str, Any]: 后端特定的输出属性字典。 +''') + +add_english_doc('TracingBackend.output_attributes', '''\ +Convert call output into backend-specific span attributes. + +Args: + text (str): The serialized output text. + is_root_span (bool): Whether the current span is the root span of the trace. + +Returns: + Dict[str, Any]: A dictionary of backend-specific output attributes. +''') + +add_chinese_doc('TracingBackend.error_attributes', '''\ +将异常信息转换为后端特定的 Span 属性。 + +Args: + exc (Exception): 执行过程中抛出的异常对象。 + +Returns: + Dict[str, Any]: 后端特定的错误属性字典。 +''') + +add_english_doc('TracingBackend.error_attributes', '''\ +Convert exception information into backend-specific span attributes. + +Args: + exc (Exception): The exception raised during execution. + +Returns: + Dict[str, Any]: A dictionary of backend-specific error attributes. +''') + +# ============= LangfuseBackend + +add_chinese_doc_lf = functools.partial(utils.add_chinese_doc, module=lazyllm.tracing.backends.langfuse) +add_english_doc_lf = functools.partial(utils.add_english_doc, module=lazyllm.tracing.backends.langfuse) + +add_chinese_doc_lf('LangfuseBackend', '''\ +Langfuse 追踪后端实现,通过 OTLP/HTTP 协议将追踪数据导出至 Langfuse 平台。 + +使用 HTTP Basic Auth 认证,需要配置以下环境变量: + +- ``LANGFUSE_HOST`` 或 ``LANGFUSE_BASE_URL``: Langfuse 服务地址 +- ``LANGFUSE_PUBLIC_KEY``: Langfuse 公钥 +- ``LANGFUSE_SECRET_KEY``: Langfuse 密钥 +''') + +add_english_doc_lf('LangfuseBackend', '''\ +Langfuse tracing backend implementation that exports trace data to the Langfuse platform +via OTLP/HTTP protocol. + +Uses HTTP Basic Auth authentication. The following environment variables must be configured: + +- ``LANGFUSE_HOST`` or ``LANGFUSE_BASE_URL``: Langfuse service URL +- ``LANGFUSE_PUBLIC_KEY``: Langfuse public key +- ``LANGFUSE_SECRET_KEY``: Langfuse secret key +''') diff --git a/lazyllm/flow/flow.py b/lazyllm/flow/flow.py index cc020287e..69d5b126c 100644 --- a/lazyllm/flow/flow.py +++ b/lazyllm/flow/flow.py @@ -21,7 +21,7 @@ import concurrent.futures from collections import deque import uuid -from ..hook import LazyLLMHook +from ..hook import LazyLLMHook, prepare_hooks, register_hooks, resolve_builtin_hooks, run_hooks from itertools import repeat @@ -157,10 +157,12 @@ def _defined_at_the_same_scope(self, other: 'FlowBase'): def __setattr__(self, name: str, value): if '_capture' in self.__dict__ and self._capture and not name.startswith('_'): + if hasattr(value, '_module_id') and hasattr(value, 'name') and value.name is None: + value.name = name if len(_get_flow_stack()) > 1 and not self._auto_registered: super(__class__, self).__setattr__('_auto_registered', True) - locals = self._curr_frame.f_locals.copy() - for key, item in locals.items(): + frame_locals = self._curr_frame.f_locals.copy() + for key, item in frame_locals.items(): if item == self and (parent := _get_flow_stack()[-2]) != item: if key not in parent._item_names and parent._defined_at_the_same_scope(self): parent._add(key, self) @@ -224,36 +226,48 @@ def __init__(self, *args, post_action=None, auto_capture=False, id: Optional[str id=id, name=name, group_id=group_id) self.post_action = post_action() if isinstance(post_action, type) else post_action self._sync = False - self._hooks = set() + self._hooks = [] + register_hooks(self, resolve_builtin_hooks(self)) def __call__(self, *args, **kw): - hook_objs = [] - for hook_type in self._hooks: - if isinstance(hook_type, LazyLLMHook): - hook_objs.append(hook_type) - else: - hook_objs.append(hook_type(self)) - hook_objs[-1].pre_hook(*args, **kw) - with globals.stack_enter(self.identities): - output = self._run(args[0] if len(args) == 1 else package(args), **kw) - if self.post_action is not None: self.invoke(self.post_action, output) - if self._sync: self.wait() - r = self._post_process(output) - for hook_obj in hook_objs[::-1]: - hook_obj.post_hook(r) - for hook_obj in hook_objs: - hook_obj.report() - return r + hook_objs = prepare_hooks(self, self._hooks, *args, **kw) + + try: + with globals.stack_enter(self.identities): + output = self._run(args[0] if len(args) == 1 else package(args), **kw) + if self.post_action is not None: self.invoke(self.post_action, output) + if self._sync: self.wait() + r = self._post_process(output) + except Exception as e: + LOG.error(f'Flow `{self.__class__.__name__}` raised {type(e).__name__}: {e}') + try: + run_hooks(hook_objs, 'on_error', e) + except Exception: + LOG.warning('Flow on_error hook failed', exc_info=True) + raise + else: + run_hooks(hook_objs, 'post_hook', r) + return r + finally: + try: + run_hooks(hook_objs, 'finalize') + except Exception: + LOG.warning('Flow finalize hook failed', exc_info=True) def register_hook(self, hook_type: LazyLLMHook): - self._hooks.add(hook_type) + if not (isinstance(hook_type, LazyLLMHook) + or (isinstance(hook_type, type) and issubclass(hook_type, LazyLLMHook))): + raise TypeError(f'Invalid hook type: {type(hook_type)}, ' + 'must be subclass or instance of LazyLLMHook') + if hook_type not in self._hooks: + self._hooks.append(hook_type) def unregister_hook(self, hook_type: LazyLLMHook): if hook_type in self._hooks: self._hooks.remove(hook_type) def clear_hooks(self): - self._hooks = set() + self._hooks = [] def _post_process(self, output): return output diff --git a/lazyllm/hook.py b/lazyllm/hook.py index cd7812497..d8087ccb3 100644 --- a/lazyllm/hook.py +++ b/lazyllm/hook.py @@ -1,8 +1,14 @@ from abc import ABC, abstractmethod import inspect import ast +import copy +from typing import Any, Callable, Sequence +from .common import LOG + class LazyLLMHook(ABC): + __hook_priority__ = 100 + __hook_error_mode__ = 'warn' @abstractmethod def __init__(self, obj): @@ -16,10 +22,24 @@ def pre_hook(self, *args, **kwargs): def post_hook(self, output): pass - def report(): # This is not an abstract method, but it is required to be implemented in subclasses. + def on_error(self, exc): + return None + + def finalize(self): raise NotImplementedError +class HookPhaseError(RuntimeError): + def __init__(self, phase: str, errors: Sequence[tuple[Any, Exception]]): + self.phase = phase + self.errors = tuple(errors) + super().__init__(phase, self.errors) + + def __str__(self): + names = ', '.join(type(error).__name__ for _, error in self.errors) + return f'Hook phase `{self.phase}` failed with {len(self.errors)} error(s): {names}' + + def _check_and_get_pre_assign_number(func): func_node = ast.parse(inspect.getsource(func)).body[0] @@ -58,3 +78,109 @@ def post_hook(self, output): try: self._generator.send(output) if self._left_count == 1 else next(self._generator) except StopIteration: pass + + +def _materialize_hook(hook_type, obj): + if isinstance(hook_type, LazyLLMHook): + return copy.deepcopy(hook_type) + assert isinstance(hook_type, type) and issubclass(hook_type, LazyLLMHook), ( + f'{hook_type} is not a subclass of LazyLLMHook') + return hook_type(obj) + + +def _hook_priority(hook_obj): + return getattr(hook_obj, '__hook_priority__', 100) + + +def _hook_error_mode(hook_obj): + mode = getattr(hook_obj, '__hook_error_mode__', 'warn') + if mode not in ('warn', 'raise'): + raise ValueError(f'Invalid hook error mode: {mode}') + return mode + + +def _raise_hook_phase_errors(phase: str, errors): + if not errors: + return + raise HookPhaseError(phase, errors) + + +_builtin_hook_providers: list[Callable[[Any], list]] = [] + + +def register_builtin_hook_provider(provider: Callable[[Any], list]): + if provider not in _builtin_hook_providers: + _builtin_hook_providers.append(provider) + return provider + + +def resolve_builtin_hooks(obj): + hooks = [] + for provider in _builtin_hook_providers: + provided_hooks = provider(obj) + if provided_hooks: + hooks.extend(provided_hooks) + return hooks + + +def register_hooks(obj, hooks): + if not hooks: + return obj + if not hasattr(obj, '_hooks'): + raise AttributeError(f'{type(obj).__name__} has no attribute `_hooks`') + for hook_type in hooks: + if isinstance(hook_type, LazyLLMHook): + exists = hook_type in obj._hooks + else: + exists = any(h is hook_type for h in obj._hooks if isinstance(h, type)) + if not exists: + obj._hooks.append(hook_type) + return obj + + +def prepare_hooks(obj, hook_types, *args, **kwargs): + hook_objs = [] + materialized_hooks = [] + for hook_type in hook_types: + hook_obj = _materialize_hook(hook_type, obj) + materialized_hooks.append(hook_obj) + + materialized_hooks.sort(key=_hook_priority) + + for hook_obj in materialized_hooks: + try: + hook_obj.pre_hook(*args, **kwargs) + except Exception as e: + try: + hook_obj.finalize() + except Exception as finalize_exc: + if _hook_error_mode(hook_obj) == 'raise': + raise finalize_exc + LOG.warning(f'Hook `{type(hook_obj).__name__}` finalize failed and will be skipped: {finalize_exc}') + if _hook_error_mode(hook_obj) == 'raise': + for active_hook in hook_objs: + try: + active_hook.finalize() + except Exception: + pass + raise + LOG.warning(f'Hook `{type(hook_obj).__name__}` pre_hook failed and will be skipped: {e}') + continue + hook_objs.append(hook_obj) + return hook_objs + + +def run_hooks(hook_objs, phase: str, *phase_args): + if phase not in ('post_hook', 'on_error', 'finalize'): + raise ValueError(f'Invalid hook phase: {phase}') + strict_errors = [] + ordered_hooks = hook_objs[::-1] + for hook_obj in ordered_hooks: + try: + getattr(hook_obj, phase)(*phase_args) + except Exception as e: + if _hook_error_mode(hook_obj) == 'raise': + strict_errors.append((hook_obj, e)) + else: + LOG.warning(f'Hook `{type(hook_obj).__name__}` {phase} failed and will be skipped: {e}') + _raise_hook_phase_errors(phase, strict_errors) diff --git a/lazyllm/module/module.py b/lazyllm/module/module.py index c6ccfa7ce..2c8053f62 100644 --- a/lazyllm/module/module.py +++ b/lazyllm/module/module.py @@ -10,11 +10,10 @@ from ..flow import FlowBase, Pipeline, Parallel from ..common.bind import _MetaBind import uuid -from ..hook import LazyLLMHook, LazyLLMFuncHook +from ..hook import LazyLLMHook, LazyLLMFuncHook, prepare_hooks, register_hooks, resolve_builtin_hooks, run_hooks from lazyllm import FileSystemQueue, LOG from contextlib import contextmanager from typing import Optional, Union, Dict, List, Callable -import copy from collections import defaultdict import sqlite3 import pickle @@ -292,7 +291,8 @@ def __init__(self, *, return_trace=False, id: Optional[str] = None, name: Option self._options = [] self.eval_result = None self._use_cache: Union[bool, str] = False - self._hooks = set() + self._hooks = [] + register_hooks(self, resolve_builtin_hooks(self)) def __setattr__(self, name: str, value): if isinstance(value, ModuleBase): @@ -326,14 +326,8 @@ def _setattr(v, *, _return_value=self, **kw): raise AttributeError(f'{self.__class__} object has no attribute {key}') def __call__(self, *args, **kw): - hook_objs = [] - for hook_type in self._hooks: - if isinstance(hook_type, LazyLLMHook): - hook_objs.append(copy.deepcopy(hook_type)) - elif isinstance(hook_type, type): - assert issubclass(hook_type, LazyLLMHook), f'{hook_type} is not a subclass of LazyLLMHook' - hook_objs.append(hook_type(self)) - hook_objs[-1].pre_hook(*args, **kw) + hook_objs = prepare_hooks(self, self._hooks, *args, **kw) + try: kw.update(locals['global_parameters'].get(self._module_id, dict())) if (files := locals['lazyllm_files'].get(self._module_id)) is not None: kw['lazyllm_files'] = files @@ -343,18 +337,31 @@ def __call__(self, *args, **kw): if args and isinstance(args[0], kwargs) else self._call_impl(*args, **kw)) if self._return_trace: lazyllm.FileSystemQueue.get_instance('lazy_trace').enqueue(str(r)) - except HandledException as e: raise e + except HandledException as e: + LOG.error(f'Module `{self.__class__.__name__}` raised {type(e).__name__}: {e}') + try: + run_hooks(hook_objs, 'on_error', e) + except Exception: + LOG.warning('Module on_error hook failed', exc_info=True) + raise except Exception as e: - LOG.error(f'An error occured in {self.__class__}' + (f' with name {self.name}' if self.name else + LOG.error(f'An error occurred in {self.__class__}' + (f' with name {self.name}' if self.name else '') + f'. Args: `{args}`, Kwargs: `{kw}`') - raise _change_exception_type(e, ModuleExecutionError) from None - - for hook_obj in hook_objs[::-1]: - hook_obj.post_hook(r) - for hook_obj in hook_objs: - hook_obj.report() - self._clear_usage() - return r + err = _change_exception_type(e, ModuleExecutionError) + try: + run_hooks(hook_objs, 'on_error', err) + except Exception: + LOG.warning('Module on_error hook failed', exc_info=True) + raise err from None + else: + run_hooks(hook_objs, 'post_hook', r) + self._clear_usage() + return r + finally: + try: + run_hooks(hook_objs, 'finalize') + except Exception: + LOG.warning('Module finalize hook failed', exc_info=True) def _call_impl(self, *args, **kw): if self._use_cache and 'R' in lazyllm.config['cache_mode']: @@ -396,14 +403,15 @@ def register_hook(self, hook_type: Union[LazyLLMHook, Callable]): if not isinstance(hook_type, LazyLLMHook): raise TypeError(f'Invalid hook type: {type(hook_type)}, ' 'must be subclass or instance of LazyLLMHook, or callable function') - self._hooks.add(hook_type) + if hook_type not in self._hooks: + self._hooks.append(hook_type) def unregister_hook(self, hook_type: LazyLLMHook): if hook_type in self._hooks: self._hooks.remove(hook_type) def clear_hooks(self): - self._hooks = set() + self._hooks = [] def _get_train_tasks(self): return None def _get_deploy_tasks(self): return None diff --git a/lazyllm/tracing/__init__.py b/lazyllm/tracing/__init__.py new file mode 100644 index 000000000..0c1e1b5bb --- /dev/null +++ b/lazyllm/tracing/__init__.py @@ -0,0 +1,47 @@ +from ..configs import config +from .runtime import ( + TracingSetupError, + start_span, + set_span_output, + set_span_error, + finish_span, + get_trace_context, + set_trace_context, + tracing_available, +) +from .configs import ( + DEFAULT_MODULE_TRACE_CONFIG, + get_default_module_trace_config, + set_default_module_trace_config, + resolve_default_module_trace, +) +from .hook import LazyTracingHook, resolve_tracing_hooks +from .backends import get_tracing_backend +from .backends.base import TracingBackend + + +config.add('trace_enabled', bool, True, 'TRACE_ENABLED', + description='Whether LazyLLM tracing is enabled by default.') +config.add('trace_backend', str, 'langfuse', 'TRACE_BACKEND', + description='The tracing backend used by LazyLLM.') +config.add('trace_content_enabled', bool, True, 'TRACE_CONTENT_ENABLED', + description='Whether tracing records basic input and output payloads by default.') + +__all__ = [ + 'TracingBackend', + 'LazyTracingHook', + 'resolve_tracing_hooks', + 'get_tracing_backend', + 'TracingSetupError', + 'start_span', + 'set_span_output', + 'set_span_error', + 'finish_span', + 'get_trace_context', + 'set_trace_context', + 'tracing_available', + 'DEFAULT_MODULE_TRACE_CONFIG', + 'get_default_module_trace_config', + 'set_default_module_trace_config', + 'resolve_default_module_trace', +] diff --git a/lazyllm/tracing/backends/__init__.py b/lazyllm/tracing/backends/__init__.py new file mode 100644 index 000000000..0be71ee80 --- /dev/null +++ b/lazyllm/tracing/backends/__init__.py @@ -0,0 +1,18 @@ +try: + from .langfuse import LangfuseBackend + _BACKEND_CLASSES = {LangfuseBackend.name: LangfuseBackend} +except ImportError: + _BACKEND_CLASSES = {} + +_BACKEND_INSTANCES = {} + +def get_tracing_backend(name: str): + if name not in _BACKEND_CLASSES: + raise ValueError(f'Unsupported trace backend: {name}') + if name not in _BACKEND_INSTANCES: + _BACKEND_INSTANCES[name] = _BACKEND_CLASSES[name]() + return _BACKEND_INSTANCES[name] + +__all__ = [ + 'get_tracing_backend', +] diff --git a/lazyllm/tracing/backends/base.py b/lazyllm/tracing/backends/base.py new file mode 100644 index 000000000..2c03e3828 --- /dev/null +++ b/lazyllm/tracing/backends/base.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict + + +class TracingBackend(ABC): + name = '' + + @abstractmethod + def build_exporter(self): + pass + + @abstractmethod + def context_attributes(self, trace_ctx: Dict[str, Any], *, is_root_span: bool) -> Dict[str, Any]: + pass + + @abstractmethod + def input_attributes(self, args: tuple[Any, ...], kwargs: Dict[str, Any], *, + capture_payload: bool, is_root_span: bool) -> Dict[str, Any]: + pass + + @abstractmethod + def set_root_span_name(self, span: Any, span_name: str): + pass + + @abstractmethod + def output_attributes(self, text: str, *, is_root_span: bool) -> Dict[str, Any]: + pass + + @abstractmethod + def error_attributes(self, exc: Exception) -> Dict[str, Any]: + pass diff --git a/lazyllm/tracing/backends/langfuse.py b/lazyllm/tracing/backends/langfuse.py new file mode 100644 index 000000000..8fa74834c --- /dev/null +++ b/lazyllm/tracing/backends/langfuse.py @@ -0,0 +1,82 @@ +import base64 +import json +import os +from typing import Any, Dict, Optional + +from .base import TracingBackend + + +_LANGFUSE_TRACE_NAME = 'langfuse.trace.name' +_LANGFUSE_TRACE_USER_ID = 'user.id' +_LANGFUSE_TRACE_SESSION_ID = 'session.id' +_LANGFUSE_TRACE_TAGS = 'langfuse.trace.tags' +_LANGFUSE_TRACE_INPUT = 'langfuse.trace.input' +_LANGFUSE_TRACE_OUTPUT = 'langfuse.trace.output' +_LANGFUSE_OBSERVATION_INPUT = 'langfuse.observation.input' +_LANGFUSE_OBSERVATION_OUTPUT = 'langfuse.observation.output' +_LANGFUSE_OBSERVATION_STATUS_MESSAGE = 'langfuse.observation.status_message' + + +class LangfuseBackend(TracingBackend): + name = 'langfuse' + + def _config(self) -> Dict[str, Optional[str]]: + return { + 'host': os.getenv('LANGFUSE_HOST') or os.getenv('LANGFUSE_BASE_URL'), + 'public_key': os.getenv('LANGFUSE_PUBLIC_KEY'), + 'secret_key': os.getenv('LANGFUSE_SECRET_KEY'), + } + + def build_exporter(self): + cfg = self._config() + missing = [name for name, value in cfg.items() if not value] + if missing: + raise RuntimeError('Missing Langfuse tracing config: ' + ', '.join(missing)) + + auth = base64.b64encode(f"{cfg['public_key']}:{cfg['secret_key']}".encode('utf-8')).decode('ascii') + endpoint = cfg['host'].rstrip('/') + '/api/public/otel/v1/traces' + + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + + return OTLPSpanExporter( + endpoint=endpoint, + headers={'Authorization': f'Basic {auth}'}, + ) + + def context_attributes(self, trace_ctx: Dict[str, Any], *, is_root_span: bool) -> Dict[str, Any]: + attrs = {} + if is_root_span and trace_ctx.get('session_id'): + attrs[_LANGFUSE_TRACE_SESSION_ID] = trace_ctx['session_id'] + if is_root_span and trace_ctx.get('user_id'): + attrs[_LANGFUSE_TRACE_USER_ID] = trace_ctx['user_id'] + if is_root_span and trace_ctx.get('request_tags'): + attrs[_LANGFUSE_TRACE_TAGS] = json.dumps(trace_ctx['request_tags'], ensure_ascii=False) + return attrs + + def input_attributes(self, args: tuple[Any, ...], kwargs: Dict[str, Any], *, + capture_payload: bool, is_root_span: bool) -> Dict[str, Any]: + if not capture_payload: + return {} + payload = {'args': args, 'kwargs': kwargs} + attrs = { + _LANGFUSE_OBSERVATION_INPUT: payload, + } + if is_root_span: + attrs[_LANGFUSE_TRACE_INPUT] = payload + return attrs + + def set_root_span_name(self, span: Any, span_name: str): + span.set_attribute(_LANGFUSE_TRACE_NAME, span_name) + + def output_attributes(self, text: str, *, is_root_span: bool) -> Dict[str, Any]: + attrs = { + _LANGFUSE_OBSERVATION_OUTPUT: text, + } + if is_root_span: + attrs[_LANGFUSE_TRACE_OUTPUT] = text + return attrs + + def error_attributes(self, exc: Exception) -> Dict[str, Any]: + return { + _LANGFUSE_OBSERVATION_STATUS_MESSAGE: str(exc), + } diff --git a/lazyllm/tracing/configs.py b/lazyllm/tracing/configs.py new file mode 100644 index 000000000..e0e0f572f --- /dev/null +++ b/lazyllm/tracing/configs.py @@ -0,0 +1,55 @@ +import threading + + +DEFAULT_MODULE_TRACE_CONFIG = { + 'default': True, + 'by_name': { + 'retriever': True, + 'reranker': True, + 'llm': True, + }, + 'by_class': { + 'OnlineModule': True, + }, +} + +_module_trace_config_lock = threading.RLock() + +_module_trace_config = { + 'default': DEFAULT_MODULE_TRACE_CONFIG['default'], + 'by_name': DEFAULT_MODULE_TRACE_CONFIG['by_name'].copy(), + 'by_class': DEFAULT_MODULE_TRACE_CONFIG['by_class'].copy(), +} + + +def set_default_module_trace_config(config: dict) -> dict: + if not isinstance(config, dict): + raise TypeError(f'Module trace config must be dict, got {type(config).__name__}') + + with _module_trace_config_lock: + _module_trace_config['default'] = config.get('default', DEFAULT_MODULE_TRACE_CONFIG['default']) + _module_trace_config['by_name'] = dict(config.get('by_name', {})) + _module_trace_config['by_class'] = dict(config.get('by_class', {})) + return get_default_module_trace_config() + + +def get_default_module_trace_config() -> dict: + with _module_trace_config_lock: + return { + 'default': _module_trace_config['default'], + 'by_name': _module_trace_config['by_name'].copy(), + 'by_class': _module_trace_config['by_class'].copy(), + } + + +def resolve_default_module_trace(*, module_name=None, module_class=None) -> bool: + with _module_trace_config_lock: + if module_name and module_name in _module_trace_config['by_name']: + return _module_trace_config['by_name'][module_name] + if module_class: + class_names = ([cls.__name__ for cls in module_class.mro()] if isinstance(module_class, type) + else [str(module_class)]) + for class_name in class_names: + if class_name in _module_trace_config['by_class']: + return _module_trace_config['by_class'][class_name] + return _module_trace_config['default'] diff --git a/lazyllm/tracing/hook.py b/lazyllm/tracing/hook.py new file mode 100644 index 000000000..217fe16b9 --- /dev/null +++ b/lazyllm/tracing/hook.py @@ -0,0 +1,62 @@ +from ..common import globals +from ..configs import config +from ..hook import LazyLLMHook, register_builtin_hook_provider +from .configs import resolve_default_module_trace +from .runtime import finish_span, set_span_error, set_span_output, start_span + + +class LazyTracingHook(LazyLLMHook): + __hook_priority__ = 0 + __hook_error_mode__ = 'raise' + + def __init__(self, obj): + self._obj = obj + self._span_handle = None + + @property + def _span_kind(self): + return 'flow' if hasattr(self._obj, '_flow_id') else 'module' + + def pre_hook(self, *args, **kwargs): + self._span_handle = start_span(span_kind=self._span_kind, target=self._obj, args=args, kwargs=kwargs) + + def post_hook(self, output): + if self._span_handle is None: + return + set_span_output(self._span_handle, output) + + def on_error(self, exc): + if self._span_handle is None: + return + set_span_error(self._span_handle, exc) + + def finalize(self): + if self._span_handle is None: + return + finish_span(self._span_handle) + self._span_handle = None + + +def resolve_tracing_hooks(obj): + trace_cfg = globals.get('trace', {}) + trace_enabled = trace_cfg.get('enabled') + if trace_enabled is None: + trace_enabled = config['trace_enabled'] + if not trace_enabled or trace_cfg.get('sampled') is False: + return [] + if hasattr(obj, '_module_id'): + if not resolve_default_module_trace( + module_name=getattr(obj, 'name', None) or getattr(obj, '_module_name', None), + module_class=obj.__class__, + ): + return [] + return [LazyTracingHook] + + +register_builtin_hook_provider(resolve_tracing_hooks) + + +__all__ = [ + 'LazyTracingHook', + 'resolve_tracing_hooks', +] diff --git a/lazyllm/tracing/runtime.py b/lazyllm/tracing/runtime.py new file mode 100644 index 000000000..f722e6640 --- /dev/null +++ b/lazyllm/tracing/runtime.py @@ -0,0 +1,299 @@ +import atexit +import json +import threading +from dataclasses import dataclass +from typing import Any, Dict, Iterable, Optional + +from lazyllm.common import LOG, globals +from lazyllm.configs import config +from .backends import get_tracing_backend + + +_TRACE_SERVICE_NAME = 'lazyllm' +_TRACE_CONTEXT_DEFAULTS = { + 'enabled': None, + 'trace_id': None, + 'session_id': None, + 'user_id': None, + 'request_tags': None, + 'sampled': None, + 'parent_span_id': None, + 'debug_capture_payload': None, +} + + +class TracingSetupError(RuntimeError): + pass + + +def _normalize_tags(tags: Any) -> list[str]: + if tags is None: + return [] + if isinstance(tags, str): + return [tags] + if isinstance(tags, Iterable): + return [str(tag) for tag in tags if tag is not None] + return [str(tags)] + + +def _normalize_trace_context(trace: Optional[Dict[str, Any]]) -> Dict[str, Any]: + data = dict(trace) if isinstance(trace, dict) else {} + for key, default in _TRACE_CONTEXT_DEFAULTS.items(): + data.setdefault(key, default) + data['request_tags'] = _normalize_tags(data.get('request_tags')) + return data + + +def get_trace_context() -> Dict[str, Any]: + return _normalize_trace_context(globals.get('trace', {})) + + +def set_trace_context(trace: Optional[Dict[str, Any]]) -> Dict[str, Any]: + normalized = _normalize_trace_context(trace) + globals['trace'] = normalized + return normalized + + +def _capture_payload_enabled(trace_ctx: Dict[str, Any]) -> bool: + debug_capture_payload = trace_ctx.get('debug_capture_payload') + if debug_capture_payload is not None: + return bool(debug_capture_payload) + return bool(config['trace_content_enabled']) + + +def _stringify_payload(value: Any, *, limit: int = 8192) -> str: + try: + if isinstance(value, str): + text = value + else: + text = json.dumps(value, ensure_ascii=False, default=str) + except Exception: + text = repr(value) + if len(text) > limit: + return text[:limit] + '...' + return text + + +@dataclass +class TraceSpanHandle: + span: Any + span_cm: Any + is_root_span: bool + + +class _TracingRuntime: + def __init__(self): + self._lock = threading.RLock() + self._initialized = False + self._warned = False + self._provider = None + self._tracer = None + self._trace_api = None + self._status = None + self._backend = None + + def available(self) -> bool: + return self._ensure_runtime() and self._tracer is not None + + def _warn_once(self, message: str): + if not self._warned: + LOG.warning(message) + self._warned = True + + def _raise_setup_error(self, message: str): + self._warn_once(message) + raise TracingSetupError(message) + + def _get_backend(self): + backend_name = config['trace_backend'] + try: + return get_tracing_backend(backend_name) + except ValueError as exc: + self._raise_setup_error(str(exc)) + + def _ensure_runtime(self) -> bool: + if self._initialized: + return self._tracer is not None + with self._lock: + if self._initialized: + return self._tracer is not None + self._initialized = True + try: + from opentelemetry import trace as trace_api + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.trace.status import Status, StatusCode + except Exception as exc: + self._raise_setup_error( + 'LazyLLM tracing is disabled because OpenTelemetry dependencies are unavailable. ' + f'Install opentelemetry-api, opentelemetry-sdk and opentelemetry-exporter-otlp-proto-http. ' + f'Details: {exc}' + ) + backend = self._get_backend() + + try: + exporter = backend.build_exporter() + except Exception as exc: + self._raise_setup_error(f'LazyLLM {backend.name} tracing initialization failed: {exc}') + + resource = Resource.create({'service.name': _TRACE_SERVICE_NAME}) + provider = TracerProvider(resource=resource) + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) + trace_api.set_tracer_provider(provider) + + self._provider = provider + self._tracer = trace_api.get_tracer('lazyllm.tracing') + self._trace_api = trace_api + self._status = (Status, StatusCode) + self._backend = backend + atexit.register(self.shutdown) + return True + + def _trace_enabled(self, trace_ctx: Dict[str, Any]) -> bool: + if trace_ctx.get('enabled') is not None: + return bool(trace_ctx['enabled']) and trace_ctx.get('sampled') is not False + if trace_ctx.get('sampled') is False: + return False + return bool(config['trace_enabled']) + + @staticmethod + def _target_name(target: Any, span_kind: str) -> str: + if span_kind == 'module': + return getattr(target, 'name', None) or getattr(target, '_module_name', None) or target.__class__.__name__ + return target.__class__.__name__ + + @staticmethod + def _target_id(target: Any, span_kind: str) -> Optional[str]: + if span_kind == 'module': + return getattr(target, '_module_id', None) + return getattr(target, '_flow_id', None) + + def _base_attributes( + self, + *, + span_kind: str, + span_name: str, + is_root_span: bool, + trace_ctx: Dict[str, Any], + target: Any, + capture_payload: bool, + args: tuple[Any, ...], + kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + attrs = { + 'lazyllm.span.kind': span_kind, + 'lazyllm.entity.name': span_name, + 'lazyllm.entity.class': target.__class__.__name__, + 'lazyllm.entity.id': self._target_id(target, span_kind) or '', + 'lazyllm.status': 'ok', + } + if trace_ctx.get('trace_id'): + attrs['lazyllm.request.trace_id'] = str(trace_ctx['trace_id']) + if trace_ctx.get('parent_span_id'): + attrs['lazyllm.request.parent_span_id'] = str(trace_ctx['parent_span_id']) + attrs.update(self._backend.context_attributes(trace_ctx, is_root_span=is_root_span)) + for key, value in self._backend.input_attributes( + args, kwargs, capture_payload=capture_payload, is_root_span=is_root_span + ).items(): + attrs[key] = _stringify_payload(value) if isinstance(value, dict) else value + return attrs + + def start_span( + self, + *, + span_kind: str, + target: Any, + args: tuple[Any, ...], + kwargs: Dict[str, Any], + ) -> Optional[TraceSpanHandle]: + trace_ctx = get_trace_context() + if not self._trace_enabled(trace_ctx): + return None + self._ensure_runtime() + + capture_payload = _capture_payload_enabled(trace_ctx) + span_name = self._target_name(target, span_kind) + current = self._trace_api.get_current_span() + is_root_span = not current.get_span_context().is_valid + attributes = self._base_attributes( + span_kind=span_kind, + span_name=span_name, + is_root_span=is_root_span, + trace_ctx=trace_ctx, + target=target, + capture_payload=capture_payload, + args=args, + kwargs=kwargs, + ) + + span_cm = self._tracer.start_as_current_span(span_name, attributes=attributes) + span = span_cm.__enter__() + span_context = span.get_span_context() + trace_ctx['trace_id'] = f'{span_context.trace_id:032x}' + set_trace_context(trace_ctx) + if is_root_span: + self._backend.set_root_span_name(span, span_name) + return TraceSpanHandle(span=span, span_cm=span_cm, is_root_span=is_root_span) + + def set_output(self, handle: Optional[TraceSpanHandle], output: Any): + if handle is None: + return + span = handle.span + trace_ctx = get_trace_context() + capture_payload = _capture_payload_enabled(trace_ctx) + span.set_attribute('lazyllm.status', 'ok') + if not capture_payload: + return + text = _stringify_payload(output) + for key, value in self._backend.output_attributes(text, is_root_span=handle.is_root_span).items(): + span.set_attribute(key, value) + + def set_error(self, handle: Optional[TraceSpanHandle], exc: Exception): + if handle is None: + return + span = handle.span + status_cls, status_code = self._status + span.set_status(status_cls(status_code.ERROR, str(exc))) + span.set_attribute('lazyllm.status', 'error') + for key, value in self._backend.error_attributes(exc).items(): + span.set_attribute(key, value) + span.record_exception(exc) + + def finish_span(self, handle: Optional[TraceSpanHandle]): + if handle is None: + return + handle.span_cm.__exit__(None, None, None) + + def shutdown(self): + if self._provider is None: + return + try: + self._provider.force_flush() + self._provider.shutdown() + except Exception: + pass + + +_runtime = _TracingRuntime() + + +def tracing_available() -> bool: + return _runtime.available() + + +def start_span(*, span_kind: str, target: Any, args: tuple[Any, ...], kwargs: Dict[str, Any]): + return _runtime.start_span(span_kind=span_kind, target=target, args=args, kwargs=kwargs) + + +def set_span_output(handle, output: Any): + _runtime.set_output(handle, output) + + +def set_span_error(handle, exc: Exception): + _runtime.set_error(handle, exc) + + +def finish_span(handle): + _runtime.finish_span(handle) diff --git a/pyproject.toml b/pyproject.toml index f9ce4e40e..0fb54546e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ rag-advanced = "Install advanced RAG features, including vector database support agent-advanced = "Install advanced agent-related features with MCP support" dev = "Install developer dependencies for code style checks and testing scripts" online-advanced = "Install dependencies for online multimodal" +tracing = "Install dependencies for tracing with OpenTelemetry and Langfuse" [tool.pytest.ini_options] markers = [ @@ -76,6 +77,10 @@ order_group_scope = "class" appdirs = { version = "*", optional = true } loralib = { version = "*", optional = true } json_repair = { version = "*", optional = true } +langfuse = { version = ">=3.0.0,<4.0.0", optional = true } +opentelemetry-api = { version = ">=1.27.0,<2.0.0", optional = true } +opentelemetry-sdk = { version = ">=1.27.0,<2.0.0", optional = true } +opentelemetry-exporter-otlp-proto-http = { version = ">=1.27.0,<2.0.0", optional = true } flake8 = { version = ">=7.0.0", optional = true } chromadb = {version = ">=1.0.6", optional = true} sentence-transformers = { version = "^3.0.1", optional = true } @@ -171,6 +176,12 @@ protobuf = { version = ">=3.20.1", optional = true } fsspec = { version = "*", optional = true } [tool.poetry.extras] +tracing = [ + "langfuse", + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp-proto-http" +] standard = [ "appdirs", "chromadb", @@ -314,6 +325,10 @@ full = [ "httpx", "async-timeout", "protobuf", + "langfuse", + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp-proto-http", "json_repair" ] alpaca-lora = [