diff --git a/.vscode/launch.json b/.vscode/launch.json index b3af0ae..35f8e04 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -23,6 +23,26 @@ }, "justMyCode": true, "python": "${command:python.interpreterPath}" + }, + { + "name": "Loading Example", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/examples/loading_overlay.py", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/examples", + "env": { + "ORCH_URL": "https://localhost:9995", + "ORCH_SECRET": "orch-secret", + "CAPABILITY_NAME": "comfystream", + "CAPABILITY_DESCRIPTION": "Flip video upside down using pytrickle", + "CAPABILITY_URL": "http://localhost:8000", + "CAPABILITY_PRICE_PER_UNIT": "0", + "CAPABILITY_PRICE_SCALING": "1", + "CAPABILITY_CAPACITY": "1" + }, + "justMyCode": true, + "python": "${command:python.interpreterPath}" } ] } \ No newline at end of file diff --git a/examples/loading_overlay.py b/examples/loading_overlay.py new file mode 100644 index 0000000..9a64d87 --- /dev/null +++ b/examples/loading_overlay.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +""" +Loading Overlay Processor using StreamProcessor + +This example demonstrates how to create loading overlay frames +that can be toggled via parameter updates. + +When the 'show_loading' parameter is True, video frames will be replaced with +animated loading overlay frames instead of showing the original video. +""" + +import asyncio +import logging +import time +import torch +from pytrickle import StreamProcessor +from pytrickle.frames import VideoFrame +from pytrickle.frame_skipper import FrameSkipConfig +import numpy as np +from pytrickle.video_utils import create_loading_frame + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Global state +show_loading = False +loading_message = "Loading..." +ready = False +processor = None +background_tasks = [] +background_task_started = False + +# Loading animation state +frame_counter = 0 + +async def load_model(**kwargs): + """Initialize processor state - called during model loading phase.""" + global show_loading, ready, processor + + logger.info(f"load_model called with kwargs: {kwargs}") + + # Set processor variables from kwargs or use defaults + show_loading = kwargs.get('show_loading', False) + + # Simulate model loading time + if kwargs.get('simulate_loading', False): + logger.info("Simulating model loading...") + await asyncio.sleep(2) # Simulate loading time + + # Note: Cannot start background tasks here as event loop isn't running yet + # Background task will be started when first frame is processed + ready = True + logger.info(f"✅ Loading Overlay processor ready (show_loading: {show_loading}, ready: {ready})") + +def start_background_task(): + """Start the background task if not already started.""" + global background_task_started, background_tasks + + if not background_task_started and processor: + task = asyncio.create_task(send_periodic_status()) + background_tasks.append(task) + background_task_started = True + logger.info("Started background status task") + +async def send_periodic_status(): + """Background task that sends periodic status updates.""" + global processor + counter = 0 + try: + while True: + await asyncio.sleep(5.0) # Send status every 5 seconds + counter += 1 + if processor: + status_data = { + "type": "status_update", + "counter": counter, + "show_loading": show_loading, + "loading_message": loading_message, + "ready": ready, + "timestamp": time.time() + } + success = await processor.send_data(str(status_data)) + if success: + logger.info(f"Sent status update #{counter}") + else: + logger.warning(f"Failed to send status update #{counter}, stopping background task") + break # Exit the loop if sending fails + except asyncio.CancelledError: + logger.info("Background status task cancelled") + raise + except Exception as e: + logger.error(f"Error in background status task: {e}") + +async def on_stream_stop(): + """Called when stream stops - cleanup background tasks.""" + global background_tasks, background_task_started + logger.info("Stream stopped, cleaning up background tasks") + + for task in background_tasks: + if not task.done(): + task.cancel() + logger.info("Cancelled background task") + + background_tasks.clear() + background_task_started = False # Reset flag for next stream + logger.info("All background tasks cleaned up") + +async def process_video(frame: VideoFrame) -> VideoFrame: + """Process video frame - show loading overlay if enabled, otherwise passthrough.""" + global show_loading, ready, frame_counter, loading_message + + # Start background task on first frame (when event loop is running) + start_background_task() + + # Increment frame counter for animations + frame_counter += 1 + + if not show_loading: + # Passthrough mode - return original frame + return frame + + # Loading overlay mode - replace frame with loading animation + frame_tensor = frame.tensor + + # Track if we need to add batch dimension back + had_batch_dim = False + + # Handle both 3D and 4D tensors (with batch dimension) + if len(frame_tensor.shape) == 4: + if frame_tensor.shape[0] == 1: + frame_tensor = frame_tensor.squeeze(0) + had_batch_dim = True + else: + logger.error(f"Unexpected batch size: {frame_tensor.shape[0]}") + return frame + + # Get frame dimensions + if len(frame_tensor.shape) == 3: + if frame_tensor.shape[0] == 3: # CHW format + height, width = frame_tensor.shape[1], frame_tensor.shape[2] + was_chw = True + else: # HWC format + height, width = frame_tensor.shape[0], frame_tensor.shape[1] + was_chw = False + else: + logger.error(f"Unexpected tensor shape: {frame_tensor.shape}") + return frame + + # Create loading overlay frame using utility + loading_frame = create_loading_frame(width, height, loading_message, frame_counter) + + # Convert to tensor format matching input + loading_tensor = torch.from_numpy(loading_frame.astype(np.float32)) + + # Check if input was normalized (0-1) or (0-255) + if frame_tensor.max() <= 1.0: + loading_tensor = loading_tensor / 255.0 + + # Convert to original tensor format + if was_chw: + loading_tensor = loading_tensor.permute(2, 0, 1) # HWC to CHW + + # Add batch dimension back if needed + if had_batch_dim: + loading_tensor = loading_tensor.unsqueeze(0) + + # Move to same device as original tensor + loading_tensor = loading_tensor.to(frame.tensor.device) + + return frame.replace_tensor(loading_tensor) + +async def update_params(params: dict): + """Update loading overlay parameters.""" + global show_loading, loading_message + + if "show_loading" in params: + old = show_loading + show_loading = bool(params["show_loading"]) + if old != show_loading: + status = "enabled" if show_loading else "disabled" + logger.info(f"Loading overlay: {status}") + + if "loading_message" in params: + old = loading_message + loading_message = str(params["loading_message"]) + if old != loading_message: + logger.info(f"Loading message: {old} → {loading_message}") + +# Create and run StreamProcessor +if __name__ == "__main__": + logger.info("Starting Loading Overlay Processor") + logger.info("Parameters you can update:") + logger.info(" - show_loading: true/false - Enable/disable loading overlay") + logger.info(" - loading_message: string - Custom loading message") + logger.info("") + logger.info("Example parameter updates:") + logger.info(' {"show_loading": true, "loading_message": "Processing video..."}') + logger.info(' {"show_loading": false}') + + processor = StreamProcessor( + video_processor=process_video, + model_loader=load_model, + param_updater=update_params, + on_stream_stop=on_stream_stop, + name="loading-overlay", + port=8001, # Different port from process_video_example + frame_skip_config=FrameSkipConfig(), + ) + processor.run() \ No newline at end of file diff --git a/pytrickle/__init__.py b/pytrickle/__init__.py index e532797..eb059e0 100644 --- a/pytrickle/__init__.py +++ b/pytrickle/__init__.py @@ -4,7 +4,7 @@ Provides functionality to subscribe to and publish video streams with real-time processing. """ -from typing import Union, Callable, Optional, Coroutine, Any +from typing import Callable, Optional, Coroutine, Any # Type alias for error callback functions (async only) ErrorCallback = Callable[[str, Optional[Exception]], Coroutine[Any, Any, None]] @@ -30,6 +30,11 @@ from .fps_meter import FPSMeter from .frame_skipper import FrameSkipConfig +# Video utilities +from .video_utils import ( + create_loading_frame, +) + from . import api from .version import __version__ @@ -59,5 +64,6 @@ "FrameProcessor", "FPSMeter", "FrameSkipConfig", + "create_loading_frame", "__version__" ] \ No newline at end of file diff --git a/pytrickle/video_utils/__init__.py b/pytrickle/video_utils/__init__.py new file mode 100644 index 0000000..a86fbde --- /dev/null +++ b/pytrickle/video_utils/__init__.py @@ -0,0 +1,12 @@ +""" +Video utilities for PyTrickle. + +This namespace contains utilities and tools for video processing, +including loading overlays and visual effects. +""" + +from .loading import create_loading_frame + +__all__ = [ + "create_loading_frame", +] \ No newline at end of file diff --git a/pytrickle/video_utils/loading.py b/pytrickle/video_utils/loading.py new file mode 100644 index 0000000..016aa2f --- /dev/null +++ b/pytrickle/video_utils/loading.py @@ -0,0 +1,92 @@ +""" +Loading utilities for PyTrickle - Video loading overlay generation. + +This module provides utilities for creating loading overlay frames for video processing. +""" + +import math +import cv2 +import numpy as np +from typing import Optional + + +def create_loading_frame( + width: int, + height: int, + message: str = "Loading...", + frame_counter: int = 0, + progress: Optional[float] = None +) -> np.ndarray: + """ + Create a loading overlay frame with animated progress bar. + + Args: + width: Frame width in pixels + height: Frame height in pixels + message: Loading message to display + frame_counter: Current frame number for animations + progress: Progress value 0.0-1.0, or None for animated progress + + Returns: + np.ndarray: Loading overlay frame as BGR image + """ + # Create dark background + frame = np.zeros((height, width, 3), dtype=np.uint8) + frame[:] = (30, 30, 30) # Dark gray background + + # Calculate center position + center_x = width // 2 + center_y = height // 2 + + # Add overlay panel + overlay_width = min(400, width - 40) + overlay_height = min(200, height - 40) + overlay_x = center_x - overlay_width // 2 + overlay_y = center_y - overlay_height // 2 + + # Create overlay background + frame[overlay_y:overlay_y + overlay_height, overlay_x:overlay_x + overlay_width] = (20, 20, 20) + + # Add border + cv2.rectangle(frame, (overlay_x, overlay_y), + (overlay_x + overlay_width, overlay_y + overlay_height), + (60, 60, 60), 2) + + # Add loading message in center + if message: + message_size = 1.2 + message_thickness = 2 + message_y = center_y + + (msg_width, _), _ = cv2.getTextSize(message, cv2.FONT_HERSHEY_SIMPLEX, + message_size, message_thickness) + message_x = center_x - msg_width // 2 + + cv2.putText(frame, message, (message_x, message_y), + cv2.FONT_HERSHEY_SIMPLEX, message_size, (200, 200, 200), message_thickness) + + # Add progress bar + bar_width = overlay_width - 60 + bar_height = 6 + bar_x = overlay_x + 30 + bar_y = center_y + 40 + + # Progress bar background + cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), + (40, 40, 40), -1) + + # Use controllable progress or animated progress + if progress is not None and progress > 0: + current_progress = min(1.0, max(0.0, progress)) + else: + # Animated progress (oscillating) + current_progress = (math.sin(frame_counter * 0.1) + 1) * 0.5 + + progress_width = int(bar_width * current_progress) + + if progress_width > 0: + cv2.rectangle(frame, (bar_x, bar_y), + (bar_x + progress_width, bar_y + bar_height), + (100, 150, 255), -1) # Blue progress + + return frame \ No newline at end of file