diff --git a/README.md b/README.md index d8e237e..4dc3ebd 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,105 @@ async def main(): For a complete working example with green tint processing, see `examples/async_processor_example.py`. +## Decorators: auto-wired handlers + +PyTrickle provides a set of decorators that make it easy to implement stream handlers as plain methods on a class. The decorators: + +- Mark methods for auto-discovery and wiring into the stream processor +- Bridge sync functions into async (run sync code in a thread pool) +- Normalize return values so your code can stay simple + +### Available decorators + +- `@video_handler` + - Signature: `(self, frame: VideoFrame) -> Optional[VideoFrame | torch.Tensor | numpy.ndarray | None]` + - Return normalization: + - `None` → pass-through original frame + - `VideoFrame` → used as-is + - `torch.Tensor` / `numpy.ndarray` → replaces the frame's tensor via `frame.replace_tensor(...)` + +- `@audio_handler` + - Signature: `(self, frame: AudioFrame) -> Optional[List[AudioFrame] | AudioFrame | torch.Tensor | numpy.ndarray | None]` + - Return normalization: + - `None` → `[original frame]` + - `AudioFrame` → `[that frame]` + - `List[AudioFrame]` → returned as-is + - `torch.Tensor` / `numpy.ndarray` → replaces samples via `frame.replace_samples(...)`, returning `[frame]` + +- `@model_loader` + - Signature: any (sync or async), called once during model/resource loading. + +- `@param_updater` (optionally `@param_updater(model=MyParamsModel)`) + - Signature: `(self, params)` where `params` is a `dict` or a validated Pydantic model instance if `model=...` is provided. + - If you pass a Pydantic `BaseModel`, incoming params are validated and parsed before your method runs. + +- `@on_stream_stop` + - Signature: `() -> None` (sync or async). Invoked when a stream stops for cleanup. + +All of the above decorators produce async wrappers internally, so they can be awaited by the framework even if your implementation is synchronous. + +### Using decorators with StreamProcessor + +```python +from pytrickle.decorators import video_handler, audio_handler, model_loader, param_updater, on_stream_stop +from pytrickle.stream_processor import StreamProcessor +from pytrickle.frames import VideoFrame, AudioFrame +from typing import List, Optional + +class MyHandler: + @model_loader + async def load(self): + # Load models/resources here + ... + + @video_handler + def handle_video(self, frame: VideoFrame): + # Return None to pass through, VideoFrame, or a tensor/ndarray replacement + return None + + @audio_handler + def handle_audio(self, frame: AudioFrame): + # Return None/[frame]/AudioFrame/List[AudioFrame] or tensor/ndarray samples + return None + + @param_updater + async def update(self, params: dict): + # Update runtime parameters + ... + + @on_stream_stop + def cleanup(self): + # Release resources + ... + +# Auto-discover decorated handlers and run +sp = StreamProcessor.from_handlers(MyHandler(), port=8000) +sp.run() # blocking +``` + +### Parameter validation with Pydantic (optional) + +```python +from pydantic import BaseModel +from pytrickle.decorators import param_updater + +class Params(BaseModel): + threshold: float = 0.5 + enabled: bool = True + +class Handler: + @param_updater(model=Params) + async def update(self, params: Params): + # params is a validated model instance + ... +``` + +### Error handling and sync bridging + +- Decorators create async wrappers that run sync code in a thread (`asyncio.to_thread`) so the event loop stays responsive. +- If your handler raises, the framework logs the error and falls back to pass-through behavior to keep the stream alive. +- When constructing `StreamProcessor` directly (without decorators), all handlers must be true async callables. Using decorators is recommended because they ensure async wrappers and return normalization. + ## HTTP API PyTrickle automatically provides a REST API for your video processor: diff --git a/README_CLI.md b/README_CLI.md new file mode 100644 index 0000000..adf16fe --- /dev/null +++ b/README_CLI.md @@ -0,0 +1,196 @@ +# PyTrickle CLI Reference + +The PyTrickle CLI provides commands to scaffold and run streaming applications using the PyTrickle framework. + +## Installation + +Use your preferred Python environment, then install this package: + +```bash +pip install -e . +``` + +After installation, the `pytrickle` command will be available globally. + +## Commands Overview + +```bash +pytrickle --help # Show main help +pytrickle init --help # Show init command help +pytrickle run --help # Show run command help +``` + +## `pytrickle init` - Scaffold a New App + +Creates a new PyTrickle application with a complete project structure. + +### Usage + +```bash +pytrickle init [PATH] [OPTIONS] +``` + +### Arguments + +- `PATH` (optional): Target directory for the new app (default: current directory `.`) + +### Options + +- `--package NAME`: Package name to use (default: derived from folder name) +- `--force`: Overwrite existing files without prompting + +### Examples + +```bash +# Create app in current directory +pytrickle init + +# Create app in specific directory +pytrickle init my_streaming_app + +# Create with custom package name +pytrickle init my_app --package custom_name + +# Force overwrite existing files +pytrickle init my_app --force +``` + +### Generated Structure + +``` +my_app/ +├── my_app/ +│ ├── __init__.py # Package initializer +│ ├── __main__.py # Entry point for 'python -m my_app' +│ └── handlers.py # Main handlers with decorators +└── README.md # Basic usage instructions +``` + +### Handler Template + +The generated `handlers.py` includes: + +- `@model_loader` - Load your model/resources on startup +- `@video_handler` - Process video frames (returns None for pass-through) +- `@param_updater` - Handle parameter updates during streaming +- `@on_stream_stop` - Cleanup when stream ends + +## `pytrickle run` - Run an Existing App + +Convenience command to run a Python module/package. + +### Usage + +```bash +pytrickle run --module MODULE_NAME +``` + +### Options + +- `--module MODULE_NAME` (required): Name of the Python module/package to run + +### Examples + +```bash +# Run a scaffolded app +pytrickle run --module my_app + +# Run any Python module with __main__.py +pytrickle run --module my_existing_package +``` + +This is equivalent to `python -m MODULE_NAME` but provides consistent CLI experience. + +## Quick Start Workflow + +1. **Create a new app:** + ```bash + pytrickle init video_processor + cd video_processor + ``` + +2. **Customize your handlers:** + Edit `video_processor/handlers.py` to implement your processing logic. + +3. **Run the app:** + ```bash + python -m video_processor + ``` + or + ```bash + pytrickle run --module video_processor + ``` + +4. **Test the streaming endpoint:** + The server starts on port 8000. Send a POST request to `/api/stream/start`: + + ```json + { + "subscribe_url": "http://localhost:3389/sample", + "publish_url": "http://localhost:3389/output", + "gateway_request_id": "demo-1", + "params": {"width": 512, "height": 512} + } + ``` + +## Server Endpoints + +Once running, your app exposes these endpoints: + +- `POST /api/stream/start` - Start streaming with parameters +- `POST /api/stream/stop` - Stop active stream +- `POST /api/stream/params` - Update stream parameters +- `GET /api/stream/status` - Get current stream status +- `GET /health` - Health check +- `GET /version` - App version info + +## Development Tips + +- **Debugging**: Add logging to your handlers to trace execution +- **Parameters**: Use the `@param_updater` to modify behavior during streaming +- **Frame Processing**: Return `None` from handlers for pass-through behavior +- **Error Handling**: Exceptions in handlers are caught and logged automatically +- **Model Loading**: Use `@model_loader` for one-time initialization on startup + +## Advanced Usage + +### Custom Port + +Modify the `main()` function in your generated `handlers.py`: + +```python +processor = StreamProcessor.from_handlers( + handlers, + name="my-app", + port=9000, # Custom port + frame_skip_config=FrameSkipConfig(), +) +``` + +### Multiple Handler Types + +Add more decorators to your handler class: + +```python +@audio_handler +async def process_audio(self, frame): + # Process audio frames + return None +``` + +### Custom Frame Skipping + +Configure frame skipping in your main function: + +```python +from pytrickle.frame_skipper import FrameSkipConfig + +config = FrameSkipConfig( + max_queue_size=10, + skip_threshold=5 +) +processor = StreamProcessor.from_handlers( + handlers, + frame_skip_config=config +) +``` diff --git a/pyproject.toml b/pyproject.toml index 72996d9..d003b1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -125,4 +125,7 @@ ignore_missing_imports = true [tool.setuptools_scm] # Enable setuptools-scm for automatic version management from Git tags write_to = "pytrickle/_version.py" -fallback_version = "0.0.0" \ No newline at end of file +fallback_version = "0.0.0" + +[project.scripts] +pytrickle = "pytrickle.cli:main" \ No newline at end of file diff --git a/pytrickle/__init__.py b/pytrickle/__init__.py index e532797..3958f1f 100644 --- a/pytrickle/__init__.py +++ b/pytrickle/__init__.py @@ -30,9 +30,19 @@ from .fps_meter import FPSMeter from .frame_skipper import FrameSkipConfig + +from .decorators import ( + trickle_handler, + video_handler, + audio_handler, + model_loader, + param_updater, + on_stream_stop, +) + from . import api -from .version import __version__ +from .version import VERSION as __version__ __all__ = [ "TrickleClient", @@ -59,5 +69,11 @@ "FrameProcessor", "FPSMeter", "FrameSkipConfig", + "trickle_handler", + "video_handler", + "audio_handler", + "model_loader", + "param_updater", + "on_stream_stop", "__version__" ] \ No newline at end of file diff --git a/pytrickle/cli.py b/pytrickle/cli.py new file mode 100644 index 0000000..1ceddcc --- /dev/null +++ b/pytrickle/cli.py @@ -0,0 +1,166 @@ +from pathlib import Path +from typing import Optional + +import argparse + +TEMPLATE_HANDLERS = """ +import logging +from pytrickle.decorators import model_loader, video_handler, param_updater, on_stream_stop +from pytrickle.stream_processor import StreamProcessor +from pytrickle.frame_skipper import FrameSkipConfig + +# --- Basic Setup --- +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class Handlers: + def __init__(self): + self.state = {"intensity": 0.8} + + @model_loader + async def load(self): + # TODO: Load your model/resources here + pass + + @video_handler + async def process_frame(self, frame): + # TODO: Implement your processing; return None for pass-through + return None + + @param_updater + async def update(self, params: dict): + # Update internal state based on params + if not params: + return + self.state.update(params) + + @on_stream_stop + async def on_stop(self): + # Cleanup if needed + pass + + +def main(): + handlers = Handlers() + processor = StreamProcessor.from_handlers( + handlers, + name="pytrickle-app", + port=8000, + frame_skip_config=FrameSkipConfig(), + ) + processor.run() + + +if __name__ == "__main__": + main() +""".lstrip() + +TEMPLATE_RUNNER = """ +from {package}.handlers import main + +if __name__ == "__main__": + main() +""".lstrip() + +README_SNIPPET = """ +# PyTrickle App + +This project was bootstrapped with `pytrickle init`. + +## Quick start + +1. Install dependencies +2. Run the app + +```bash +python -m {package} +``` + +Then POST to /api/stream/start on localhost:8000. +""".lstrip() + + +def _write_file(path: Path, content: str, overwrite: bool = False): + path.parent.mkdir(parents=True, exist_ok=True) + if path.exists() and not overwrite: + return False + path.write_text(content, encoding="utf-8") + return True + + +def cmd_init(args: argparse.Namespace) -> int: + target = Path(args.path).resolve() + package = args.package or target.name.replace("-", "_") + + # Layout: + # / + # {package}/ + # __init__.py + # handlers.py + # README.md + # pyproject.toml (optional) + + handlers_py = TEMPLATE_HANDLERS + runner_py = TEMPLATE_RUNNER.format(package=package) + readme_md = README_SNIPPET.format(package=package) + + created = [] + created.append(_write_file(target / package / "__init__.py", "", overwrite=args.force)) + created.append(_write_file(target / package / "handlers.py", handlers_py, overwrite=args.force)) + created.append(_write_file(target / package / "__main__.py", runner_py, overwrite=args.force)) + created.append(_write_file(target / "README.md", readme_md, overwrite=args.force)) + + print(f"Initialized PyTrickle app in {target}") + print(f"Package: {package}") + print("Files:") + for p in [target / package / "handlers.py", target / package / "__main__.py", target / "README.md"]: + print(f" - {p.relative_to(target)}") + + print("\nTry it:") + print(" python -m", package) + return 0 + + +def cmd_run(args: argparse.Namespace) -> int: + # Allow running a local package quickly: pytrickle run --module my_app + module = args.module + if not module: + print("--module is required (e.g., my_app)") + return 2 + try: + __import__(module) + # Delegate to module's __main__ + run_code = f"import runpy; runpy.run_module('{module}', run_name='__main__')" + exec(run_code, {}) + return 0 + except Exception as e: + print(f"Failed to run module '{module}': {e}") + return 1 + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(prog="pytrickle", description="PyTrickle CLI") + sub = p.add_subparsers(dest="command", required=True) + + p_init = sub.add_parser("init", help="Scaffold a new PyTrickle app") + p_init.add_argument("path", nargs="?", default=".", help="Target directory") + p_init.add_argument("--package", help="Package name (default: folder name)") + p_init.add_argument("--force", action="store_true", help="Overwrite existing files") + p_init.set_defaults(func=cmd_init) + + p_run = sub.add_parser("run", help="Run a local app module (python -m)") + p_run.add_argument("--module", required=True, help="Module/package to run") + p_run.set_defaults(func=cmd_run) + + return p + + +def main(argv: Optional[list] = None) -> int: + parser = build_parser() + ns = parser.parse_args(argv) + return ns.func(ns) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/pytrickle/decorators.py b/pytrickle/decorators.py new file mode 100644 index 0000000..9a845e6 --- /dev/null +++ b/pytrickle/decorators.py @@ -0,0 +1,226 @@ +from __future__ import annotations + +import asyncio +import inspect +from typing import Any, Callable, Optional, cast + +try: + # Pydantic is optional but available in requirements + from pydantic import BaseModel # type: ignore + _PYDANTIC_AVAILABLE = True +except Exception: # pragma: no cover - if pydantic missing at runtime + BaseModel = object # sentinel + _PYDANTIC_AVAILABLE = False + + +def trickle_handler(handler_type: str): + """ + Base decorator to mark methods as trickle handlers. Lightweight marker only. + + Args: + handler_type: One of 'video' | 'audio' | 'model_loader' | 'param_updater' | 'stream_stop'. + + Returns: + The same function with marker attributes set for discovery. + + Notes: + - This does not wrap the callable or change semantics. + - Convenience typed decorators below build on this to add DX ergonomics. + """ + def decorator(func): + setattr(func, "_trickle_handler_type", handler_type) + setattr(func, "_trickle_handler", True) + return func + + return decorator + + +# ---------- DX helpers: async bridging and return normalization ---------- + +def _is_coro_fn(fn: Callable[..., Any]) -> bool: + return inspect.iscoroutinefunction(fn) + + +async def _maybe_await(fn: Callable[..., Any], *args, **kwargs): + """Call fn and await if it returns a coroutine or is a coroutine function. + + If fn is synchronous, execute it in a thread to avoid blocking the event loop. + """ + if _is_coro_fn(fn): + return await fn(*args, **kwargs) + # If call returns a coroutine due to dynamic wrappers, still await it + result = await asyncio.to_thread(fn, *args, **kwargs) + return result + + +def video_handler(func: Callable[..., Any]): + """ + Decorator for video handlers with DX niceties: + - Accepts sync or async functions (sync runs in thread pool) + - Accepts flexible return types from user logic: + * None -> pass-through (internal falls back to original frame) + * VideoFrame -> used as-is + * torch.Tensor / numpy.ndarray -> converted via frame.replace_tensor + + Signature expected by framework: (self, frame: VideoFrame) -> Optional[VideoFrame] + """ + + from .frames import VideoFrame + import torch + import numpy as np + + @trickle_handler("video") + async def wrapper(*args, **kwargs): + # Expect last positional arg or kw 'frame' + frame = kwargs.get("frame") if "frame" in kwargs else (args[-1] if args else None) + result = await _maybe_await(func, *args, **kwargs) + + if result is None: + return None # internal will pass-through + if isinstance(result, VideoFrame): + return result + if frame is None: + return None + frame_v = cast(VideoFrame, frame) + if isinstance(result, torch.Tensor): + return frame_v.replace_tensor(result) + if isinstance(result, np.ndarray): + tensor = torch.from_numpy(result) + return frame_v.replace_tensor(tensor) + # Unknown type -> pass-through + return None + + # Keep metadata for debugging/testing + wrapper.__name__ = getattr(func, "__name__", "video_handler") + return wrapper + + +def audio_handler(func: Callable[..., Any]): + """ + Decorator for audio handlers with DX niceties: + - Accepts sync or async functions (sync runs in thread pool) + - Flexible return types: + * None -> pass-through (becomes [original]) + * AudioFrame -> [that] + * List[AudioFrame] -> as-is + * numpy.ndarray / torch.Tensor -> replace samples keeping metadata + + Signature expected by framework: (self, frame: AudioFrame) -> Optional[List[AudioFrame]] + """ + + from .frames import AudioFrame + import torch # type: ignore + import numpy as np # type: ignore + + @trickle_handler("audio") + async def wrapper(*args, **kwargs): + frame = kwargs.get("frame") if "frame" in kwargs else (args[-1] if args else None) + result = await _maybe_await(func, *args, **kwargs) + + if result is None: + return None # internal will convert to [frame] + if isinstance(result, AudioFrame): + return [result] + if isinstance(result, list): + return result + # Tensor/ndarray samples + if frame is None: + return None + frame_a = cast(AudioFrame, frame) + if isinstance(result, torch.Tensor): + samples = result.detach().cpu().numpy() + return [frame_a.replace_samples(samples)] + if isinstance(result, np.ndarray): + return [frame_a.replace_samples(result)] + return None + + wrapper.__name__ = getattr(func, "__name__", "audio_handler") + return wrapper + + +def model_loader(func: Callable[..., Any]): + """Decorator for model loader; allows sync or async functions.""" + + @trickle_handler("model_loader") + async def wrapper(*args, **kwargs): + return await _maybe_await(func, *args, **kwargs) + + wrapper.__name__ = getattr(func, "__name__", "model_loader") + return wrapper + + +def param_updater(func: Optional[Callable[..., Any]] = None, *, model: Optional[Any] = None): + """ + Decorator for parameter updates with optional Pydantic model validation. + + Usage: + @param_updater + async def update_params(self, params: Dict[str, Any]): ... + + @param_updater(model=MyParamsModel) + async def update_params(self, params: MyParamsModel): ... + """ + + def _decorate(fn: Callable[..., Any]): + @trickle_handler("param_updater") + async def wrapper(*args, **kwargs): + # Pull params (positional last or kw) + has_kw = "params" in kwargs + incoming = kwargs["params"] if has_kw else (args[-1] if args else None) + + parsed = incoming + # Parse with Pydantic model if provided and available + if ( + model is not None + and _PYDANTIC_AVAILABLE + and inspect.isclass(model) + and issubclass(model, BaseModel) # type: ignore[arg-type] + ): + try: + if isinstance(incoming, model): # already validated instance + parsed = incoming + else: + mv = getattr(model, "model_validate", None) + if callable(mv): # Pydantic v2 + parsed = mv(incoming) + elif isinstance(incoming, dict): # Pydantic v1 expects kwargs + parsed = model(**incoming) + else: + parsed = incoming + except Exception: + # Fall back to original incoming on validation error to avoid breaking stream + parsed = incoming + + # Rebuild args/kwargs replacing only the params argument + new_args = list(args) + if has_kw: + kwargs["params"] = parsed + else: + if new_args: + new_args[-1] = parsed + else: + new_args.append(parsed) + + return await _maybe_await(fn, *tuple(new_args), **kwargs) + + # Attach for discovery/testing + setattr(wrapper, "_trickle_param_model", model) + wrapper.__name__ = getattr(fn, "__name__", "param_updater") + return wrapper + + # Support bare and called decorator forms + if func is None: + return _decorate + return _decorate(func) + + +def on_stream_stop(func: Callable[..., Any]): + """Decorator for stream stop callback; allows sync or async functions.""" + + @trickle_handler("stream_stop") + async def wrapper(*args, **kwargs): + return await _maybe_await(func, *args, **kwargs) + + wrapper.__name__ = getattr(func, "__name__", "on_stream_stop") + return wrapper + diff --git a/pytrickle/server.py b/pytrickle/server.py index 97d9a5c..c1ed56f 100644 --- a/pytrickle/server.py +++ b/pytrickle/server.py @@ -63,6 +63,9 @@ def __init__( app_kwargs: Optional[Dict[str, Any]] = None, # Frame skipping configuration frame_skip_config: Optional[FrameSkipConfig] = None, + # Model loading configuration + auto_load_model: bool = True, + model_load_kwargs: Optional[Dict[str, Any]] = None, ): """Initialize StreamServer. @@ -105,6 +108,8 @@ def __init__( self.capability_name = capability_name or os.getenv("CAPABILITY_NAME", os.getenv("MODEL_ID","")) self.publisher_timeout = publisher_timeout self.subscriber_timeout = subscriber_timeout + self.auto_load_model = auto_load_model + self.model_load_kwargs = model_load_kwargs or {} # Frame skipping configuration self.frame_skip_config = frame_skip_config @@ -122,6 +127,13 @@ def __init__( for key, value in app_context.items(): self.app[key] = value + # Attach server state to the frame processor for lifecycle awareness + try: + self.frame_processor.attach_state(self.state) + except Exception: + # Optional for processors that don't implement it + logger.debug("FrameProcessor.attach_state not implemented; proceeding without attachment") + # Setup middleware first (order matters) if middleware: self._setup_middleware(middleware) @@ -138,6 +150,26 @@ def __init__( for handler in on_shutdown: self.app.on_shutdown.append(handler) + # Internal: auto-load model on startup if enabled + if self.auto_load_model: + async def _auto_load_model(app: web.Application): + try: + # Indicate loading state explicitly + self.state.set_state(PipelineState.LOADING) + await self.frame_processor.load_model(**self.model_load_kwargs) + # Mark startup complete moves LOADING -> IDLE + self.state.set_startup_complete() + + # Only log model-related messages if a model_loader decorator is present + if hasattr(self.frame_processor, "model_loader"): + if getattr(self.frame_processor, "model_loader", None) is not None: + logger.info("Model loaded successfully during server startup") + except Exception as e: + logger.exception("Model load failed during startup") + self.state.set_error(f"Model load failed: {e}") + + self.app.on_startup.append(_auto_load_model) + # Setup routes if enable_default_routes: self._setup_routes() @@ -631,9 +663,11 @@ async def start_server(self): site = web.TCPSite(runner, self.host, self.port) await site.start() - # Set pipeline ready when server is up and ready to accept requests - self.state.set_state(PipelineState.IDLE) - self.state.set_startup_complete() + # If auto-load is enabled, startup_complete/state will be set by the startup hook + if not self.auto_load_model: + # Set pipeline ready when server is up and ready to accept requests + self.state.set_state(PipelineState.IDLE) + self.state.set_startup_complete() logger.info(f"Server started on {self.host}:{self.port}") return runner diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 66f28a8..b7453c1 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -1,7 +1,7 @@ import asyncio import inspect import logging -from typing import Optional, Callable, Dict, Any, List, Union, Awaitable, Coroutine +from typing import Optional, Callable, Dict, Any, List, Awaitable from .frames import VideoFrame, AudioFrame from .frame_processor import FrameProcessor @@ -13,11 +13,74 @@ # Type aliases for processing functions VideoProcessor = Callable[[VideoFrame], Awaitable[Optional[VideoFrame]]] AudioProcessor = Callable[[AudioFrame], Awaitable[Optional[List[AudioFrame]]]] -ModelLoader = Callable[[Dict[str, Any]], Awaitable[None]] +ModelLoader = Callable[..., Awaitable[None]] ParamUpdater = Callable[[Dict[str, Any]], Awaitable[None]] OnStreamStop = Callable[[], Awaitable[None]] + class StreamProcessor: + @classmethod + def from_handlers( + cls, + handler_instance, + send_data_interval: Optional[float] = 0.333, + name: str = "stream-processor", + port: int = 8000, + frame_skip_config: Optional[FrameSkipConfig] = None, + **server_kwargs + ): + """ + Create StreamProcessor from decorated handler methods. + + Args: + handler_instance: Instance with @trickle_handler decorated methods + send_data_interval: Interval for sending data + name: Processor name + port: Server port + frame_skip_config: Optional frame skipping configuration + **server_kwargs: Additional arguments passed to StreamServer + + Returns: + StreamProcessor instance configured with the decorated handlers + """ + handlers = {} + + # Find all decorated handler methods + for attr_name in dir(handler_instance): + attr = getattr(handler_instance, attr_name) + # For bound methods, the marker lives on the underlying function + fn_obj = getattr(attr, "__func__", attr) + if hasattr(fn_obj, '_trickle_handler'): + handler_type = getattr(fn_obj, '_trickle_handler_type', None) + if handler_type: + if handler_type in handlers: + logger.warning( + f"Duplicate handler for '{handler_type}' found; " + f"overriding previous with '{attr_name}'" + ) + # Store the bound attribute itself so 'self' is preserved + handlers[handler_type] = attr + + # Extract handlers directly without conversion + video_processor = handlers.get('video') + audio_processor = handlers.get('audio') + model_loader = handlers.get('model_loader') + param_updater = handlers.get('param_updater') + on_stream_stop = handlers.get('stream_stop') + + return cls( + video_processor=video_processor, + audio_processor=audio_processor, + model_loader=model_loader, + param_updater=param_updater, + on_stream_stop=on_stream_stop, + send_data_interval=send_data_interval, + name=name, + port=port, + frame_skip_config=frame_skip_config, + **server_kwargs + ) + def __init__( self, video_processor: Optional[VideoProcessor] = None, @@ -45,12 +108,21 @@ def __init__( frame_skip_config: Optional frame skipping configuration (None = no frame skipping) **server_kwargs: Additional arguments passed to StreamServer """ + # Validate that processors are async functions - if video_processor is not None and not inspect.iscoroutinefunction(video_processor): - raise ValueError("video_processor must be an async function") - if audio_processor is not None and not inspect.iscoroutinefunction(audio_processor): - raise ValueError("audio_processor must be an async function") - + for attr_name, fn in { + "video_processor": video_processor, + "audio_processor": audio_processor, + "model_loader": model_loader, + "param_updater": param_updater, + "on_stream_stop": on_stream_stop, + }.items(): + if fn is not None and not inspect.iscoroutinefunction(fn): + # Allow decorated sync functions that became async wrappers + # If user passed a bound method that is sync, our decorators wrap it as async + # If it's truly sync and not decorated, this will raise to keep contract clear + raise ValueError(f"{attr_name} must be an async function") + self.video_processor = video_processor self.audio_processor = audio_processor self.model_loader = model_loader @@ -64,11 +136,11 @@ def __init__( # Create internal frame processor self._frame_processor = _InternalFrameProcessor( - video_processor=video_processor, - audio_processor=audio_processor, - model_loader=model_loader, - param_updater=param_updater, - on_stream_stop=on_stream_stop, + video_processor=self.video_processor, + audio_processor=self.audio_processor, + model_loader=self.model_loader, + param_updater=self.param_updater, + on_stream_stop=self.on_stream_stop, name=name ) @@ -125,7 +197,7 @@ def __init__( self.audio_processor = audio_processor self.model_loader = model_loader self.param_updater = param_updater - self.on_stream_stop = on_stream_stop + self.on_stream_stop_cb = on_stream_stop self.name = name # Frame skipping is handled at TrickleClient level @@ -140,8 +212,8 @@ async def load_model(self, **kwargs): try: await self.model_loader(**kwargs) logger.info(f"StreamProcessor '{self.name}' model loaded successfully") - except Exception as e: - logger.error(f"Error in model loader: {e}") + except Exception: + logger.exception("Error in model loader") raise async def process_video_async(self, frame: VideoFrame) -> Optional[VideoFrame]: @@ -152,9 +224,10 @@ async def process_video_async(self, frame: VideoFrame) -> Optional[VideoFrame]: try: result = await self.video_processor(frame) - return result if isinstance(result, VideoFrame) else frame - except Exception as e: - logger.error(f"Error in video processing: {e}") + # If handler returns None, passthrough original frame + return result if result is not None else frame + except Exception: + logger.exception("Error in video processing") return frame async def process_audio_async(self, frame: AudioFrame) -> Optional[List[AudioFrame]]: @@ -165,15 +238,17 @@ async def process_audio_async(self, frame: AudioFrame) -> Optional[List[AudioFra try: result = await self.audio_processor(frame) + # Normalize results + if result is None: + return [frame] if isinstance(result, AudioFrame): return [result] - elif isinstance(result, list): + if isinstance(result, list): return result - elif result is None: - return [frame] + # Unknown type -> pass-through return [frame] - except Exception as e: - logger.error(f"Error in audio processing: {e}") + except Exception: + logger.exception("Error in audio processing") return [frame] async def update_params(self, params: Dict[str, Any]): @@ -182,5 +257,13 @@ async def update_params(self, params: Dict[str, Any]): try: await self.param_updater(params) logger.info(f"Parameters updated: {params}") - except Exception as e: - logger.error(f"Error updating parameters: {e}") + except Exception: + logger.exception("Error updating parameters") + + async def on_stream_stop(self): + """Invoke optional user-provided stream stop callback.""" + if self.on_stream_stop_cb: + try: + await self.on_stream_stop_cb() + except Exception: + logger.exception("Error in on_stream_stop callback")