From b1d4f17723019fbae7b8304619e52bed97bf1ebf Mon Sep 17 00:00:00 2001 From: Brad P Date: Mon, 18 Aug 2025 15:44:28 -0500 Subject: [PATCH 01/11] add sending data to stream_processor and stream_handler --- pytrickle/client.py | 53 ++++++++++++++++++++++++++++++++--- pytrickle/protocol.py | 2 +- pytrickle/stream_handler.py | 13 +++++++-- pytrickle/stream_processor.py | 4 +++ 4 files changed, 64 insertions(+), 8 deletions(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index 2b80ad1..200f2c0 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -50,9 +50,10 @@ def __init__( self.stop_event = asyncio.Event() self.error_event = asyncio.Event() - # Output queue for processed frames + # Output queues self.output_queue = queue.Queue() - + self.data_queue = asyncio.Queue(maxsize=250) + def process_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Optional[Union[VideoOutput, AudioOutput]]: """Process a single frame and return the output.""" if not frame: @@ -85,6 +86,7 @@ async def start(self, request_id: str = "default"): self._ingress_loop(), self._egress_loop(), self._control_loop(), + self._send_data_loop(), return_exceptions=True ) @@ -119,8 +121,11 @@ async def stop(self): async def publish_data(self, data: str): """Publish data via the protocol's data publisher.""" - return await self.protocol.publish_data(data) - + try: + await self.data_queue.put_nowait(data) + except asyncio.QueueFull: + logger.warning("Could not send data, queue is full. Reduce velocity of data publishing.") + async def _ingress_loop(self): """Process incoming frames with native async support.""" try: @@ -258,6 +263,46 @@ async def _control_loop(self): except Exception as cb_error: logger.error(f"Error in error callback: {cb_error}") + async def _send_data_loop(self): + """Send data to the server every 250ms, batching all available items.""" + try: + while not self.stop_event.is_set(): + # Wait for 250ms or until stop event is set + try: + await asyncio.wait_for(self.stop_event.wait(), timeout=0.25) + break # Stop event was set, exit loop + except asyncio.TimeoutError: + pass # Timeout is expected, continue to process data + + # Pull all available items from the data_queue + data_items = [] + while True: + try: + data = self.data_queue.get_nowait() + if data is None: + # Sentinel value to stop loop + if data_items: + # Send any remaining items before stopping + break + else: + return # No items to send, just stop + data_items.append(data) + except asyncio.QueueEmpty: + break # No more items in queue + + # Send all collected data items + for data in data_items: + # Ensure data ends with newline character + if not data.endswith('\n'): + data += '\n' + + # Send data using protocol + await self.protocol.publish_data(data) + + except Exception as e: + logger.error(f"Error in data sending loop: {e}") + + async def _handle_control_message(self, control_data: dict): """Handle a control message.""" if self.control_handler: diff --git a/pytrickle/protocol.py b/pytrickle/protocol.py index dfac3ca..0b7c3be 100644 --- a/pytrickle/protocol.py +++ b/pytrickle/protocol.py @@ -154,7 +154,7 @@ async def start(self): # Initialize data publisher if URL provided if self.data_url and self.data_url.strip(): - self.data_publisher = TricklePublisher(self.data_url, "application/octet-stream", error_callback=self._on_component_error) + self.data_publisher = TricklePublisher(self.data_url, "application/jsonl", error_callback=self._on_component_error) await self.data_publisher.start() # Start monitoring subscription end for immediate cleanup diff --git a/pytrickle/stream_handler.py b/pytrickle/stream_handler.py index e715850..cdec3ad 100644 --- a/pytrickle/stream_handler.py +++ b/pytrickle/stream_handler.py @@ -83,7 +83,8 @@ def __init__( self._task: Optional[asyncio.Task] = None self._control_task: Optional[asyncio.Task] = None self._monitoring_task: Optional[asyncio.Task] = None - + self._send_data_task: Optional[asyncio.Task] = None + # Error handling self._error_callback = error_callback self._critical_error_occurred = False @@ -332,7 +333,12 @@ async def start(self) -> bool: self._monitoring_task = asyncio.create_task(self._monitoring_loop()) except Exception as e: logger.warning(f"Failed to start monitoring task: {e}") - + + try: + self._send_data_task = asyncio.create_task(self._send_data_loop()) + except Exception as e: + logger.warning(f"Failed to start send data task: {e}") + logger.info("TrickleStreamHandler started successfully") return True @@ -358,7 +364,8 @@ async def stop(self, *, called_by_manager: bool = False) -> bool: await self._cancel_task_with_timeout(self._task, "Main task") await self._cancel_task_with_timeout(self._control_task, "Control task") await self._cancel_task_with_timeout(self._monitoring_task, "Monitoring task") - + await self._cancel_task_with_timeout(self._send_data_task, "Send data task") + # Close control subscriber if self.control_subscriber: try: diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 4eef493..934cbc0 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -59,6 +59,10 @@ def __init__( **server_kwargs ) + async def send_data(self, data: str): + """Send data to the server.""" + await self.server.current_client.publish_data(data) + async def run_forever(self): """Run the stream processor server forever.""" await self.server.run_forever() From d99f414816ce07e67096b17be52e83986de303d8 Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 19 Aug 2025 14:32:31 -0500 Subject: [PATCH 02/11] updates --- pytrickle/client.py | 2 +- pytrickle/stream_processor.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index 200f2c0..8e6281e 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -93,7 +93,7 @@ async def start(self, request_id: str = "default"): # Check if any loop had an exception that is not a cancelled error for i, result in enumerate(results): if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError): - loop_names = ["ingress", "egress", "control"] + loop_names = ["ingress", "egress", "control", "send_data"] logger.error(f"{loop_names[i]} loop failed: {result}") except asyncio.CancelledError: diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 934cbc0..9e3b461 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -61,6 +61,9 @@ def __init__( async def send_data(self, data: str): """Send data to the server.""" + if self.server.current_client is None: + logger.warning("No active client connection, cannot send data") + return await self.server.current_client.publish_data(data) async def run_forever(self): From 6fbaeb2cba338dacb1c93e3d587072338ab110e7 Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 19 Aug 2025 15:59:23 -0500 Subject: [PATCH 03/11] fix --- pytrickle/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index 8e6281e..5e0ce52 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -122,7 +122,7 @@ async def stop(self): async def publish_data(self, data: str): """Publish data via the protocol's data publisher.""" try: - await self.data_queue.put_nowait(data) + self.data_queue.put_nowait(data) except asyncio.QueueFull: logger.warning("Could not send data, queue is full. Reduce velocity of data publishing.") From b01635eb74d9899158c0da802b53c9a14479fbea Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 19 Aug 2025 16:30:25 -0500 Subject: [PATCH 04/11] send data items in json str --- pytrickle/client.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index 5e0ce52..1f32974 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -8,6 +8,7 @@ import asyncio import queue import logging +import json from typing import Callable, Optional, Union from .protocol import TrickleProtocol @@ -291,13 +292,8 @@ async def _send_data_loop(self): break # No more items in queue # Send all collected data items - for data in data_items: - # Ensure data ends with newline character - if not data.endswith('\n'): - data += '\n' - - # Send data using protocol - await self.protocol.publish_data(data) + data_str = json.dumps(data_items) + "\n" + await self.protocol.publish_data(data_str) except Exception as e: logger.error(f"Error in data sending loop: {e}") From 5db0f779ec58bcd3e66df465cf92366a8b8c1715 Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 19 Aug 2025 21:12:34 -0500 Subject: [PATCH 05/11] send data only if needed --- pytrickle/client.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index 1f32974..7bff678 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -268,9 +268,9 @@ async def _send_data_loop(self): """Send data to the server every 250ms, batching all available items.""" try: while not self.stop_event.is_set(): - # Wait for 250ms or until stop event is set + # Wait for 333ms or until stop event is set try: - await asyncio.wait_for(self.stop_event.wait(), timeout=0.25) + await asyncio.wait_for(self.stop_event.wait(), timeout=0.333) break # Stop event was set, exit loop except asyncio.TimeoutError: pass # Timeout is expected, continue to process data @@ -292,8 +292,9 @@ async def _send_data_loop(self): break # No more items in queue # Send all collected data items - data_str = json.dumps(data_items) + "\n" - await self.protocol.publish_data(data_str) + if len(data_items) > 0: + data_str = json.dumps(data_items) + "\n" + await self.protocol.publish_data(data_str) except Exception as e: logger.error(f"Error in data sending loop: {e}") From 65bdf0e412430607b3c3f6ea1dacedb0e73a8c49 Mon Sep 17 00:00:00 2001 From: Brad P Date: Tue, 19 Aug 2025 23:17:55 -0500 Subject: [PATCH 06/11] fix comment --- pytrickle/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index 7bff678..11a90b3 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -265,7 +265,7 @@ async def _control_loop(self): logger.error(f"Error in error callback: {cb_error}") async def _send_data_loop(self): - """Send data to the server every 250ms, batching all available items.""" + """Send data to the server every 333ms, batching all available items.""" try: while not self.stop_event.is_set(): # Wait for 333ms or until stop event is set From 39a848b431df38c20eb890d211d8cd07997b2c92 Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 18:19:06 -0500 Subject: [PATCH 07/11] revert stream handler changes --- pytrickle/stream_handler.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pytrickle/stream_handler.py b/pytrickle/stream_handler.py index cdec3ad..d2a3f16 100644 --- a/pytrickle/stream_handler.py +++ b/pytrickle/stream_handler.py @@ -83,7 +83,6 @@ def __init__( self._task: Optional[asyncio.Task] = None self._control_task: Optional[asyncio.Task] = None self._monitoring_task: Optional[asyncio.Task] = None - self._send_data_task: Optional[asyncio.Task] = None # Error handling self._error_callback = error_callback @@ -334,11 +333,6 @@ async def start(self) -> bool: except Exception as e: logger.warning(f"Failed to start monitoring task: {e}") - try: - self._send_data_task = asyncio.create_task(self._send_data_loop()) - except Exception as e: - logger.warning(f"Failed to start send data task: {e}") - logger.info("TrickleStreamHandler started successfully") return True @@ -364,7 +358,6 @@ async def stop(self, *, called_by_manager: bool = False) -> bool: await self._cancel_task_with_timeout(self._task, "Main task") await self._cancel_task_with_timeout(self._control_task, "Control task") await self._cancel_task_with_timeout(self._monitoring_task, "Monitoring task") - await self._cancel_task_with_timeout(self._send_data_task, "Send data task") # Close control subscriber if self.control_subscriber: From 38b50a1738f26260ee3207970013369ace4ab732 Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 18:23:27 -0500 Subject: [PATCH 08/11] ensure None is added to data_queue to stop the data loop --- pytrickle/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pytrickle/client.py b/pytrickle/client.py index 11a90b3..fcc41c0 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -117,6 +117,7 @@ async def stop(self): # Send sentinel value to stop egress loop try: self.output_queue.put_nowait(None) + await self.data_queue.put(None) except queue.Full: pass From 7f6e90bcb1519cd620371a8fa1734c815652ae86 Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 18:39:48 -0500 Subject: [PATCH 09/11] add return True/False on send_data in StreamProcessor to indicate success/failure --- pytrickle/stream_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 9e3b461..83dc5fb 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -63,8 +63,9 @@ async def send_data(self, data: str): """Send data to the server.""" if self.server.current_client is None: logger.warning("No active client connection, cannot send data") - return + return False await self.server.current_client.publish_data(data) + return True async def run_forever(self): """Run the stream processor server forever.""" From a26d25c8e0becac1805088137bcdff350f35902e Mon Sep 17 00:00:00 2001 From: Brad P Date: Wed, 20 Aug 2025 18:42:33 -0500 Subject: [PATCH 10/11] add send_data_interval to StreamProcessor and TrickleClient --- pytrickle/client.py | 14 +++++++++++--- pytrickle/stream_processor.py | 2 ++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index fcc41c0..c8354e9 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -27,6 +27,7 @@ def __init__( protocol: TrickleProtocol, frame_processor: 'FrameProcessor', control_handler: Optional[Callable] = None, + send_data_interval: Optional[float] = 0.333, error_callback: Optional[ErrorCallback] = None ): """Initialize TrickleClient. @@ -40,6 +41,8 @@ def __init__( self.protocol = protocol self.frame_processor = frame_processor self.control_handler = control_handler + self.send_data_interval = send_data_interval + # Use provided error_callback, or fall back to frame_processor's error_callback self.error_callback = error_callback or frame_processor.error_callback @@ -269,9 +272,9 @@ async def _send_data_loop(self): """Send data to the server every 333ms, batching all available items.""" try: while not self.stop_event.is_set(): - # Wait for 333ms or until stop event is set + # Wait for send_data_interval or until stop event is set try: - await asyncio.wait_for(self.stop_event.wait(), timeout=0.333) + await asyncio.wait_for(self.stop_event.wait(), timeout=self.send_data_interval) break # Stop event was set, exit loop except asyncio.TimeoutError: pass # Timeout is expected, continue to process data @@ -294,7 +297,12 @@ async def _send_data_loop(self): # Send all collected data items if len(data_items) > 0: - data_str = json.dumps(data_items) + "\n" + try: + data_str = json.dumps(data_items) + "\n" + except Exception as e: + logger.error(f"Error serializing data items: {e}") + continue + await self.protocol.publish_data(data_str) except Exception as e: diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 83dc5fb..3c98be1 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -19,6 +19,7 @@ def __init__( audio_processor: Optional[AudioProcessor] = None, model_loader: Optional[Callable[[], None]] = None, param_updater: Optional[Callable[[Dict[str, Any]], None]] = None, + send_data_interval: Optional[float] = 0.333, name: str = "stream-processor", port: int = 8000, **server_kwargs @@ -39,6 +40,7 @@ def __init__( self.audio_processor = audio_processor self.model_loader = model_loader self.param_updater = param_updater + self.send_data_interval = send_data_interval self.name = name self.port = port self.server_kwargs = server_kwargs From 5e960a0045ea495e3cef8b3826698453f9d3958a Mon Sep 17 00:00:00 2001 From: Brad P Date: Sat, 23 Aug 2025 07:42:51 -0500 Subject: [PATCH 11/11] update data queue to use deque with a longer max size --- pytrickle/client.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pytrickle/client.py b/pytrickle/client.py index c8354e9..522e7f1 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -9,7 +9,8 @@ import queue import logging import json -from typing import Callable, Optional, Union +from typing import Callable, Optional, Union, Deque, Any +from collections import deque from .protocol import TrickleProtocol from .frames import VideoFrame, AudioFrame, VideoOutput, AudioOutput @@ -56,7 +57,7 @@ def __init__( # Output queues self.output_queue = queue.Queue() - self.data_queue = asyncio.Queue(maxsize=250) + self.data_queue: Deque[Any] = deque(maxlen=1000) def process_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Optional[Union[VideoOutput, AudioOutput]]: """Process a single frame and return the output.""" @@ -120,16 +121,13 @@ async def stop(self): # Send sentinel value to stop egress loop try: self.output_queue.put_nowait(None) - await self.data_queue.put(None) + await self.data_queue.append(None) except queue.Full: pass async def publish_data(self, data: str): """Publish data via the protocol's data publisher.""" - try: - self.data_queue.put_nowait(data) - except asyncio.QueueFull: - logger.warning("Could not send data, queue is full. Reduce velocity of data publishing.") + self.data_queue.append(data) async def _ingress_loop(self): """Process incoming frames with native async support.""" @@ -283,7 +281,7 @@ async def _send_data_loop(self): data_items = [] while True: try: - data = self.data_queue.get_nowait() + data = self.data_queue.popleft() if data is None: # Sentinel value to stop loop if data_items: @@ -292,7 +290,7 @@ async def _send_data_loop(self): else: return # No items to send, just stop data_items.append(data) - except asyncio.QueueEmpty: + except IndexError: break # No more items in queue # Send all collected data items