diff --git a/pytrickle/client.py b/pytrickle/client.py index 2b80ad1..522e7f1 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -8,7 +8,9 @@ import asyncio import queue import logging -from typing import Callable, Optional, Union +import json +from typing import Callable, Optional, Union, Deque, Any +from collections import deque from .protocol import TrickleProtocol from .frames import VideoFrame, AudioFrame, VideoOutput, AudioOutput @@ -26,6 +28,7 @@ def __init__( protocol: TrickleProtocol, frame_processor: 'FrameProcessor', control_handler: Optional[Callable] = None, + send_data_interval: Optional[float] = 0.333, error_callback: Optional[ErrorCallback] = None ): """Initialize TrickleClient. @@ -39,6 +42,8 @@ def __init__( self.protocol = protocol self.frame_processor = frame_processor self.control_handler = control_handler + self.send_data_interval = send_data_interval + # Use provided error_callback, or fall back to frame_processor's error_callback self.error_callback = error_callback or frame_processor.error_callback @@ -50,9 +55,10 @@ def __init__( self.stop_event = asyncio.Event() self.error_event = asyncio.Event() - # Output queue for processed frames + # Output queues self.output_queue = queue.Queue() - + self.data_queue: Deque[Any] = deque(maxlen=1000) + def process_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Optional[Union[VideoOutput, AudioOutput]]: """Process a single frame and return the output.""" if not frame: @@ -85,13 +91,14 @@ async def start(self, request_id: str = "default"): self._ingress_loop(), self._egress_loop(), self._control_loop(), + self._send_data_loop(), return_exceptions=True ) # Check if any loop had an exception that is not a cancelled error for i, result in enumerate(results): if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError): - loop_names = ["ingress", "egress", "control"] + loop_names = ["ingress", "egress", "control", "send_data"] logger.error(f"{loop_names[i]} loop failed: {result}") except asyncio.CancelledError: @@ -114,13 +121,14 @@ async def stop(self): # Send sentinel value to stop egress loop try: self.output_queue.put_nowait(None) + await self.data_queue.append(None) except queue.Full: pass async def publish_data(self, data: str): """Publish data via the protocol's data publisher.""" - return await self.protocol.publish_data(data) - + self.data_queue.append(data) + async def _ingress_loop(self): """Process incoming frames with native async support.""" try: @@ -258,6 +266,47 @@ async def _control_loop(self): except Exception as cb_error: logger.error(f"Error in error callback: {cb_error}") + async def _send_data_loop(self): + """Send data to the server every 333ms, batching all available items.""" + try: + while not self.stop_event.is_set(): + # Wait for send_data_interval or until stop event is set + try: + await asyncio.wait_for(self.stop_event.wait(), timeout=self.send_data_interval) + break # Stop event was set, exit loop + except asyncio.TimeoutError: + pass # Timeout is expected, continue to process data + + # Pull all available items from the data_queue + data_items = [] + while True: + try: + data = self.data_queue.popleft() + if data is None: + # Sentinel value to stop loop + if data_items: + # Send any remaining items before stopping + break + else: + return # No items to send, just stop + data_items.append(data) + except IndexError: + break # No more items in queue + + # Send all collected data items + if len(data_items) > 0: + try: + data_str = json.dumps(data_items) + "\n" + except Exception as e: + logger.error(f"Error serializing data items: {e}") + continue + + await self.protocol.publish_data(data_str) + + except Exception as e: + logger.error(f"Error in data sending loop: {e}") + + async def _handle_control_message(self, control_data: dict): """Handle a control message.""" if self.control_handler: diff --git a/pytrickle/protocol.py b/pytrickle/protocol.py index feb70a1..c20cf1c 100644 --- a/pytrickle/protocol.py +++ b/pytrickle/protocol.py @@ -170,7 +170,7 @@ async def start(self): # Initialize data publisher if URL provided if self.data_url and self.data_url.strip(): - self.data_publisher = TricklePublisher(self.data_url, "application/octet-stream", error_callback=self._on_component_error) + self.data_publisher = TricklePublisher(self.data_url, "application/jsonl", error_callback=self._on_component_error) if self.publisher_timeout is not None: self.data_publisher.connect_timeout_seconds = self.publisher_timeout await self.data_publisher.start() diff --git a/pytrickle/stream_handler.py b/pytrickle/stream_handler.py index 60e23c2..e2aab73 100644 --- a/pytrickle/stream_handler.py +++ b/pytrickle/stream_handler.py @@ -83,7 +83,7 @@ def __init__( self._task: Optional[asyncio.Task] = None self._control_task: Optional[asyncio.Task] = None self._monitoring_task: Optional[asyncio.Task] = None - + # Error handling self._error_callback = error_callback self._critical_error_occurred = False @@ -332,7 +332,7 @@ async def start(self) -> bool: self._monitoring_task = asyncio.create_task(self._monitoring_loop()) except Exception as e: logger.warning(f"Failed to start monitoring task: {e}") - + logger.info("TrickleStreamHandler started successfully") return True @@ -358,7 +358,7 @@ async def stop(self, *, called_by_manager: bool = False) -> bool: await self._cancel_task_with_timeout(self._task, "Main task") await self._cancel_task_with_timeout(self._control_task, "Control task") await self._cancel_task_with_timeout(self._monitoring_task, "Monitoring task") - + # Close control subscriber if self.control_subscriber: try: diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 55c3263..97afc0f 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -21,6 +21,7 @@ def __init__( audio_processor: Optional[AudioProcessor] = None, model_loader: Optional[Callable[[], None]] = None, param_updater: Optional[Callable[[Dict[str, Any]], None]] = None, + send_data_interval: Optional[float] = 0.333, name: str = "stream-processor", port: int = 8000, **server_kwargs @@ -41,6 +42,7 @@ def __init__( self.audio_processor = audio_processor self.model_loader = model_loader self.param_updater = param_updater + self.send_data_interval = send_data_interval self.name = name self.port = port self.server_kwargs = server_kwargs @@ -61,6 +63,14 @@ def __init__( **server_kwargs ) + async def send_data(self, data: str): + """Send data to the server.""" + if self.server.current_client is None: + logger.warning("No active client connection, cannot send data") + return False + await self.server.current_client.publish_data(data) + return True + async def run_forever(self): """Run the stream processor server forever.""" await self.server.run_forever()