diff --git a/code_puppy/agents/base_agent.py b/code_puppy/agents/base_agent.py index 86ff147d..7ae0ba8b 100644 --- a/code_puppy/agents/base_agent.py +++ b/code_puppy/agents/base_agent.py @@ -157,6 +157,11 @@ def __init__(self): # This is populated after the first successful run when MCP tools are retrieved self._mcp_tool_definitions_cache: List[Dict[str, Any]] = [] + @property + def code_generation_agent(self): + """Public accessor for the underlying pydantic-ai Agent.""" + return self._code_generation_agent + def get_identity(self) -> str: """Get a unique identity for this agent instance. diff --git a/code_puppy/cli_runner.py b/code_puppy/cli_runner.py index 9b1d0a5e..05a6f76b 100644 --- a/code_puppy/cli_runner.py +++ b/code_puppy/cli_runner.py @@ -89,8 +89,69 @@ async def main(): parser.add_argument( "command", nargs="*", help="Run a single command (deprecated, use -p instead)" ) + parser.add_argument( + "--acp", + action="store_true", + help="Start ACP Gateway (Agent Communication Protocol) server", + ) + parser.add_argument( + "--acp-transport", + choices=["http", "stdio"], + default="http", + help="ACP transport: http (port 9001) or stdio (stdin/stdout). Default: http", + ) args = parser.parse_args() + # ACP stdio mode: early exit — skip banner, version check, renderers. + # stdio needs clean stdout (only JSON-RPC), all logs go to stderr. + if args.acp and args.acp_transport == 'stdio': + import logging + os.environ["ACP_ENABLED"] = "true" + os.environ["ACP_TRANSPORT"] = "stdio" + logging.basicConfig( + stream=sys.stderr, + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + ) + + # Initialize DBOS — pydantic-ai's .run() goes through DBOS + # durable execution and will raise DBOSException if not launched. + if get_use_dbos(): + dbos_app_version = os.environ.get( + "DBOS_APP_VERSION", + f"{__version__}-{int(time.time() * 1000)}", + ) + dbos_config: DBOSConfig = { + "name": "dbos-code-puppy", + "system_database_url": DBOS_DATABASE_URL, + "run_admin_server": False, + "conductor_key": os.environ.get("DBOS_CONDUCTOR_KEY"), + "log_level": os.environ.get("DBOS_LOG_LEVEL", "ERROR"), + "application_version": dbos_app_version, + } + try: + DBOS(config=dbos_config) + DBOS.launch() + except Exception as e: + logging.getLogger(__name__).error("DBOS init failed: %s", e) + return + + # Plugins already loaded at module level — no need to reload here. + # Do NOT call callbacks.on_startup() because that would start a + # SECOND stdio server in a background thread. + try: + from code_puppy.plugins.acp_gateway.agent import run_code_puppy_agent + await run_code_puppy_agent() + except KeyboardInterrupt: + pass + finally: + if get_use_dbos(): + try: + DBOS.destroy() + except Exception: + pass + return + from code_puppy.messaging import ( RichConsoleRenderer, SynchronousInteractiveRenderer, @@ -320,6 +381,11 @@ def _uvx_protective_sigint_handler(_sig, _frame): initial_command = None prompt_only_mode = False + if args.acp: + os.environ["ACP_ENABLED"] = "true" + os.environ["ACP_TRANSPORT"] = args.acp_transport + # http mode: falls through to interactive mode with ACP enabled in background + if args.prompt: initial_command = args.prompt prompt_only_mode = True diff --git a/code_puppy/plugins/acp_gateway/__init__.py b/code_puppy/plugins/acp_gateway/__init__.py new file mode 100644 index 00000000..1a16442e --- /dev/null +++ b/code_puppy/plugins/acp_gateway/__init__.py @@ -0,0 +1,22 @@ +"""ACP Gateway Plugin. + +Exposes Code Puppy as an ACP (Agent Client Protocol) agent using the +official ``agent-client-protocol`` Python SDK. + +The SDK handles all transport concerns (stdio JSON-RPC, session +lifecycle, content blocks). This plugin provides the bridge between +ACP and Code Puppy's pydantic-ai agent system. + +The plugin gracefully degrades — if ``agent-client-protocol`` is not +installed, Code Puppy starts normally with a warning log. +""" + +__version__ = "0.2.0" +__description__ = "ACP Gateway plugin for Code Puppy" + +from code_puppy.plugins.acp_gateway.agent import CodePuppyAgent, run_code_puppy_agent + +__all__ = [ + "CodePuppyAgent", + "run_code_puppy_agent", +] \ No newline at end of file diff --git a/code_puppy/plugins/acp_gateway/__main__.py b/code_puppy/plugins/acp_gateway/__main__.py new file mode 100644 index 00000000..9a05adb3 --- /dev/null +++ b/code_puppy/plugins/acp_gateway/__main__.py @@ -0,0 +1,21 @@ +"""Allow running the ACP agent as: python -m code_puppy.plugins.acp_gateway + +All transport concerns (stdio JSON-RPC) are handled by the ACP SDK. +We just start the CodePuppyAgent and let ``run_agent()`` do the rest. +""" + +import asyncio +import logging +import sys + +from code_puppy.plugins.acp_gateway.agent import run_code_puppy_agent + +# Redirect logging to stderr so stdout stays clean for JSON-RPC +logging.basicConfig( + stream=sys.stderr, + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", +) + +if __name__ == "__main__": + asyncio.run(run_code_puppy_agent()) \ No newline at end of file diff --git a/code_puppy/plugins/acp_gateway/agent.py b/code_puppy/plugins/acp_gateway/agent.py new file mode 100644 index 00000000..748f5d5b --- /dev/null +++ b/code_puppy/plugins/acp_gateway/agent.py @@ -0,0 +1,569 @@ +"""ACP Agent implementation for Code Puppy. + +Bridges Code Puppy's pydantic-ai agent system to the Agent Client Protocol +using the official ``agent-client-protocol`` Python SDK. + +This single module replaces the previous 13+ file infrastructure +(stdio_server, acp_server, session_store, event_store, event_types, +tool_approvals, hitl_bridge, filesystem_ops, terminal_ops, +message_utils, run_engine, uvicorn_compat, commands) by leveraging +what the SDK already provides. + +Usage: + from code_puppy.plugins.acp_gateway.agent import CodePuppyAgent + await run_agent(CodePuppyAgent()) +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import re +from typing import Any, Dict, Optional +from uuid import uuid4 + +from acp import ( + Agent, + InitializeResponse, + NewSessionResponse, + PromptResponse, + run_agent, + text_block, + update_agent_message, + update_agent_thought_text, + start_tool_call, + update_tool_call, + tool_content, +) +from acp.interfaces import Client +from acp.schema import ( + AudioContentBlock, + ClientCapabilities, + EmbeddedResourceContentBlock, + HttpMcpServer, + ImageContentBlock, + Implementation, + McpServerStdio, + ResourceContentBlock, + SseMcpServer, + TextContentBlock, +) + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +DEFAULT_AGENT_NAME = os.getenv("ACP_AGENT_NAME", "code-puppy") + + +# --------------------------------------------------------------------------- +# Session state (lightweight — SDK handles transport/protocol) +# --------------------------------------------------------------------------- + +class _SessionState: + """Minimal per-session state for multi-turn conversations. + + Tracks pydantic-ai message history so successive prompts within + the same ACP session share conversational context. + """ + + __slots__ = ("session_id", "agent_name", "message_history") + + def __init__(self, session_id: str, agent_name: str = DEFAULT_AGENT_NAME) -> None: + self.session_id = session_id + self.agent_name = agent_name + self.message_history: list = [] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _extract_text( + prompt: list[ + TextContentBlock + | ImageContentBlock + | AudioContentBlock + | ResourceContentBlock + | EmbeddedResourceContentBlock + ], +) -> str: + """Pull plain text out of ACP content blocks.""" + parts: list[str] = [] + for block in prompt: + text = ( + block.get("text", "") + if isinstance(block, dict) + else getattr(block, "text", "") + ) + if text: + parts.append(str(text)) + return "\n".join(parts).strip() + + +def _safe_serialize_args(args: Any) -> dict: + """Safely serialize tool args for logging / event payloads.""" + if args is None: + return {} + if isinstance(args, dict): + return { + k: (str(v)[:200] if isinstance(v, str) and len(str(v)) > 200 else v) + for k, v in args.items() + } + try: + return {"raw": str(args)[:500]} + except Exception: + return {} + + +def _extract_plan_steps(thinking_content: str) -> list[dict]: + """Try to extract structured plan steps from agent thinking.""" + steps: list[dict] = [] + patterns = [ + r"(?:^|\n)\s*(\d+)[.)]+\s+(.+)", + r"(?:^|\n)\s*[Ss]tep\s+(\d+)[:.]+\s*(.+)", + r"(?:^|\n)\s*[-*]\s+(.+)", + ] + for pattern in patterns: + matches = re.findall(pattern, thinking_content) + if len(matches) >= 2: + for i, match in enumerate(matches): + if isinstance(match, tuple): + step_num = match[0] if match[0].isdigit() else str(i + 1) + desc = match[-1].strip() + else: + step_num = str(i + 1) + desc = match.strip() + steps.append({"step": int(step_num), "description": desc[:200]}) + break + return steps + + +# --------------------------------------------------------------------------- +# CodePuppyAgent — the only class you need +# --------------------------------------------------------------------------- + +class CodePuppyAgent(Agent): + """ACP Agent that bridges to Code Puppy's pydantic-ai agent system. + + Implements the full Agent protocol. The SDK handles all transport + concerns (stdio JSON-RPC, session lifecycle, content blocks, etc.). + This class only contains the *business logic*: loading a Code Puppy + agent, running a prompt through pydantic-ai, and streaming results + back through the SDK's ``Client`` interface. + """ + + def __init__(self, default_agent: str = DEFAULT_AGENT_NAME) -> None: + self._conn: Optional[Client] = None + self._default_agent = default_agent + self._sessions: Dict[str, _SessionState] = {} + # Track running tasks for cancellation + self._running_tasks: Dict[str, asyncio.Task] = {} + + # ------------------------------------------------------------------ + # ACP lifecycle + # ------------------------------------------------------------------ + + def on_connect(self, conn: Client) -> None: + """Called by the SDK when the transport is established.""" + self._conn = conn + logger.info("ACP connection established") + + async def initialize( + self, + protocol_version: int, + client_capabilities: ClientCapabilities | None = None, + client_info: Implementation | None = None, + **kwargs: Any, + ) -> InitializeResponse: + """Handshake — return supported protocol version.""" + logger.info( + "ACP initialize: protocol_version=%d, client=%s", + protocol_version, + client_info, + ) + return InitializeResponse(protocol_version=protocol_version) + + async def new_session( + self, + cwd: str, + mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None, + **kwargs: Any, + ) -> NewSessionResponse: + """Create a new conversation session.""" + session_id = uuid4().hex + self._sessions[session_id] = _SessionState( + session_id=session_id, + agent_name=self._default_agent, + ) + logger.info("New session: %s (cwd=%s)", session_id, cwd) + return NewSessionResponse(session_id=session_id) + + async def prompt( + self, + prompt: list[ + TextContentBlock + | ImageContentBlock + | AudioContentBlock + | ResourceContentBlock + | EmbeddedResourceContentBlock + ], + session_id: str, + **kwargs: Any, + ) -> PromptResponse: + """Run a Code Puppy agent on the user's prompt. + + Extracts text from the ACP content blocks, loads the + appropriate pydantic-ai agent, executes the prompt, and + streams results back via ``session_update``. + """ + text = _extract_text(prompt) + if not text: + await self._send_text(session_id, "No prompt text found in the request.") + return PromptResponse(stop_reason="end_turn") + + session = self._sessions.get(session_id) + if session is None: + # Auto-create session for convenience + session = _SessionState(session_id=session_id, agent_name=self._default_agent) + self._sessions[session_id] = session + + logger.info( + "[%s] prompt (agent=%s): %s", + session_id, + session.agent_name, + text[:120], + ) + + try: + result_text, tool_events = await self._run_agent(session, text) + + # Stream tool events as thoughts for visibility + await self._stream_tool_events(session_id, tool_events) + + # Send the final agent response + await self._send_text(session_id, result_text) + + return PromptResponse(stop_reason="end_turn") + + except asyncio.CancelledError: + logger.info("[%s] prompt cancelled", session_id) + return PromptResponse(stop_reason="cancelled") + + except Exception: + logger.exception("[%s] prompt failed", session_id) + await self._send_text( + session_id, + f"[error] Agent '{session.agent_name}' encountered an error. " + f"Check server logs for details.", + ) + return PromptResponse(stop_reason="end_turn") + + async def cancel(self, session_id: str, **kwargs: Any) -> None: + """Cancel a running prompt.""" + task = self._running_tasks.pop(session_id, None) + if task is not None and not task.done(): + task.cancel() + logger.info("[%s] cancellation requested", session_id) + + # ------------------------------------------------------------------ + # Extension methods (custom Code Puppy features) + # ------------------------------------------------------------------ + + async def ext_method(self, method: str, params: dict[str, Any]) -> dict[str, Any]: + """Handle custom extension methods. + + Supported extensions: + - ``x/list-agents``: List available Code Puppy agents. + - ``x/set-agent``: Switch the agent for a session. + - ``x/agent-info``: Get metadata for a specific agent. + """ + if method == "x/list-agents": + return await self._ext_list_agents() + elif method == "x/set-agent": + return self._ext_set_agent(params) + elif method == "x/agent-info": + return await self._ext_agent_info(params) + else: + return {"error": f"Unknown extension method: {method}"} + + async def ext_notification(self, method: str, params: dict[str, Any]) -> None: + """Handle custom extension notifications (fire-and-forget).""" + logger.debug("Extension notification: %s %s", method, params) + + # ------------------------------------------------------------------ + # Core execution bridge to pydantic-ai + # ------------------------------------------------------------------ + + async def _run_agent( + self, + session: _SessionState, + prompt_text: str, + ) -> tuple[str, list[dict]]: + """Load a Code Puppy agent and execute the prompt through pydantic-ai. + + Headless execution — no signal handlers, no spinners, no keyboard + listeners. Restores and persists message history for multi-turn + conversations within the ACP session. + + Returns: + ``(result_text, tool_events)`` + """ + from code_puppy.agents import load_agent + + agent = load_agent(session.agent_name) + + # Build (or reuse) the underlying pydantic-ai Agent + pydantic_agent = ( + agent.code_generation_agent or agent.reload_code_generation_agent() + ) + + # Restore session history (empty list on first turn) + agent.set_message_history(session.message_history) + history = agent.get_message_history() + + # Prepend system prompt on first turn only, mirroring + # BaseAgent.run_with_mcp behaviour. + if len(history) == 0: + from code_puppy.model_utils import prepare_prompt_for_model + + system_prompt = agent.get_full_system_prompt() + puppy_rules = agent.load_puppy_rules() + if puppy_rules: + system_prompt += f"\n{puppy_rules}" + + prepared = prepare_prompt_for_model( + model_name=agent.get_model_name() or "default", + system_prompt=system_prompt, + user_prompt=prompt_text, + prepend_system_to_user=True, + ) + prompt_text = prepared.user_prompt + + # Run the agent (headless) + result = await pydantic_agent.run( + prompt_text, + message_history=history, + ) + + # Persist updated conversation for future turns + if hasattr(result, "all_messages"): + session.message_history = list(result.all_messages()) + else: + session.message_history = agent.get_message_history() + + logger.debug( + "[%s] session %s now has %d messages", + session.agent_name, + session.session_id, + len(session.message_history), + ) + + text = self._extract_result_text(result) + tool_events = self._extract_tool_events(result) + return text, tool_events + + # ------------------------------------------------------------------ + # Result extraction + # ------------------------------------------------------------------ + + @staticmethod + def _extract_result_text(result: Any) -> str: + """Pull text from a pydantic-ai RunResult.""" + if result is None: + return "" + # pydantic-ai >= 0.1 uses .output, older uses .data + output = getattr(result, "output", None) + if output is not None: + return str(output) + data = getattr(result, "data", None) + if data is not None: + return str(data) + return str(result) + + @staticmethod + def _extract_tool_events(result: Any) -> list[dict]: + """Extract tool-call events from a pydantic-ai result for ACP visibility.""" + events: list[dict] = [] + try: + from pydantic_ai.messages import ( + ModelRequest, + ModelResponse, + ThinkingPart, + ToolCallPart, + ToolReturnPart, + ) + + all_messages_fn = getattr(result, "all_messages", None) + if all_messages_fn is None: + return events + + for msg in all_messages_fn(): + if isinstance(msg, ModelResponse): + for part in msg.parts: + if isinstance(part, ToolCallPart): + events.append({ + "type": "tool_call", + "tool_name": part.tool_name, + "tool_call_id": getattr(part, "tool_call_id", None), + "args": _safe_serialize_args(part.args), + }) + elif isinstance(part, ThinkingPart): + content = part.content or "" + events.append({ + "type": "thinking", + "content": content[:500], + }) + plan_steps = _extract_plan_steps(content) + if plan_steps: + events.append({ + "type": "plan", + "steps": plan_steps, + }) + elif isinstance(msg, ModelRequest): + for part in msg.parts: + if isinstance(part, ToolReturnPart): + events.append({ + "type": "tool_result", + "tool_call_id": getattr(part, "tool_call_id", None), + "tool_name": getattr(part, "tool_name", None), + "content": str(part.content)[:1000] if part.content else "", + }) + except ImportError: + logger.debug("pydantic_ai messages not available for tool extraction") + except Exception: + logger.exception("Error extracting tool events") + return events + + # ------------------------------------------------------------------ + # Streaming helpers (send updates back to the ACP client) + # ------------------------------------------------------------------ + + async def _send_text(self, session_id: str, text: str) -> None: + """Send a text message chunk to the client.""" + if self._conn is None: + logger.warning("No connection — cannot send text") + return + chunk = update_agent_message(text_block(text)) + await self._conn.session_update(session_id=session_id, update=chunk) + + async def _send_thought(self, session_id: str, text: str) -> None: + """Send a thought/reasoning chunk to the client.""" + if self._conn is None: + return + chunk = update_agent_thought_text(text) + await self._conn.session_update(session_id=session_id, update=chunk) + + async def _stream_tool_events(self, session_id: str, events: list[dict]) -> None: + """Stream tool events as thoughts so the client sees agent activity.""" + if self._conn is None or not events: + return + + for event in events: + event_type = event.get("type", "") + + if event_type == "thinking": + content = event.get("content", "") + if content: + await self._send_thought(session_id, content) + + elif event_type == "tool_call": + tool_name = event.get("tool_name", "unknown") + args = event.get("args", {}) + try: + chunk = start_tool_call(tool_name) + await self._conn.session_update( + session_id=session_id, update=chunk + ) + except Exception: + # Fallback: send as thought if start_tool_call fails + await self._send_thought( + session_id, + f"[tool] {tool_name}({args})", + ) + + elif event_type == "tool_result": + tool_name = event.get("tool_name", "unknown") + content = event.get("content", "") + try: + chunk = update_tool_call(tool_content(content[:500])) + await self._conn.session_update( + session_id=session_id, update=chunk + ) + except Exception: + # Fallback: send as thought + await self._send_thought( + session_id, + f"[result] {tool_name}: {content[:200]}", + ) + + elif event_type == "plan": + steps = event.get("steps", []) + if steps: + plan_text = "\n".join( + f" {s['step']}. {s['description']}" for s in steps + ) + await self._send_thought(session_id, f"Plan:\n{plan_text}") + + # ------------------------------------------------------------------ + # Extension method implementations + # ------------------------------------------------------------------ + + async def _ext_list_agents(self) -> dict[str, Any]: + """List all available Code Puppy agents.""" + from code_puppy.plugins.acp_gateway.agent_adapter import discover_agents + + agents = await discover_agents() + return { + "agents": [ + { + "name": a.name, + "display_name": a.display_name, + "description": a.description, + } + for a in agents + ] + } + + def _ext_set_agent(self, params: dict[str, Any]) -> dict[str, Any]: + """Switch the agent for a given session.""" + session_id = params.get("session_id", "") + agent_name = params.get("agent_name", "") + + if not session_id or not agent_name: + return {"error": "session_id and agent_name are required"} + + session = self._sessions.get(session_id) + if session is None: + return {"error": f"Unknown session: {session_id}"} + + session.agent_name = agent_name + logger.info("[%s] agent switched to '%s'", session_id, agent_name) + return {"status": "ok", "agent_name": agent_name} + + async def _ext_agent_info(self, params: dict[str, Any]) -> dict[str, Any]: + """Get metadata for a specific agent.""" + from code_puppy.plugins.acp_gateway.agent_adapter import build_agent_metadata + + agent_name = params.get("agent_name", self._default_agent) + metadata = build_agent_metadata(agent_name) + if metadata is None: + return {"error": f"Agent not found: {agent_name}"} + return metadata + + +# --------------------------------------------------------------------------- +# Convenience entry point +# --------------------------------------------------------------------------- + +async def run_code_puppy_agent(agent_name: str = DEFAULT_AGENT_NAME) -> None: + """Start the Code Puppy ACP agent over stdio. + + This is the primary entry point. The SDK handles the entire + stdio JSON-RPC transport — we just provide the Agent implementation. + """ + logger.info("Starting Code Puppy ACP agent (default_agent=%s)", agent_name) + await run_agent(CodePuppyAgent(default_agent=agent_name)) \ No newline at end of file diff --git a/code_puppy/plugins/acp_gateway/agent_adapter.py b/code_puppy/plugins/acp_gateway/agent_adapter.py new file mode 100644 index 00000000..e9483c98 --- /dev/null +++ b/code_puppy/plugins/acp_gateway/agent_adapter.py @@ -0,0 +1,105 @@ +"""Agent adapter — bridges Code Puppy's agent system to ACP. + +Provides discovery and metadata helpers so the ACP server can +dynamically register every available Code Puppy agent without +hardcoding names or descriptions. +""" + +import asyncio +import logging +from dataclasses import dataclass +from typing import Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Maximum retry attempts when registry is being modified concurrently +_MAX_REGISTRY_RETRIES = 3 + + +@dataclass(frozen=True) +class AgentInfo: + """Immutable snapshot of a Code Puppy agent's public metadata.""" + + name: str + display_name: str + description: str + + +async def discover_agents() -> List[AgentInfo]: + """Discover all available Code Puppy agents. + + Merges data from ``get_available_agents()`` (name → display_name) + and ``get_agent_descriptions()`` (name → description) into a flat + list of :class:`AgentInfo` records. + + Agents that fail to load are logged and skipped — one broken agent + must never take down the whole ACP server. + """ + try: + from code_puppy.agents import get_available_agents, get_agent_descriptions + + # Retry if registry is being modified concurrently (e.g. plugin loading) + for attempt in range(_MAX_REGISTRY_RETRIES): + try: + agents_map: Dict[str, str] = get_available_agents() + descriptions_map: Dict[str, str] = get_agent_descriptions() + break + except RuntimeError: + if attempt == _MAX_REGISTRY_RETRIES - 1: + raise + await asyncio.sleep(0.1) + except Exception: + logger.exception("Failed to query Code Puppy agent registry") + return [] + + agents: List[AgentInfo] = [] + for name, display_name in agents_map.items(): + description = descriptions_map.get(name, "No description available.") + agents.append( + AgentInfo( + name=name, + display_name=display_name, + description=description, + ) + ) + + logger.info("Discovered %d Code Puppy agent(s) for ACP", len(agents)) + return agents + + +def build_agent_metadata(agent_name: str) -> Optional[dict]: + """Return ACP-compatible metadata for a single agent. + + This is useful for enriching ACP responses or building + agent-card–style payloads later. + + Args: + agent_name: The internal Code Puppy agent name. + + Returns: + A dict with ``name``, ``display_name``, ``description``, + and ``version`` keys. + """ + try: + from code_puppy.agents import get_available_agents, get_agent_descriptions + + available = get_available_agents() + if agent_name not in available: + return None + display_name = available.get(agent_name, agent_name) + description = get_agent_descriptions().get( + agent_name, "No description available." + ) + except Exception: + logger.exception( + "Failed to build metadata for agent '%s'", agent_name + ) + display_name = agent_name + description = "No description available." + + return { + "name": agent_name, + "display_name": display_name, + "description": description, + "version": "0.1.0", + } diff --git a/code_puppy/plugins/acp_gateway/config.py b/code_puppy/plugins/acp_gateway/config.py new file mode 100644 index 00000000..62ce8091 --- /dev/null +++ b/code_puppy/plugins/acp_gateway/config.py @@ -0,0 +1,34 @@ +"""ACP Gateway configuration. + +All settings are read from environment variables with sensible defaults. +""" + +import os +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ACPConfig: + """Immutable configuration for the ACP Gateway.""" + + enabled: bool + transport: str # "http" or "stdio" + host: str + port: int + + @classmethod + def from_env(cls) -> "ACPConfig": + """Build config from environment variables. + + Environment variables: + ACP_ENABLED: "true" / "false" (default: "true") + ACP_TRANSPORT: "http" / "stdio" (default: "http") + ACP_HOST: bind host (default: "0.0.0.0") + ACP_PORT: bind port (default: 9001) + """ + return cls( + enabled=os.getenv("ACP_ENABLED", "true").lower() in ("true", "1", "yes"), + transport=os.getenv("ACP_TRANSPORT", "http").lower(), + host=os.getenv("ACP_HOST", "0.0.0.0"), + port=int(os.getenv("ACP_PORT", "9001")), + ) diff --git a/code_puppy/plugins/acp_gateway/register_callbacks.py b/code_puppy/plugins/acp_gateway/register_callbacks.py new file mode 100644 index 00000000..9179fe93 --- /dev/null +++ b/code_puppy/plugins/acp_gateway/register_callbacks.py @@ -0,0 +1,97 @@ +"""ACP Gateway plugin callbacks. + +Starts the ACP agent during Code Puppy startup using the official +``agent-client-protocol`` SDK. The SDK handles all transport concerns +(stdio JSON-RPC) — we just provide the CodePuppyAgent implementation. + +Supports two modes: + - stdio: ACP agent over stdin/stdout (default for subprocess orchestration) + - http: Reserved for future use (not yet implemented with new SDK) +""" + +import asyncio +import logging +import threading + +from code_puppy.callbacks import register_callback + +logger = logging.getLogger(__name__) + +# Background thread reference for clean shutdown +_acp_thread: threading.Thread | None = None +_acp_shutdown_event: threading.Event | None = None + + +async def _start_stdio() -> None: + """Start the ACP stdio agent in a background thread. + + The SDK's ``run_agent()`` blocks on stdin, so we run it in a + daemon thread to avoid blocking Code Puppy's main loop. + """ + global _acp_thread + + def _run() -> None: + from code_puppy.plugins.acp_gateway.agent import run_code_puppy_agent + + asyncio.run(run_code_puppy_agent()) + + _acp_thread = threading.Thread( + target=_run, + name="acp-gateway-stdio", + daemon=True, + ) + _acp_thread.start() + + logger.info("\U0001f436 ACP Gateway (stdio) started \u2014 reading from stdin") + + +async def _on_startup() -> None: + """Start the ACP agent based on configured transport.""" + from code_puppy.plugins.acp_gateway.config import ACPConfig + + config = ACPConfig.from_env() + + if not config.enabled: + logger.info("ACP Gateway is disabled via ACP_ENABLED=false") + return + + try: + import acp # noqa: F401 + except ImportError: + logger.warning( + "agent-client-protocol is not installed \u2014 ACP Gateway disabled. " + "Install it with: pip install agent-client-protocol" + ) + return + + try: + if config.transport == "stdio": + await _start_stdio() + else: + # HTTP transport not yet implemented with the new SDK. + # For now, fall back to stdio with a warning. + logger.warning( + "HTTP transport not yet implemented with agent-client-protocol SDK. " + "Falling back to stdio transport." + ) + await _start_stdio() + except Exception: + logger.exception("Failed to start ACP Gateway (%s)", config.transport) + + +async def _on_shutdown() -> None: + """Stop the ACP background thread.""" + global _acp_thread, _acp_shutdown_event + + if _acp_shutdown_event is not None: + logger.info("Shutting down ACP Gateway...") + _acp_shutdown_event.set() + + if _acp_thread is not None and _acp_thread.is_alive(): + _acp_thread.join(timeout=5.0) + logger.info("ACP Gateway stopped") + + +# ---------- Register with Code Puppy's callback system -------------------- +register_callback("startup", _on_startup) +register_callback("shutdown", _on_shutdown) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 08f2ed32..21a51f2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "websockets>=12.0", "termflow-md>=0.1.8", "Pillow>=10.0.0", + "agent-client-protocol>=0.8.1", "anthropic==0.79.0" ] dev-dependencies = [ diff --git a/tests/plugins/test_acp_agent.py b/tests/plugins/test_acp_agent.py new file mode 100644 index 00000000..7e9e277c --- /dev/null +++ b/tests/plugins/test_acp_agent.py @@ -0,0 +1,640 @@ +"""Tests for the CodePuppyAgent — ACP SDK-based agent implementation. + +Covers: +- Agent lifecycle (on_connect, initialize, new_session) +- Prompt dispatch and text extraction +- Session management and multi-turn history +- Cancellation +- Extension methods (x/list-agents, x/set-agent, x/agent-info) +- Error handling +- Tool event extraction and streaming + +All Code Puppy internals are mocked — no LLM calls are made. +""" + +import asyncio +import sys +from types import ModuleType +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from code_puppy.plugins.acp_gateway.agent import ( + CodePuppyAgent, + _SessionState, + _extract_text, + _extract_plan_steps, + _safe_serialize_args, +) + + +# --------------------------------------------------------------------------- +# Fixtures & helpers +# --------------------------------------------------------------------------- + + +def _make_mock_client() -> MagicMock: + """Create a mock ACP Client with async methods.""" + client = MagicMock() + client.session_update = AsyncMock() + client.request_permission = AsyncMock() + client.read_text_file = AsyncMock() + client.write_text_file = AsyncMock() + client.create_terminal = AsyncMock() + return client + + +def _make_text_block(text: str) -> dict: + """Create a minimal ACP TextContentBlock dict.""" + return {"type": "text", "text": text} + + +def _make_image_block(url: str = "data:image/png;base64,abc") -> dict: + """Create a minimal ACP ImageContentBlock dict.""" + return {"type": "image", "image": url} + + +def _make_mock_result(output: str = "Hello from Code Puppy!", messages=None): + """Create a mock pydantic-ai RunResult.""" + result = MagicMock() + result.output = output + result.data = output # fallback for older pydantic-ai + result.all_messages = MagicMock(return_value=messages or []) + return result + + +@pytest.fixture +def agent(): + """Fresh CodePuppyAgent instance.""" + return CodePuppyAgent(default_agent="code-puppy") + + +@pytest.fixture +def connected_agent(): + """CodePuppyAgent with a mock client connected.""" + a = CodePuppyAgent(default_agent="code-puppy") + client = _make_mock_client() + a.on_connect(client) + return a, client + + +# --------------------------------------------------------------------------- +# Text extraction +# --------------------------------------------------------------------------- + + +class TestExtractText: + """Test the _extract_text helper.""" + + def test_single_text_block(self): + blocks = [_make_text_block("hello world")] + assert _extract_text(blocks) == "hello world" + + def test_multiple_text_blocks(self): + blocks = [_make_text_block("hello"), _make_text_block("world")] + assert _extract_text(blocks) == "hello\nworld" + + def test_empty_blocks(self): + assert _extract_text([]) == "" + + def test_non_text_blocks_ignored(self): + blocks = [_make_image_block(), _make_text_block("actual text")] + assert _extract_text(blocks) == "actual text" + + def test_dict_blocks(self): + blocks = [{"text": "from dict"}] + assert _extract_text(blocks) == "from dict" + + def test_mixed_empty_and_text(self): + blocks = [{"text": ""}, _make_text_block("real")] + assert _extract_text(blocks) == "real" + + def test_strips_whitespace(self): + blocks = [_make_text_block(" hello ")] + assert _extract_text(blocks) == "hello" + + +# --------------------------------------------------------------------------- +# Helper utilities +# --------------------------------------------------------------------------- + + +class TestSafeSerializeArgs: + """Test _safe_serialize_args.""" + + def test_none_returns_empty(self): + assert _safe_serialize_args(None) == {} + + def test_dict_passthrough(self): + args = {"key": "value", "num": 42} + result = _safe_serialize_args(args) + assert result == {"key": "value", "num": 42} + + def test_long_string_truncated(self): + args = {"big": "x" * 500} + result = _safe_serialize_args(args) + assert len(result["big"]) == 200 + + def test_non_dict_wrapped(self): + result = _safe_serialize_args("raw string") + assert "raw" in result + + +class TestExtractPlanSteps: + """Test _extract_plan_steps.""" + + def test_numbered_list(self): + content = "1. First step\n2. Second step\n3. Third step" + steps = _extract_plan_steps(content) + assert len(steps) == 3 + assert steps[0]["step"] == 1 + assert steps[0]["description"] == "First step" + + def test_bullet_list(self): + content = "- Do this\n- Do that\n- Finish up" + steps = _extract_plan_steps(content) + assert len(steps) == 3 + + def test_single_item_not_a_plan(self): + content = "1. Only one step" + steps = _extract_plan_steps(content) + assert len(steps) == 0 + + def test_empty_content(self): + assert _extract_plan_steps("") == [] + + def test_no_pattern_match(self): + assert _extract_plan_steps("just some random text") == [] + + +# --------------------------------------------------------------------------- +# Agent lifecycle +# --------------------------------------------------------------------------- + + +class TestAgentLifecycle: + """Test on_connect and initialize.""" + + def test_on_connect_stores_client(self, agent): + client = _make_mock_client() + agent.on_connect(client) + assert agent._conn is client + + @pytest.mark.asyncio + async def test_initialize_returns_protocol_version(self, agent): + resp = await agent.initialize(protocol_version=1) + assert resp.protocol_version == 1 + + @pytest.mark.asyncio + async def test_initialize_with_client_info(self, agent): + resp = await agent.initialize( + protocol_version=2, + client_info=MagicMock(name="TestClient"), + ) + assert resp.protocol_version == 2 + + +# --------------------------------------------------------------------------- +# Session management +# --------------------------------------------------------------------------- + + +class TestSessionManagement: + """Test new_session and session state.""" + + @pytest.mark.asyncio + async def test_new_session_returns_id(self, agent): + resp = await agent.new_session(cwd="/tmp") + assert resp.session_id is not None + assert len(resp.session_id) > 0 + + @pytest.mark.asyncio + async def test_new_session_creates_state(self, agent): + resp = await agent.new_session(cwd="/tmp") + sid = resp.session_id + assert sid in agent._sessions + assert agent._sessions[sid].agent_name == "code-puppy" + + @pytest.mark.asyncio + async def test_multiple_sessions_unique(self, agent): + r1 = await agent.new_session(cwd="/a") + r2 = await agent.new_session(cwd="/b") + assert r1.session_id != r2.session_id + + def test_session_state_initial(self): + state = _SessionState("sid-123", agent_name="test-agent") + assert state.session_id == "sid-123" + assert state.agent_name == "test-agent" + assert state.message_history == [] + + +# --------------------------------------------------------------------------- +# Prompt handling +# --------------------------------------------------------------------------- + + +class TestPrompt: + """Test prompt dispatch.""" + + @pytest.mark.asyncio + async def test_empty_prompt_returns_end_turn(self): + agent, client = _make_agent_with_client() + resp = await agent.new_session(cwd="/tmp") + sid = resp.session_id + + result = await agent.prompt(prompt=[], session_id=sid) + assert result.stop_reason == "end_turn" + # Should have sent "No prompt text found" + client.session_update.assert_called() + + @pytest.mark.asyncio + async def test_prompt_auto_creates_session(self): + agent, client = _make_agent_with_client() + # Don't call new_session — prompt should auto-create + mock_result = _make_mock_result("Agent response") + + with _mock_code_puppy_agent(mock_result): + result = await agent.prompt( + prompt=[_make_text_block("hello")], + session_id="auto-session", + ) + + assert result.stop_reason == "end_turn" + assert "auto-session" in agent._sessions + + @pytest.mark.asyncio + async def test_prompt_runs_agent_and_sends_result(self): + agent, client = _make_agent_with_client() + resp = await agent.new_session(cwd="/tmp") + sid = resp.session_id + + mock_result = _make_mock_result("The answer is 42") + + with _mock_code_puppy_agent(mock_result): + result = await agent.prompt( + prompt=[_make_text_block("What is the answer?")], + session_id=sid, + ) + + assert result.stop_reason == "end_turn" + # Should have called session_update at least once + assert client.session_update.call_count >= 1 + + @pytest.mark.asyncio + async def test_prompt_error_returns_error_stop_reason(self): + agent, client = _make_agent_with_client() + resp = await agent.new_session(cwd="/tmp") + sid = resp.session_id + + with _mock_code_puppy_agent_error(RuntimeError("boom")): + result = await agent.prompt( + prompt=[_make_text_block("cause error")], + session_id=sid, + ) + + assert result.stop_reason == "end_turn" + # Should have sent error message + client.session_update.assert_called() + + @pytest.mark.asyncio + async def test_prompt_preserves_history(self): + agent, client = _make_agent_with_client() + resp = await agent.new_session(cwd="/tmp") + sid = resp.session_id + + mock_messages = [MagicMock(), MagicMock()] + mock_result = _make_mock_result("first response", messages=mock_messages) + + with _mock_code_puppy_agent(mock_result): + await agent.prompt( + prompt=[_make_text_block("first")], + session_id=sid, + ) + + # History should be updated + assert agent._sessions[sid].message_history == mock_messages + + +# --------------------------------------------------------------------------- +# Cancellation +# --------------------------------------------------------------------------- + + +class TestCancellation: + """Test cancel method.""" + + @pytest.mark.asyncio + async def test_cancel_nonexistent_session(self, agent): + # Should not raise + await agent.cancel(session_id="nonexistent") + + @pytest.mark.asyncio + async def test_cancel_with_running_task(self, agent): + # Simulate a running task + mock_task = MagicMock() + mock_task.done.return_value = False + agent._running_tasks["session-1"] = mock_task + + await agent.cancel(session_id="session-1") + mock_task.cancel.assert_called_once() + + @pytest.mark.asyncio + async def test_cancel_already_done_task(self, agent): + mock_task = MagicMock() + mock_task.done.return_value = True + agent._running_tasks["session-2"] = mock_task + + await agent.cancel(session_id="session-2") + mock_task.cancel.assert_not_called() + + +# --------------------------------------------------------------------------- +# Extension methods +# --------------------------------------------------------------------------- + + +class TestExtensionMethods: + """Test ext_method dispatch.""" + + @pytest.mark.asyncio + async def test_list_agents(self): + agent, _ = _make_agent_with_client() + + mock_agents = [ + MagicMock(name="code-puppy", display_name="Code Puppy", description="Main agent"), + ] + # Fix .name attribute (MagicMock overrides name) + mock_agents[0].name = "code-puppy" + mock_agents[0].display_name = "Code Puppy" + mock_agents[0].description = "Main agent" + + with patch( + "code_puppy.plugins.acp_gateway.agent_adapter.discover_agents", + new_callable=AsyncMock, + return_value=mock_agents, + ): + result = await agent.ext_method("x/list-agents", {}) + + assert "agents" in result + assert len(result["agents"]) == 1 + assert result["agents"][0]["name"] == "code-puppy" + + @pytest.mark.asyncio + async def test_set_agent(self): + agent, _ = _make_agent_with_client() + resp = await agent.new_session(cwd="/tmp") + sid = resp.session_id + + result = await agent.ext_method("x/set-agent", { + "session_id": sid, + "agent_name": "qa-kitten", + }) + + assert result["status"] == "ok" + assert agent._sessions[sid].agent_name == "qa-kitten" + + @pytest.mark.asyncio + async def test_set_agent_missing_params(self): + agent, _ = _make_agent_with_client() + result = await agent.ext_method("x/set-agent", {}) + assert "error" in result + + @pytest.mark.asyncio + async def test_set_agent_unknown_session(self): + agent, _ = _make_agent_with_client() + result = await agent.ext_method("x/set-agent", { + "session_id": "nonexistent", + "agent_name": "qa-kitten", + }) + assert "error" in result + + @pytest.mark.asyncio + async def test_agent_info(self): + agent, _ = _make_agent_with_client() + + with patch( + "code_puppy.plugins.acp_gateway.agent_adapter.build_agent_metadata", + return_value={ + "name": "code-puppy", + "display_name": "Code Puppy", + "description": "Main agent", + "version": "0.1.0", + }, + ): + result = await agent.ext_method("x/agent-info", { + "agent_name": "code-puppy", + }) + + assert result["name"] == "code-puppy" + + @pytest.mark.asyncio + async def test_agent_info_not_found(self): + agent, _ = _make_agent_with_client() + + with patch( + "code_puppy.plugins.acp_gateway.agent_adapter.build_agent_metadata", + return_value=None, + ): + result = await agent.ext_method("x/agent-info", { + "agent_name": "nonexistent", + }) + + assert "error" in result + + @pytest.mark.asyncio + async def test_unknown_extension(self): + agent, _ = _make_agent_with_client() + result = await agent.ext_method("x/unknown", {}) + assert "error" in result + + @pytest.mark.asyncio + async def test_ext_notification_does_not_raise(self): + agent, _ = _make_agent_with_client() + # Should not raise + await agent.ext_notification("x/some-event", {"key": "value"}) + + +# --------------------------------------------------------------------------- +# Streaming helpers +# --------------------------------------------------------------------------- + + +class TestStreamingHelpers: + """Test _send_text, _send_thought, and _stream_tool_events.""" + + @pytest.mark.asyncio + async def test_send_text_calls_session_update(self): + agent, client = _make_agent_with_client() + await agent._send_text("sid-1", "Hello") + client.session_update.assert_called_once() + + @pytest.mark.asyncio + async def test_send_text_without_connection(self): + agent = CodePuppyAgent() + # Should not raise + await agent._send_text("sid", "text") + + @pytest.mark.asyncio + async def test_send_thought_calls_session_update(self): + agent, client = _make_agent_with_client() + await agent._send_thought("sid-1", "Thinking...") + client.session_update.assert_called_once() + + @pytest.mark.asyncio + async def test_stream_empty_events(self): + agent, client = _make_agent_with_client() + await agent._stream_tool_events("sid", []) + client.session_update.assert_not_called() + + @pytest.mark.asyncio + async def test_stream_thinking_event(self): + agent, client = _make_agent_with_client() + events = [{"type": "thinking", "content": "I am thinking..."}] + await agent._stream_tool_events("sid", events) + assert client.session_update.call_count >= 1 + + @pytest.mark.asyncio + async def test_stream_tool_call_event(self): + agent, client = _make_agent_with_client() + events = [{"type": "tool_call", "tool_name": "read_file", "args": {"path": "foo.py"}}] + await agent._stream_tool_events("sid", events) + assert client.session_update.call_count >= 1 + + @pytest.mark.asyncio + async def test_stream_plan_event(self): + agent, client = _make_agent_with_client() + events = [{"type": "plan", "steps": [ + {"step": 1, "description": "First"}, + {"step": 2, "description": "Second"}, + ]}] + await agent._stream_tool_events("sid", events) + assert client.session_update.call_count >= 1 + + @pytest.mark.asyncio + async def test_stream_without_connection(self): + agent = CodePuppyAgent() + # Should not raise even without connection + await agent._stream_tool_events("sid", [ + {"type": "thinking", "content": "test"}, + ]) + + +# --------------------------------------------------------------------------- +# Result extraction +# --------------------------------------------------------------------------- + + +class TestResultExtraction: + """Test _extract_result_text and _extract_tool_events.""" + + def test_extract_text_from_output(self): + result = MagicMock() + result.output = "response text" + assert CodePuppyAgent._extract_result_text(result) == "response text" + + def test_extract_text_from_data_fallback(self): + result = MagicMock(spec=[]) + result.data = "fallback text" + assert CodePuppyAgent._extract_result_text(result) == "fallback text" + + def test_extract_text_none(self): + assert CodePuppyAgent._extract_result_text(None) == "" + + def test_extract_text_str_fallback(self): + result = MagicMock(spec=[]) + # No .output, no .data — should str() + text = CodePuppyAgent._extract_result_text(result) + assert isinstance(text, str) + + def test_extract_tool_events_no_messages(self): + result = MagicMock() + result.all_messages.return_value = [] + events = CodePuppyAgent._extract_tool_events(result) + assert events == [] + + def test_extract_tool_events_no_method(self): + result = MagicMock(spec=[]) # no all_messages + events = CodePuppyAgent._extract_tool_events(result) + assert events == [] + + +# --------------------------------------------------------------------------- +# Private helpers for test setup +# --------------------------------------------------------------------------- + + +def _make_agent_with_client(): + """Create agent + connected mock client pair.""" + agent = CodePuppyAgent(default_agent="code-puppy") + client = _make_mock_client() + agent.on_connect(client) + return agent, client + + +class _MockPydanticAgent: + """Fake pydantic-ai Agent that returns a canned result.""" + + def __init__(self, result): + self._result = result + + async def run(self, prompt, message_history=None): + return self._result + + +class _MockBaseAgent: + """Fake Code Puppy BaseAgent for testing.""" + + def __init__(self, pydantic_agent): + self._code_generation_agent = pydantic_agent + self._message_history = [] + + @property + def code_generation_agent(self): + return self._code_generation_agent + + def reload_code_generation_agent(self): + return self._code_generation_agent + + def set_message_history(self, history): + self._message_history = list(history) + + def get_message_history(self): + return self._message_history + + def get_full_system_prompt(self): + return "You are Code Puppy." + + def load_puppy_rules(self): + return None + + def get_model_name(self): + return "test-model" + + +def _mock_code_puppy_agent(mock_result): + """Context manager that patches ``_run_agent`` on ``CodePuppyAgent``. + + Since the real ``_run_agent`` lazy-imports ``code_puppy.agents`` and + ``code_puppy.model_utils`` (which may not be resolvable in every test + environment), we sidestep the problem entirely by replacing the method + with a coroutine that returns the canned result and updates session + history, matching the real method's contract. + """ + tool_events = CodePuppyAgent._extract_tool_events(mock_result) + result_text = CodePuppyAgent._extract_result_text(mock_result) + messages = list(mock_result.all_messages()) if hasattr(mock_result, "all_messages") else [] + + async def _fake_run_agent(self, session, prompt_text): + session.message_history = messages + return result_text, tool_events + + return patch.object(CodePuppyAgent, "_run_agent", _fake_run_agent) + + +def _mock_code_puppy_agent_error(error): + """Context manager that patches ``_run_agent`` to raise *error*.""" + + async def _failing_run_agent(self, session, prompt_text): + raise error + + return patch.object(CodePuppyAgent, "_run_agent", _failing_run_agent) \ No newline at end of file diff --git a/tests/plugins/test_acp_agent_adapter.py b/tests/plugins/test_acp_agent_adapter.py new file mode 100644 index 00000000..a09ba297 --- /dev/null +++ b/tests/plugins/test_acp_agent_adapter.py @@ -0,0 +1,157 @@ +"""Tests for agent adapter — discovery and metadata helpers.""" + +import sys +from types import ModuleType +from unittest.mock import MagicMock, patch + +import pytest + +from code_puppy.plugins.acp_gateway.agent_adapter import ( + AgentInfo, + build_agent_metadata, + discover_agents, +) + + +# --------------------------------------------------------------------------- +# Helper: inject a fake ``code_puppy.agents`` module so that inline +# imports inside ``discover_agents`` / ``build_agent_metadata`` resolve +# without dragging in the real (heavy) agent machinery. +# --------------------------------------------------------------------------- + +def _make_agents_module( + agents_map: dict | None = None, + descriptions_map: dict | None = None, +) -> ModuleType: + mod = ModuleType("code_puppy.agents") + mod.get_available_agents = MagicMock(return_value=agents_map or {}) # type: ignore[attr-defined] + mod.get_agent_descriptions = MagicMock(return_value=descriptions_map or {}) # type: ignore[attr-defined] + return mod + + +@pytest.fixture() +def _fake_agents(): + """Ensure ``code_puppy.agents`` exists in sys.modules for patching.""" + mod = _make_agents_module() + with patch.dict(sys.modules, {"code_puppy.agents": mod}): + yield mod + + +# --------------------------------------------------------------------------- +# discover_agents (async) +# --------------------------------------------------------------------------- + +class TestDiscoverAgents: + """Test dynamic agent discovery.""" + + @pytest.mark.asyncio + async def test_returns_agent_info_list(self, _fake_agents): + _fake_agents.get_available_agents.return_value = { + "code-puppy": "Code Puppy", + "wibey": "Wibey", + } + _fake_agents.get_agent_descriptions.return_value = { + "code-puppy": "The main agent", + "wibey": "A wise agent", + } + + agents = await discover_agents() + + assert len(agents) == 2 + names = {a.name for a in agents} + assert names == {"code-puppy", "wibey"} + + cp = next(a for a in agents if a.name == "code-puppy") + assert cp.display_name == "Code Puppy" + assert cp.description == "The main agent" + + @pytest.mark.asyncio + async def test_missing_description_uses_fallback(self, _fake_agents): + _fake_agents.get_available_agents.return_value = { + "code-puppy": "Code Puppy", + } + _fake_agents.get_agent_descriptions.return_value = {} # empty + + agents = await discover_agents() + + assert len(agents) == 1 + assert agents[0].description == "No description available." + + @pytest.mark.asyncio + async def test_empty_agent_registry(self, _fake_agents): + agents = await discover_agents() + assert agents == [] + + @pytest.mark.asyncio + async def test_import_error_returns_empty(self): + """When code_puppy.agents can't be imported at all.""" + # Remove module from sys.modules so the import fails. + with patch.dict( + sys.modules, + {"code_puppy.agents": None}, # type: ignore[dict-item] + ): + agents = await discover_agents() + assert agents == [] + + @pytest.mark.asyncio + async def test_runtime_error_returns_empty(self, _fake_agents): + _fake_agents.get_available_agents.side_effect = RuntimeError("boom") + agents = await discover_agents() + assert agents == [] + + +# --------------------------------------------------------------------------- +# build_agent_metadata +# --------------------------------------------------------------------------- + +class TestBuildAgentMetadata: + """Test single-agent metadata builder.""" + + def test_returns_correct_dict(self, _fake_agents): + _fake_agents.get_available_agents.return_value = { + "code-puppy": "Code Puppy", + } + _fake_agents.get_agent_descriptions.return_value = { + "code-puppy": "The main agent", + } + + meta = build_agent_metadata("code-puppy") + + assert meta == { + "name": "code-puppy", + "display_name": "Code Puppy", + "description": "The main agent", + "version": "0.1.0", + } + + def test_unknown_agent_returns_none(self, _fake_agents): + """Unknown agents are not in the registry — build_agent_metadata returns None.""" + meta = build_agent_metadata("unknown-agent") + assert meta is None + + def test_error_returns_graceful_fallback(self, _fake_agents): + _fake_agents.get_available_agents.side_effect = RuntimeError("boom") + + meta = build_agent_metadata("code-puppy") + + assert meta["name"] == "code-puppy" + assert meta["display_name"] == "code-puppy" + assert meta["version"] == "0.1.0" + + +# --------------------------------------------------------------------------- +# AgentInfo +# --------------------------------------------------------------------------- + +class TestAgentInfoImmutability: + """Test AgentInfo frozen dataclass.""" + + def test_frozen(self): + info = AgentInfo(name="a", display_name="A", description="desc") + with pytest.raises(AttributeError): + info.name = "b" # type: ignore[misc] + + def test_equality(self): + a = AgentInfo(name="x", display_name="X", description="d") + b = AgentInfo(name="x", display_name="X", description="d") + assert a == b \ No newline at end of file diff --git a/tests/plugins/test_acp_config.py b/tests/plugins/test_acp_config.py new file mode 100644 index 00000000..e032972b --- /dev/null +++ b/tests/plugins/test_acp_config.py @@ -0,0 +1,105 @@ +"""Tests for ACP Gateway configuration.""" + +import os +from unittest.mock import patch + +from code_puppy.plugins.acp_gateway.config import ACPConfig + + +class TestACPConfigDefaults: + """Test default configuration values.""" + + @patch.dict(os.environ, {}, clear=True) + def test_default_enabled(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is True + + @patch.dict(os.environ, {}, clear=True) + def test_default_host(self): + cfg = ACPConfig.from_env() + assert cfg.host == "0.0.0.0" + + @patch.dict(os.environ, {}, clear=True) + def test_default_port(self): + cfg = ACPConfig.from_env() + assert cfg.port == 9001 + + +class TestACPConfigEnvironmentOverrides: + """Test environment variable overrides.""" + + @patch.dict(os.environ, {"ACP_ENABLED": "false"}) + def test_enabled_false(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is False + + @patch.dict(os.environ, {"ACP_ENABLED": "0"}) + def test_enabled_zero(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is False + + @patch.dict(os.environ, {"ACP_ENABLED": "no"}) + def test_enabled_no(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is False + + @patch.dict(os.environ, {"ACP_ENABLED": "true"}) + def test_enabled_true_explicit(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is True + + @patch.dict(os.environ, {"ACP_ENABLED": "1"}) + def test_enabled_one(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is True + + @patch.dict(os.environ, {"ACP_ENABLED": "yes"}) + def test_enabled_yes(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is True + + @patch.dict(os.environ, {"ACP_ENABLED": "TRUE"}) + def test_enabled_case_insensitive(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is True + + @patch.dict(os.environ, {"ACP_HOST": "127.0.0.1"}) + def test_custom_host(self): + cfg = ACPConfig.from_env() + assert cfg.host == "127.0.0.1" + + @patch.dict(os.environ, {"ACP_PORT": "8080"}) + def test_custom_port(self): + cfg = ACPConfig.from_env() + assert cfg.port == 8080 + + @patch.dict( + os.environ, + {"ACP_ENABLED": "true", "ACP_HOST": "localhost", "ACP_PORT": "3000"}, + ) + def test_all_overrides(self): + cfg = ACPConfig.from_env() + assert cfg.enabled is True + assert cfg.host == "localhost" + assert cfg.port == 3000 + + +class TestACPConfigImmutability: + """Test that config is frozen.""" + + def test_frozen(self): + cfg = ACPConfig(enabled=True, transport="http", host="0.0.0.0", port=9001) + import pytest + + with pytest.raises(AttributeError): + cfg.enabled = False # type: ignore[misc] + + def test_equality(self): + a = ACPConfig(enabled=True, transport="http", host="0.0.0.0", port=9001) + b = ACPConfig(enabled=True, transport="http", host="0.0.0.0", port=9001) + assert a == b + + def test_inequality(self): + a = ACPConfig(enabled=True, transport="http", host="0.0.0.0", port=9001) + b = ACPConfig(enabled=False, transport="http", host="0.0.0.0", port=9001) + assert a != b