Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 161 additions & 3 deletions run_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,62 @@ def __repr__(self):

OpenAI = _OpenAIProxy()

# Process-local guard against two gateway/background workers compacting the
# same parent session at the same time. Context compression rotates the
# session_id; without a per-parent lock and winner cache, a stale concurrent
# worker can fork a second child from the same parent before the first child has
# flushed its compacted messages to SQLite.
_CONTEXT_COMPRESSION_LOCKS: dict[str, threading.RLock] = {}
_CONTEXT_COMPRESSION_LOCKS_GUARD = threading.Lock()
_CONTEXT_COMPRESSION_RESULTS: dict[str, tuple[float, str, list[dict[str, Any]], str]] = {}
_CONTEXT_COMPRESSION_RESULT_TTL_S = 600.0


def _context_compression_lock(session_id: str) -> threading.RLock:
key = session_id or "__no_session__"
with _CONTEXT_COMPRESSION_LOCKS_GUARD:
lock = _CONTEXT_COMPRESSION_LOCKS.get(key)
if lock is None:
lock = threading.RLock()
_CONTEXT_COMPRESSION_LOCKS[key] = lock
return lock


def _remember_context_compression(parent_session_id: str, child_session_id: str, messages: list[dict[str, Any]], system_prompt: str) -> None:
if not parent_session_id or not child_session_id or parent_session_id == child_session_id:
return
now = time.time()
with _CONTEXT_COMPRESSION_LOCKS_GUARD:
# Opportunistic TTL cleanup keeps the process-local cache bounded.
expired = [
sid for sid, (created_at, _child, _msgs, _prompt) in _CONTEXT_COMPRESSION_RESULTS.items()
if now - created_at > _CONTEXT_COMPRESSION_RESULT_TTL_S
]
for sid in expired:
_CONTEXT_COMPRESSION_RESULTS.pop(sid, None)
_CONTEXT_COMPRESSION_RESULTS[parent_session_id] = (
now,
child_session_id,
copy.deepcopy(messages),
system_prompt or "",
)


def _get_remembered_context_compression(parent_session_id: str) -> tuple[str, list[dict[str, Any]], str] | None:
if not parent_session_id:
return None
now = time.time()
with _CONTEXT_COMPRESSION_LOCKS_GUARD:
cached = _CONTEXT_COMPRESSION_RESULTS.get(parent_session_id)
if cached is None:
return None
created_at, child_session_id, messages, system_prompt = cached
if now - created_at > _CONTEXT_COMPRESSION_RESULT_TTL_S:
_CONTEXT_COMPRESSION_RESULTS.pop(parent_session_id, None)
return None
return child_session_id, copy.deepcopy(messages), system_prompt


# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
from hermes_cli.env_loader import load_hermes_dotenv
Expand Down Expand Up @@ -10653,7 +10709,101 @@ def _should_sanitize_tool_calls(self) -> bool:
"""
return self.api_mode != "codex_responses"

def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default", focus_topic: str = None) -> tuple:
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int | None = None, task_id: str = "default", focus_topic: str | None = None) -> tuple:
"""Serialize compression per parent session and reuse the winning split.

Gateway background work can race a live user turn: both workers may load
the same oversized parent history and call compression. The first worker
must be the only one to rotate the parent session. Later stale callers
adopt the winning child session and compacted transcript instead of
creating sibling compression children.
"""
parent_session_id = self.session_id or ""
if not self._session_db:
return self._compress_context_locked(
messages,
system_message,
approx_tokens=approx_tokens,
task_id=task_id,
focus_topic=focus_topic,
)

lock = _context_compression_lock(parent_session_id)
with lock:
remembered = _get_remembered_context_compression(parent_session_id)
if remembered is not None:
child_session_id, remembered_messages, remembered_prompt = remembered
logger.info(
"context compression reused: parent=%s child=%s messages=%d",
parent_session_id or "none",
child_session_id or "none",
len(remembered_messages),
)
self._adopt_compression_child_session(
parent_session_id=parent_session_id,
child_session_id=child_session_id,
system_prompt=remembered_prompt,
)
return remembered_messages, remembered_prompt

compressed, new_system_prompt = self._compress_context_locked(
messages,
system_message,
approx_tokens=approx_tokens,
task_id=task_id,
focus_topic=focus_topic,
)
child_session_id = self.session_id or ""
_remember_context_compression(
parent_session_id,
child_session_id,
compressed,
new_system_prompt,
)
return compressed, new_system_prompt

def _adopt_compression_child_session(self, *, parent_session_id: str, child_session_id: str, system_prompt: str) -> None:
"""Move this agent instance onto a compression child created elsewhere."""
if not child_session_id or child_session_id == self.session_id:
return
old_session_id = parent_session_id or self.session_id or ""
self.session_id = child_session_id
os.environ["HERMES_SESSION_ID"] = self.session_id
try:
from gateway.session_context import _SESSION_ID
_SESSION_ID.set(self.session_id)
except Exception:
pass
try:
self.session_log_file = self.logs_dir / f"session_{self.session_id}.json"
except Exception:
pass
self._cached_system_prompt = system_prompt
self._session_db_created = True
self._last_flushed_db_idx = 0

try:
if old_session_id and hasattr(self.context_compressor, "on_session_start"):
self.context_compressor.on_session_start(
self.session_id or "",
boundary_reason="compression",
old_session_id=old_session_id,
)
except Exception as _ce_err:
logger.debug("context engine on_session_start (compression reuse): %s", _ce_err)

try:
if old_session_id and self._memory_manager:
self._memory_manager.on_session_switch(
self.session_id or "",
parent_session_id=old_session_id,
reset=False,
reason="compression_reuse",
)
except Exception as _me_err:
logger.debug("memory manager on_session_switch (compression reuse): %s", _me_err)

def _compress_context_locked(self, messages: list, system_message: str, *, approx_tokens: int | None = None, task_id: str = "default", focus_topic: str | None = None) -> tuple:
"""Compress conversation context and split the session in SQLite.

Args:
Expand Down Expand Up @@ -10682,12 +10832,20 @@ def _compress_context(self, messages: list, system_message: str, *, approx_token
except Exception:
pass

_current_tokens = approx_tokens if approx_tokens is not None else 0
try:
compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens, focus_topic=focus_topic)
if focus_topic is not None:
compressed = self.context_compressor.compress(
messages,
current_tokens=_current_tokens,
focus_topic=focus_topic,
)
else:
compressed = self.context_compressor.compress(messages, current_tokens=_current_tokens)
except TypeError:
# Plugin context engine with strict signature that doesn't accept
# focus_topic — fall back to calling without it.
compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens)
compressed = self.context_compressor.compress(messages, current_tokens=_current_tokens)

summary_error = getattr(self.context_compressor, "_last_summary_error", None)
if summary_error:
Expand Down
66 changes: 66 additions & 0 deletions tests/run_agent/test_413_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,72 @@ def test_no_preflight_when_under_threshold(self, agent):
mock_compress.assert_not_called()
assert result["completed"] is True

def test_duplicate_compression_for_same_parent_reuses_first_child(self, agent, tmp_path):
"""A stale concurrent turn must not create a second compression child.

Gateway background review and a live user turn can both start from the
same parent session. The first compaction should win; the second stale
caller should adopt the winning child instead of compacting the old
transcript again and forking a sibling continuation.
"""
class FakeSessionDB:
def __init__(self):
self.created_children = []
self.ended = []

def get_session_title(self, session_id):
return None

def end_session(self, session_id, reason):
self.ended.append((session_id, reason))

def create_session(self, **kwargs):
self.created_children.append(kwargs)

def update_system_prompt(self, session_id, system_prompt):
pass

fake_db = FakeSessionDB()
agent._session_db = fake_db
agent.session_id = "parent-session"
agent.logs_dir = tmp_path
agent.platform = "telegram"
agent._session_init_model_config = {}
agent._memory_manager = None
agent._todo_store = SimpleNamespace(format_for_injection=lambda: "")
agent.context_compressor.compress = MagicMock(
return_value=[{"role": "user", "content": f"{SUMMARY_PREFIX}\ncompressed"}]
)

with (
patch.object(agent, "_build_system_prompt", return_value="new system prompt"),
patch("run_agent.estimate_request_tokens_rough", return_value=42),
):
first_messages, first_prompt = agent._compress_context(
[{"role": "user", "content": "old transcript"}],
"system prompt",
approx_tokens=1234,
)
winning_child = agent.session_id

# Simulate a second worker that started before the first one split
# the session and still believes it owns the parent session.
agent.session_id = "parent-session"
agent.context_compressor.compress.reset_mock()
second_messages, second_prompt = agent._compress_context(
[{"role": "user", "content": "old transcript"}],
"system prompt",
approx_tokens=1234,
)

assert winning_child != "parent-session"
assert agent.session_id == winning_child
assert first_messages == second_messages
assert first_prompt == second_prompt
agent.context_compressor.compress.assert_not_called()
assert len(fake_db.created_children) == 1
assert fake_db.created_children[0]["parent_session_id"] == "parent-session"

def test_no_preflight_when_compression_disabled(self, agent):
"""Preflight should not run when compression is disabled."""
agent.compression_enabled = False
Expand Down
Loading