feat(service): add stale session cleanup to inference data proxy#1268
feat(service): add stale session cleanup to inference data proxy#1268nuzant wants to merge 8 commits into
Conversation
Consolidate the experimental agent and rollout controller configuration into areal.api.cli_args so the trainer, examples, and tests share one configuration surface. This also wires RolloutControllerV2 into the v2 rollout path and updates examples and integrations to use the new controller APIs. Key changes: - move agent and rollout controller configs into areal.api.cli_args and remove duplicated controller config modules - rename and rewire experimental controllers around AgentController and RolloutControllerV2 - update examples and experimental tests for rollout v2 config, initialization, and versioning Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Expose stale session timeout, cleanup interval, and dump path through the inference-service config and data-proxy CLI so deployments can enable recovery behavior without manual patching. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Wire the new inference-service stale-session settings into the data-proxy launch command so controller-managed deployments can activate the cleanup behavior consistently. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Keep stale-session dumping and RTensor cleanup close to session ownership so the cleanup policy is encapsulated in the store rather than split across app helpers. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Reduce app-level cleanup responsibilities to a thin loop wrapper now that stale-session dumping and shard cleanup live behind SessionStore.cleanup_stale. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
There was a problem hiding this comment.
Code Review
This pull request consolidates configuration classes into areal/api/cli_args.py, renames AgentServiceController to AgentController, and introduces RolloutControllerV2 as a successor to GatewayInferenceController. It also implements a stale session cleanup mechanism in the inference service data proxy, enabling the dumping of inactive sessions to disk. Feedback identifies critical race conditions in the session cleanup and snapshotting logic where locks are released before processing is complete, and recommends offloading synchronous JSON serialization to background threads to prevent blocking the asynchronous event loop.
| with self._lock: | ||
| stale_sessions: list[str] = [] | ||
| stale_sessions: list[tuple[str, SessionData]] = [] | ||
| for sid, session in self._sessions.items(): | ||
| if not session.is_stale(timeout_seconds): | ||
| continue | ||
| if session.has_ready_trajectories: | ||
| continue | ||
| stale_sessions.append(sid) | ||
| stale_sessions.append((sid, session)) | ||
|
|
||
| for sid in stale_sessions: | ||
| self._sessions.pop(sid, None) | ||
| self._remove_api_keys_for_session(sid) | ||
| for sid, session in stale_sessions: | ||
| exports = session.snapshot_cleanup_exports( | ||
| discount=1.0, | ||
| style="individual", | ||
| ) | ||
| await _dump_stale_session_exports( | ||
| sid, | ||
| exports, | ||
| stale_session_dump_path=stale_session_dump_path, | ||
| serving_addr=serving_addr, | ||
| ) | ||
| self.remove_session(sid) |
There was a problem hiding this comment.
There is a race condition in cleanup_stale. Stale sessions are identified under a lock but then processed (dumped) outside the lock while still remaining in the SessionStore. Since _dump_stale_session_exports is an async operation, it yields control, allowing concurrent requests to access or modify these sessions. Furthermore, _prepare_interactions_for_stale_dump modifies interaction objects in-place (localizing tensors), which could lead to data corruption if an active request is using them.
Sessions should be removed from the store immediately after being identified as stale to prevent any further concurrent access.
with self._lock:
stale_sessions: list[tuple[str, SessionData]] = []
for sid in list(self._sessions.keys()):
session = self._sessions[sid]
if session.is_stale(timeout_seconds):
stale_sessions.append((sid, session))
# Remove from store immediately to prevent concurrent access during async dump
self._sessions.pop(sid, None)
self._remove_api_keys_for_session(sid)
for sid, session in stale_sessions:
exports = session.snapshot_cleanup_exports(
discount=1.0,
style="individual",
)
await _dump_stale_session_exports(
sid,
exports,
stale_session_dump_path=stale_session_dump_path,
serving_addr=serving_addr,
)| with self._lock: | ||
| ready_trajectories = list(self._ready_trajectories.values()) | ||
| active_completions = self._active_completions | ||
|
|
||
| exports: list[tuple[int | None, dict[str, InteractionWithTokenLogpReward]]] = [] | ||
|
|
||
| def _export_cache( | ||
| cache: InteractionCache, | ||
| ) -> dict[str, InteractionWithTokenLogpReward]: | ||
| reward_discount = None if cache._apply_reward_discount_called else discount | ||
| return cache.export_interactions( | ||
| style=style, | ||
| reward_discount=reward_discount, | ||
| ) | ||
|
|
||
| for ready in ready_trajectories: | ||
| interactions = _export_cache(ready.completions) | ||
| if interactions: | ||
| exports.append((ready.trajectory_id, interactions)) | ||
|
|
||
| if len(active_completions) != 0: | ||
| interactions = _export_cache(active_completions) | ||
| if interactions: | ||
| exports.append((None, interactions)) | ||
|
|
||
| return exports |
There was a problem hiding this comment.
The lock is released before performing the snapshot exports. This can lead to inconsistent data if the session is modified concurrently by another thread or task while _export_cache is running. It is safer to hold the lock for the duration of the snapshot creation to ensure a consistent state.
with self._lock:
ready_trajectories = list(self._ready_trajectories.values())
active_completions = self._active_completions
exports: list[tuple[int | None, dict[str, InteractionWithTokenLogpReward]]] = []
def _export_cache(
cache: InteractionCache,
) -> dict[str, InteractionWithTokenLogpReward]:
reward_discount = None if cache._apply_reward_discount_called else discount
return cache.export_interactions(
style=style,
reward_discount=reward_discount,
)
for ready in ready_trajectories:
interactions = _export_cache(ready.completions)
if interactions:
exports.append((ready.trajectory_id, interactions))
if active_completions:
interactions = _export_cache(active_completions)
if interactions:
exports.append((None, interactions))
return exports| async with aiofiles.open(dump_path, "w") as dump_file: | ||
| await dump_file.write(json.dumps(serialized)) |
There was a problem hiding this comment.
json.dumps is a synchronous and potentially slow operation for large trajectory data. Calling it directly in the async loop will block the event loop, reducing the overall responsiveness of the data proxy. Consider offloading the serialization to a background thread using run_in_executor.
| async with aiofiles.open(dump_path, "w") as dump_file: | |
| await dump_file.write(json.dumps(serialized)) | |
| import asyncio | |
| serialized_json = await asyncio.get_event_loop().run_in_executor( | |
| None, json.dumps, serialized | |
| ) | |
| async with aiofiles.open(dump_path, "w") as dump_file: | |
| await dump_file.write(serialized_json) |
Prevent stale-session cleanup from leaving sessions reachable during async dump work so new requests cannot mutate the same interaction objects mid-cleanup. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Keep stale snapshot export creation under the session lock and move JSON encoding off the event loop so cleanup stays consistent without blocking proxy responsiveness. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
|
This pull request has been automatically marked as stale because it has not had recent activity within the last 14 days. Please add a comment or push new commits to keep it active. Thank you for your contribution! |
Summary
SessionStore.cleanup_staleand keep the app loop as a thin delegating wrapperRelated to issue #1229
Testing
Notes
main.mainmeans the diff currently includes the stacked commits from refactor(service): rename service controllers and unify service controller configs #1265 as well.