Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@
},
"justMyCode": true,
"python": "${command:python.interpreterPath}"
},
{
"name": "Loading Example",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/examples/loading_overlay.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/examples",
"env": {
"ORCH_URL": "https://localhost:9995",
"ORCH_SECRET": "orch-secret",
"CAPABILITY_NAME": "comfystream",
"CAPABILITY_DESCRIPTION": "Flip video upside down using pytrickle",
"CAPABILITY_URL": "http://localhost:8000",
"CAPABILITY_PRICE_PER_UNIT": "0",
"CAPABILITY_PRICE_SCALING": "1",
"CAPABILITY_CAPACITY": "1"
},
"justMyCode": true,
"python": "${command:python.interpreterPath}"
}
]
}
209 changes: 209 additions & 0 deletions examples/loading_overlay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""
Loading Overlay Processor using StreamProcessor

This example demonstrates how to create loading overlay frames
that can be toggled via parameter updates.

When the 'show_loading' parameter is True, video frames will be replaced with
animated loading overlay frames instead of showing the original video.
"""

import asyncio
import logging
import time
import torch
from pytrickle import StreamProcessor
from pytrickle.frames import VideoFrame
from pytrickle.frame_skipper import FrameSkipConfig
import numpy as np
from pytrickle.video_utils import create_loading_frame

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global state
show_loading = False
loading_message = "Loading..."
ready = False
processor = None
background_tasks = []
background_task_started = False

# Loading animation state
frame_counter = 0

async def load_model(**kwargs):
"""Initialize processor state - called during model loading phase."""
global show_loading, ready, processor

logger.info(f"load_model called with kwargs: {kwargs}")

# Set processor variables from kwargs or use defaults
show_loading = kwargs.get('show_loading', False)

# Simulate model loading time
if kwargs.get('simulate_loading', False):
logger.info("Simulating model loading...")
await asyncio.sleep(2) # Simulate loading time

# Note: Cannot start background tasks here as event loop isn't running yet
# Background task will be started when first frame is processed
ready = True
logger.info(f"✅ Loading Overlay processor ready (show_loading: {show_loading}, ready: {ready})")

def start_background_task():
"""Start the background task if not already started."""
global background_task_started, background_tasks

if not background_task_started and processor:
task = asyncio.create_task(send_periodic_status())
background_tasks.append(task)
background_task_started = True
logger.info("Started background status task")

async def send_periodic_status():
"""Background task that sends periodic status updates."""
global processor
counter = 0
try:
while True:
await asyncio.sleep(5.0) # Send status every 5 seconds
counter += 1
if processor:
status_data = {
"type": "status_update",
"counter": counter,
"show_loading": show_loading,
"loading_message": loading_message,
"ready": ready,
"timestamp": time.time()
}
success = await processor.send_data(str(status_data))
if success:
logger.info(f"Sent status update #{counter}")
else:
logger.warning(f"Failed to send status update #{counter}, stopping background task")
break # Exit the loop if sending fails
except asyncio.CancelledError:
logger.info("Background status task cancelled")
raise
except Exception as e:
logger.error(f"Error in background status task: {e}")

async def on_stream_stop():
"""Called when stream stops - cleanup background tasks."""
global background_tasks, background_task_started
logger.info("Stream stopped, cleaning up background tasks")

for task in background_tasks:
if not task.done():
task.cancel()
logger.info("Cancelled background task")

background_tasks.clear()
background_task_started = False # Reset flag for next stream
logger.info("All background tasks cleaned up")

async def process_video(frame: VideoFrame) -> VideoFrame:
"""Process video frame - show loading overlay if enabled, otherwise passthrough."""
global show_loading, ready, frame_counter, loading_message

# Start background task on first frame (when event loop is running)
start_background_task()

# Increment frame counter for animations
frame_counter += 1

if not show_loading:
# Passthrough mode - return original frame
return frame

# Loading overlay mode - replace frame with loading animation
frame_tensor = frame.tensor

# Track if we need to add batch dimension back
had_batch_dim = False

# Handle both 3D and 4D tensors (with batch dimension)
if len(frame_tensor.shape) == 4:
if frame_tensor.shape[0] == 1:
frame_tensor = frame_tensor.squeeze(0)
had_batch_dim = True
else:
logger.error(f"Unexpected batch size: {frame_tensor.shape[0]}")
return frame

# Get frame dimensions
if len(frame_tensor.shape) == 3:
if frame_tensor.shape[0] == 3: # CHW format
height, width = frame_tensor.shape[1], frame_tensor.shape[2]
was_chw = True
else: # HWC format
height, width = frame_tensor.shape[0], frame_tensor.shape[1]
was_chw = False
else:
logger.error(f"Unexpected tensor shape: {frame_tensor.shape}")
return frame

# Create loading overlay frame using utility
loading_frame = create_loading_frame(width, height, loading_message, frame_counter)

# Convert to tensor format matching input
loading_tensor = torch.from_numpy(loading_frame.astype(np.float32))

# Check if input was normalized (0-1) or (0-255)
if frame_tensor.max() <= 1.0:
loading_tensor = loading_tensor / 255.0

# Convert to original tensor format
if was_chw:
loading_tensor = loading_tensor.permute(2, 0, 1) # HWC to CHW

# Add batch dimension back if needed
if had_batch_dim:
loading_tensor = loading_tensor.unsqueeze(0)

# Move to same device as original tensor
loading_tensor = loading_tensor.to(frame.tensor.device)

return frame.replace_tensor(loading_tensor)

async def update_params(params: dict):
"""Update loading overlay parameters."""
global show_loading, loading_message

if "show_loading" in params:
old = show_loading
show_loading = bool(params["show_loading"])
if old != show_loading:
status = "enabled" if show_loading else "disabled"
logger.info(f"Loading overlay: {status}")

if "loading_message" in params:
old = loading_message
loading_message = str(params["loading_message"])
if old != loading_message:
logger.info(f"Loading message: {old} → {loading_message}")

# Create and run StreamProcessor
if __name__ == "__main__":
logger.info("Starting Loading Overlay Processor")
logger.info("Parameters you can update:")
logger.info(" - show_loading: true/false - Enable/disable loading overlay")
logger.info(" - loading_message: string - Custom loading message")
logger.info("")
logger.info("Example parameter updates:")
logger.info(' {"show_loading": true, "loading_message": "Processing video..."}')
logger.info(' {"show_loading": false}')

processor = StreamProcessor(
video_processor=process_video,
model_loader=load_model,
param_updater=update_params,
on_stream_stop=on_stream_stop,
name="loading-overlay",
port=8001, # Different port from process_video_example
frame_skip_config=FrameSkipConfig(),
)
processor.run()
8 changes: 7 additions & 1 deletion pytrickle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Provides functionality to subscribe to and publish video streams with real-time processing.
"""

from typing import Union, Callable, Optional, Coroutine, Any
from typing import Callable, Optional, Coroutine, Any

# Type alias for error callback functions (async only)
ErrorCallback = Callable[[str, Optional[Exception]], Coroutine[Any, Any, None]]
Expand All @@ -30,6 +30,11 @@
from .fps_meter import FPSMeter
from .frame_skipper import FrameSkipConfig

# Video utilities
from .video_utils import (
create_loading_frame,
)

from . import api

from .version import __version__
Expand Down Expand Up @@ -59,5 +64,6 @@
"FrameProcessor",
"FPSMeter",
"FrameSkipConfig",
"create_loading_frame",
"__version__"
]
12 changes: 12 additions & 0 deletions pytrickle/video_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""
Video utilities for PyTrickle.

This namespace contains utilities and tools for video processing,
including loading overlays and visual effects.
"""

from .loading import create_loading_frame

__all__ = [
"create_loading_frame",
]
92 changes: 92 additions & 0 deletions pytrickle/video_utils/loading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
Loading utilities for PyTrickle - Video loading overlay generation.

This module provides utilities for creating loading overlay frames for video processing.
"""

import math
import cv2
import numpy as np
from typing import Optional


def create_loading_frame(
width: int,
height: int,
message: str = "Loading...",
frame_counter: int = 0,
progress: Optional[float] = None
) -> np.ndarray:
"""
Create a loading overlay frame with animated progress bar.

Args:
width: Frame width in pixels
height: Frame height in pixels
message: Loading message to display
frame_counter: Current frame number for animations
progress: Progress value 0.0-1.0, or None for animated progress

Returns:
np.ndarray: Loading overlay frame as BGR image
"""
# Create dark background
frame = np.zeros((height, width, 3), dtype=np.uint8)
frame[:] = (30, 30, 30) # Dark gray background

# Calculate center position
center_x = width // 2
center_y = height // 2

# Add overlay panel
overlay_width = min(400, width - 40)
overlay_height = min(200, height - 40)
overlay_x = center_x - overlay_width // 2
overlay_y = center_y - overlay_height // 2

# Create overlay background
frame[overlay_y:overlay_y + overlay_height, overlay_x:overlay_x + overlay_width] = (20, 20, 20)

# Add border
cv2.rectangle(frame, (overlay_x, overlay_y),
(overlay_x + overlay_width, overlay_y + overlay_height),
(60, 60, 60), 2)

# Add loading message in center
if message:
message_size = 1.2
message_thickness = 2
message_y = center_y

(msg_width, _), _ = cv2.getTextSize(message, cv2.FONT_HERSHEY_SIMPLEX,
message_size, message_thickness)
message_x = center_x - msg_width // 2

cv2.putText(frame, message, (message_x, message_y),
cv2.FONT_HERSHEY_SIMPLEX, message_size, (200, 200, 200), message_thickness)

# Add progress bar
bar_width = overlay_width - 60
bar_height = 6
bar_x = overlay_x + 30
bar_y = center_y + 40

# Progress bar background
cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height),
(40, 40, 40), -1)

# Use controllable progress or animated progress
if progress is not None and progress > 0:
current_progress = min(1.0, max(0.0, progress))
else:
# Animated progress (oscillating)
current_progress = (math.sin(frame_counter * 0.1) + 1) * 0.5

progress_width = int(bar_width * current_progress)

if progress_width > 0:
cv2.rectangle(frame, (bar_x, bar_y),
(bar_x + progress_width, bar_y + bar_height),
(100, 150, 255), -1) # Blue progress

return frame