-
Notifications
You must be signed in to change notification settings - Fork 3
Implement non-blocking model loading with accurate health state management #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
pytrickle/stream_processor.py
Outdated
# Schedule non-blocking background preload so server can accept /health immediately | ||
async def _background_preload(): | ||
try: | ||
if getattr(self._frame_processor, "state", None) is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be repeated a lot - we could extract this into a method, get it once, and maybe assign it to a variable if appropriate to avoid the repeated calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_on_startup
is a "callback" method that's being registered to the server's existing post-startup event to initiate load_model
pytrickle/pytrickle/stream_processor.py
Lines 123 to 126 in f888c13
try: | |
self.server.app.on_startup.append(_on_startup) | |
except Exception as e: | |
logger.error(f"Failed to register startup hook: {e}") |
This can also be done manually outside of pytrickle by accessing the StreamProcessor's server property as was done in ComfyStream to load the pipeline:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah...you're referring to getting the current state. Totally agree!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is the state of frame processor and server are still separate. I think we could simplify get and update the state of the Server instead of keeping a state on the frame processor and using "attach_state". wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the state property on the base frame_processor class so these None checks could be removed
e810f27
pytrickle/stream_processor.py
Outdated
logger.error(f"Error preloading model on startup: {e}") | ||
|
||
try: | ||
asyncio.get_running_loop().create_task(_background_preload()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want this async? seems like we could get the stated intent of the PR, to load the model synchronously ( blocking ), with await _background_preload()
- asyncio might report "ready" before the model is loaded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, due to this being called via the server startup event, it blocks the server from being available unless a task is created to run it in the background (non-blocking). The health state begins with LOADING
and transitions to IDLE
. For managed containers, it is important for the server to be available immediately to rule out other potential docker container issues and complete a health check (in this case LOADING
is returned until set_startup_complete()
is called at the end). In a sense, model loading is now synchronous due to the model loading lock and the health state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is where the user's load_model callback is used with the lock
pytrickle/pytrickle/frame_processor.py
Lines 68 to 76 in e810f27
async def ensure_model_loaded(self, **kwargs): | |
"""Thread-safe wrapper that ensures model is loaded exactly once.""" | |
async with self._model_load_lock: | |
if not self._model_loaded: | |
await self.load_model(**kwargs) | |
self._model_loaded = True | |
logger.debug(f"Model loaded for {self.__class__.__name__}") | |
else: | |
logger.debug(f"Model already loaded for {self.__class__.__name__}") |
This can be tested by simpling running process_video_example.py
from the launch config, and sending a curl request within the first 3 seconds:
curl -X GET http://localhost:8000/health -H "Accept: application/json"
It should read LOADING
and flip to IDLE
after 3 seconds. This can be adjusted here
MODEL_LOAD_DELAY_SECONDS = 3.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made another change to track the background model loading from stream processor and cancel if needed. I think this is more of what you were looking for? 3a8c523
This pull request introduces several improvements to PyTrickle's model loading and server startup flow, focusing on ensuring that the model is loaded exactly once, synchronizing server health status with model readiness, and improving background task management to prevent memory leaks. The changes enhance reliability and make the server's health reporting more accurate during startup.
Model loading and state management:
ensure_model_loaded
method toFrameProcessor
that uses anasyncio.Lock
to guarantee the model is loaded only once, and sets the pipeline state to ready after loading. (pytrickle/frame_processor.py
pytrickle/frame_processor.pyL57-R76)ensure_model_loaded
before starting frame processing, ensuring the model is ready on the correct event loop. (pytrickle/client.py
[1]pytrickle/stream_processor.py
[2]LOADING
until the model is fully loaded, improving health endpoint accuracy. (pytrickle/server.py
pytrickle/server.pyL634-R634)Server and processor integration:
pytrickle/stream_processor.py
pytrickle/stream_processor.pyR87-R120)Background task management:
StreamProcessor
to prevent memory leaks, including acleanup
method called on shutdown. (pytrickle/stream_processor.py
[1] [2]Testing and examples:
tests/test_state_integration.py
tests/test_state_integration.pyL186-R190)/health
endpoint reportsLOADING
until the model is ready. (examples/process_video_example.py
examples/process_video_example.pyR27-L40)