diff --git a/pytrickle/__init__.py b/pytrickle/__init__.py index e532797..8eea39d 100644 --- a/pytrickle/__init__.py +++ b/pytrickle/__init__.py @@ -30,6 +30,8 @@ from .fps_meter import FPSMeter from .frame_skipper import FrameSkipConfig +from .decorators import trickle_handler + from . import api from .version import __version__ @@ -59,5 +61,6 @@ "FrameProcessor", "FPSMeter", "FrameSkipConfig", + "trickle_handler", "__version__" ] \ No newline at end of file diff --git a/pytrickle/decorators.py b/pytrickle/decorators.py new file mode 100644 index 0000000..8b92eb6 --- /dev/null +++ b/pytrickle/decorators.py @@ -0,0 +1,38 @@ +def trickle_handler(handler_type: str): + """ + Decorator to mark methods as trickle handlers. + + Args: + handler_type: Type of handler ('video', 'audio', 'model_loader', 'param_updater', 'stream_stop') + + Usage: + @trickle_handler("video") + async def process_video(self, frame: VideoFrame) -> Optional[VideoFrame]: + # Process video frame using existing VideoFrame interface + return processed_frame + + @trickle_handler("audio") + async def process_audio(self, frame: AudioFrame) -> Optional[List[AudioFrame]]: + # Process audio frame using existing AudioFrame interface + return [processed_frame] + + @trickle_handler("model_loader") + async def load_model(self, **kwargs): + # Load model with keyword arguments + pass + + @trickle_handler("param_updater") + async def update_params(self, params: Dict[str, Any]): + # Update parameters with dict + pass + + @trickle_handler("stream_stop") + async def on_stream_stop(self): + # Handle stream stop + pass + """ + def decorator(func): + func._trickle_handler_type = handler_type + func._trickle_handler = True + return func + return decorator diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 66f28a8..21b41e3 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -18,6 +18,59 @@ OnStreamStop = Callable[[], Awaitable[None]] class StreamProcessor: + @classmethod + def from_handlers( + cls, + handler_instance, + send_data_interval: Optional[float] = 0.333, + name: str = "stream-processor", + port: int = 8000, + frame_skip_config: Optional[FrameSkipConfig] = None, + **server_kwargs + ): + """ + Create StreamProcessor from decorated handler methods. + + Args: + handler_instance: Instance with @trickle_handler decorated methods + send_data_interval: Interval for sending data + name: Processor name + port: Server port + frame_skip_config: Optional frame skipping configuration + **server_kwargs: Additional arguments passed to StreamServer + + Returns: + StreamProcessor instance configured with the decorated handlers + """ + handlers = {} + + # Find all decorated handler methods + for attr_name in dir(handler_instance): + attr = getattr(handler_instance, attr_name) + if hasattr(attr, '_trickle_handler'): + handler_type = attr._trickle_handler_type + handlers[handler_type] = attr + + # Extract handlers directly without conversion + video_processor = handlers.get('video') + audio_processor = handlers.get('audio') + model_loader = handlers.get('model_loader') + param_updater = handlers.get('param_updater') + on_stream_stop = handlers.get('stream_stop') + + return cls( + video_processor=video_processor, + audio_processor=audio_processor, + model_loader=model_loader, + param_updater=param_updater, + on_stream_stop=on_stream_stop, + send_data_interval=send_data_interval, + name=name, + port=port, + frame_skip_config=frame_skip_config, + **server_kwargs + ) + def __init__( self, video_processor: Optional[VideoProcessor] = None,