diff --git a/.vscode/launch.json b/.vscode/launch.json index b3af0ae..db59274 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -23,6 +23,26 @@ }, "justMyCode": true, "python": "${command:python.interpreterPath}" + }, + { + "name": "PyTrickle Demo with Text Publishing", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/examples/text_output_example.py", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/examples", + "env": { + "ORCH_URL": "https://localhost:9995", + "ORCH_SECRET": "orch-secret", + "CAPABILITY_NAME": "trickle-stream-example", + "CAPABILITY_DESCRIPTION": "Flip video upside down using pytrickle", + "CAPABILITY_URL": "http://localhost:8000", + "CAPABILITY_PRICE_PER_UNIT": "0", + "CAPABILITY_PRICE_SCALING": "1", + "CAPABILITY_CAPACITY": "1" + }, + "justMyCode": true, + "python": "${command:python.interpreterPath}" } ] } \ No newline at end of file diff --git a/examples/text_output_example.py b/examples/text_output_example.py new file mode 100644 index 0000000..f7b7ad6 --- /dev/null +++ b/examples/text_output_example.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Video/Audio Passthrough with Text Publishing using StreamProcessor + +This example demonstrates: +- Video passthrough (no processing) +- Audio passthrough (no processing) +- Text publishing every 400 audio frames using simple text queue +""" + +import logging +import json +import time +from pytrickle import StreamProcessor +from pytrickle.frames import VideoFrame, AudioFrame +from typing import List + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Global state +audio_frame_count = 0 +text_publish_interval = 400 +ready = False +start_time = None +_stream_processor = None # Reference to StreamProcessor for text publishing + +def load_model(**kwargs): + """Initialize processor state - called during model loading phase.""" + global text_publish_interval, ready, start_time + + logger.info(f"load_model called with kwargs: {kwargs}") + + # Set processor variables from kwargs or use defaults + text_publish_interval = kwargs.get('text_publish_interval', 400) + text_publish_interval = max(1, int(text_publish_interval)) + + ready = True + start_time = time.time() + logger.info(f"✅ Video/Audio passthrough with text publishing ready (interval: {text_publish_interval} frames)") + +async def process_video(frame: VideoFrame) -> VideoFrame: + """Pass through video frames unchanged.""" + global ready + + if not ready: + return frame + + # Simply pass through the video frame without any processing + return frame + +async def process_audio(frame: AudioFrame) -> List[AudioFrame]: + """Pass through audio frames and publish text data periodically.""" + global audio_frame_count, text_publish_interval, ready, start_time, _stream_processor + + if not ready: + return [frame] + + # Increment frame counter + audio_frame_count += 1 + + # Check if we should publish text data + if audio_frame_count % text_publish_interval == 0: + # Calculate elapsed time + elapsed_time = time.time() - start_time + + # Create JSONL data with audio processing statistics + text_data = { + "type": "audio_stats", + "timestamp": time.time(), + "elapsed_time_seconds": round(elapsed_time, 2), + "total_audio_frames": audio_frame_count, + "frames_per_second": round(audio_frame_count / elapsed_time, 2) if elapsed_time > 0 else 0, + "frame_shape": list(frame.samples.shape) if hasattr(frame, 'samples') else None, + "sample_rate": getattr(frame, 'sample_rate', None), + "channels": getattr(frame, 'channels', None), + "message": f"Processed {audio_frame_count} audio frames in {elapsed_time:.2f} seconds" + } + + # Publish as JSONL - just add text to the queue! + jsonl_line = json.dumps(text_data) + if _stream_processor: + await _stream_processor.publish_data_output(jsonl_line) + + logger.info(f"Published stats: {audio_frame_count} frames, {elapsed_time:.2f}s elapsed") + + # Pass through the audio frame unchanged + return [frame] + +def update_params(params: dict): + """Update text publishing interval.""" + global text_publish_interval + + if "text_publish_interval" in params: + old = text_publish_interval + text_publish_interval = max(1, int(params["text_publish_interval"])) + if old != text_publish_interval: + logger.info(f"Text publish interval: {old} → {text_publish_interval} frames") + + if "reset_counter" in params and params["reset_counter"]: + global audio_frame_count, start_time + audio_frame_count = 0 + start_time = time.time() + logger.info("Reset audio frame counter and timer") + + +# Create and run StreamProcessor +if __name__ == "__main__": + processor = StreamProcessor( + video_processor=process_video, + audio_processor=process_audio, + model_loader=load_model, + param_updater=update_params, + name="passthrough-with-text", + port=8000 + ) + + # Set reference for text publishing + _stream_processor = processor + + logger.info("Starting passthrough processor with text publishing...") + logger.info(f"Will publish JSONL stats every {text_publish_interval} audio frames") + logger.info("Update parameters via /api/update_params:") + logger.info(" - text_publish_interval: number of frames between text publications") + logger.info(" - reset_counter: true to reset frame counter and timer") + + processor.run() \ No newline at end of file diff --git a/pytrickle/__init__.py b/pytrickle/__init__.py index a5f4d46..20bcceb 100644 --- a/pytrickle/__init__.py +++ b/pytrickle/__init__.py @@ -16,7 +16,7 @@ from .server import StreamServer from .protocol import TrickleProtocol from .frames import ( - VideoFrame, AudioFrame, VideoOutput, AudioOutput, + VideoFrame, AudioFrame, VideoOutput, AudioOutput, TextOutput, FrameBuffer, ) from .state import StreamState @@ -45,6 +45,7 @@ "AudioFrame", "VideoOutput", "AudioOutput", + "TextOutput", "TricklePublisher", "TrickleSubscriber", "BaseStreamManager", diff --git a/pytrickle/client.py b/pytrickle/client.py index 2b80ad1..24bb988 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -8,10 +8,11 @@ import asyncio import queue import logging -from typing import Callable, Optional, Union +from typing import Callable, Optional, Union, List +from collections import deque from .protocol import TrickleProtocol -from .frames import VideoFrame, AudioFrame, VideoOutput, AudioOutput +from .frames import VideoFrame, AudioFrame, VideoOutput, AudioOutput, TextOutput from . import ErrorCallback from .frame_processor import FrameProcessor @@ -26,7 +27,8 @@ def __init__( protocol: TrickleProtocol, frame_processor: 'FrameProcessor', control_handler: Optional[Callable] = None, - error_callback: Optional[ErrorCallback] = None + error_callback: Optional[ErrorCallback] = None, + text_queue: Optional[deque] = None ): """Initialize TrickleClient. @@ -35,12 +37,14 @@ def __init__( frame_processor: FrameProcessor for native async processing control_handler: Optional control message handler error_callback: Optional error callback (if None, uses frame_processor.error_callback) + text_queue: Optional deque of strings to publish as text data """ self.protocol = protocol self.frame_processor = frame_processor self.control_handler = control_handler # Use provided error_callback, or fall back to frame_processor's error_callback self.error_callback = error_callback or frame_processor.error_callback + self.text_queue = text_queue # Optional text queue for apps to use # Client state self.running = False @@ -53,6 +57,7 @@ def __init__( # Output queue for processed frames self.output_queue = queue.Queue() + def process_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Optional[Union[VideoOutput, AudioOutput]]: """Process a single frame and return the output.""" if not frame: @@ -133,10 +138,14 @@ async def _ingress_loop(self): # Process frames directly in ingress loop try: if isinstance(frame, VideoFrame): - logger.debug(f"Processing video frame with frame processor: {frame.tensor.shape}") - + logger.debug( + f"Processing video frame with frame processor: {frame.tensor.shape}" + ) + # Direct async processing - processed_frame = await self.frame_processor.process_video_async(frame) + processed_frame = ( + await self.frame_processor.process_video_async(frame) + ) if processed_frame: output = VideoOutput(processed_frame, self.request_id) await self._send_output(output) @@ -144,17 +153,30 @@ async def _ingress_loop(self): else: logger.warning(f"Frame processor returned None for video frame") + # Check for text outputs from app text queue + await self._process_app_text_queue() + elif isinstance(frame, AudioFrame): - logger.debug(f"Processing audio frame with frame processor: {frame.samples.shape}") - + logger.debug( + f"Processing audio frame with frame processor: {frame.samples.shape}" + ) + # Direct async processing for audio - processed_frames = await self.frame_processor.process_audio_async(frame) + processed_frames = ( + await self.frame_processor.process_audio_async(frame) + ) if processed_frames: output = AudioOutput(processed_frames, self.request_id) await self._send_output(output) logger.debug(f"Sent async processed audio frame to egress") else: - logger.warning(f"Frame processor returned None for audio frame") + logger.warning( + f"Frame processor returned None for audio frame" + ) + + # Check for text outputs from app text queue + await self._process_app_text_queue() + else: logger.debug(f"Received unknown frame type: {type(frame)}") @@ -178,7 +200,7 @@ async def _ingress_loop(self): elif isinstance(frame, AudioFrame): fallback_output = AudioOutput([frame], self.request_id) await self._send_output(fallback_output) - + # Send sentinel to signal egress loop to complete logger.info("Ingress loop completed, sending sentinel to egress loop") await self._send_output(None) @@ -201,16 +223,20 @@ async def _egress_loop(self): """Handle outgoing frames.""" try: async def output_generator(): - """Generate output frames from the queue.""" + """Generate output frames from the queue, filtering out TextOutput frames.""" while not self.stop_event.is_set() and not self.error_event.is_set(): try: # Try to get a frame from the queue with timeout frame = await asyncio.to_thread(self.output_queue.get, timeout=0.1) - if frame is not None: - yield frame - else: + if frame is None: # None frame indicates shutdown break + elif isinstance(frame, TextOutput): + # Handle TextOutput frames separately + await self._handle_text_output(frame) + else: + # Yield video/audio frames to protocol egress + yield frame except queue.Empty: continue # No frame available, continue loop except Exception as e: @@ -233,7 +259,7 @@ async def output_generator(): self.error_callback("egress_loop_error", e) except Exception as cb_error: logger.error(f"Error in error callback: {cb_error}") - + async def _control_loop(self): """Handle control messages.""" try: @@ -277,5 +303,41 @@ async def _send_output(self, output): logger.warning("Output queue is full, dropping frame") except Exception as e: logger.error(f"Error sending output: {e}") - - \ No newline at end of file + + async def _process_app_text_queue(self): + """Process any queued text strings from the app's text queue and send to egress.""" + if not self.text_queue: + return + + try: + # Process all available text strings from the app's queue + texts_processed = 0 + while self.text_queue and not self.stop_event.is_set() and not self.error_event.is_set(): + try: + text_string = self.text_queue.popleft() + if text_string and text_string.strip(): + # Create TextOutput and send to egress + text_output = TextOutput(text=text_string, request_id=self.request_id) + await self._send_output(text_output) + texts_processed += 1 + logger.debug(f"Publishing: {text_string[:50]}...") + except IndexError: + # Queue is empty + break + + if texts_processed > 0: + logger.debug(f"Processed {texts_processed} text outputs from app queue") + + except Exception as e: + logger.error(f"Error processing app text queue: {e}") + + async def _handle_text_output(self, text_output: TextOutput): + """Handle TextOutput frame by publishing via protocol.""" + try: + if self.protocol.data_publisher: + await self.publish_data(text_output.text) + logger.info(f"✅ Published to data URL: {text_output.text[:50]}...") + else: + logger.warning(f"❌ No data URL configured - cannot publish: {text_output.text[:50]}...") + except Exception as e: + logger.error(f"Error handling text output: {e}") \ No newline at end of file diff --git a/pytrickle/frame_processor.py b/pytrickle/frame_processor.py index 8b19959..078a6c3 100644 --- a/pytrickle/frame_processor.py +++ b/pytrickle/frame_processor.py @@ -56,6 +56,7 @@ def __init__( self.error_callback = error_callback self.state: Optional[StreamState] = None self.model_loaded: bool = False + try: self.load_model(**init_kwargs) self.model_loaded = True diff --git a/pytrickle/frames.py b/pytrickle/frames.py index b63a6b2..ca4d567 100644 --- a/pytrickle/frames.py +++ b/pytrickle/frames.py @@ -10,6 +10,7 @@ import torch import numpy as np import av +import time from typing import Optional, Dict, Union, List, Deque from fractions import Fraction from collections import deque @@ -164,7 +165,6 @@ def _from_existing_with_timestamp(cls, existing_frame: 'AudioFrame', new_timesta def from_tensor(cls, tensor: torch.Tensor, format: str = 's16', layout: str = 'mono', sample_rate: int = 48000, timestamp: int = 0, time_base = None) -> 'AudioFrame': """Create AudioFrame from torch tensor.""" - from fractions import Fraction if time_base is None: time_base = Fraction(1, sample_rate) @@ -315,6 +315,24 @@ def with_monotonic_timestamps(cls, frames: List[AudioFrame], request_id: str, st # Streaming Utilities # ==================== + +class TextOutput(OutputFrame): + """Represents text data output (usually JSONL format).""" + + text: str + request_id: str + timestamp_ms: int + + def __init__(self, text: str, request_id: str = '', timestamp_ms: int = None): + self.text = text + self.request_id = request_id + self.timestamp_ms = timestamp_ms or int(time.time() * 1000) + + @property + def timestamp(self): + """Get the timestamp of this text output.""" + return self.timestamp_ms + class FrameBuffer: """Rolling frame buffer that keeps a fixed number of frames.""" diff --git a/pytrickle/server.py b/pytrickle/server.py index 1fd9359..4eb96b4 100644 --- a/pytrickle/server.py +++ b/pytrickle/server.py @@ -325,11 +325,19 @@ async def _handle_start_stream(self, request: web.Request) -> web.Response: subscriber_timeout=self.subscriber_timeout, ) - # Create client + # Get text queue from frame processor if it has one + text_queue = self.frame_processor.text_queue + if params.data_url: + logger.info(f"Text publishing enabled - data URL: {params.data_url}") + else: + logger.warning("Text queue available but no data_url provided - text won't be published") + + # Create client with optional text queue self.current_client = TrickleClient( protocol=protocol, frame_processor=self.frame_processor, control_handler=self._handle_control_message, + text_queue=text_queue ) # Update state @@ -678,4 +686,4 @@ async def run_forever(self): logger.info("Shutting down server...") finally: await self.stop() - await runner.cleanup() + await runner.cleanup() \ No newline at end of file diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 55c3263..853b62c 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -1,20 +1,22 @@ import asyncio import logging -from typing import Optional, Callable, Dict, Any, List, Union, Awaitable +from typing import Optional, Callable, Dict, Any, List, Awaitable +from threading import Lock +from collections import deque -from pytrickle.state import PipelineState - -from .frames import VideoFrame, AudioFrame +from .frames import VideoFrame, AudioFrame, TextOutput from .frame_processor import FrameProcessor from .server import StreamServer logger = logging.getLogger(__name__) -# Type aliases for processing functions +# Type aliases for processing functions (simple signatures) VideoProcessor = Callable[[VideoFrame], Awaitable[Optional[VideoFrame]]] AudioProcessor = Callable[[AudioFrame], Awaitable[Optional[List[AudioFrame]]]] class StreamProcessor: + """StreamProcessor that wraps user-provided functions and provides a text queue for apps.""" + def __init__( self, video_processor: Optional[VideoProcessor] = None, @@ -30,37 +32,58 @@ def __init__( Args: video_processor: Function that processes VideoFrame objects - audio_processor: Function that processes AudioFrame objects + audio_processor: Function that processes AudioFrame objects model_loader: Optional function called during load_model phase param_updater: Optional function called when parameters update name: Processor name port: Server port **server_kwargs: Additional arguments passed to StreamServer + + Note on text publishing: + Apps can call processor.publish_data_output(text) and the client will + automatically publish them to the data URL. """ - self.video_processor = video_processor - self.audio_processor = audio_processor + # Validate that at least one processor is provided + if video_processor is None and audio_processor is None: + raise ValueError("At least one of video or audio processor must be provided") + self.model_loader = model_loader self.param_updater = param_updater self.name = name self.port = port self.server_kwargs = server_kwargs - # Create internal frame processor + # Text queue that apps can use to publish text data + self.text_queue = deque() + + # Create internal frame processor that wraps the user functions self._frame_processor = _InternalFrameProcessor( video_processor=video_processor, audio_processor=audio_processor, model_loader=model_loader, param_updater=param_updater, + text_queue=self.text_queue, # Pass text queue to frame processor name=name ) - # Create and start server + # Create and start server with the internal frame processor self.server = StreamServer( frame_processor=self._frame_processor, port=port, **server_kwargs ) + @property + def frame_processor(self): + """Access to the internal frame processor for advanced usage.""" + return self._frame_processor + + async def publish_data_output(self, text: str) -> None: + """Add text to the queue for publishing (simple interface for apps).""" + if text and text.strip(): + self.text_queue.append(text) + logger.debug(f"Queued text for publishing: {text[:50]}...") + async def run_forever(self): """Run the stream processor server forever.""" await self.server.run_forever() @@ -70,7 +93,7 @@ def run(self): asyncio.run(self.run_forever()) class _InternalFrameProcessor(FrameProcessor): - """Internal frame processor that wraps user-provided functions.""" + """Internal frame processor that wraps user-provided functions and implements FrameProcessor interface.""" def __init__( self, @@ -78,6 +101,7 @@ def __init__( audio_processor: Optional[AudioProcessor] = None, model_loader: Optional[Callable[[], None]] = None, param_updater: Optional[Callable[[Dict[str, Any]], None]] = None, + text_queue: Optional[deque] = None, name: str = "internal-processor" ): # Set attributes first before calling parent @@ -85,6 +109,7 @@ def __init__( self.audio_processor = audio_processor self.model_loader = model_loader self.param_updater = param_updater + self.text_queue = text_queue # Reference to StreamProcessor's text queue self._ready = False self.name = name diff --git a/pytrickle/test_utils.py b/pytrickle/test_utils.py index a2e08d1..47629ea 100644 --- a/pytrickle/test_utils.py +++ b/pytrickle/test_utils.py @@ -7,6 +7,7 @@ import asyncio from typing import Optional, Dict, Any from unittest.mock import MagicMock, AsyncMock +from collections import deque from .frame_processor import FrameProcessor @@ -16,6 +17,7 @@ class MockFrameProcessor(FrameProcessor): def __init__(self, **kwargs): self.test_params = {} + self.text_queue = deque() # Add text_queue attribute for testing super().__init__(**kwargs) def load_model(self, **kwargs):