Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions frontend/xterm-log.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ class XTermLog {
} else if (step.type === 'holding' || step.type === 'auto_holding') {
const content = step.content || '';
this.writeln(`${ts} ${ANSI.yellow}hold${ANSI.reset} ${this._clip(content)}`);
} else if (step.type === 'heartbeat') {
const content = (step.content || '').replace(/\n/g, ' ');
this.writeln(`${ts} ${ANSI.gray}beat${ANSI.reset} ${ANSI.dim}${this._clip(content)}${ANSI.reset}`);
} else if (step.type === 'error') {
const content = (step.content || '').replace(/\n/g, ' ');
this.writeln(`${ts} ${ANSI.red}error${ANSI.reset} ${this._clip(content)}`);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@1mancompany/onemancompany",
"version": "0.7.77",
"version": "0.7.78",
"description": "The AI Operating System for One-Person Companies",
"bin": {
"onemancompany": "bin/cli.js"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "onemancompany"
version = "0.7.77"
version = "0.7.78"
description = "A one-man company simulation with pixel art visualization and LangChain AI agents"
requires-python = ">=3.12"
dependencies = [
Expand Down
198 changes: 120 additions & 78 deletions src/onemancompany/agents/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Base agent utilities shared across all LangChain agents."""

from __future__ import annotations
from datetime import datetime
import asyncio
import contextlib
from datetime import datetime, timezone
import time
from loguru import logger
from typing import Any

Expand Down Expand Up @@ -61,6 +64,7 @@
_NO_OUTPUT = "(no output)"
_MISSING_API_KEY_SENTINEL = "missing-api-key"
_COMPANY_HOSTING = HostingMode.COMPANY.value
_STREAM_HEARTBEAT_INTERVAL_SEC = 20.0


def _extract_text(content) -> str:
Expand Down Expand Up @@ -658,87 +662,125 @@ async def run_streamed(self, task: str, on_log=None) -> str:
model_used = ""
last_tool_calls: list[str] = [] # track tool names for fallback
last_tool_results: list[str] = []
current_phase = "planning"
start_monotonic = time.monotonic()
last_activity_monotonic = start_monotonic
last_tool_name: str | None = None
last_tool_called_at: str | None = None
# Debug trace: accumulate full message objects from streaming events
debug_messages: list = []

async for event in self._agent.astream_events(
messages_input, version="v2", config={"recursion_limit": 50},
):
kind = event.get("event", "")
data = event.get("data", {})
if kind == "on_chat_model_start":
inp = data.get("input", "")
if isinstance(inp, list) and inp:
# Capture all input messages for SFT on first LLM call
if not debug_messages:
debug_messages.extend(inp)
last_msg = inp[-1]
if hasattr(last_msg, "content"):
content = last_msg.content or ""
if isinstance(content, str):
on_log("llm_input", f"[{type(last_msg).__name__}] {content}")
logger.debug("[LLM INPUT] employee={}: {}", self.employee_id, content[:3000])
elif kind == "on_chat_model_end":
output = data.get("output", None)
if output:
# Capture AI message for Debug trace
debug_messages.append(output)
# Extract token usage — try response_metadata first, then usage_metadata
meta = getattr(output, "response_metadata", {}) or {}
usage = meta.get("usage", {}) or meta.get("token_usage", {}) or {}
if usage:
total_input_tokens += usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0)
total_output_tokens += usage.get("completion_tokens", 0) or usage.get("output_tokens", 0)
# Provider-reported cost (e.g. OpenRouter includes "cost" in token_usage)
if "cost" in usage and usage["cost"]: # pragma: no cover
provider_cost = (provider_cost or 0.0) + float(usage["cost"]) # pragma: no cover
else:
# Streaming mode: usage lives in usage_metadata (requires stream_usage=True)
usage_meta = getattr(output, "usage_metadata", None)
if usage_meta and isinstance(usage_meta, dict): # pragma: no cover
total_input_tokens += usage_meta.get("input_tokens", 0) # pragma: no cover
total_output_tokens += usage_meta.get("output_tokens", 0) # pragma: no cover
async def _heartbeat_loop() -> None:
while True:
await asyncio.sleep(_STREAM_HEARTBEAT_INTERVAL_SEC)
idle_seconds = time.monotonic() - last_activity_monotonic
if idle_seconds < _STREAM_HEARTBEAT_INTERVAL_SEC:
continue
elapsed_seconds = int(time.monotonic() - start_monotonic)
heartbeat = {
"phase": current_phase,
"idle_seconds": int(idle_seconds),
"elapsed_seconds": elapsed_seconds,
"last_tool_name": last_tool_name,
"last_tool_call_at": last_tool_called_at,
"content": (
f"⏱ heartbeat phase={current_phase} "
f"idle={int(idle_seconds)}s elapsed={elapsed_seconds}s"
),
}
on_log("heartbeat", heartbeat)

heartbeat_task = asyncio.create_task(_heartbeat_loop())
try:
async for event in self._agent.astream_events(
messages_input, version="v2", config={"recursion_limit": 50},
):
kind = event.get("event", "")
data = event.get("data", {})
last_activity_monotonic = time.monotonic()
if kind == "on_chat_model_start":
current_phase = "waiting_for_llm"
inp = data.get("input", "")
if isinstance(inp, list) and inp:
# Capture all input messages for SFT on first LLM call
if not debug_messages:
debug_messages.extend(inp)
last_msg = inp[-1]
if hasattr(last_msg, "content"):
content = last_msg.content or ""
if isinstance(content, str):
on_log("llm_input", f"[{type(last_msg).__name__}] {content}")
logger.debug("[LLM INPUT] employee={}: {}", self.employee_id, content[:3000])
elif kind == "on_chat_model_end":
current_phase = "processing_response"
output = data.get("output", None)
if output:
# Capture AI message for Debug trace
debug_messages.append(output)
# Extract token usage — try response_metadata first, then usage_metadata
meta = getattr(output, "response_metadata", {}) or {}
usage = meta.get("usage", {}) or meta.get("token_usage", {}) or {}
if usage:
total_input_tokens += usage.get("prompt_tokens", 0) or usage.get("input_tokens", 0)
total_output_tokens += usage.get("completion_tokens", 0) or usage.get("output_tokens", 0)
# Provider-reported cost (e.g. OpenRouter includes "cost" in token_usage)
if "cost" in usage and usage["cost"]: # pragma: no cover
provider_cost = (provider_cost or 0.0) + float(usage["cost"]) # pragma: no cover
else:
logger.debug("[COST] on_chat_model_end: no usage data for employee={}, meta_keys={}", self.employee_id, list(meta.keys()))
if not model_used:
model_used = meta.get("model_name", "") or meta.get("model", "")

if hasattr(output, "content"):
text = _extract_text(output.content)
if text.strip():
final_content = text # track last AI output
on_log("llm_output", text)
logger.debug("[LLM OUTPUT] employee={}: {}", self.employee_id, text[:3000])
tool_calls = getattr(output, "tool_calls", None)
if tool_calls:
last_tool_calls = []
last_tool_results = []
for tc in tool_calls:
name = tc.get("name", "?")
args_dict = tc.get("args", {})
args = str(args_dict)
last_tool_calls.append(name)
on_log("tool_call", {
"tool_name": name,
"tool_args": args_dict,
"content": f"{name}({args})",
})
logger.debug("[TOOL CALL] employee={}: {}({})", self.employee_id, name, args[:1000])
elif kind == "on_tool_end":
output = data.get("output", "")
name = event.get("name", "tool")
result_str = str(output)
last_tool_results.append(f"{name} → {result_str}")
logger.debug("[TOOL RESULT] employee={}: {} → {}", self.employee_id, name, result_str[:2000])
on_log("tool_result", {
"tool_name": name,
"tool_result": result_str,
"content": f"{name} → {result_str}",
})
# Capture ToolMessage for Debug trace
raw_output = data.get("output")
if raw_output and hasattr(raw_output, "content"): # pragma: no cover
debug_messages.append(raw_output) # pragma: no cover
# Streaming mode: usage lives in usage_metadata (requires stream_usage=True)
usage_meta = getattr(output, "usage_metadata", None)
if usage_meta and isinstance(usage_meta, dict): # pragma: no cover
total_input_tokens += usage_meta.get("input_tokens", 0) # pragma: no cover
total_output_tokens += usage_meta.get("output_tokens", 0) # pragma: no cover
else:
logger.debug("[COST] on_chat_model_end: no usage data for employee={}, meta_keys={}", self.employee_id, list(meta.keys()))
if not model_used:
model_used = meta.get("model_name", "") or meta.get("model", "")

if hasattr(output, "content"):
text = _extract_text(output.content)
if text.strip():
final_content = text # track last AI output
on_log("llm_output", text)
logger.debug("[LLM OUTPUT] employee={}: {}", self.employee_id, text[:3000])
tool_calls = getattr(output, "tool_calls", None)
if tool_calls:
current_phase = "tool_calling"
last_tool_calls = []
last_tool_results = []
for tc in tool_calls:
name = tc.get("name", "?")
args_dict = tc.get("args", {})
args = str(args_dict)
last_tool_calls.append(name)
last_tool_name = name
last_tool_called_at = datetime.now(timezone.utc).isoformat()
on_log("tool_call", {
"tool_name": name,
"tool_args": args_dict,
"content": f"{name}({args})",
})
logger.debug("[TOOL CALL] employee={}: {}({})", self.employee_id, name, args[:1000])
elif kind == "on_tool_end":
current_phase = "waiting_for_llm"
output = data.get("output", "")
name = event.get("name", "tool")
result_str = str(output)
last_tool_results.append(f"{name} → {result_str}")
logger.debug("[TOOL RESULT] employee={}: {} → {}", self.employee_id, name, result_str[:2000])
on_log("tool_result", {
"tool_name": name,
"tool_result": result_str,
"content": f"{name} → {result_str}",
})
# Capture ToolMessage for Debug trace
raw_output = data.get("output")
if raw_output and hasattr(raw_output, "content"): # pragma: no cover
debug_messages.append(raw_output) # pragma: no cover
finally:
heartbeat_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await heartbeat_task

# If no text content from LLM, synthesize from last tool calls
if not final_content.strip() and last_tool_calls: # pragma: no cover
Expand Down
97 changes: 97 additions & 0 deletions tests/unit/agents/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -1280,6 +1281,102 @@ async def fake_astream(msg, version, config):
await runner.run_streamed("task", on_log=lambda k, c: logs.append((k, c)))
assert any(k == "tool_call" for k, _ in logs)

@pytest.mark.asyncio
async def test_run_streamed_emits_heartbeat_when_stream_is_silent(self, monkeypatch):
from onemancompany.agents.base import BaseAgentRunner
from onemancompany.agents import base as base_mod
from onemancompany.core import state as state_mod
from langchain_core.messages import AIMessage

cs = _make_cs()
emp = _make_emp("00010")
_mock_store_for_employees(monkeypatch, {"00010": emp})
monkeypatch.setattr(state_mod, "company_state", cs)
monkeypatch.setattr(base_mod, "company_state", cs)
monkeypatch.setattr(base_mod, "event_bus", MagicMock(publish=AsyncMock()))
heartbeat_interval = 0.01
silent_period = heartbeat_interval * 3
monkeypatch.setattr(base_mod, "_STREAM_HEARTBEAT_INTERVAL_SEC", heartbeat_interval)
monkeypatch.setattr(
"onemancompany.core.agent_loop._current_vessel",
MagicMock(get=lambda x=None: None),
)

events = [{"event": "on_chat_model_end", "data": {"output": AIMessage(content="done")}}]

async def fake_astream(msg, version, config):
await asyncio.sleep(silent_period)
for e in events:
yield e

runner = BaseAgentRunner()
runner.employee_id = "00010"
runner.role = "Agent"
runner._agent = MagicMock()
runner._agent.astream_events = fake_astream
runner._build_prompt = lambda: ""

logs = []
await runner.run_streamed("task", on_log=lambda k, c: logs.append((k, c)))

heartbeats = [c for k, c in logs if k == "heartbeat"]
assert heartbeats
assert heartbeats[0]["phase"]
assert "elapsed_seconds" in heartbeats[0]
assert "idle_seconds" in heartbeats[0]

@pytest.mark.asyncio
async def test_run_streamed_heartbeat_reports_last_tool_call(self, monkeypatch):
from onemancompany.agents.base import BaseAgentRunner
from onemancompany.agents import base as base_mod
from onemancompany.core import state as state_mod
from langchain_core.messages import AIMessage

cs = _make_cs()
emp = _make_emp("00010")
_mock_store_for_employees(monkeypatch, {"00010": emp})
monkeypatch.setattr(state_mod, "company_state", cs)
monkeypatch.setattr(base_mod, "company_state", cs)
monkeypatch.setattr(base_mod, "event_bus", MagicMock(publish=AsyncMock()))
heartbeat_interval = 0.01
silent_period = heartbeat_interval * 3
monkeypatch.setattr(base_mod, "_STREAM_HEARTBEAT_INTERVAL_SEC", heartbeat_interval)
monkeypatch.setattr(
"onemancompany.core.agent_loop._current_vessel",
MagicMock(get=lambda x=None: None),
)

events = [
{
"event": "on_chat_model_end",
"data": {"output": AIMessage(content="", tool_calls=[{"name": "search_web", "args": {"q": "x"}, "id": "tc_1"}])},
},
{"event": "on_tool_end", "data": {"output": "ok"}, "name": "search_web"},
{"event": "on_chat_model_end", "data": {"output": AIMessage(content="done")}},
]

async def fake_astream(msg, version, config):
yield events[0]
await asyncio.sleep(silent_period)
yield events[1]
yield events[2]

runner = BaseAgentRunner()
runner.employee_id = "00010"
runner.role = "Agent"
runner._agent = MagicMock()
runner._agent.astream_events = fake_astream
runner._build_prompt = lambda: ""

logs = []
await runner.run_streamed("task", on_log=lambda k, c: logs.append((k, c)))

heartbeats = [c for k, c in logs if k == "heartbeat"]
assert heartbeats
assert heartbeats[0]["phase"] == "tool_calling"
assert heartbeats[0]["last_tool_name"] == "search_web"
assert heartbeats[0]["last_tool_call_at"]


# ---------------------------------------------------------------------------
# get_employee_tools_prompt — file content and binary file handling
Expand Down