From fafddf0dda4c46ace0c0d9ed9facc042aed14c5f Mon Sep 17 00:00:00 2001 From: ianleely Date: Fri, 6 Feb 2026 02:47:58 +0000 Subject: [PATCH 1/4] feat: migrate 5 features from reference project - Configurable allowed/auto-allowed tools via ALLOWED_TOOLS and AUTO_ALLOW_TOOLS env vars - MCP server support: load from ~/.claude/mcp.json, manage via API (CRUD endpoints) - Session auto-reconnect via get_or_ensure_session() when model/MCP config changes - S3 backup trigger after send_message_stream completes - Environment variables management API (CRUD for ~/.claude/settings.json env section) Co-Authored-By: Claude Opus 4.6 --- backend/api/__init__.py | 4 + backend/api/env_vars.py | 159 +++++++++++++++++++++++++++ backend/api/invocations.py | 108 ++++++++++++++++++- backend/api/mcp_servers.py | 186 ++++++++++++++++++++++++++++++++ backend/api/messages.py | 11 +- backend/api/sessions.py | 1 + backend/core/session.py | 144 +++++++++++++++++++++---- backend/core/session_manager.py | 50 +++++++++ backend/models/schemas.py | 102 ++++++++++++++++++ backend/server.py | 35 ++++++ 10 files changed, 776 insertions(+), 24 deletions(-) create mode 100644 backend/api/env_vars.py create mode 100644 backend/api/mcp_servers.py diff --git a/backend/api/__init__.py b/backend/api/__init__.py index 8383313..db55c68 100644 --- a/backend/api/__init__.py +++ b/backend/api/__init__.py @@ -1,6 +1,8 @@ """API endpoint routers.""" +from .env_vars import router as env_vars_router from .invocations import router as invocations_router +from .mcp_servers import router as mcp_servers_router from .messages import router as messages_router from .permissions import router as permissions_router from .sessions import router as sessions_router @@ -10,4 +12,6 @@ "messages_router", "permissions_router", "invocations_router", + "env_vars_router", + "mcp_servers_router", ] diff --git a/backend/api/env_vars.py b/backend/api/env_vars.py new file mode 100644 index 0000000..006b7ae --- /dev/null +++ b/backend/api/env_vars.py @@ -0,0 +1,159 @@ +""" +Environment variables management endpoints. + +Provides API endpoints for reading and managing environment variables +stored in ~/.claude/settings.json under the "env" key. +""" + +import json +import logging +from pathlib import Path + +from fastapi import APIRouter, HTTPException + +from ..models.schemas import ( + DeleteEnvVarResponse, + GetEnvVarsResponse, + SetAllEnvVarsRequest, + SetAllEnvVarsResponse, + SetEnvVarRequest, + SetEnvVarResponse, +) + +logger = logging.getLogger(__name__) + +router = APIRouter() + +CLAUDE_SETTINGS_PATH = str(Path.home() / ".claude" / "settings.json") + + +def _get_settings_path() -> Path: + """Get the path to the Claude settings file.""" + return Path(CLAUDE_SETTINGS_PATH) + + +def _read_settings() -> dict: + """Read the Claude settings file.""" + settings_path = _get_settings_path() + + if not settings_path.exists(): + return {} + + try: + with open(settings_path, "r") as f: + return json.load(f) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in settings file: {e}") + raise HTTPException( + status_code=500, + detail=f"Invalid JSON in settings file: {e}", + ) + except Exception as e: + logger.error(f"Error reading settings file: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to read settings file: {e}", + ) + + +def _write_settings(settings: dict) -> None: + """Write the Claude settings file.""" + settings_path = _get_settings_path() + + try: + settings_path.parent.mkdir(parents=True, exist_ok=True) + + with open(settings_path, "w") as f: + json.dump(settings, f, indent=2) + except Exception as e: + logger.error(f"Error writing settings file: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to write settings file: {e}", + ) + + +@router.get("/env-vars", response_model=GetEnvVarsResponse) +async def get_env_vars(): + """Get all environment variables from ~/.claude/settings.json.""" + settings_path = _get_settings_path() + + if not settings_path.exists(): + return GetEnvVarsResponse( + env_vars={}, + settings_path=CLAUDE_SETTINGS_PATH, + exists=False, + ) + + settings = _read_settings() + env_vars = settings.get("env", {}) + + return GetEnvVarsResponse( + env_vars=env_vars, + settings_path=CLAUDE_SETTINGS_PATH, + exists=True, + ) + + +@router.post("/env-vars", response_model=SetEnvVarResponse) +async def set_env_var(request: SetEnvVarRequest): + """Set a single environment variable in ~/.claude/settings.json.""" + if not request.key or not request.key.strip(): + raise HTTPException( + status_code=400, + detail="Environment variable key cannot be empty", + ) + + settings = _read_settings() + + if "env" not in settings: + settings["env"] = {} + + settings["env"][request.key] = request.value + _write_settings(settings) + + return SetEnvVarResponse( + status="success", + message=f"Environment variable '{request.key}' set successfully", + key=request.key, + ) + + +@router.delete("/env-vars/{key}", response_model=DeleteEnvVarResponse) +async def delete_env_var(key: str): + """Delete an environment variable from ~/.claude/settings.json.""" + settings_path = _get_settings_path() + + if not settings_path.exists(): + raise HTTPException(status_code=404, detail="Settings file not found") + + settings = _read_settings() + + if "env" not in settings or key not in settings["env"]: + raise HTTPException( + status_code=404, + detail=f"Environment variable '{key}' not found", + ) + + del settings["env"][key] + _write_settings(settings) + + return DeleteEnvVarResponse( + status="success", + message=f"Environment variable '{key}' deleted successfully", + key=key, + ) + + +@router.put("/env-vars", response_model=SetAllEnvVarsResponse) +async def set_all_env_vars(request: SetAllEnvVarsRequest): + """Replace all environment variables in ~/.claude/settings.json.""" + settings = _read_settings() + settings["env"] = request.env_vars + _write_settings(settings) + + return SetAllEnvVarsResponse( + status="success", + message=f"Successfully set {len(request.env_vars)} environment variables", + count=len(request.env_vars), + ) diff --git a/backend/api/invocations.py b/backend/api/invocations.py index 29e996e..a9635ea 100644 --- a/backend/api/invocations.py +++ b/backend/api/invocations.py @@ -16,24 +16,56 @@ """ import json +import logging import re from typing import Any, Dict, Optional +import jwt from fastapi import APIRouter, HTTPException, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel from ..core import SessionManager +from ..core.claude_sync_manager import get_claude_sync_manager from ..models import ( CreateSessionRequest, SendMessageRequest, SetPermissionModeRequest, ) +logger = logging.getLogger(__name__) + router = APIRouter() +def extract_user_id_from_request(http_request: Request) -> Optional[str]: + """ + Extract user_id from the JWT Authorization header. + + Decodes the JWT without signature verification since AgentCore + already validates the token via the JWT authorizer. + + Args: + http_request: The incoming FastAPI Request + + Returns: + User ID (sub claim) if found, None otherwise + """ + auth_header = http_request.headers.get("authorization", "") + if not auth_header.startswith("Bearer "): + return None + + token = auth_header[len("Bearer "):] + try: + # Decode without verification — AgentCore already validated the JWT + payload = jwt.decode(token, options={"verify_signature": False}) + return payload.get("sub") + except Exception as e: + logger.debug(f"Failed to decode JWT: {e}") + return None + + class InvocationRequest(BaseModel): """Request format for the unified /invocations endpoint.""" path: str @@ -91,7 +123,7 @@ def extract_session_id(path: str, path_params: Optional[Dict[str, str]] = None) @router.post("/invocations") -async def handle_invocation(request: InvocationRequest): +async def handle_invocation(request: InvocationRequest, http_request: Request): """ Unified invocations endpoint for AgentCore. @@ -109,9 +141,17 @@ async def handle_invocation(request: InvocationRequest): - POST /sessions/{session_id}/permission_mode -> Set permission mode - POST /sessions/{session_id}/permissions/respond -> Respond to permission - DELETE /sessions/{session_id} -> Close session + - GET /env-vars -> List environment variables + - POST /env-vars -> Set environment variable + - PUT /env-vars -> Replace all environment variables + - DELETE /env-vars/{key} -> Delete environment variable + - GET /mcp-servers -> List MCP servers + - POST /mcp-servers -> Add MCP server + - DELETE /mcp-servers/{server_name} -> Delete MCP server Args: request: Invocation request with path, method, and payload + http_request: Raw FastAPI request for header access Returns: Response from the appropriate handler @@ -124,6 +164,16 @@ async def handle_invocation(request: InvocationRequest): manager = get_session_manager() + # Trigger initial .claude directory sync from S3 if enabled + sync_manager = get_claude_sync_manager() + if sync_manager: + user_id = extract_user_id_from_request(http_request) + if user_id: + try: + await sync_manager.ensure_initial_sync(user_id) + except Exception as e: + logger.warning(f"Initial .claude sync failed for user {user_id}: {e}") + print(f"[Invocations] Routing: {method} {path}") print(f"[Invocations] Payload: {json.dumps(payload)[:200]}...") @@ -142,6 +192,7 @@ async def handle_invocation(request: InvocationRequest): resume_session_id=session_request.resume_session_id, model=session_request.model, cwd=session_request.cwd, + mcp_server_ids=session_request.mcp_server_ids, ) return { "session_id": session_id, @@ -199,9 +250,18 @@ async def handle_invocation(request: InvocationRequest): raise HTTPException(status_code=400, detail="Session ID required") print(f"[Invocations] Streaming message to session {session_id}") - session = await manager.get_session(session_id) message_request = SendMessageRequest(**payload) + # Use get_or_ensure_session to handle model/MCP config changes + if message_request.model or message_request.mcp_server_ids is not None: + session = await manager.get_or_ensure_session( + session_id, + model=message_request.model, + mcp_server_ids=message_request.mcp_server_ids, + ) + else: + session = await manager.get_session(session_id) + async def event_generator(): """Generate SSE events from the agent response.""" event_count = 0 @@ -282,6 +342,50 @@ async def event_generator(): print(f"Failed to close session {session_info.session_id}: {e}") return {"status": "success", "closed_count": closed_count} + # ======================================== + # Environment Variables Routes + # ======================================== + + if path == "/env-vars" and method == "GET": + from .env_vars import get_env_vars + return await get_env_vars() + + if path == "/env-vars" and method == "POST": + from .env_vars import set_env_var + from ..models.schemas import SetEnvVarRequest + env_request = SetEnvVarRequest(**payload) + return await set_env_var(env_request) + + if re.match(r"^/env-vars/[^/]+$", path) and method == "DELETE": + from .env_vars import delete_env_var + key = path.split("/env-vars/", 1)[1] + return await delete_env_var(key) + + if path == "/env-vars" and method == "PUT": + from .env_vars import set_all_env_vars + from ..models.schemas import SetAllEnvVarsRequest + env_request = SetAllEnvVarsRequest(**payload) + return await set_all_env_vars(env_request) + + # ======================================== + # MCP Servers Routes + # ======================================== + + if path == "/mcp-servers" and method == "GET": + from .mcp_servers import list_mcp_servers + return await list_mcp_servers() + + if path == "/mcp-servers" and method == "POST": + from .mcp_servers import add_mcp_server + from ..models.schemas import AddMCPServerRequest + mcp_request = AddMCPServerRequest(**payload) + return await add_mcp_server(mcp_request) + + if re.match(r"^/mcp-servers/[^/]+$", path) and method == "DELETE": + from .mcp_servers import delete_mcp_server + server_name = path.split("/mcp-servers/", 1)[1] + return await delete_mcp_server(server_name) + # ======================================== # Route Not Found # ======================================== diff --git a/backend/api/mcp_servers.py b/backend/api/mcp_servers.py new file mode 100644 index 0000000..cf1448e --- /dev/null +++ b/backend/api/mcp_servers.py @@ -0,0 +1,186 @@ +""" +MCP servers management endpoints. + +Provides API endpoints for reading and managing MCP server configurations +stored in ~/.claude/mcp.json. +""" + +import json +import logging +from pathlib import Path + +from fastapi import APIRouter, HTTPException + +from ..models.schemas import ( + AddMCPServerRequest, + AddMCPServerResponse, + DeleteMCPServerResponse, + ListMCPServersResponse, + MCPServer, +) + +logger = logging.getLogger(__name__) + +router = APIRouter() + +MCP_CONFIG_PATH = str(Path.home() / ".claude" / "mcp.json") + + +def _get_config_path() -> Path: + """Get the path to the MCP config file.""" + return Path(MCP_CONFIG_PATH) + + +def _read_mcp_config() -> dict: + """Read the MCP config file.""" + config_path = _get_config_path() + + if not config_path.exists(): + return {} + + try: + with open(config_path, "r") as f: + return json.load(f) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in MCP config file: {e}") + raise HTTPException( + status_code=500, + detail=f"Invalid JSON in MCP config file: {e}", + ) + except Exception as e: + logger.error(f"Error reading MCP config: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to read MCP config: {e}", + ) + + +def _write_mcp_config(config_data: dict) -> None: + """Write the MCP config file.""" + config_path = _get_config_path() + + try: + config_path.parent.mkdir(parents=True, exist_ok=True) + + with open(config_path, "w") as f: + json.dump(config_data, f, indent=2) + except Exception as e: + logger.error(f"Error writing MCP config: {e}") + raise HTTPException( + status_code=500, + detail=f"Failed to write MCP config: {e}", + ) + + +@router.get("/mcp-servers", response_model=ListMCPServersResponse) +async def list_mcp_servers(): + """List all MCP servers from ~/.claude/mcp.json.""" + config_path = _get_config_path() + + if not config_path.exists(): + return ListMCPServersResponse( + servers={}, + mcp_config_path=MCP_CONFIG_PATH, + exists=False, + ) + + config_data = _read_mcp_config() + mcp_servers_raw = config_data.get("mcpServers", {}) + + servers = {} + for name, config in mcp_servers_raw.items(): + try: + servers[name] = MCPServer( + type=config.get("type", "stdio"), + command=config.get("command"), + args=config.get("args"), + env=config.get("env"), + url=config.get("url"), + ) + except Exception as e: + logger.error(f"Failed to parse MCP server '{name}': {e}") + + return ListMCPServersResponse( + servers=servers, + mcp_config_path=MCP_CONFIG_PATH, + exists=True, + ) + + +@router.post("/mcp-servers", response_model=AddMCPServerResponse) +async def add_mcp_server(request: AddMCPServerRequest): + """Add a new MCP server to ~/.claude/mcp.json.""" + # Validate required fields based on type + if request.type == "stdio": + if not request.command: + raise HTTPException( + status_code=400, + detail="'command' is required for stdio type", + ) + elif request.type in ["sse", "http"]: + if not request.url: + raise HTTPException( + status_code=400, + detail=f"'url' is required for {request.type} type", + ) + else: + raise HTTPException( + status_code=400, + detail=f"Invalid type '{request.type}'. Must be 'stdio', 'sse', or 'http'", + ) + + config_data = _read_mcp_config() + + if "mcpServers" not in config_data: + config_data["mcpServers"] = {} + + if request.name in config_data["mcpServers"]: + raise HTTPException( + status_code=400, + detail=f"MCP server '{request.name}' already exists", + ) + + # Build server config based on type + server_config = {"type": request.type} + + if request.type == "stdio": + server_config["command"] = request.command + server_config["args"] = request.args if request.args else [] + server_config["env"] = request.env if request.env else {} + elif request.type in ["sse", "http"]: + server_config["url"] = request.url + + config_data["mcpServers"][request.name] = server_config + _write_mcp_config(config_data) + + return AddMCPServerResponse( + status="success", + message=f"MCP server '{request.name}' added successfully", + server_name=request.name, + ) + + +@router.delete("/mcp-servers/{server_name}", response_model=DeleteMCPServerResponse) +async def delete_mcp_server(server_name: str): + """Delete an MCP server from ~/.claude/mcp.json.""" + config_path = _get_config_path() + + if not config_path.exists(): + raise HTTPException(status_code=404, detail="MCP config file not found") + + config_data = _read_mcp_config() + + if "mcpServers" not in config_data or server_name not in config_data["mcpServers"]: + raise HTTPException( + status_code=404, + detail=f"MCP server '{server_name}' not found", + ) + + del config_data["mcpServers"][server_name] + _write_mcp_config(config_data) + + return DeleteMCPServerResponse( + status="success", + message=f"MCP server '{server_name}' deleted successfully", + server_name=server_name, + ) diff --git a/backend/api/messages.py b/backend/api/messages.py index 03cb080..9ede2dd 100644 --- a/backend/api/messages.py +++ b/backend/api/messages.py @@ -110,7 +110,16 @@ async def send_message_stream(session_id: str, request: SendMessageRequest): print(f"[API] message: {request.message[:100] if isinstance(request.message, str) else request.message}") manager = get_session_manager() - session = await manager.get_session(session_id) + + # Use get_or_ensure_session to handle model/MCP config changes + if request.model or request.mcp_server_ids is not None: + session = await manager.get_or_ensure_session( + session_id, + model=request.model, + mcp_server_ids=request.mcp_server_ids, + ) + else: + session = await manager.get_session(session_id) async def event_generator(): """Generate SSE events from the agent response.""" diff --git a/backend/api/sessions.py b/backend/api/sessions.py index 4f83d90..3363144 100644 --- a/backend/api/sessions.py +++ b/backend/api/sessions.py @@ -46,6 +46,7 @@ async def create_session(request: CreateSessionRequest): resume_session_id=request.resume_session_id, model=request.model, cwd=request.cwd, + mcp_server_ids=request.mcp_server_ids, ) return CreateSessionResponse( diff --git a/backend/core/session.py b/backend/core/session.py index b1121be..601b249 100644 --- a/backend/core/session.py +++ b/backend/core/session.py @@ -77,6 +77,7 @@ def __init__( user_id: Optional[str] = None, model: Optional[str] = None, cwd: Optional[str] = None, + mcp_server_ids: Optional[list[str]] = None, ): """ Initialize an agent session. @@ -86,6 +87,7 @@ def __init__( user_id: User ID for tracking model: Optional model name (defaults to ANTHROPIC_MODEL env var) cwd: Working directory for the session + mcp_server_ids: List of MCP server names to enable """ self.session_id = session_id self.user_id = user_id @@ -106,6 +108,9 @@ def __init__( self.model = model or os.environ.get("ANTHROPIC_MODEL") self.current_model = self.model + # MCP servers configuration + self.mcp_server_ids = mcp_server_ids or [] + # Slide detection self.slide_detector = SlideDetector() @@ -137,24 +142,34 @@ async def connect(self, resume_session_id: Optional[str] = None): "preset": "claude_code", } - # Configure allowed tools for slide generation - allowed_tools = [ - "Read", - "Write", - "Edit", - "Glob", - "Grep", + # Configure allowed tools from environment variable + default_tools = [ + "Read", "Write", "Edit", + "Glob", "Grep", "Bash", + "NotebookEdit", "WebFetch", - "WebSearch", + "Task", "TodoWrite", + "BashOutput", "KillShell", + "AskUserQuestion", + "Skill", "SlashCommand", + "ExitPlanMode", + "ListMcpResourcesTool", "ReadMcpResourceTool", ] + allowed_tools_env = os.environ.get("ALLOWED_TOOLS", "").strip() + if allowed_tools_env: + allowed_tools = [tool.strip() for tool in allowed_tools_env.split(",") if tool.strip()] + else: + allowed_tools = default_tools + options_dict = { "allowed_tools": allowed_tools, "system_prompt": system_prompt_config, "max_turns": 0, "can_use_tool": self.permission_callback, "permission_mode": "default", + "setting_sources": ["user"], } if resume_session_id: @@ -166,6 +181,16 @@ async def connect(self, resume_session_id: Optional[str] = None): if self.cwd: options_dict["cwd"] = self.cwd + # Load MCP servers if specified + if self.mcp_server_ids: + print(f"[Session] Loading MCP servers: {self.mcp_server_ids}") + mcp_servers = await self._load_mcp_servers() + if mcp_servers: + options_dict["mcp_servers"] = mcp_servers + print(f"[Session] Loaded {len(mcp_servers)} MCP server(s)") + else: + print(f"[Session] No MCP servers loaded (config not found or invalid)") + print(f"[Session] SDK options: {list(options_dict.keys())}") options = ClaudeAgentOptions(**options_dict) @@ -197,6 +222,68 @@ async def disconnect(self): finally: self.status = "disconnected" + async def _load_mcp_servers(self) -> dict[str, Any]: + """ + Load MCP servers configuration from ~/.claude/mcp.json. + + Returns: + Dictionary of MCP server configurations keyed by server name + """ + import json + + mcp_config_path = Path.home() / ".claude" / "mcp.json" + + if not mcp_config_path.exists(): + print(f"[Session] MCP config file not found: {mcp_config_path}") + return {} + + try: + with open(mcp_config_path, "r") as f: + config_data = json.load(f) + + all_servers = config_data.get("mcpServers", {}) + mcp_servers = {} + + for server_name in self.mcp_server_ids: + if server_name not in all_servers: + print(f"[Session] Warning: MCP server '{server_name}' not found in config") + continue + + server_config = all_servers[server_name] + connection_type = server_config.get("type", "stdio") + + if connection_type == "stdio": + mcp_servers[server_name] = { + "type": "stdio", + "command": server_config.get("command"), + "args": server_config.get("args", []), + "env": server_config.get("env", {}), + } + print(f"[Session] Configured MCP server '{server_name}' (stdio)") + elif connection_type == "sse": + mcp_servers[server_name] = { + "type": "sse", + "url": server_config.get("url"), + } + print(f"[Session] Configured MCP server '{server_name}' (sse)") + elif connection_type == "http": + mcp_servers[server_name] = { + "type": "http", + "url": server_config.get("url"), + } + print(f"[Session] Configured MCP server '{server_name}' (http)") + else: + print(f"[Session] Warning: Unknown MCP server type '{connection_type}' for '{server_name}'") + + return mcp_servers + + except json.JSONDecodeError as e: + print(f"[Session] Error: Invalid JSON in MCP config file: {str(e)}") + return {} + except Exception as e: + print(f"[Session] Error loading MCP servers: {str(e)}") + return {} + async def permission_callback( self, tool_name: str, input_data: dict, context: ToolPermissionContext ) -> PermissionResultAllow | PermissionResultDeny: @@ -215,19 +302,26 @@ async def permission_callback( """ print(f"[Permission] Tool: {tool_name}") - # Auto-allow common tools for slide generation - auto_allow_tools = [ - "Read", - "Write", - "Edit", - "Glob", - "Grep", - "Bash", - "WebSearch", - "WebFetch", - "Task", - "TodoWrite", - ] + # Auto-allow all MCP tools (tools from MCP servers) + if tool_name.startswith("mcp__"): + print(f"[Permission] Auto-allow MCP tool: {tool_name}") + return PermissionResultAllow() + + # Auto-allow tools based on environment variable + auto_allow_tools_env = os.environ.get("AUTO_ALLOW_TOOLS", "").strip() + if auto_allow_tools_env: + auto_allow_tools = [tool.strip() for tool in auto_allow_tools_env.split(",") if tool.strip()] + else: + auto_allow_tools = [ + "Read", "Write", "Edit", "NotebookEdit", + "Glob", "Grep", + "Bash", "KillShell", + "WebSearch", "WebFetch", + "Task", "TaskOutput", "TodoWrite", + "AskUserQuestion", + "EnterPlanMode", "ExitPlanMode", + "Skill", + ] if tool_name in auto_allow_tools: print(f"[Permission] Auto-allow: {tool_name}") @@ -520,6 +614,14 @@ async def message_stream(): } print(f"[Session] send_message_stream END") + # Backup to S3 after task completion (if S3 sync is enabled) + s3_sync_enabled = os.environ.get("ENABLE_S3_SYNC", "true").lower() in ["true", "1", "yes"] + if s3_sync_enabled: + from .claude_sync_manager import get_claude_sync_manager + sync_manager = get_claude_sync_manager() + if sync_manager: + asyncio.create_task(sync_manager.backup_user_claude_dir(self.user_id)) + async def interrupt(self): """ Interrupt the current operation. diff --git a/backend/core/session_manager.py b/backend/core/session_manager.py index 5e3cd8f..f67e4b8 100644 --- a/backend/core/session_manager.py +++ b/backend/core/session_manager.py @@ -159,6 +159,7 @@ async def create_session( resume_session_id: Optional[str] = None, model: Optional[str] = None, cwd: Optional[str] = None, + mcp_server_ids: Optional[list[str]] = None, ) -> str: """ Create a new session or resume an existing one. @@ -168,6 +169,7 @@ async def create_session( resume_session_id: Optional session ID to resume model: Optional model name override cwd: Working directory for the session + mcp_server_ids: List of MCP server names to enable Returns: The session ID (new or resumed) @@ -182,6 +184,7 @@ async def create_session( user_id, model, cwd, + mcp_server_ids=mcp_server_ids, ) await session.connect(resume_session_id) @@ -292,6 +295,53 @@ def update_session_id(self, old_session_id: str, new_session_id: str): self.sessions[new_session_id] = session print(f"[SessionManager] Updated session ID: {old_session_id} -> {new_session_id}") + async def get_or_ensure_session( + self, + session_id: str, + model: Optional[str] = None, + mcp_server_ids: Optional[list[str]] = None, + ) -> AgentSession: + """ + Get session and ensure model and MCP servers match the request. + + If model or mcp_server_ids are provided and differ from current session, + the session will be disconnected and reconnected with new configuration. + + Args: + session_id: The session ID + model: Optional model to ensure + mcp_server_ids: Optional MCP server IDs to ensure + + Returns: + The AgentSession instance with correct configuration + + Raises: + HTTPException: If session not found + """ + session = await self.get_session(session_id) + + needs_reconnect = False + + # Check if model needs to be updated + if model and model != session.model: + print(f"[SessionManager] Model change: {session.model} -> {model}") + session.model = model + needs_reconnect = True + + # Check if MCP servers need to be updated + if mcp_server_ids is not None and mcp_server_ids != session.mcp_server_ids: + print(f"[SessionManager] MCP servers change: {session.mcp_server_ids} -> {mcp_server_ids}") + session.mcp_server_ids = mcp_server_ids + needs_reconnect = True + + if needs_reconnect: + print(f"[SessionManager] Configuration changed — reconnecting session {session_id}") + await session.disconnect() + await session.connect(resume_session_id=session_id) + print(f"[SessionManager] Session {session_id} reconnected") + + return session + async def close_session(self, session_id: str): """ Close and cleanup a session. diff --git a/backend/models/schemas.py b/backend/models/schemas.py index 04afaf3..8d0a1cb 100644 --- a/backend/models/schemas.py +++ b/backend/models/schemas.py @@ -17,6 +17,7 @@ class CreateSessionRequest(BaseModel): resume_session_id: Optional[str] = None model: Optional[str] = None cwd: Optional[str] = None + mcp_server_ids: Optional[list[str]] = None class CreateSessionResponse(BaseModel): @@ -33,6 +34,7 @@ class SendMessageRequest(BaseModel): message: str | dict[str, Any] model: Optional[str] = None enable_web_search: bool = True + mcp_server_ids: Optional[list[str]] = None class MessageBlock(BaseModel): @@ -122,3 +124,103 @@ class SlideCompleteEvent(BaseModel): slide_index: int html: str timestamp: int + + +# ============================================================================ +# MCP Server Schemas +# ============================================================================ + + +class MCPServer(BaseModel): + """MCP server configuration.""" + + type: str # "stdio", "sse", or "http" + command: Optional[str] = None + args: Optional[list[str]] = None + env: Optional[dict[str, str]] = None + url: Optional[str] = None + + +class ListMCPServersResponse(BaseModel): + """Response containing list of MCP servers.""" + + servers: dict[str, MCPServer] + mcp_config_path: str + exists: bool + + +class AddMCPServerRequest(BaseModel): + """Request to add a new MCP server.""" + + name: str + type: str + command: Optional[str] = None + args: Optional[list[str]] = None + env: Optional[dict[str, str]] = None + url: Optional[str] = None + + +class AddMCPServerResponse(BaseModel): + """Response from adding MCP server.""" + + status: str + message: str + server_name: str + + +class DeleteMCPServerResponse(BaseModel): + """Response from deleting MCP server.""" + + status: str + message: str + server_name: str + + +# ============================================================================ +# Environment Variables Schemas +# ============================================================================ + + +class GetEnvVarsResponse(BaseModel): + """Response containing environment variables from settings.json.""" + + env_vars: dict[str, str] + settings_path: str + exists: bool + + +class SetEnvVarRequest(BaseModel): + """Request to set a single environment variable.""" + + key: str + value: str + + +class SetEnvVarResponse(BaseModel): + """Response from setting an environment variable.""" + + status: str + message: str + key: str + + +class DeleteEnvVarResponse(BaseModel): + """Response from deleting an environment variable.""" + + status: str + message: str + key: str + + +class SetAllEnvVarsRequest(BaseModel): + """Request to set all environment variables at once.""" + + env_vars: dict[str, str] + + +class SetAllEnvVarsResponse(BaseModel): + """Response from setting all environment variables.""" + + status: str + message: str + count: int diff --git a/backend/server.py b/backend/server.py index 492843c..536516c 100644 --- a/backend/server.py +++ b/backend/server.py @@ -42,12 +42,15 @@ logger = logging.getLogger(__name__) from .api import ( + env_vars_router, invocations_router, + mcp_servers_router, messages_router, permissions_router, sessions_router, ) from .core import SessionManager +from .core.claude_sync_manager import initialize_claude_sync_manager # ============================================================================ # Global Session Manager @@ -55,6 +58,9 @@ session_manager = SessionManager() +# Claude sync manager (initialized during startup if S3 sync is enabled) +claude_sync_manager = None + # ============================================================================ # FastAPI Application @@ -64,6 +70,8 @@ @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" + global claude_sync_manager + # Startup print("=" * 80) print("Slide Forge API Server Starting...") @@ -73,6 +81,24 @@ async def lifespan(app: FastAPI): logger.info("Starting Slide Forge API Server") logger.info(f"Log level set to: {LOG_LEVEL}") + # Initialize S3 sync manager if enabled + enable_s3_sync = os.environ.get("ENABLE_S3_SYNC", "true").lower() in ("true", "1", "yes") + s3_bucket = os.environ.get("S3_WORKSPACE_BUCKET") + + if enable_s3_sync and s3_bucket: + try: + claude_sync_manager = initialize_claude_sync_manager() + if claude_sync_manager: + claude_sync_manager.start_backup_task() + logger.info("Claude sync manager initialized and backup task started") + except Exception as e: + logger.warning(f"Failed to initialize Claude sync manager: {e}") + claude_sync_manager = None + elif enable_s3_sync and not s3_bucket: + logger.warning("S3 sync enabled but S3_WORKSPACE_BUCKET not set, skipping sync manager") + else: + logger.info("S3 sync disabled via ENABLE_S3_SYNC") + print("=" * 80) print("Server startup complete") print("=" * 80) @@ -80,6 +106,9 @@ async def lifespan(app: FastAPI): yield # Shutdown + if claude_sync_manager: + await claude_sync_manager.stop_backup_task() + print("Shutting down server...") for session_id in list(session_manager.sessions.keys()): await session_manager.close_session(session_id) @@ -119,6 +148,12 @@ async def lifespan(app: FastAPI): # Permission endpoints app.include_router(permissions_router, tags=["permissions"]) +# Environment variables endpoints +app.include_router(env_vars_router, tags=["env-vars"]) + +# MCP servers endpoints +app.include_router(mcp_servers_router, tags=["mcp-servers"]) + # ============================================================================ # Health Check From 92bdd62f3faed887c43a37605687664b76165b97 Mon Sep 17 00:00:00 2001 From: ianleely Date: Fri, 6 Feb 2026 02:49:21 +0000 Subject: [PATCH 2/4] feat: add S3 sync infrastructure and update deployment config - Add claude_sync_manager, s3_client, workspace_sync modules for S3 backup - Add pyjwt and s5cmd dependencies to pyproject.toml - Add Bedrock model and S3 sync config to CDK env-config and stack - Remove legacy deploy/ shell scripts (replaced by CDK infrastructure) Co-Authored-By: Claude Opus 4.6 --- backend/core/claude_sync_manager.py | 334 +++++++++++++++++ backend/core/s3_client.py | 310 ++++++++++++++++ backend/core/workspace_sync.py | 194 ++++++++++ backend/pyproject.toml | 2 + deploy/01_build_and_push.sh | 137 ------- deploy/02_deploy_agentcore.sh | 468 ------------------------ deploy/config.env.template | 119 ------ infrastructure/config/env-config.ts | 24 ++ infrastructure/lib/slide-forge-stack.ts | 8 + 9 files changed, 872 insertions(+), 724 deletions(-) create mode 100644 backend/core/claude_sync_manager.py create mode 100644 backend/core/s3_client.py create mode 100644 backend/core/workspace_sync.py delete mode 100755 deploy/01_build_and_push.sh delete mode 100755 deploy/02_deploy_agentcore.sh delete mode 100644 deploy/config.env.template diff --git a/backend/core/claude_sync_manager.py b/backend/core/claude_sync_manager.py new file mode 100644 index 0000000..8e2ca74 --- /dev/null +++ b/backend/core/claude_sync_manager.py @@ -0,0 +1,334 @@ +""" +Claude Directory Sync Manager. + +Manages synchronization and backup of user .claude directories to S3. +Tracks sync state to avoid duplicate initial syncs and provides periodic backup. +""" + +import asyncio +import logging +import os +from typing import Optional, Set + +from .workspace_sync import ( + backup_claude_dir_to_s3, + sync_claude_dir_from_s3, + WorkspaceSyncError, +) + +logger = logging.getLogger(__name__) + + +class ClaudeSyncManager: + """ + Manages .claude directory synchronization and backup for users. + + - Tracks which users have had their initial sync + - Prevents duplicate initial syncs + - Provides periodic backup functionality + """ + + def __init__( + self, + bucket_name: str, + s3_prefix: str = "user_data", + backup_interval_minutes: int = 5, + ): + """ + Initialize Claude Sync Manager. + + Args: + bucket_name: S3 bucket name for sync/backup + s3_prefix: S3 key prefix (default: "user_data") + backup_interval_minutes: Interval for periodic backups (default: 5) + """ + self.bucket_name = bucket_name + self.s3_prefix = s3_prefix + self.backup_interval_minutes = backup_interval_minutes + + # Track users who have completed initial sync + self._synced_users: Set[str] = set() + + # Background task handle + self._backup_task: Optional[asyncio.Task] = None + + # Flag to control backup loop + self._running = False + + async def ensure_initial_sync(self, user_id: str) -> dict: + """ + Ensure user's .claude directory is synced (first time only). + + If S3 has data: downloads from S3 to local + If S3 is empty but local has data: uploads local to S3 + + This method is idempotent - it will only sync once per user per + server lifetime. + + Args: + user_id: User ID + + Returns: + dict: Sync result with status, message, files_synced, etc. + """ + # Check if user already synced + if user_id in self._synced_users: + logger.debug(f"User {user_id} already synced in this session") + return { + "status": "already_synced", + "user_id": user_id, + "message": "User already synced in this session", + } + + logger.info(f"Starting initial .claude sync for user {user_id}") + + try: + # Attempt to sync from S3 + result = await sync_claude_dir_from_s3( + user_id=user_id, + bucket_name=self.bucket_name, + s3_prefix=self.s3_prefix, + ) + + # If S3 had no data, try to backup local data to S3 + if result.get("status") == "skipped": + s3_path = f"s3://{self.bucket_name}/{self.s3_prefix}/{user_id}/.claude/" + logger.info( + f"No S3 data found for user {user_id} at {s3_path}, " + f"checking for local .claude data to backup" + ) + + # Try to backup local .claude to S3 + backup_result = await self.backup_user_claude_dir(user_id) + + if backup_result.get("status") == "success": + logger.info( + f"Initial backup completed for user {user_id}: " + f"{backup_result.get('files_synced', 0)} files backed up" + ) + self._synced_users.add(user_id) + return backup_result + elif backup_result.get("status") == "skipped": + logger.info( + f"No local .claude data to backup for user {user_id}" + ) + # Still mark as synced to avoid repeated checks + self._synced_users.add(user_id) + return result + + # Mark user as synced (S3 data was downloaded successfully) + self._synced_users.add(user_id) + + if result.get("status") == "success": + logger.info( + f"Initial sync completed for user {user_id}: " + f"{result.get('files_synced', 0)} files synced from S3" + ) + + return result + + except WorkspaceSyncError as e: + logger.error(f"Failed to sync .claude for user {user_id}: {e}") + # Don't add to synced_users on error - allow retry + return { + "status": "error", + "user_id": user_id, + "message": f"Sync failed: {str(e)}", + } + + async def backup_user_claude_dir(self, user_id: str) -> dict: + """ + Backup a single user's .claude directory to S3. + + Args: + user_id: User ID + + Returns: + dict: Backup result + """ + try: + result = await backup_claude_dir_to_s3( + user_id=user_id, + bucket_name=self.bucket_name, + s3_prefix=self.s3_prefix, + ) + return result + + except WorkspaceSyncError as e: + logger.error(f"Failed to backup .claude for user {user_id}: {e}") + return { + "status": "error", + "user_id": user_id, + "message": f"Backup failed: {str(e)}", + } + + async def _backup_loop(self): + """Background loop for periodic backups of all synced users.""" + logger.info( + f"Starting .claude backup loop " + f"(interval: {self.backup_interval_minutes} minutes)" + ) + + while self._running: + try: + await asyncio.sleep(self.backup_interval_minutes * 60) + + if not self._running: + break + + users_to_backup = list(self._synced_users) + + if not users_to_backup: + logger.debug("No users to backup") + continue + + logger.info(f"Starting periodic backup for {len(users_to_backup)} users") + + success_count = 0 + skip_count = 0 + error_count = 0 + + for user_id in users_to_backup: + if not self._running: + break + + try: + result = await self.backup_user_claude_dir(user_id) + if result["status"] == "success": + success_count += 1 + logger.info( + f"Backed up .claude for user {user_id}: " + f"{result.get('files_synced', 0)} files" + ) + elif result["status"] == "skipped": + skip_count += 1 + logger.debug( + f"Skipped backup for user {user_id}: " + f"{result.get('message', 'No data')}" + ) + elif result["status"] == "error": + error_count += 1 + logger.warning( + f"Error backing up user {user_id}: " + f"{result.get('message', 'Unknown error')}" + ) + + except Exception as e: + error_count += 1 + logger.error( + f"Exception backing up user {user_id}: {e}", + exc_info=True + ) + + logger.info( + f"Periodic backup completed: " + f"{success_count} succeeded, {skip_count} skipped, {error_count} errors" + ) + + except asyncio.CancelledError: + logger.info("Backup loop cancelled") + break + except Exception as e: + logger.error(f"Error in backup loop: {e}", exc_info=True) + + logger.info("Backup loop stopped") + + def start_backup_task(self): + """Start the background backup task.""" + if self._backup_task is not None: + logger.warning("Backup task already running") + return + + self._running = True + self._backup_task = asyncio.create_task(self._backup_loop()) + logger.info("Background backup task started") + + async def stop_backup_task(self): + """Stop the background backup task.""" + if self._backup_task is None: + return + + logger.info("Stopping background backup task...") + self._running = False + + if self._backup_task and not self._backup_task.done(): + self._backup_task.cancel() + try: + await self._backup_task + except asyncio.CancelledError: + pass + + self._backup_task = None + logger.info("Background backup task stopped") + + def get_synced_users(self) -> Set[str]: + """Get set of users who have been synced.""" + return self._synced_users.copy() + + def get_stats(self) -> dict: + """Get sync manager statistics.""" + return { + "synced_user_count": len(self._synced_users), + "backup_running": self._backup_task is not None and not self._backup_task.done(), + "backup_interval_minutes": self.backup_interval_minutes, + "bucket_name": self.bucket_name, + "s3_prefix": self.s3_prefix, + } + + +# Global singleton instance +_claude_sync_manager: Optional[ClaudeSyncManager] = None + + +def get_claude_sync_manager() -> Optional[ClaudeSyncManager]: + """Get the global ClaudeSyncManager instance.""" + return _claude_sync_manager + + +def initialize_claude_sync_manager( + bucket_name: Optional[str] = None, + s3_prefix: str = "user_data", + backup_interval_minutes: Optional[int] = None, +) -> Optional[ClaudeSyncManager]: + """ + Initialize the global ClaudeSyncManager. + + Args: + bucket_name: S3 bucket name (from env if not provided) + s3_prefix: S3 key prefix + backup_interval_minutes: Backup interval (from env if not provided) + + Returns: + ClaudeSyncManager instance or None if not configured + """ + global _claude_sync_manager + + # Get bucket name from env if not provided + if bucket_name is None: + bucket_name = os.environ.get("S3_WORKSPACE_BUCKET") + + if not bucket_name: + logger.warning( + "S3_WORKSPACE_BUCKET not configured, .claude sync/backup will be disabled" + ) + return None + + # Get backup interval from env if not provided + if backup_interval_minutes is None: + backup_interval_minutes = int( + os.environ.get("CLAUDE_BACKUP_INTERVAL_MINUTES", "5") + ) + + logger.info( + f"Initializing Claude Sync Manager: " + f"bucket={bucket_name}, prefix={s3_prefix}, " + f"interval={backup_interval_minutes}m" + ) + + _claude_sync_manager = ClaudeSyncManager( + bucket_name=bucket_name, + s3_prefix=s3_prefix, + backup_interval_minutes=backup_interval_minutes, + ) + + return _claude_sync_manager diff --git a/backend/core/s3_client.py b/backend/core/s3_client.py new file mode 100644 index 0000000..b4243e9 --- /dev/null +++ b/backend/core/s3_client.py @@ -0,0 +1,310 @@ +""" +S3 Client using s5cmd. + +Provides unified interface for S3 operations using s5cmd for high-performance +parallel transfers. +""" + +import asyncio +import logging +import shutil +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + + +class S3ClientError(Exception): + """Exception raised when S3 operations fail.""" + pass + + +class S3Client: + """ + High-performance S3 client using s5cmd. + + Provides methods for checking existence, syncing, and listing S3 objects. + """ + + def __init__(self, bucket_name: str, s3_prefix: str = "user_data"): + """ + Initialize S3 client. + + Args: + bucket_name: S3 bucket name + s3_prefix: S3 key prefix (default: "user_data") + """ + self.bucket_name = bucket_name + self.s3_prefix = s3_prefix + + if not self._check_s5cmd_installed(): + raise S3ClientError( + "s5cmd is not installed. Please install it: " + "https://github.com/peak/s5cmd#installation" + ) + + def _check_s5cmd_installed(self) -> bool: + """Check if s5cmd is installed and available.""" + return shutil.which("s5cmd") is not None + + def build_s3_path(self, *path_parts: str) -> str: + """ + Build S3 path from parts. + + Args: + *path_parts: Path components to join + + Returns: + Full S3 path (s3://bucket/prefix/...) + """ + parts = [self.s3_prefix] + list(path_parts) + path = "/".join(parts) + return f"s3://{self.bucket_name}/{path}" + + async def check_exists(self, *path_parts: str) -> bool: + """ + Check if a directory/file exists in S3. + + Args: + *path_parts: Path components relative to s3_prefix + + Returns: + True if path exists and has objects, False otherwise + """ + s3_path = self.build_s3_path(*path_parts) + "/" + + try: + process = await asyncio.create_subprocess_exec( + "s5cmd", + "ls", + s3_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + stdout_text = stdout.decode() if stdout else "" + stderr_text = stderr.decode() if stderr else "" + + # Debug logging + logger.debug(f"S3 check: {s3_path}") + logger.debug(f" Return code: {process.returncode}") + logger.debug(f" Output length: {len(stdout_text)} chars") + + if process.returncode != 0: + logger.debug(f" Stderr: {stderr_text}") + return False + + exists = bool(stdout_text.strip()) + logger.debug(f" Result: {'EXISTS' if exists else 'NOT FOUND'}") + + return exists + + except Exception as e: + logger.error(f"Failed to check S3 path {s3_path}: {e}") + return False + + async def sync_from_s3( + self, + s3_path_parts: list[str], + local_path: Path, + dry_run: bool = False, + ) -> dict: + """ + Sync from S3 to local directory. + + Args: + s3_path_parts: Path components relative to s3_prefix + local_path: Local destination directory + dry_run: If True, only simulate the sync + + Returns: + Dict with status, files_synced, output, etc. + + Raises: + S3ClientError: If sync fails + """ + s3_path = self.build_s3_path(*s3_path_parts) + "/" + + # Ensure local directory exists + local_path.mkdir(parents=True, exist_ok=True) + + logger.info(f"Syncing from {s3_path} to {local_path}") + + # Build s5cmd command + cmd = ["s5cmd", "sync"] + + if dry_run: + cmd.append("--dry-run") + + cmd.extend([ + s3_path + "*", + str(local_path) + "/", + ]) + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + stdout_text = stdout.decode() if stdout else "" + stderr_text = stderr.decode() if stderr else "" + + if process.returncode != 0: + error_msg = f"s5cmd sync failed (exit {process.returncode}): {stderr_text}" + logger.error(error_msg) + raise S3ClientError(error_msg) + + # Count files synced + files_synced = len([line for line in stdout_text.strip().split('\n') if line]) + + logger.info(f"Synced {files_synced} files from S3") + + return { + "status": "success", + "s3_path": s3_path, + "local_path": str(local_path), + "files_synced": files_synced, + "dry_run": dry_run, + "output": stdout_text, + } + + except S3ClientError: + raise + except Exception as e: + error_msg = f"Failed to sync from S3: {str(e)}" + logger.error(error_msg, exc_info=True) + raise S3ClientError(error_msg) from e + + async def sync_to_s3( + self, + local_path: Path, + s3_path_parts: list[str], + dry_run: bool = False, + ) -> dict: + """ + Sync from local directory to S3. + + Args: + local_path: Local source directory + s3_path_parts: Path components relative to s3_prefix + dry_run: If True, only simulate the sync + + Returns: + Dict with status, files_synced, output, etc. + + Raises: + S3ClientError: If sync fails or local path doesn't exist + """ + if not local_path.exists(): + return { + "status": "skipped", + "local_path": str(local_path), + "message": "Local path does not exist", + "files_synced": 0, + } + + s3_path = self.build_s3_path(*s3_path_parts) + "/" + + logger.info(f"Syncing from {local_path} to {s3_path}") + + # Build s5cmd command + cmd = ["s5cmd", "sync"] + + if dry_run: + cmd.append("--dry-run") + + cmd.extend([ + str(local_path) + "/*", + s3_path, + ]) + + try: + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + stdout_text = stdout.decode() if stdout else "" + stderr_text = stderr.decode() if stderr else "" + + if process.returncode != 0: + error_msg = f"s5cmd sync failed (exit {process.returncode}): {stderr_text}" + logger.error(error_msg) + raise S3ClientError(error_msg) + + # Count files synced + files_synced = len([line for line in stdout_text.strip().split('\n') if line]) + + logger.info(f"Synced {files_synced} files to S3") + + return { + "status": "success", + "local_path": str(local_path), + "s3_path": s3_path, + "files_synced": files_synced, + "dry_run": dry_run, + "output": stdout_text, + } + + except S3ClientError: + raise + except Exception as e: + error_msg = f"Failed to sync to S3: {str(e)}" + logger.error(error_msg, exc_info=True) + raise S3ClientError(error_msg) from e + + async def list_directories(self, *path_parts: str) -> list[str]: + """ + List subdirectories at given S3 path. + + Args: + *path_parts: Path components relative to s3_prefix + + Returns: + List of directory names (not full paths) + """ + s3_path = self.build_s3_path(*path_parts) + "/" + + try: + process = await asyncio.create_subprocess_exec( + "s5cmd", + "ls", + s3_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + stdout_text = stdout.decode() if stdout else "" + + if not stdout_text.strip(): + logger.debug(f"No directories found at {s3_path}") + return [] + + # Parse s5cmd ls output to extract directory names + directories = [] + for line in stdout_text.strip().split('\n'): + line = line.strip() + if not line: + continue + + if "DIR" in line: + tokens = line.split() + if len(tokens) >= 2: + dir_path = tokens[-1].rstrip('/') + dir_name = dir_path.split('/')[-1] if '/' in dir_path else dir_path + if dir_name: + directories.append(dir_name) + + logger.debug(f"Found {len(directories)} directories at {s3_path}") + return directories + + except Exception as e: + logger.error(f"Failed to list directories: {e}") + return [] diff --git a/backend/core/workspace_sync.py b/backend/core/workspace_sync.py new file mode 100644 index 0000000..8396b28 --- /dev/null +++ b/backend/core/workspace_sync.py @@ -0,0 +1,194 @@ +""" +Workspace synchronization utilities for S3. + +Provides functions to sync user .claude directories from S3 to local filesystem +using s5cmd for high-performance parallel transfers. +""" + +import asyncio +import logging +import os +import shutil +from pathlib import Path +from typing import Optional + +from .s3_client import S3Client, S3ClientError + +logger = logging.getLogger(__name__) + + +class WorkspaceSyncError(Exception): + """Exception raised when workspace sync fails.""" + pass + + +def check_s5cmd_installed() -> bool: + """ + Check if s5cmd is installed and available. + + Returns: + bool: True if s5cmd is installed, False otherwise + """ + return shutil.which("s5cmd") is not None + + +async def check_s3_directory_exists( + bucket_name: str, + s3_prefix: str, +) -> bool: + """ + Check if a directory exists in S3. + + Args: + bucket_name: S3 bucket name + s3_prefix: S3 key prefix to check + + Returns: + bool: True if directory exists and has objects, False otherwise + """ + if not check_s5cmd_installed(): + logger.warning("s5cmd not installed, cannot check S3 directory") + return False + + s3_path = f"s3://{bucket_name}/{s3_prefix}/" + + try: + process = await asyncio.create_subprocess_exec( + "s5cmd", + "--log", "error", + "ls", + s3_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + stdout_text = stdout.decode() if stdout else "" + + return bool(stdout_text.strip()) + + except Exception as e: + logger.error(f"Failed to check S3 directory: {e}") + return False + + +async def sync_claude_dir_from_s3( + user_id: str, + bucket_name: str, + s3_prefix: str = "user_data", + local_home: Optional[str] = None, +) -> dict: + """ + Sync .claude directory from S3 to local ~/.claude for a user. + + Args: + user_id: User ID + bucket_name: S3 bucket name + s3_prefix: S3 key prefix (default: "user_data") + local_home: Local home directory (default: from HOME env var) + + Returns: + dict: Sync result with status, local_path, files_synced, etc. + + Raises: + WorkspaceSyncError: If sync fails + """ + try: + s3_client = S3Client(bucket_name, s3_prefix) + except S3ClientError as e: + raise WorkspaceSyncError(str(e)) from e + + # Get home directory + if local_home is None: + local_home = os.environ.get("HOME", "/root") + + local_claude_dir = Path(local_home) / ".claude" + + # Check if S3 directory exists + s3_exists = await s3_client.check_exists(user_id, ".claude") + s3_path = s3_client.build_s3_path(user_id, ".claude") + "/" + + logger.info(f"Checking if .claude data exists in S3: {s3_path}") + + if not s3_exists: + logger.info(f"No .claude data found in S3 for user {user_id}") + return { + "status": "skipped", + "user_id": user_id, + "s3_path": s3_path, + "local_path": str(local_claude_dir), + "message": "No .claude data found in S3", + "files_synced": 0, + } + + try: + result = await s3_client.sync_from_s3( + [user_id, ".claude"], + local_claude_dir, + ) + + result["user_id"] = user_id + result["message"] = f"Successfully synced {result['files_synced']} files from S3" + + logger.info(f".claude sync completed: {result['files_synced']} files from S3") + return result + + except S3ClientError as e: + error_msg = f"Failed to sync .claude directory: {str(e)}" + logger.error(error_msg) + raise WorkspaceSyncError(error_msg) from e + + +async def backup_claude_dir_to_s3( + user_id: str, + bucket_name: str, + s3_prefix: str = "user_data", + local_home: Optional[str] = None, +) -> dict: + """ + Backup .claude directory from local ~/.claude to S3. + + Args: + user_id: User ID + bucket_name: S3 bucket name + s3_prefix: S3 key prefix (default: "user_data") + local_home: Local home directory (default: from HOME env var) + + Returns: + dict: Backup result with status, s3_path, files_synced, etc. + + Raises: + WorkspaceSyncError: If backup fails + """ + try: + s3_client = S3Client(bucket_name, s3_prefix) + except S3ClientError as e: + raise WorkspaceSyncError(str(e)) from e + + # Get home directory + if local_home is None: + local_home = os.environ.get("HOME", "/root") + + local_claude_dir = Path(local_home) / ".claude" + + try: + result = await s3_client.sync_to_s3( + local_claude_dir, + [user_id, ".claude"], + ) + + if result["status"] == "skipped": + result["user_id"] = user_id + logger.debug(f"No .claude directory found for user {user_id}") + return result + + result["user_id"] = user_id + result["message"] = f"Successfully backed up {result['files_synced']} files to S3" + + logger.info(f".claude backup completed: {result['files_synced']} files to S3") + return result + + except S3ClientError as e: + error_msg = f"Failed to backup .claude directory: {str(e)}" + logger.error(error_msg) + raise WorkspaceSyncError(error_msg) from e diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 7296098..3a0bac1 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -9,6 +9,8 @@ dependencies = [ "claude-agent-sdk>=0.1.19", "fastapi>=0.120.0", "httpx>=0.28.1", + "pyjwt>=2.10.1", + "s5cmd>=0.3.3", "uvicorn>=0.38.0", ] diff --git a/deploy/01_build_and_push.sh b/deploy/01_build_and_push.sh deleted file mode 100755 index 7186911..0000000 --- a/deploy/01_build_and_push.sh +++ /dev/null @@ -1,137 +0,0 @@ -#!/bin/bash -# ============================================================================= -# Slide Forge - Build and Push Docker Image to ECR -# ============================================================================= -# Step 1: Build ARM64 Docker image and push to Amazon ECR -# -# Prerequisites: -# - AWS CLI configured with appropriate permissions -# - Docker installed and running -# - config.env file with required values -# ============================================================================= - -set -e - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" - -# Load configuration -if [ -f "${SCRIPT_DIR}/config.env" ]; then - source "${SCRIPT_DIR}/config.env" -else - echo "Error: config.env not found. Please copy config.env.template to config.env and fill in the values." - exit 1 -fi - -# Colors for output -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -BLUE='\033[0;34m' -NC='\033[0m' # No Color - -echo -e "${GREEN}=======================================${NC}" -echo -e "${GREEN}Step 1: Build and Push Docker Image${NC}" -echo -e "${GREEN}=======================================${NC}" -echo "" - -# Auto-detect AWS Account ID if not set -if [ -z "$AWS_ACCOUNT_ID" ]; then - echo -e "${YELLOW}AWS_ACCOUNT_ID not set in config, detecting...${NC}" - AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) - echo -e "${GREEN}Detected AWS_ACCOUNT_ID: ${AWS_ACCOUNT_ID}${NC}" -fi - -# Auto-detect AWS Region if not set -if [ -z "$AWS_REGION" ]; then - echo -e "${YELLOW}AWS_REGION not set, using default...${NC}" - AWS_REGION=$(aws configure get region) - AWS_REGION=${AWS_REGION:-us-west-2} - echo -e "${GREEN}Using AWS_REGION: ${AWS_REGION}${NC}" -fi - -# Construct ECR URI and image name -ECR_URI="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com" -FULL_IMAGE_NAME="${ECR_URI}/${ECR_REPOSITORY_NAME}:${DOCKER_IMAGE_VERSION}" - -echo "" -echo -e "${BLUE}Configuration:${NC}" -echo " Project Root: ${PROJECT_ROOT}" -echo " ECR Repository: ${ECR_REPOSITORY_NAME}" -echo " Image Version: ${DOCKER_IMAGE_VERSION}" -echo " Full Image URI: ${FULL_IMAGE_NAME}" -echo "" - -# ============================================================================= -# Create ECR Repository if it doesn't exist -# ============================================================================= -echo -e "${YELLOW}Checking if ECR repository exists...${NC}" -if ! aws ecr describe-repositories --region "${AWS_REGION}" --repository-names "${ECR_REPOSITORY_NAME}" > /dev/null 2>&1; then - echo -e "${YELLOW}Creating ECR repository: ${ECR_REPOSITORY_NAME}${NC}" - aws ecr create-repository \ - --region "${AWS_REGION}" \ - --repository-name "${ECR_REPOSITORY_NAME}" \ - --image-scanning-configuration scanOnPush=true \ - --tags \ - Key=Project,Value="${TAG_PROJECT}" \ - Key=Environment,Value="${TAG_ENVIRONMENT}" \ - Key=ManagedBy,Value="${TAG_MANAGED_BY}" \ - > /dev/null - echo -e "${GREEN}[OK]${NC} ECR repository created" -else - echo -e "${GREEN}[OK]${NC} ECR repository already exists" -fi - -# ============================================================================= -# Login to ECR -# ============================================================================= -echo -e "${YELLOW}Logging into ECR...${NC}" -aws ecr get-login-password --region "${AWS_REGION}" | \ - docker login --username AWS --password-stdin "${ECR_URI}" -echo -e "${GREEN}[OK]${NC} Logged into ECR" - -# ============================================================================= -# Build Docker Image for ARM64 -# ============================================================================= -echo "" -echo -e "${YELLOW}Building Docker image for ARM64 architecture...${NC}" -echo " This may take several minutes on first build..." -echo "" - -docker build \ - --platform linux/arm64 \ - -t "${FULL_IMAGE_NAME}" \ - -f "${SCRIPT_DIR}/Dockerfile" \ - "${PROJECT_ROOT}" - -echo -e "${GREEN}[OK]${NC} Docker image built (ARM64)" - -# ============================================================================= -# Push Image to ECR -# ============================================================================= -echo "" -echo -e "${YELLOW}Pushing image to ECR...${NC}" -docker push "${FULL_IMAGE_NAME}" -echo -e "${GREEN}[OK]${NC} Image pushed to ECR" - -# ============================================================================= -# Save Output for Next Step -# ============================================================================= -echo "" -echo -e "${GREEN}=======================================${NC}" -echo -e "${GREEN}Step 1 Complete!${NC}" -echo -e "${GREEN}=======================================${NC}" -echo "" -echo "Image URI: ${FULL_IMAGE_NAME}" -echo "" - -# Save output for next step -cat > "${SCRIPT_DIR}/.build_output" </dev/null || echo "") - - if [ -z "$S3_WORKSPACE_BUCKET" ] || [ "$S3_WORKSPACE_BUCKET" == "None" ]; then - echo -e "${RED}Error: Could not find S3 workspace bucket.${NC}" - echo "Please set S3_WORKSPACE_BUCKET in config.env" - exit 1 - fi -fi - -if aws s3 ls "s3://${S3_WORKSPACE_BUCKET}" &>/dev/null; then - echo -e "${GREEN}[OK]${NC} S3 bucket exists: ${S3_WORKSPACE_BUCKET}" -else - echo -e "${RED}Error: S3 bucket does not exist: ${S3_WORKSPACE_BUCKET}${NC}" - exit 1 -fi - -# ============================================================================= -# Lookup Existing Cognito Configuration from CDK Stack -# ============================================================================= -echo "" -echo -e "${YELLOW}Checking Cognito configuration...${NC}" - -STACK_NAME="${SLIDE_FORGE_STACK_NAME:-slide-forge}" - -# Function to lookup CloudFormation export -lookup_cfn_export() { - local export_name="$1" - aws cloudformation list-exports \ - --region "${AWS_REGION}" \ - --query "Exports[?Name=='${export_name}'].Value | [0]" \ - --output text 2>/dev/null || echo "" -} - -# Lookup Cognito User Pool ID if not provided -if [ -z "$COGNITO_USER_POOL_ID" ]; then - echo -e "${YELLOW}Looking up Cognito User Pool ID from CDK stack...${NC}" - - # Try CloudFormation export first - COGNITO_USER_POOL_ID=$(lookup_cfn_export "${STACK_NAME}-cognito-user-pool-id") - - # Fallback: try alternative export name - if [ -z "$COGNITO_USER_POOL_ID" ] || [ "$COGNITO_USER_POOL_ID" == "None" ]; then - COGNITO_USER_POOL_ID=$(lookup_cfn_export "${STACK_NAME}-user-pool-id") - fi - - # Fallback: try stack outputs directly - if [ -z "$COGNITO_USER_POOL_ID" ] || [ "$COGNITO_USER_POOL_ID" == "None" ]; then - COGNITO_USER_POOL_ID=$(aws cloudformation describe-stacks \ - --stack-name "${STACK_NAME}" \ - --region "${AWS_REGION}" \ - --query "Stacks[0].Outputs[?contains(OutputKey, 'UserPoolId') || contains(OutputKey, 'CognitoUserPoolId')].OutputValue | [0]" \ - --output text 2>/dev/null || echo "") - fi - - if [ -z "$COGNITO_USER_POOL_ID" ] || [ "$COGNITO_USER_POOL_ID" == "None" ]; then - echo -e "${RED}Error: Could not find Cognito User Pool ID.${NC}" - echo "Please set COGNITO_USER_POOL_ID in config.env" - exit 1 - fi -fi - -# Lookup Cognito Client ID if not provided -if [ -z "$COGNITO_CLIENT_ID" ]; then - echo -e "${YELLOW}Looking up Cognito Client ID from CDK stack...${NC}" - - # Try CloudFormation export first - COGNITO_CLIENT_ID=$(lookup_cfn_export "${STACK_NAME}-cognito-client-id") - - # Fallback: try stack outputs directly - if [ -z "$COGNITO_CLIENT_ID" ] || [ "$COGNITO_CLIENT_ID" == "None" ]; then - COGNITO_CLIENT_ID=$(aws cloudformation describe-stacks \ - --stack-name "${STACK_NAME}" \ - --region "${AWS_REGION}" \ - --query "Stacks[0].Outputs[?contains(OutputKey, 'ClientId') || contains(OutputKey, 'CognitoClientId')].OutputValue | [0]" \ - --output text 2>/dev/null || echo "") - fi - - if [ -z "$COGNITO_CLIENT_ID" ] || [ "$COGNITO_CLIENT_ID" == "None" ]; then - echo -e "${RED}Error: Could not find Cognito Client ID.${NC}" - echo "Please set COGNITO_CLIENT_ID in config.env" - exit 1 - fi -fi - -# Set Cognito region (usually same as AWS_REGION) -COGNITO_REGION="${COGNITO_REGION:-${AWS_REGION}}" - -# Construct discovery URL -COGNITO_DISCOVERY_URL="https://cognito-idp.${COGNITO_REGION}.amazonaws.com/${COGNITO_USER_POOL_ID}/.well-known/openid-configuration" - -echo -e "${GREEN}[OK]${NC} Using existing Cognito from slide-forge stack" -echo " User Pool ID: ${COGNITO_USER_POOL_ID}" -echo " Client ID: ${COGNITO_CLIENT_ID}" -echo " Discovery URL: ${COGNITO_DISCOVERY_URL}" - -# ============================================================================= -# Create IAM Execution Role -# ============================================================================= -echo "" -echo -e "${YELLOW}Checking IAM execution role...${NC}" - -FULL_ROLE_NAME="${IAM_ROLE_NAME}-${AWS_REGION}-${DEPLOYMENT_ENV:-prod}" - -if aws iam get-role --role-name "${FULL_ROLE_NAME}" &>/dev/null; then - ROLE_ARN=$(aws iam get-role --role-name "${FULL_ROLE_NAME}" --query 'Role.Arn' --output text) - echo -e "${GREEN}[OK]${NC} IAM role already exists: ${ROLE_ARN}" -else - echo -e "${YELLOW}Creating IAM role: ${FULL_ROLE_NAME}${NC}" - - # Create trust policy for AgentCore - cat > /tmp/trust-policy.json < /tmp/role-policy.json </dev/null || echo "") - -# Prepare environment variables -ENV_VARS="AWS_DEFAULT_REGION=${AWS_REGION}" -[ -n "${ANTHROPIC_MODEL}" ] && ENV_VARS="${ENV_VARS},ANTHROPIC_MODEL=${ANTHROPIC_MODEL}" -[ -n "${ANTHROPIC_SMALL_FAST_MODEL}" ] && ENV_VARS="${ENV_VARS},ANTHROPIC_SMALL_FAST_MODEL=${ANTHROPIC_SMALL_FAST_MODEL}" -[ -n "${ANTHROPIC_DEFAULT_HAIKU_MODEL}" ] && ENV_VARS="${ENV_VARS},ANTHROPIC_DEFAULT_HAIKU_MODEL=${ANTHROPIC_DEFAULT_HAIKU_MODEL}" -[ -n "${DISABLE_PROMPT_CACHING}" ] && ENV_VARS="${ENV_VARS},DISABLE_PROMPT_CACHING=${DISABLE_PROMPT_CACHING}" -[ -n "${CLAUDE_CODE_USE_BEDROCK}" ] && ENV_VARS="${ENV_VARS},CLAUDE_CODE_USE_BEDROCK=${CLAUDE_CODE_USE_BEDROCK}" -ENV_VARS="${ENV_VARS},S3_WORKSPACE_BUCKET=${S3_WORKSPACE_BUCKET}" - -# Prepare authorizer configuration (using existing Cognito) -AUTHORIZER_CONFIG="customJWTAuthorizer={discoveryUrl=${COGNITO_DISCOVERY_URL},allowedClients=[${COGNITO_CLIENT_ID}]}" - -if [ -n "$EXISTING_RUNTIME" ]; then - echo -e "${YELLOW}Updating existing AgentCore Runtime: ${EXISTING_RUNTIME}${NC}" - - aws bedrock-agentcore-control update-agent-runtime \ - --agent-runtime-id "${EXISTING_RUNTIME}" \ - --region "${AWS_REGION}" \ - --agent-runtime-artifact "containerConfiguration={containerUri=${DOCKER_IMAGE_URI}}" \ - --network-configuration "networkMode=PUBLIC" \ - --role-arn "${ROLE_ARN}" \ - --request-header-configuration "requestHeaderAllowlist=[Authorization]" \ - --environment-variables "${ENV_VARS}" \ - --authorizer-configuration "${AUTHORIZER_CONFIG}" \ - --output json > /tmp/runtime-output.json - - RUNTIME_ID="${EXISTING_RUNTIME}" - echo -e "${GREEN}[OK]${NC} AgentCore Runtime updated" -else - echo -e "${YELLOW}Creating new AgentCore Runtime: ${RUNTIME_NAME}${NC}" - - aws bedrock-agentcore-control create-agent-runtime \ - --agent-runtime-name "${RUNTIME_NAME}" \ - --region "${AWS_REGION}" \ - --agent-runtime-artifact "containerConfiguration={containerUri=${DOCKER_IMAGE_URI}}" \ - --network-configuration "networkMode=PUBLIC" \ - --role-arn "${ROLE_ARN}" \ - --request-header-configuration "requestHeaderAllowlist=[Authorization]" \ - --environment-variables "${ENV_VARS}" \ - --authorizer-configuration "${AUTHORIZER_CONFIG}" \ - --output json > /tmp/runtime-output.json - - RUNTIME_ID=$(jq -r '.agentRuntimeId' /tmp/runtime-output.json) - echo -e "${GREEN}[OK]${NC} AgentCore Runtime created: ${RUNTIME_ID}" -fi - -# Extract runtime details -RUNTIME_ARN=$(jq -r '.agentRuntimeArn' /tmp/runtime-output.json) -WORKLOAD_IDENTITY_ARN=$(jq -r '.workloadIdentityDetails.workloadIdentityArn // "N/A"' /tmp/runtime-output.json) -STATUS=$(jq -r '.status' /tmp/runtime-output.json) - -# Construct Runtime URL -ENCODED_ARN=$(echo "${RUNTIME_ARN}" | sed 's/:/%3A/g' | sed 's/\//%2F/g') -RUNTIME_URL="https://bedrock-agentcore.${AWS_REGION}.amazonaws.com/runtimes/${ENCODED_ARN}" - -rm /tmp/runtime-output.json - -# ============================================================================= -# Save Outputs -# ============================================================================= -cat > "${SCRIPT_DIR}/.agentcore_output" < Date: Fri, 6 Feb 2026 02:57:25 +0000 Subject: [PATCH 3/4] feat: add slide-theme-generator skill and enable project-level skills - Create .claude/skills/slide-theme-generator/SKILL.md: generates cohesive CSS custom-property themes from keywords or brand colors - Update setting_sources from ["user"] to ["user", "project"] so the SDK discovers skills in both ~/.claude/skills/ and {cwd}/.claude/skills/ Co-Authored-By: Claude Opus 4.6 --- .claude/skills/slide-theme-generator/SKILL.md | 53 +++++++++++++++++++ backend/core/session.py | 2 +- 2 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 .claude/skills/slide-theme-generator/SKILL.md diff --git a/.claude/skills/slide-theme-generator/SKILL.md b/.claude/skills/slide-theme-generator/SKILL.md new file mode 100644 index 0000000..b90add2 --- /dev/null +++ b/.claude/skills/slide-theme-generator/SKILL.md @@ -0,0 +1,53 @@ +--- +name: slide-theme-generator +description: Generate a cohesive slide theme (color palette, typography, layout tokens) from a user-provided keyword, brand color, or mood. Outputs a reusable CSS variable block that can be injected into every slide. +--- + +# Slide Theme Generator + +When the user asks to create a presentation theme, set a color scheme, or define a visual style, generate a CSS custom-properties block that all subsequent slides can reference. + +## Input + +Accept any of the following: +- A keyword or mood (e.g. "corporate", "playful", "dark tech") +- A hex brand color (e.g. "#FF6B00") +- A reference image description + +## Output Format + +Return a single `