Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pytrickle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from .fps_meter import FPSMeter
from .frame_skipper import FrameSkipConfig

from .decorators import trickle_handler

from . import api

from .version import __version__
Expand Down Expand Up @@ -59,5 +61,6 @@
"FrameProcessor",
"FPSMeter",
"FrameSkipConfig",
"trickle_handler",
"__version__"
]
38 changes: 38 additions & 0 deletions pytrickle/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
def trickle_handler(handler_type: str):
"""
Decorator to mark methods as trickle handlers.

Args:
handler_type: Type of handler ('video', 'audio', 'model_loader', 'param_updater', 'stream_stop')

Usage:
@trickle_handler("video")
async def process_video(self, frame: VideoFrame) -> Optional[VideoFrame]:
# Process video frame using existing VideoFrame interface
return processed_frame

@trickle_handler("audio")
async def process_audio(self, frame: AudioFrame) -> Optional[List[AudioFrame]]:
# Process audio frame using existing AudioFrame interface
return [processed_frame]

@trickle_handler("model_loader")
async def load_model(self, **kwargs):
# Load model with keyword arguments
pass

@trickle_handler("param_updater")
async def update_params(self, params: Dict[str, Any]):
# Update parameters with dict
pass

@trickle_handler("stream_stop")
async def on_stream_stop(self):
# Handle stream stop
pass
"""
def decorator(func):
func._trickle_handler_type = handler_type
func._trickle_handler = True
return func
return decorator
53 changes: 53 additions & 0 deletions pytrickle/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,59 @@
OnStreamStop = Callable[[], Awaitable[None]]

class StreamProcessor:
@classmethod
def from_handlers(
cls,
handler_instance,
send_data_interval: Optional[float] = 0.333,
name: str = "stream-processor",
port: int = 8000,
frame_skip_config: Optional[FrameSkipConfig] = None,
**server_kwargs
):
"""
Create StreamProcessor from decorated handler methods.

Args:
handler_instance: Instance with @trickle_handler decorated methods
send_data_interval: Interval for sending data
name: Processor name
port: Server port
frame_skip_config: Optional frame skipping configuration
**server_kwargs: Additional arguments passed to StreamServer

Returns:
StreamProcessor instance configured with the decorated handlers
"""
handlers = {}

# Find all decorated handler methods
for attr_name in dir(handler_instance):
attr = getattr(handler_instance, attr_name)
if hasattr(attr, '_trickle_handler'):
handler_type = attr._trickle_handler_type
handlers[handler_type] = attr

# Extract handlers directly without conversion
video_processor = handlers.get('video')
audio_processor = handlers.get('audio')
model_loader = handlers.get('model_loader')
param_updater = handlers.get('param_updater')
on_stream_stop = handlers.get('stream_stop')

return cls(
video_processor=video_processor,
audio_processor=audio_processor,
model_loader=model_loader,
param_updater=param_updater,
on_stream_stop=on_stream_stop,
send_data_interval=send_data_interval,
name=name,
port=port,
frame_skip_config=frame_skip_config,
**server_kwargs
)

def __init__(
self,
video_processor: Optional[VideoProcessor] = None,
Expand Down