Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
43dfda6
feat(stream_processor): call load_model automatically during init, at…
eliteprox Sep 15, 2025
2b17af1
tests(model_loader): fix failing test, add settings.json
eliteprox Nov 6, 2025
dd0690c
feat: Implement loading overlay example and utilities
eliteprox Sep 28, 2025
97af03d
add code attribution
eliteprox Oct 28, 2025
b1e7615
add color_format and validation to create_loading_frame
eliteprox Oct 28, 2025
ee44775
examples(loading_overlay): remove unused background tasks and add byo…
eliteprox Oct 28, 2025
6a338fe
launch.json: update description for loading example
eliteprox Oct 28, 2025
d34409b
Merge branch 'feat/loading-overlay' into fix/load-model-sync
eliteprox Nov 6, 2025
8cbfd92
consolidate examples
eliteprox Nov 6, 2025
5055d3b
fix: Correct status code name in RegisterCapability class
eliteprox Nov 6, 2025
28922a4
refactor: Remove redundant model loading checks and enhance parameter…
eliteprox Nov 6, 2025
9f668d8
feat: Introduce build_loading_overlay_frame utility and refactor load…
eliteprox Nov 6, 2025
1d3349e
feat: Add input queue clearing functionality to prevent stale frame p…
eliteprox Nov 7, 2025
240a48b
feat: Implement warmup handler functionality in FrameProcessor and re…
eliteprox Nov 7, 2025
4d0637d
fix failing tests
eliteprox Nov 7, 2025
95524a2
feat: Add clear_queues_on_update parameter to StreamServer and Stream…
eliteprox Nov 7, 2025
5a1cb95
refactor(ErrorCallback): Revert change to ErrorCallback type alias
eliteprox Nov 7, 2025
bbb6b3d
revert unintended changes to init.py and client
eliteprox Nov 8, 2025
f3aebb1
feat: moved loading_overlay to utils and renamed example for clarity
eliteprox Nov 8, 2025
71ecd12
refactor(FrameProcessor): Simplify warmup handling and improve parame…
eliteprox Nov 8, 2025
3f0fda2
feat(StreamServer): Add pipeline readiness check before starting stream
eliteprox Nov 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@
},
"justMyCode": true,
"python": "${command:python.interpreterPath}"
},
{
"name": "Loading Example",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/examples/loading_overlay.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/examples",
"env": {
"ORCH_URL": "https://localhost:9995",
"ORCH_SECRET": "orch-secret",
"CAPABILITY_NAME": "comfystream",
"CAPABILITY_DESCRIPTION": "Loading overlay processor with animated progress bars",
"CAPABILITY_URL": "http://localhost:8000",
"CAPABILITY_PRICE_PER_UNIT": "0",
"CAPABILITY_PRICE_SCALING": "1",
"CAPABILITY_CAPACITY": "1"
},
"justMyCode": true,
"python": "${command:python.interpreterPath}"
}
]
}
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async def main():
await app.run_forever()
```

For a complete working example with green tint processing, see `examples/async_processor_example.py`.
For a complete working example with green tint processing, see [`examples/process_video_example.py`](examples/process_video_example.py) and [`examples/model_loading_example.py`](examples/model_loading_example.py).

## HTTP API

Expand Down
192 changes: 192 additions & 0 deletions examples/loading_overlay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""
Model Loading and Loading Overlay Example

This comprehensive example demonstrates:
1. Non-blocking model loading with configurable delay
2. Server health state transitions (LOADING -> IDLE)
3. Optional animated loading overlay during processing
4. Real-time parameter updates to control overlay

The server starts immediately and is available for /health checks while
the model loads in the background. The loading overlay can be toggled
independently to show visual feedback during processing.

To test:
1. Run: python examples/loading_overlay.py
2. Check health: curl http://localhost:8001/health
3. Update parameters:
curl -X POST http://localhost:8001/update_params \
-H "Content-Type: application/json" \
-d '{"show_loading": true, "loading_message": "Processing..."}'
"""

import asyncio
import logging
import time
from pytrickle import StreamProcessor
from pytrickle.frames import VideoFrame, AudioFrame, build_loading_overlay_frame
from pytrickle.frame_skipper import FrameSkipConfig
from pytrickle.utils.register import RegisterCapability

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Model loading configuration
MODEL_LOAD_DELAY_SECONDS = 3.0 # Configurable model load delay
model_loaded = False
model_load_start_time = None

# Loading overlay state
show_loading = False
loading_message = "Loading..."
frame_counter = 0

async def load_model(**kwargs):
"""
Simulate a model loading process with configurable delay.

This demonstrates non-blocking model loading. The server is available
for health checks immediately, while model loading happens in background.
Health endpoint transitions from LOADING to IDLE once complete.

In a real application, you would load your model here, e.g.:
- model = torch.load('my_model.pth')
- tokenizer = AutoTokenizer.from_pretrained('model_name')
"""
global model_loaded, model_load_start_time, show_loading

model_load_start_time = time.time()
logger.info("🔄 Model loading started...")

# Get configurable delay from kwargs or use default
load_delay = kwargs.get('load_delay', MODEL_LOAD_DELAY_SECONDS)
show_loading = kwargs.get('show_loading', False)

if load_delay > 0:
logger.info(f"Simulating model load for {load_delay:.1f}s...")
await asyncio.sleep(load_delay)

# In a real application, load your model here
# e.g., self.model = torch.load('my_model.pth')

model_loaded = True
load_duration = time.time() - model_load_start_time
logger.info(f"✅ Model loading complete in {load_duration:.2f}s (show_loading: {show_loading})")

async def on_stream_start():
"""Called when a stream starts."""
logger.info("🎬 Stream started")
if not model_loaded:
logger.warning("⚠️ Model not loaded yet - frames will pass through until ready")

async def on_stream_stop():
"""Called when stream stops - cleanup resources."""
logger.info("🛑 Stream stopped, cleaning up resources")
# Reset frame counter and loading state for next stream
global frame_counter, show_loading
frame_counter = 0
show_loading = False
logger.info("✅ Resources cleaned up (show_loading reset to False)")

async def process_video(frame: VideoFrame) -> VideoFrame:
"""
Process video frame - show loading overlay if enabled, otherwise passthrough.

This demonstrates how to conditionally replace frames with a loading overlay.
When show_loading is False, frames pass through unchanged.
When show_loading is True, frames are replaced with animated loading graphics.
"""
global show_loading, frame_counter, loading_message

# Increment frame counter for animations
frame_counter += 1

if not show_loading:
# Passthrough mode - return original frame
return frame

# Loading overlay mode - replace frame with loading animation
return build_loading_overlay_frame(
original_frame=frame,
message=loading_message,
frame_counter=frame_counter
)

async def process_audio(frame: AudioFrame) -> list[AudioFrame]:
"""Pass-through audio processing."""
return [frame]

async def update_params(params: dict):
"""
Handle real-time parameter updates from the client.

Note: The 'load_model' sentinel parameter is handled internally by
StreamProcessor and won't appear here. User-defined parameters like
'show_loading' and 'loading_message' are passed through.
"""
global show_loading, loading_message

logger.info(f"Parameters updated: {params}")

if "show_loading" in params:
old = show_loading
show_loading = bool(params["show_loading"])
if old != show_loading:
status = "enabled" if show_loading else "disabled"
logger.info(f"🎨 Loading overlay: {status}")

if "loading_message" in params:
old = loading_message
loading_message = str(params["loading_message"])
if old != loading_message:
logger.info(f"💬 Loading message: '{old}' → '{loading_message}'")

async def register_with_orchestrator(app):
"""Register this capability with the orchestrator on startup."""
registrar = RegisterCapability(logger)
result = await registrar.register_capability()
if result:
logger.info(f"✅ Successfully registered capability with orchestrator: {result}")
else:
logger.info("ℹ️ Orchestrator registration skipped (no ORCH_URL/ORCH_SECRET provided)")

# Create and run StreamProcessor
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("Model Loading & Loading Overlay Example")
logger.info("=" * 60)
logger.info("")
logger.info("This example demonstrates:")
logger.info(" 1. Non-blocking model loading with health state transitions")
logger.info(" 2. Animated loading overlay that can be toggled via parameters")
logger.info("")
logger.info("Server will start immediately on http://localhost:8001")
logger.info(f"Model will load in background (~{MODEL_LOAD_DELAY_SECONDS}s delay)")
logger.info("")
logger.info("Test endpoints:")
logger.info(" curl http://localhost:8001/health")
logger.info(" curl -X POST http://localhost:8001/update_params \\")
logger.info(' -H "Content-Type: application/json" \\')
logger.info(' -d \'{"show_loading": true, "loading_message": "Processing..."}\'')
logger.info("")
logger.info("Parameters you can update:")
logger.info(" - show_loading: true/false - Enable/disable loading overlay")
logger.info(" - loading_message: string - Custom loading message")
logger.info("")

processor = StreamProcessor(
video_processor=process_video,
audio_processor=process_audio,
model_loader=load_model,
param_updater=update_params,
on_stream_start=on_stream_start,
on_stream_stop=on_stream_stop,
name="model-loading-demo",
port=8000,
frame_skip_config=FrameSkipConfig(),
# Add orchestrator registration on startup
on_startup=[register_with_orchestrator],
route_prefix="/"
)
processor.run()
12 changes: 6 additions & 6 deletions pytrickle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,16 @@
Provides functionality to subscribe to and publish video streams with real-time processing.
"""

from typing import Union, Callable, Optional, Coroutine, Any

# Type alias for error callback functions (async only)
ErrorCallback = Callable[[str, Optional[Exception]], Coroutine[Any, Any, None]]

from .base import TrickleComponent, ComponentState, ErrorCallback
from .client import TrickleClient
from .server import StreamServer
from .protocol import TrickleProtocol
from .frames import (
VideoFrame, AudioFrame, VideoOutput, AudioOutput,
FrameBuffer,
build_loading_overlay_frame,
)
from .state import StreamState
from .base import TrickleComponent, ComponentState
from .publisher import TricklePublisher
from .subscriber import TrickleSubscriber
from .manager import BaseStreamManager, TrickleStreamManager, StreamHandler
Expand All @@ -29,6 +25,7 @@
from .stream_processor import StreamProcessor
from .fps_meter import FPSMeter
from .frame_skipper import FrameSkipConfig
from .warmup_config import WarmupConfig, WarmupMode

from . import api

Expand Down Expand Up @@ -59,5 +56,8 @@
"FrameProcessor",
"FPSMeter",
"FrameSkipConfig",
"WarmupConfig",
"WarmupMode",
"build_loading_overlay_frame",
"__version__"
]
7 changes: 4 additions & 3 deletions pytrickle/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
import logging
import time
from enum import Enum
from typing import Optional, List

from . import ErrorCallback
from typing import Optional, List, Callable, Coroutine, Any

logger = logging.getLogger(__name__)

# Type alias for error callback functions (async only)
ErrorCallback = Callable[[str, Optional[Exception]], Coroutine[Any, Any, None]]

class ComponentState(Enum):
"""States for trickle protocol components.

Expand Down
35 changes: 33 additions & 2 deletions pytrickle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import logging
import time
import json
from typing import Callable, Optional, Union, Deque, Any
from typing import Callable, Optional, Deque, Any
from collections import deque

from .base import ErrorCallback
from .protocol import TrickleProtocol
from .frames import VideoFrame, AudioFrame, VideoOutput, AudioOutput
from . import ErrorCallback
from .frame_processor import FrameProcessor
from .frame_skipper import AdaptiveFrameSkipper, FrameSkipConfig, FrameProcessingResult

Expand Down Expand Up @@ -64,6 +64,9 @@ def __init__(
self.output_queue = asyncio.Queue(maxsize=200)
self.data_queue: Deque[Any] = deque(maxlen=1000)

# Track queue size for clearing
self.max_queue_size = max_queue_size

# Optional frame skipper
if frame_skip_config is not None:
self.frame_skipper = AdaptiveFrameSkipper(
Expand Down Expand Up @@ -137,6 +140,34 @@ async def start(self, request_id: str = "default"):
if self.frame_skipper:
self.frame_skipper.reset()

async def clear_input_queues(self):
"""
Clear pending frames from input queues.

Useful when parameters change and old queued frames become stale.
This prevents processing old frames with new parameters.
"""
# Clear video queue
cleared_video = 0
while not self.video_input_queue.empty():
try:
self.video_input_queue.get_nowait()
cleared_video += 1
except asyncio.QueueEmpty:
break

# Clear audio queue
cleared_audio = 0
while not self.audio_input_queue.empty():
try:
self.audio_input_queue.get_nowait()
cleared_audio += 1
except asyncio.QueueEmpty:
break

if cleared_video or cleared_audio:
logger.info(f"Cleared stale frames from input queues: {cleared_video} video, {cleared_audio} audio")

async def stop(self):
"""Stop the trickle client."""
if not self.running:
Expand Down
25 changes: 25 additions & 0 deletions pytrickle/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ModelLoaderProtocol,
ParamUpdaterProtocol,
VideoHandlerProtocol,
WarmupProtocol,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -419,3 +420,27 @@ async def wrapper(*args: Any, **kwargs: Any) -> None:
setattr(wrapper, "_trickle_handler_type", "stream_stop")
setattr(wrapper, "_trickle_handler_info", getattr(func, "_trickle_handler_info", None))
return wrapper


def warmup(
func: HandlerFn,
) -> WarmupProtocol:
"""Decorator for warmup handlers.

The description is automatically generated from the function name.
"""
base_wrapper = _wrap_handler(
"warmup",
description=f"Warmup handler: {func.__name__}",
validate_signature=True,
)(func)

@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> None:
await base_wrapper(*args, **kwargs)
return None

setattr(wrapper, "_trickle_handler", True)
setattr(wrapper, "_trickle_handler_type", "warmup")
setattr(wrapper, "_trickle_handler_info", getattr(func, "_trickle_handler_info", None))
return wrapper
Loading