Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@
},
"justMyCode": true,
"python": "${command:python.interpreterPath}"
},
{
"name": "PyTrickle Demo with Text Publishing",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/examples/text_output_example.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/examples",
"env": {
"ORCH_URL": "https://localhost:9995",
"ORCH_SECRET": "orch-secret",
"CAPABILITY_NAME": "trickle-stream-example",
"CAPABILITY_DESCRIPTION": "Flip video upside down using pytrickle",
"CAPABILITY_URL": "http://localhost:8000",
"CAPABILITY_PRICE_PER_UNIT": "0",
"CAPABILITY_PRICE_SCALING": "1",
"CAPABILITY_CAPACITY": "1"
},
"justMyCode": true,
"python": "${command:python.interpreterPath}"
}
]
}
127 changes: 127 additions & 0 deletions examples/text_output_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Video/Audio Passthrough with Text Publishing using StreamProcessor

This example demonstrates:
- Video passthrough (no processing)
- Audio passthrough (no processing)
- Text publishing every 400 audio frames using simple text queue
"""

import logging
import json
import time
from pytrickle import StreamProcessor
from pytrickle.frames import VideoFrame, AudioFrame
from typing import List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global state
audio_frame_count = 0
text_publish_interval = 400
ready = False
start_time = None
_stream_processor = None # Reference to StreamProcessor for text publishing

def load_model(**kwargs):
"""Initialize processor state - called during model loading phase."""
global text_publish_interval, ready, start_time

logger.info(f"load_model called with kwargs: {kwargs}")

# Set processor variables from kwargs or use defaults
text_publish_interval = kwargs.get('text_publish_interval', 400)
text_publish_interval = max(1, int(text_publish_interval))

ready = True
start_time = time.time()
logger.info(f"✅ Video/Audio passthrough with text publishing ready (interval: {text_publish_interval} frames)")

async def process_video(frame: VideoFrame) -> VideoFrame:
"""Pass through video frames unchanged."""
global ready

if not ready:
return frame

# Simply pass through the video frame without any processing
return frame

async def process_audio(frame: AudioFrame) -> List[AudioFrame]:
"""Pass through audio frames and publish text data periodically."""
global audio_frame_count, text_publish_interval, ready, start_time, _stream_processor

if not ready:
return [frame]

# Increment frame counter
audio_frame_count += 1

# Check if we should publish text data
if audio_frame_count % text_publish_interval == 0:
# Calculate elapsed time
elapsed_time = time.time() - start_time

# Create JSONL data with audio processing statistics
text_data = {
"type": "audio_stats",
"timestamp": time.time(),
"elapsed_time_seconds": round(elapsed_time, 2),
"total_audio_frames": audio_frame_count,
"frames_per_second": round(audio_frame_count / elapsed_time, 2) if elapsed_time > 0 else 0,
"frame_shape": list(frame.samples.shape) if hasattr(frame, 'samples') else None,
"sample_rate": getattr(frame, 'sample_rate', None),
"channels": getattr(frame, 'channels', None),
"message": f"Processed {audio_frame_count} audio frames in {elapsed_time:.2f} seconds"
}

# Publish as JSONL - just add text to the queue!
jsonl_line = json.dumps(text_data)
if _stream_processor:
await _stream_processor.publish_data_output(jsonl_line)

logger.info(f"Published stats: {audio_frame_count} frames, {elapsed_time:.2f}s elapsed")

# Pass through the audio frame unchanged
return [frame]

def update_params(params: dict):
"""Update text publishing interval."""
global text_publish_interval

if "text_publish_interval" in params:
old = text_publish_interval
text_publish_interval = max(1, int(params["text_publish_interval"]))
if old != text_publish_interval:
logger.info(f"Text publish interval: {old} → {text_publish_interval} frames")

if "reset_counter" in params and params["reset_counter"]:
global audio_frame_count, start_time
audio_frame_count = 0
start_time = time.time()
logger.info("Reset audio frame counter and timer")


# Create and run StreamProcessor
if __name__ == "__main__":
processor = StreamProcessor(
video_processor=process_video,
audio_processor=process_audio,
model_loader=load_model,
param_updater=update_params,
name="passthrough-with-text",
port=8000
)

# Set reference for text publishing
_stream_processor = processor

logger.info("Starting passthrough processor with text publishing...")
logger.info(f"Will publish JSONL stats every {text_publish_interval} audio frames")
logger.info("Update parameters via /api/update_params:")
logger.info(" - text_publish_interval: number of frames between text publications")
logger.info(" - reset_counter: true to reset frame counter and timer")

processor.run()
3 changes: 2 additions & 1 deletion pytrickle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .server import StreamServer
from .protocol import TrickleProtocol
from .frames import (
VideoFrame, AudioFrame, VideoOutput, AudioOutput,
VideoFrame, AudioFrame, VideoOutput, AudioOutput, TextOutput,
FrameBuffer,
)
from .state import StreamState
Expand Down Expand Up @@ -45,6 +45,7 @@
"AudioFrame",
"VideoOutput",
"AudioOutput",
"TextOutput",
"TricklePublisher",
"TrickleSubscriber",
"BaseStreamManager",
Expand Down
98 changes: 80 additions & 18 deletions pytrickle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import asyncio
import queue
import logging
from typing import Callable, Optional, Union
from typing import Callable, Optional, Union, List
from collections import deque

from .protocol import TrickleProtocol
from .frames import VideoFrame, AudioFrame, VideoOutput, AudioOutput
from .frames import VideoFrame, AudioFrame, VideoOutput, AudioOutput, TextOutput
from . import ErrorCallback
from .frame_processor import FrameProcessor

Expand All @@ -26,7 +27,8 @@ def __init__(
protocol: TrickleProtocol,
frame_processor: 'FrameProcessor',
control_handler: Optional[Callable] = None,
error_callback: Optional[ErrorCallback] = None
error_callback: Optional[ErrorCallback] = None,
text_queue: Optional[deque] = None
):
"""Initialize TrickleClient.

Expand All @@ -35,12 +37,14 @@ def __init__(
frame_processor: FrameProcessor for native async processing
control_handler: Optional control message handler
error_callback: Optional error callback (if None, uses frame_processor.error_callback)
text_queue: Optional deque of strings to publish as text data
"""
self.protocol = protocol
self.frame_processor = frame_processor
self.control_handler = control_handler
# Use provided error_callback, or fall back to frame_processor's error_callback
self.error_callback = error_callback or frame_processor.error_callback
self.text_queue = text_queue # Optional text queue for apps to use

# Client state
self.running = False
Expand All @@ -53,6 +57,7 @@ def __init__(
# Output queue for processed frames
self.output_queue = queue.Queue()


def process_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Optional[Union[VideoOutput, AudioOutput]]:
"""Process a single frame and return the output."""
if not frame:
Expand Down Expand Up @@ -133,28 +138,45 @@ async def _ingress_loop(self):
# Process frames directly in ingress loop
try:
if isinstance(frame, VideoFrame):
logger.debug(f"Processing video frame with frame processor: {frame.tensor.shape}")

logger.debug(
f"Processing video frame with frame processor: {frame.tensor.shape}"
)

# Direct async processing
processed_frame = await self.frame_processor.process_video_async(frame)
processed_frame = (
await self.frame_processor.process_video_async(frame)
)
if processed_frame:
output = VideoOutput(processed_frame, self.request_id)
await self._send_output(output)
logger.debug(f"Sent async processed video frame to egress")
else:
logger.warning(f"Frame processor returned None for video frame")

# Check for text outputs from app text queue
await self._process_app_text_queue()

elif isinstance(frame, AudioFrame):
logger.debug(f"Processing audio frame with frame processor: {frame.samples.shape}")

logger.debug(
f"Processing audio frame with frame processor: {frame.samples.shape}"
)

# Direct async processing for audio
processed_frames = await self.frame_processor.process_audio_async(frame)
processed_frames = (
await self.frame_processor.process_audio_async(frame)
)
if processed_frames:
output = AudioOutput(processed_frames, self.request_id)
await self._send_output(output)
logger.debug(f"Sent async processed audio frame to egress")
else:
logger.warning(f"Frame processor returned None for audio frame")
logger.warning(
f"Frame processor returned None for audio frame"
)

# Check for text outputs from app text queue
await self._process_app_text_queue()

else:
logger.debug(f"Received unknown frame type: {type(frame)}")

Expand All @@ -178,7 +200,7 @@ async def _ingress_loop(self):
elif isinstance(frame, AudioFrame):
fallback_output = AudioOutput([frame], self.request_id)
await self._send_output(fallback_output)

# Send sentinel to signal egress loop to complete
logger.info("Ingress loop completed, sending sentinel to egress loop")
await self._send_output(None)
Expand All @@ -201,16 +223,20 @@ async def _egress_loop(self):
"""Handle outgoing frames."""
try:
async def output_generator():
"""Generate output frames from the queue."""
"""Generate output frames from the queue, filtering out TextOutput frames."""
while not self.stop_event.is_set() and not self.error_event.is_set():
try:
# Try to get a frame from the queue with timeout
frame = await asyncio.to_thread(self.output_queue.get, timeout=0.1)
if frame is not None:
yield frame
else:
if frame is None:
# None frame indicates shutdown
break
elif isinstance(frame, TextOutput):
# Handle TextOutput frames separately
await self._handle_text_output(frame)
else:
# Yield video/audio frames to protocol egress
yield frame
except queue.Empty:
continue # No frame available, continue loop
except Exception as e:
Expand All @@ -233,7 +259,7 @@ async def output_generator():
self.error_callback("egress_loop_error", e)
except Exception as cb_error:
logger.error(f"Error in error callback: {cb_error}")

async def _control_loop(self):
"""Handle control messages."""
try:
Expand Down Expand Up @@ -277,5 +303,41 @@ async def _send_output(self, output):
logger.warning("Output queue is full, dropping frame")
except Exception as e:
logger.error(f"Error sending output: {e}")



async def _process_app_text_queue(self):
"""Process any queued text strings from the app's text queue and send to egress."""
if not self.text_queue:
return

try:
# Process all available text strings from the app's queue
texts_processed = 0
while self.text_queue and not self.stop_event.is_set() and not self.error_event.is_set():
try:
text_string = self.text_queue.popleft()
if text_string and text_string.strip():
# Create TextOutput and send to egress
text_output = TextOutput(text=text_string, request_id=self.request_id)
await self._send_output(text_output)
texts_processed += 1
logger.debug(f"Publishing: {text_string[:50]}...")
except IndexError:
# Queue is empty
break

if texts_processed > 0:
logger.debug(f"Processed {texts_processed} text outputs from app queue")

except Exception as e:
logger.error(f"Error processing app text queue: {e}")

async def _handle_text_output(self, text_output: TextOutput):
"""Handle TextOutput frame by publishing via protocol."""
try:
if self.protocol.data_publisher:
await self.publish_data(text_output.text)
logger.info(f"✅ Published to data URL: {text_output.text[:50]}...")
else:
logger.warning(f"❌ No data URL configured - cannot publish: {text_output.text[:50]}...")
except Exception as e:
logger.error(f"Error handling text output: {e}")
1 change: 1 addition & 0 deletions pytrickle/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
self.error_callback = error_callback
self.state: Optional[StreamState] = None
self.model_loaded: bool = False

try:
self.load_model(**init_kwargs)
self.model_loaded = True
Expand Down
Loading