diff --git a/frontend/xterm-log.js b/frontend/xterm-log.js index 208bc3fe..ccb6b7c8 100644 --- a/frontend/xterm-log.js +++ b/frontend/xterm-log.js @@ -221,6 +221,9 @@ class XTermLog { } else if (step.type === 'holding' || step.type === 'auto_holding') { const content = step.content || ''; this.writeln(`${ts} ${ANSI.yellow}hold${ANSI.reset} ${this._clip(content)}`); + } else if (step.type === 'heartbeat') { + const content = (step.content || '').replace(/\n/g, ' '); + this.writeln(`${ts} ${ANSI.gray}beat${ANSI.reset} ${ANSI.dim}${this._clip(content)}${ANSI.reset}`); } else if (step.type === 'error') { const content = (step.content || '').replace(/\n/g, ' '); this.writeln(`${ts} ${ANSI.red}error${ANSI.reset} ${this._clip(content)}`); diff --git a/package.json b/package.json index 21ed7373..304c5aa1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@1mancompany/onemancompany", - "version": "0.7.77", + "version": "0.7.78", "description": "The AI Operating System for One-Person Companies", "bin": { "onemancompany": "bin/cli.js" diff --git a/pyproject.toml b/pyproject.toml index d3de0a5f..0b2023fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "onemancompany" -version = "0.7.77" +version = "0.7.78" description = "A one-man company simulation with pixel art visualization and LangChain AI agents" requires-python = ">=3.12" dependencies = [ diff --git a/src/onemancompany/agents/base.py b/src/onemancompany/agents/base.py index f73be706..de982c2f 100644 --- a/src/onemancompany/agents/base.py +++ b/src/onemancompany/agents/base.py @@ -1,7 +1,10 @@ """Base agent utilities shared across all LangChain agents.""" from __future__ import annotations -from datetime import datetime +import asyncio +import contextlib +from datetime import datetime, timezone +import time from loguru import logger from typing import Any @@ -61,6 +64,7 @@ _NO_OUTPUT = "(no output)" _MISSING_API_KEY_SENTINEL = "missing-api-key" _COMPANY_HOSTING = HostingMode.COMPANY.value +_STREAM_HEARTBEAT_INTERVAL_SEC = 20.0 def _extract_text(content) -> str: @@ -658,87 +662,125 @@ async def run_streamed(self, task: str, on_log=None) -> str: model_used = "" last_tool_calls: list[str] = [] # track tool names for fallback last_tool_results: list[str] = [] + current_phase = "planning" + start_monotonic = time.monotonic() + last_activity_monotonic = start_monotonic + last_tool_name: str | None = None + last_tool_called_at: str | None = None # Debug trace: accumulate full message objects from streaming events debug_messages: list = [] - async for event in self._agent.astream_events( - messages_input, version="v2", config={"recursion_limit": 50}, - ): - kind = event.get("event", "") - data = event.get("data", {}) - if kind == "on_chat_model_start": - inp = data.get("input", "") - if isinstance(inp, list) and inp: - # Capture all input messages for SFT on first LLM call - if not debug_messages: - debug_messages.extend(inp) - last_msg = inp[-1] - if hasattr(last_msg, "content"): - content = last_msg.content or "" - if isinstance(content, str): - on_log("llm_input", f"[{type(last_msg).__name__}] {content}") - logger.debug("[LLM INPUT] employee={}: {}", self.employee_id, content[:3000]) - elif kind == "on_chat_model_end": - output = data.get("output", None) - if output: - # Capture AI message for Debug trace - debug_messages.append(output) - # Extract token usage — try response_metadata first, then usage_metadata - meta = getattr(output, "response_metadata", {}) or {} - usage = meta.get("usage", {}) or meta.get("token_usage", {}) or {} - if usage: - total_input_tokens += usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0) - total_output_tokens += usage.get("completion_tokens", 0) or usage.get("output_tokens", 0) - # Provider-reported cost (e.g. OpenRouter includes "cost" in token_usage) - if "cost" in usage and usage["cost"]: # pragma: no cover - provider_cost = (provider_cost or 0.0) + float(usage["cost"]) # pragma: no cover - else: - # Streaming mode: usage lives in usage_metadata (requires stream_usage=True) - usage_meta = getattr(output, "usage_metadata", None) - if usage_meta and isinstance(usage_meta, dict): # pragma: no cover - total_input_tokens += usage_meta.get("input_tokens", 0) # pragma: no cover - total_output_tokens += usage_meta.get("output_tokens", 0) # pragma: no cover + async def _heartbeat_loop() -> None: + while True: + await asyncio.sleep(_STREAM_HEARTBEAT_INTERVAL_SEC) + idle_seconds = time.monotonic() - last_activity_monotonic + if idle_seconds < _STREAM_HEARTBEAT_INTERVAL_SEC: + continue + elapsed_seconds = int(time.monotonic() - start_monotonic) + heartbeat = { + "phase": current_phase, + "idle_seconds": int(idle_seconds), + "elapsed_seconds": elapsed_seconds, + "last_tool_name": last_tool_name, + "last_tool_call_at": last_tool_called_at, + "content": ( + f"⏱ heartbeat phase={current_phase} " + f"idle={int(idle_seconds)}s elapsed={elapsed_seconds}s" + ), + } + on_log("heartbeat", heartbeat) + + heartbeat_task = asyncio.create_task(_heartbeat_loop()) + try: + async for event in self._agent.astream_events( + messages_input, version="v2", config={"recursion_limit": 50}, + ): + kind = event.get("event", "") + data = event.get("data", {}) + last_activity_monotonic = time.monotonic() + if kind == "on_chat_model_start": + current_phase = "waiting_for_llm" + inp = data.get("input", "") + if isinstance(inp, list) and inp: + # Capture all input messages for SFT on first LLM call + if not debug_messages: + debug_messages.extend(inp) + last_msg = inp[-1] + if hasattr(last_msg, "content"): + content = last_msg.content or "" + if isinstance(content, str): + on_log("llm_input", f"[{type(last_msg).__name__}] {content}") + logger.debug("[LLM INPUT] employee={}: {}", self.employee_id, content[:3000]) + elif kind == "on_chat_model_end": + current_phase = "processing_response" + output = data.get("output", None) + if output: + # Capture AI message for Debug trace + debug_messages.append(output) + # Extract token usage — try response_metadata first, then usage_metadata + meta = getattr(output, "response_metadata", {}) or {} + usage = meta.get("usage", {}) or meta.get("token_usage", {}) or {} + if usage: + total_input_tokens += usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0) + total_output_tokens += usage.get("completion_tokens", 0) or usage.get("output_tokens", 0) + # Provider-reported cost (e.g. OpenRouter includes "cost" in token_usage) + if "cost" in usage and usage["cost"]: # pragma: no cover + provider_cost = (provider_cost or 0.0) + float(usage["cost"]) # pragma: no cover else: - logger.debug("[COST] on_chat_model_end: no usage data for employee={}, meta_keys={}", self.employee_id, list(meta.keys())) - if not model_used: - model_used = meta.get("model_name", "") or meta.get("model", "") - - if hasattr(output, "content"): - text = _extract_text(output.content) - if text.strip(): - final_content = text # track last AI output - on_log("llm_output", text) - logger.debug("[LLM OUTPUT] employee={}: {}", self.employee_id, text[:3000]) - tool_calls = getattr(output, "tool_calls", None) - if tool_calls: - last_tool_calls = [] - last_tool_results = [] - for tc in tool_calls: - name = tc.get("name", "?") - args_dict = tc.get("args", {}) - args = str(args_dict) - last_tool_calls.append(name) - on_log("tool_call", { - "tool_name": name, - "tool_args": args_dict, - "content": f"{name}({args})", - }) - logger.debug("[TOOL CALL] employee={}: {}({})", self.employee_id, name, args[:1000]) - elif kind == "on_tool_end": - output = data.get("output", "") - name = event.get("name", "tool") - result_str = str(output) - last_tool_results.append(f"{name} → {result_str}") - logger.debug("[TOOL RESULT] employee={}: {} → {}", self.employee_id, name, result_str[:2000]) - on_log("tool_result", { - "tool_name": name, - "tool_result": result_str, - "content": f"{name} → {result_str}", - }) - # Capture ToolMessage for Debug trace - raw_output = data.get("output") - if raw_output and hasattr(raw_output, "content"): # pragma: no cover - debug_messages.append(raw_output) # pragma: no cover + # Streaming mode: usage lives in usage_metadata (requires stream_usage=True) + usage_meta = getattr(output, "usage_metadata", None) + if usage_meta and isinstance(usage_meta, dict): # pragma: no cover + total_input_tokens += usage_meta.get("input_tokens", 0) # pragma: no cover + total_output_tokens += usage_meta.get("output_tokens", 0) # pragma: no cover + else: + logger.debug("[COST] on_chat_model_end: no usage data for employee={}, meta_keys={}", self.employee_id, list(meta.keys())) + if not model_used: + model_used = meta.get("model_name", "") or meta.get("model", "") + + if hasattr(output, "content"): + text = _extract_text(output.content) + if text.strip(): + final_content = text # track last AI output + on_log("llm_output", text) + logger.debug("[LLM OUTPUT] employee={}: {}", self.employee_id, text[:3000]) + tool_calls = getattr(output, "tool_calls", None) + if tool_calls: + current_phase = "tool_calling" + last_tool_calls = [] + last_tool_results = [] + for tc in tool_calls: + name = tc.get("name", "?") + args_dict = tc.get("args", {}) + args = str(args_dict) + last_tool_calls.append(name) + last_tool_name = name + last_tool_called_at = datetime.now(timezone.utc).isoformat() + on_log("tool_call", { + "tool_name": name, + "tool_args": args_dict, + "content": f"{name}({args})", + }) + logger.debug("[TOOL CALL] employee={}: {}({})", self.employee_id, name, args[:1000]) + elif kind == "on_tool_end": + current_phase = "waiting_for_llm" + output = data.get("output", "") + name = event.get("name", "tool") + result_str = str(output) + last_tool_results.append(f"{name} → {result_str}") + logger.debug("[TOOL RESULT] employee={}: {} → {}", self.employee_id, name, result_str[:2000]) + on_log("tool_result", { + "tool_name": name, + "tool_result": result_str, + "content": f"{name} → {result_str}", + }) + # Capture ToolMessage for Debug trace + raw_output = data.get("output") + if raw_output and hasattr(raw_output, "content"): # pragma: no cover + debug_messages.append(raw_output) # pragma: no cover + finally: + heartbeat_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await heartbeat_task # If no text content from LLM, synthesize from last tool calls if not final_content.strip() and last_tool_calls: # pragma: no cover diff --git a/tests/unit/agents/test_base.py b/tests/unit/agents/test_base.py index 6de41b0f..f6853cb2 100644 --- a/tests/unit/agents/test_base.py +++ b/tests/unit/agents/test_base.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch @@ -1280,6 +1281,102 @@ async def fake_astream(msg, version, config): await runner.run_streamed("task", on_log=lambda k, c: logs.append((k, c))) assert any(k == "tool_call" for k, _ in logs) + @pytest.mark.asyncio + async def test_run_streamed_emits_heartbeat_when_stream_is_silent(self, monkeypatch): + from onemancompany.agents.base import BaseAgentRunner + from onemancompany.agents import base as base_mod + from onemancompany.core import state as state_mod + from langchain_core.messages import AIMessage + + cs = _make_cs() + emp = _make_emp("00010") + _mock_store_for_employees(monkeypatch, {"00010": emp}) + monkeypatch.setattr(state_mod, "company_state", cs) + monkeypatch.setattr(base_mod, "company_state", cs) + monkeypatch.setattr(base_mod, "event_bus", MagicMock(publish=AsyncMock())) + heartbeat_interval = 0.01 + silent_period = heartbeat_interval * 3 + monkeypatch.setattr(base_mod, "_STREAM_HEARTBEAT_INTERVAL_SEC", heartbeat_interval) + monkeypatch.setattr( + "onemancompany.core.agent_loop._current_vessel", + MagicMock(get=lambda x=None: None), + ) + + events = [{"event": "on_chat_model_end", "data": {"output": AIMessage(content="done")}}] + + async def fake_astream(msg, version, config): + await asyncio.sleep(silent_period) + for e in events: + yield e + + runner = BaseAgentRunner() + runner.employee_id = "00010" + runner.role = "Agent" + runner._agent = MagicMock() + runner._agent.astream_events = fake_astream + runner._build_prompt = lambda: "" + + logs = [] + await runner.run_streamed("task", on_log=lambda k, c: logs.append((k, c))) + + heartbeats = [c for k, c in logs if k == "heartbeat"] + assert heartbeats + assert heartbeats[0]["phase"] + assert "elapsed_seconds" in heartbeats[0] + assert "idle_seconds" in heartbeats[0] + + @pytest.mark.asyncio + async def test_run_streamed_heartbeat_reports_last_tool_call(self, monkeypatch): + from onemancompany.agents.base import BaseAgentRunner + from onemancompany.agents import base as base_mod + from onemancompany.core import state as state_mod + from langchain_core.messages import AIMessage + + cs = _make_cs() + emp = _make_emp("00010") + _mock_store_for_employees(monkeypatch, {"00010": emp}) + monkeypatch.setattr(state_mod, "company_state", cs) + monkeypatch.setattr(base_mod, "company_state", cs) + monkeypatch.setattr(base_mod, "event_bus", MagicMock(publish=AsyncMock())) + heartbeat_interval = 0.01 + silent_period = heartbeat_interval * 3 + monkeypatch.setattr(base_mod, "_STREAM_HEARTBEAT_INTERVAL_SEC", heartbeat_interval) + monkeypatch.setattr( + "onemancompany.core.agent_loop._current_vessel", + MagicMock(get=lambda x=None: None), + ) + + events = [ + { + "event": "on_chat_model_end", + "data": {"output": AIMessage(content="", tool_calls=[{"name": "search_web", "args": {"q": "x"}, "id": "tc_1"}])}, + }, + {"event": "on_tool_end", "data": {"output": "ok"}, "name": "search_web"}, + {"event": "on_chat_model_end", "data": {"output": AIMessage(content="done")}}, + ] + + async def fake_astream(msg, version, config): + yield events[0] + await asyncio.sleep(silent_period) + yield events[1] + yield events[2] + + runner = BaseAgentRunner() + runner.employee_id = "00010" + runner.role = "Agent" + runner._agent = MagicMock() + runner._agent.astream_events = fake_astream + runner._build_prompt = lambda: "" + + logs = [] + await runner.run_streamed("task", on_log=lambda k, c: logs.append((k, c))) + + heartbeats = [c for k, c in logs if k == "heartbeat"] + assert heartbeats + assert heartbeats[0]["phase"] == "tool_calling" + assert heartbeats[0]["last_tool_name"] == "search_web" + assert heartbeats[0]["last_tool_call_at"] + # --------------------------------------------------------------------------- # get_employee_tools_prompt — file content and binary file handling