diff --git a/run_agent.py b/run_agent.py index 1dd4219b22e..105a2701276 100644 --- a/run_agent.py +++ b/run_agent.py @@ -100,6 +100,62 @@ def __repr__(self): OpenAI = _OpenAIProxy() +# Process-local guard against two gateway/background workers compacting the +# same parent session at the same time. Context compression rotates the +# session_id; without a per-parent lock and winner cache, a stale concurrent +# worker can fork a second child from the same parent before the first child has +# flushed its compacted messages to SQLite. +_CONTEXT_COMPRESSION_LOCKS: dict[str, threading.RLock] = {} +_CONTEXT_COMPRESSION_LOCKS_GUARD = threading.Lock() +_CONTEXT_COMPRESSION_RESULTS: dict[str, tuple[float, str, list[dict[str, Any]], str]] = {} +_CONTEXT_COMPRESSION_RESULT_TTL_S = 600.0 + + +def _context_compression_lock(session_id: str) -> threading.RLock: + key = session_id or "__no_session__" + with _CONTEXT_COMPRESSION_LOCKS_GUARD: + lock = _CONTEXT_COMPRESSION_LOCKS.get(key) + if lock is None: + lock = threading.RLock() + _CONTEXT_COMPRESSION_LOCKS[key] = lock + return lock + + +def _remember_context_compression(parent_session_id: str, child_session_id: str, messages: list[dict[str, Any]], system_prompt: str) -> None: + if not parent_session_id or not child_session_id or parent_session_id == child_session_id: + return + now = time.time() + with _CONTEXT_COMPRESSION_LOCKS_GUARD: + # Opportunistic TTL cleanup keeps the process-local cache bounded. + expired = [ + sid for sid, (created_at, _child, _msgs, _prompt) in _CONTEXT_COMPRESSION_RESULTS.items() + if now - created_at > _CONTEXT_COMPRESSION_RESULT_TTL_S + ] + for sid in expired: + _CONTEXT_COMPRESSION_RESULTS.pop(sid, None) + _CONTEXT_COMPRESSION_RESULTS[parent_session_id] = ( + now, + child_session_id, + copy.deepcopy(messages), + system_prompt or "", + ) + + +def _get_remembered_context_compression(parent_session_id: str) -> tuple[str, list[dict[str, Any]], str] | None: + if not parent_session_id: + return None + now = time.time() + with _CONTEXT_COMPRESSION_LOCKS_GUARD: + cached = _CONTEXT_COMPRESSION_RESULTS.get(parent_session_id) + if cached is None: + return None + created_at, child_session_id, messages, system_prompt = cached + if now - created_at > _CONTEXT_COMPRESSION_RESULT_TTL_S: + _CONTEXT_COMPRESSION_RESULTS.pop(parent_session_id, None) + return None + return child_session_id, copy.deepcopy(messages), system_prompt + + # Load .env from ~/.hermes/.env first, then project root as dev fallback. # User-managed env files should override stale shell exports on restart. from hermes_cli.env_loader import load_hermes_dotenv @@ -10653,7 +10709,101 @@ def _should_sanitize_tool_calls(self) -> bool: """ return self.api_mode != "codex_responses" - def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default", focus_topic: str = None) -> tuple: + def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int | None = None, task_id: str = "default", focus_topic: str | None = None) -> tuple: + """Serialize compression per parent session and reuse the winning split. + + Gateway background work can race a live user turn: both workers may load + the same oversized parent history and call compression. The first worker + must be the only one to rotate the parent session. Later stale callers + adopt the winning child session and compacted transcript instead of + creating sibling compression children. + """ + parent_session_id = self.session_id or "" + if not self._session_db: + return self._compress_context_locked( + messages, + system_message, + approx_tokens=approx_tokens, + task_id=task_id, + focus_topic=focus_topic, + ) + + lock = _context_compression_lock(parent_session_id) + with lock: + remembered = _get_remembered_context_compression(parent_session_id) + if remembered is not None: + child_session_id, remembered_messages, remembered_prompt = remembered + logger.info( + "context compression reused: parent=%s child=%s messages=%d", + parent_session_id or "none", + child_session_id or "none", + len(remembered_messages), + ) + self._adopt_compression_child_session( + parent_session_id=parent_session_id, + child_session_id=child_session_id, + system_prompt=remembered_prompt, + ) + return remembered_messages, remembered_prompt + + compressed, new_system_prompt = self._compress_context_locked( + messages, + system_message, + approx_tokens=approx_tokens, + task_id=task_id, + focus_topic=focus_topic, + ) + child_session_id = self.session_id or "" + _remember_context_compression( + parent_session_id, + child_session_id, + compressed, + new_system_prompt, + ) + return compressed, new_system_prompt + + def _adopt_compression_child_session(self, *, parent_session_id: str, child_session_id: str, system_prompt: str) -> None: + """Move this agent instance onto a compression child created elsewhere.""" + if not child_session_id or child_session_id == self.session_id: + return + old_session_id = parent_session_id or self.session_id or "" + self.session_id = child_session_id + os.environ["HERMES_SESSION_ID"] = self.session_id + try: + from gateway.session_context import _SESSION_ID + _SESSION_ID.set(self.session_id) + except Exception: + pass + try: + self.session_log_file = self.logs_dir / f"session_{self.session_id}.json" + except Exception: + pass + self._cached_system_prompt = system_prompt + self._session_db_created = True + self._last_flushed_db_idx = 0 + + try: + if old_session_id and hasattr(self.context_compressor, "on_session_start"): + self.context_compressor.on_session_start( + self.session_id or "", + boundary_reason="compression", + old_session_id=old_session_id, + ) + except Exception as _ce_err: + logger.debug("context engine on_session_start (compression reuse): %s", _ce_err) + + try: + if old_session_id and self._memory_manager: + self._memory_manager.on_session_switch( + self.session_id or "", + parent_session_id=old_session_id, + reset=False, + reason="compression_reuse", + ) + except Exception as _me_err: + logger.debug("memory manager on_session_switch (compression reuse): %s", _me_err) + + def _compress_context_locked(self, messages: list, system_message: str, *, approx_tokens: int | None = None, task_id: str = "default", focus_topic: str | None = None) -> tuple: """Compress conversation context and split the session in SQLite. Args: @@ -10682,12 +10832,20 @@ def _compress_context(self, messages: list, system_message: str, *, approx_token except Exception: pass + _current_tokens = approx_tokens if approx_tokens is not None else 0 try: - compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens, focus_topic=focus_topic) + if focus_topic is not None: + compressed = self.context_compressor.compress( + messages, + current_tokens=_current_tokens, + focus_topic=focus_topic, + ) + else: + compressed = self.context_compressor.compress(messages, current_tokens=_current_tokens) except TypeError: # Plugin context engine with strict signature that doesn't accept # focus_topic — fall back to calling without it. - compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens) + compressed = self.context_compressor.compress(messages, current_tokens=_current_tokens) summary_error = getattr(self.context_compressor, "_last_summary_error", None) if summary_error: diff --git a/tests/run_agent/test_413_compression.py b/tests/run_agent/test_413_compression.py index 3cbd47c0e1b..c67c1ee7062 100644 --- a/tests/run_agent/test_413_compression.py +++ b/tests/run_agent/test_413_compression.py @@ -519,6 +519,72 @@ def test_no_preflight_when_under_threshold(self, agent): mock_compress.assert_not_called() assert result["completed"] is True + def test_duplicate_compression_for_same_parent_reuses_first_child(self, agent, tmp_path): + """A stale concurrent turn must not create a second compression child. + + Gateway background review and a live user turn can both start from the + same parent session. The first compaction should win; the second stale + caller should adopt the winning child instead of compacting the old + transcript again and forking a sibling continuation. + """ + class FakeSessionDB: + def __init__(self): + self.created_children = [] + self.ended = [] + + def get_session_title(self, session_id): + return None + + def end_session(self, session_id, reason): + self.ended.append((session_id, reason)) + + def create_session(self, **kwargs): + self.created_children.append(kwargs) + + def update_system_prompt(self, session_id, system_prompt): + pass + + fake_db = FakeSessionDB() + agent._session_db = fake_db + agent.session_id = "parent-session" + agent.logs_dir = tmp_path + agent.platform = "telegram" + agent._session_init_model_config = {} + agent._memory_manager = None + agent._todo_store = SimpleNamespace(format_for_injection=lambda: "") + agent.context_compressor.compress = MagicMock( + return_value=[{"role": "user", "content": f"{SUMMARY_PREFIX}\ncompressed"}] + ) + + with ( + patch.object(agent, "_build_system_prompt", return_value="new system prompt"), + patch("run_agent.estimate_request_tokens_rough", return_value=42), + ): + first_messages, first_prompt = agent._compress_context( + [{"role": "user", "content": "old transcript"}], + "system prompt", + approx_tokens=1234, + ) + winning_child = agent.session_id + + # Simulate a second worker that started before the first one split + # the session and still believes it owns the parent session. + agent.session_id = "parent-session" + agent.context_compressor.compress.reset_mock() + second_messages, second_prompt = agent._compress_context( + [{"role": "user", "content": "old transcript"}], + "system prompt", + approx_tokens=1234, + ) + + assert winning_child != "parent-session" + assert agent.session_id == winning_child + assert first_messages == second_messages + assert first_prompt == second_prompt + agent.context_compressor.compress.assert_not_called() + assert len(fake_db.created_children) == 1 + assert fake_db.created_children[0]["parent_session_id"] == "parent-session" + def test_no_preflight_when_compression_disabled(self, agent): """Preflight should not run when compression is disabled.""" agent.compression_enabled = False