Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lazyllm/common/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,14 @@ def __reduce__(self):


class Globals(metaclass=SingletonABCMeta):
__global_attrs__ = ThreadSafeDict(user_id=None, chat_history={}, global_parameters={}, lazyllm_files={}, usage={})
__global_attrs__ = ThreadSafeDict(
user_id=None,
chat_history={},
global_parameters={},
lazyllm_files={},
usage={},
trace={},
)

def __new__(cls, *args, **kw):
if cls is not Globals: return super().__new__(cls)
Expand Down
13 changes: 10 additions & 3 deletions lazyllm/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ def prefix(self):
return self._config._prefix


config = _NamespaceConfig().add('mode', Mode, Mode.Normal, dict(DISPLAY=Mode.Display, DEBUG=Mode.Debug),
description='The default mode for LazyLLM.'
config = (_NamespaceConfig().add('mode', Mode, Mode.Normal, dict(DISPLAY=Mode.Display, DEBUG=Mode.Debug),
description='The default mode for LazyLLM.'
).add('repr_ml', bool, False, 'REPR_USE_ML', description='Whether to use Markup Language for repr.'
).add('repr_show_child', bool, False, 'REPR_SHOW_CHILD',
description='Whether to show child modules in repr.'
Expand All @@ -263,7 +263,14 @@ def prefix(self):
description='Whether to skip check keywords for deployment.'
).add('allow_internal_network', bool, False, 'ALLOW_INTERNAL_NETWORK',
description='Whether to allow loading images from internal network addresses. '
'Set to False for security in production environments.')
'Set to False for security in production environments.'
).add('trace_enabled', bool, True, 'TRACE_ENABLED',
description='Whether LazyLLM tracing is enabled by default.'
).add('trace_backend', str, 'langfuse', 'TRACE_BACKEND',
description='The tracing backend used by LazyLLM.'
).add('trace_content_enabled', bool, True, 'TRACE_CONTENT_ENABLED',
description='Whether tracing records basic input and output payloads by default.'
))

def refresh_config(key):
if key in Config:
Expand Down
57 changes: 57 additions & 0 deletions lazyllm/docs/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@
output: The return value of the monitored function.
''')

add_chinese_doc('LazyLLMHook.on_error', '''\
异常处理钩子,在被监控函数执行抛出异常时调用。

默认实现为空操作,子类可以按需覆盖,用于记录错误状态、补充诊断信息或执行清理逻辑。

Args:
exc: 被监控函数抛出的异常对象。
''')

add_english_doc('LazyLLMHook.on_error', '''\
Error-handling hook, called when the monitored function raises an exception.

The default implementation is a no-op. Subclasses can override it to record error status,
attach diagnostic information, or perform cleanup.

Args:
exc: The exception raised by the monitored function.
''')

add_chinese_doc('LazyLLMHook.report', '''\
生成钩子的执行报告。

Expand All @@ -96,3 +115,41 @@

This is an abstract method and must be implemented in subclasses.
''')

add_chinese_doc('LazyTracingHook', '''\
为 flow 或 module 创建 tracing hook。

该 hook 会在执行生命周期中创建、更新并结束对应的 tracing span。

Args:
obj: 要进行 tracing 的 flow 或 module 对象。
''')

add_english_doc('LazyTracingHook', '''\
Create a tracing hook for a flow or module object.

This hook is responsible for creating, updating, and finishing the corresponding tracing span
during the execution lifecycle.

Args:
obj: The flow or module object to be traced.
''')

add_chinese_doc('LazyTracingHook.on_error', '''\
在 tracing span 上记录异常状态。

当被包裹的 flow 或 module 执行失败时,该方法会把异常信息写入当前 span。

Args:
exc: 执行过程中抛出的异常对象。
''')

add_english_doc('LazyTracingHook.on_error', '''\
Record the error state on the active tracing span.

When the wrapped flow or module execution fails, this method writes the exception information
to the current span.

Args:
exc: The exception raised during execution.
''')
44 changes: 25 additions & 19 deletions lazyllm/flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import concurrent.futures
from collections import deque
import uuid
from ..hook import LazyLLMHook
from ..hook import LazyLLMHook, LazyTracingHook, run_pre_hooks
from itertools import repeat


Expand Down Expand Up @@ -151,6 +151,8 @@ def _defined_at_the_same_scope(self, other: 'FlowBase'):

def __setattr__(self, name: str, value):
if '_capture' in self.__dict__ and self._capture and not name.startswith('_'):
if hasattr(value, '_module_id') and hasattr(value, 'name') and not value.name:
value.name = name
if len(_get_flow_stack()) > 1 and not self._auto_registered:
super(__class__, self).__setattr__('_auto_registered', True)
locals = self._curr_frame.f_locals.copy()
Expand Down Expand Up @@ -216,35 +218,39 @@ def __init__(self, *args, post_action=None, auto_capture=False, **kw):
super(__class__, self).__init__(*args, item_names=list(kw.keys()), auto_capture=auto_capture)
self.post_action = post_action() if isinstance(post_action, type) else post_action
self._sync = False
self._hooks = set()
self._builtin_hooks = [LazyTracingHook]
self._hooks = []

def __call__(self, *args, **kw):
hook_objs = []
for hook_type in self._hooks:
if isinstance(hook_type, LazyLLMHook):
hook_objs.append(hook_type)
else:
hook_objs.append(hook_type(self))
hook_objs[-1].pre_hook(*args, **kw)
output = self._run(args[0] if len(args) == 1 else package(args), **kw)
if self.post_action is not None: self.invoke(self.post_action, output)
if self._sync: self.wait()
r = self._post_process(output)
for hook_obj in hook_objs[::-1]:
hook_obj.post_hook(r)
for hook_obj in hook_objs:
hook_obj.report()
return r
run_pre_hooks(self, self._builtin_hooks, hook_objs, *args, raise_on_error=True, **kw)
run_pre_hooks(self, self._hooks, hook_objs, *args, raise_on_error=False, **kw)

try:
output = self._run(args[0] if len(args) == 1 else package(args), **kw)
if self.post_action is not None: self.invoke(self.post_action, output)
if self._sync: self.wait()
r = self._post_process(output)
for hook_obj in hook_objs[::-1]:
hook_obj.post_hook(r)
return r
except Exception as e:
for hook_obj in hook_objs[::-1]:
hook_obj.on_error(e)
raise
finally:
for hook_obj in hook_objs:
hook_obj.report()

def register_hook(self, hook_type: LazyLLMHook):
self._hooks.add(hook_type)
self._hooks.append(hook_type)

def unregister_hook(self, hook_type: LazyLLMHook):
if hook_type in self._hooks:
self._hooks.remove(hook_type)

def clear_hooks(self):
self._hooks = set()
self._hooks = []

def _post_process(self, output):
return output
Expand Down
67 changes: 66 additions & 1 deletion lazyllm/hook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from abc import ABC, abstractmethod
import inspect
import ast
import copy
from .common import LOG
from .tracing.runtime import start_span, set_span_output, set_span_error, finish_span
from .tracing.configs import resolve_default_module_trace

class LazyLLMHook(ABC):

Expand All @@ -16,7 +20,10 @@ def pre_hook(self, *args, **kwargs):
def post_hook(self, output):
pass

def report(): # This is not an abstract method, but it is required to be implemented in subclasses.
def on_error(self, exc):
return None

def report(self): # This is not an abstract method, but it is required to be implemented in subclasses.
raise NotImplementedError


Expand Down Expand Up @@ -58,3 +65,61 @@ def post_hook(self, output):
try:
self._generator.send(output) if self._left_count == 1 else next(self._generator)
except StopIteration: pass


class LazyTracingHook(LazyLLMHook):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个hook定义在tracing目录下

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已移动位置

def __init__(self, obj):
self._obj = obj
self._span_handle = None

@property
def _span_kind(self):
return 'flow' if hasattr(self._obj, '_flow_id') else 'module'

def _enabled(self) -> bool:
if self._span_kind != 'module':
return True
return resolve_default_module_trace(
module_name=getattr(self._obj, 'name', None) or getattr(self._obj, '_module_name', None),
module_class=self._obj.__class__,
)

def pre_hook(self, *args, **kwargs):
if not self._enabled():
self._span_handle = None
return
self._span_handle = start_span(span_kind=self._span_kind, target=self._obj, args=args, kwargs=kwargs)

def post_hook(self, output):
set_span_output(self._span_handle, output)

def on_error(self, exc):
set_span_error(self._span_handle, exc)

def report(self):
finish_span(self._span_handle)
self._span_handle = None


def _materialize_hook(hook_type, obj):
if isinstance(hook_type, LazyLLMHook):
return copy.deepcopy(hook_type)
assert isinstance(hook_type, type) and issubclass(hook_type, LazyLLMHook), (
f'{hook_type} is not a subclass of LazyLLMHook')
return hook_type(obj)


def run_pre_hooks(obj, hook_types, hook_objs, *args, raise_on_error: bool, **kwargs):
for hook_type in hook_types:
hook_obj = _materialize_hook(hook_type, obj)
try:
hook_obj.pre_hook(*args, **kwargs)
except Exception as e:
hook_obj.report()
if raise_on_error:
for active_hook in hook_objs:
active_hook.report()
raise
LOG.warning(f'Hook `{type(hook_obj).__name__}` pre_hook failed and will be skipped: {e}')
continue
hook_objs.append(hook_obj)
44 changes: 23 additions & 21 deletions lazyllm/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from ..flow import FlowBase, Pipeline, Parallel
from ..common.bind import _MetaBind
import uuid
from ..hook import LazyLLMHook, LazyLLMFuncHook
from ..hook import LazyLLMHook, LazyLLMFuncHook, LazyTracingHook, run_pre_hooks
from lazyllm import FileSystemQueue, LOG
from contextlib import contextmanager
from typing import Optional, Union, Dict, List, Callable
import copy
from collections import defaultdict
import sqlite3
import pickle
Expand Down Expand Up @@ -292,7 +291,8 @@ def __init__(self, *, return_trace=False):
self._options = []
self.eval_result = None
self._use_cache: Union[bool, str] = False
self._hooks = set()
self._builtin_hooks = [LazyTracingHook]
self._hooks = []

def __setattr__(self, name: str, value):
if isinstance(value, ModuleBase):
Expand Down Expand Up @@ -327,13 +327,9 @@ def _setattr(v, *, _return_value=self, **kw):

def __call__(self, *args, **kw):
hook_objs = []
for hook_type in self._hooks:
if isinstance(hook_type, LazyLLMHook):
hook_objs.append(copy.deepcopy(hook_type))
elif isinstance(hook_type, type):
assert issubclass(hook_type, LazyLLMHook), f'{hook_type} is not a subclass of LazyLLMHook'
hook_objs.append(hook_type(self))
hook_objs[-1].pre_hook(*args, **kw)
run_pre_hooks(self, self._builtin_hooks, hook_objs, *args, raise_on_error=True, **kw)
run_pre_hooks(self, self._hooks, hook_objs, *args, raise_on_error=False, **kw)

try:
kw.update(locals['global_parameters'].get(self._module_id, dict()))
if (files := locals['lazyllm_files'].get(self._module_id)) is not None: kw['lazyllm_files'] = files
Expand All @@ -343,18 +339,24 @@ def __call__(self, *args, **kw):
if args and isinstance(args[0], kwargs) else self._call_impl(*args, **kw))
if self._return_trace:
lazyllm.FileSystemQueue.get_instance('lazy_trace').enqueue(str(r))
except HandledException as e: raise e
for hook_obj in hook_objs[::-1]:
hook_obj.post_hook(r)
self._clear_usage()
return r
except HandledException as e:
for hook_obj in hook_objs[::-1]:
hook_obj.on_error(e)
raise
except Exception as e:
LOG.error(f'An error occured in {self.__class__}' + (f' with name {self.name}' if self.name else
'') + f'. Args: `{args}`, Kwargs: `{kw}`')
raise _change_exception_type(e, ModuleExecutionError) from None

for hook_obj in hook_objs[::-1]:
hook_obj.post_hook(r)
for hook_obj in hook_objs:
hook_obj.report()
self._clear_usage()
return r
err = _change_exception_type(e, ModuleExecutionError)
for hook_obj in hook_objs[::-1]:
hook_obj.on_error(err)
raise err from None
finally:
for hook_obj in hook_objs:
hook_obj.report()

def _call_impl(self, *args, **kw):
if self._use_cache and 'R' in lazyllm.config['cache_mode']:
Expand Down Expand Up @@ -395,14 +397,14 @@ def register_hook(self, hook_type: Union[LazyLLMHook, Callable]):
if not isinstance(hook_type, LazyLLMHook):
raise TypeError(f'Invalid hook type: {type(hook_type)}, '
'must be subclass or instance of LazyLLMHook, or callable function')
self._hooks.add(hook_type)
self._hooks.append(hook_type)

def unregister_hook(self, hook_type: LazyLLMHook):
if hook_type in self._hooks:
self._hooks.remove(hook_type)

def clear_hooks(self):
self._hooks = set()
self._hooks = []

def _get_train_tasks(self): return None
def _get_deploy_tasks(self): return None
Expand Down
31 changes: 31 additions & 0 deletions lazyllm/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from .runtime import (
TracingSetupError,
start_span,
set_span_output,
set_span_error,
finish_span,
get_trace_context,
set_trace_context,
tracing_available,
)
from .configs import (
DEFAULT_MODULE_TRACE_CONFIG,
get_default_module_trace_config,
set_default_module_trace_config,
resolve_default_module_trace,
)

__all__ = [
'TracingSetupError',
'start_span',
'set_span_output',
'set_span_error',
'finish_span',
'get_trace_context',
'set_trace_context',
'tracing_available',
'DEFAULT_MODULE_TRACE_CONFIG',
'get_default_module_trace_config',
'set_default_module_trace_config',
'resolve_default_module_trace',
]
Loading
Loading