diff --git a/.gitignore b/.gitignore index 2d3e391089..d303a8e56b 100644 --- a/.gitignore +++ b/.gitignore @@ -175,3 +175,6 @@ OPUS_ANALYSIS_AND_IDEAS.md /shared_docs logs/security/ Agents.md +desktop.env +auto-claude-desktop.sh +images/ diff --git a/apps/backend/cli/batch_commands.py b/apps/backend/cli/batch_commands.py index 68ed33536b..5282c86bfd 100644 --- a/apps/backend/cli/batch_commands.py +++ b/apps/backend/cli/batch_commands.py @@ -6,6 +6,7 @@ """ import json +import re import shutil import subprocess from pathlib import Path @@ -60,8 +61,19 @@ def handle_batch_create_command(batch_file: str, project_dir: str) -> bool: for idx, task in enumerate(tasks, 1): spec_id = f"{next_id:03d}" task_title = task.get("title", f"Task {idx}") - task_slug = task_title.lower().replace(" ", "-")[:50] - spec_name = f"{spec_id}-{task_slug}" + + # Extract category tag like [sec-001] from title if present + tag_match = re.match(r"^\[(\w+-\d+)\]\s*(.*)", task_title) + if tag_match: + tag = tag_match.group(1) # e.g. "sec-001" + title_rest = tag_match.group(2) # e.g. "Remove hardcoded API key..." + title_slug = re.sub(r"[^\w\-]", "-", title_rest.lower()) + title_slug = re.sub(r"-+", "-", title_slug).strip("-")[:50] + spec_name = f"{spec_id}-[{tag}]-{title_slug}" + else: + task_slug = re.sub(r"[^\w\-]", "-", task_title.lower()) + task_slug = re.sub(r"-+", "-", task_slug).strip("-")[:50] + spec_name = f"{spec_id}-{task_slug}" spec_dir = specs_dir / spec_name spec_dir.mkdir(exist_ok=True) diff --git a/apps/backend/cli/main.py b/apps/backend/cli/main.py index dc1f6a9c32..112c451aec 100644 --- a/apps/backend/cli/main.py +++ b/apps/backend/cli/main.py @@ -285,6 +285,10 @@ def parse_args() -> argparse.Namespace: def main() -> None: """Main CLI entry point.""" + # Preflight self-healing checks (token refresh, ollama, stale locks) + from preflight_hook import run_preflight + run_preflight() + # Set up environment first setup_environment() diff --git a/apps/backend/core/worktree.py b/apps/backend/core/worktree.py index f8cbbfb965..d8fb25e7c4 100644 --- a/apps/backend/core/worktree.py +++ b/apps/backend/core/worktree.py @@ -344,7 +344,11 @@ def get_worktree_path(self, spec_name: str) -> Path: def get_branch_name(self, spec_name: str) -> str: """Get the branch name for a spec.""" - return f"auto-claude/{spec_name}" + # Sanitize spec_name: remove characters invalid in git branch names + sanitized = re.sub(r'[\[\]~^:?*\\{}]', '', spec_name) + # Collapse repeated dashes from removal + sanitized = re.sub(r'-{2,}', '-', sanitized) + return f"auto-claude/{sanitized}" def worktree_exists(self, spec_name: str) -> bool: """Check if a worktree exists for a spec.""" diff --git a/apps/backend/preflight.py b/apps/backend/preflight.py new file mode 100644 index 0000000000..084f101f77 --- /dev/null +++ b/apps/backend/preflight.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +"""Auto-Claude Preflight Check & Self-Healing Script. + +Run before any Auto-Claude command to detect and fix common issues. +Usage: python preflight.py [--fix] +""" +import json +import os +import sys +import subprocess +import time +from pathlib import Path + +BACKEND_DIR = Path(__file__).parent +ENV_FILE = BACKEND_DIR / ".env" +VENV_PYTHON = BACKEND_DIR / ".venv" / "bin" / "python" + +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +RESET = "\033[0m" + +def ok(msg): + print(f" {GREEN}✓{RESET} {msg}") + +def warn(msg): + print(f" {YELLOW}⚠{RESET} {msg}") + +def fail(msg): + print(f" {RED}✗{RESET} {msg}") + +def info(msg): + print(f" {BLUE}ℹ{RESET} {msg}") + + +class PreflightCheck: + def __init__(self, auto_fix=False): + self.auto_fix = auto_fix + self.issues = [] + self.fixed = [] + + def check_env_file(self): + """Verify .env exists and has required fields.""" + print(f"\n{BLUE}[1/6] Checking .env configuration{RESET}") + if not ENV_FILE.exists(): + fail(".env file not found") + self.issues.append("missing_env") + return + + env = {} + with open(ENV_FILE) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + key, _, val = line.partition("=") + env[key.strip()] = val.strip() + + # Check OAuth token + token = env.get("CLAUDE_CODE_OAUTH_TOKEN", "") + if not token or token.startswith("sk-ant-oat01-cnqsmZU"): + fail("OAuth token missing or known-expired") + self.issues.append("expired_token") + if self.auto_fix: + self._fix_token(env) + else: + ok(f"OAuth token present ({token[:20]}...)") + + # Check Ollama URL + ollama_url = env.get("OLLAMA_BASE_URL", "") + if not ollama_url: + fail("OLLAMA_BASE_URL not set") + self.issues.append("missing_ollama_url") + else: + ok(f"Ollama URL: {ollama_url}") + + # Check required providers + for key in ["GRAPHITI_LLM_PROVIDER", "GRAPHITI_EMBEDDER_PROVIDER"]: + if key in env: + ok(f"{key}={env[key]}") + else: + warn(f"{key} not set") + + def _fix_token(self, env): + """Auto-fix expired OAuth token from ~/.claude/.credentials.json.""" + creds_file = Path.home() / ".claude" / ".credentials.json" + if not creds_file.exists(): + fail("Cannot auto-fix: ~/.claude/.credentials.json not found") + return + + try: + with open(creds_file) as f: + creds = json.load(f) + new_token = creds.get("claudeAiOauth", {}).get("accessToken", "") + if not new_token: + fail("No access token in credentials file") + return + + # Read and update .env + content = ENV_FILE.read_text() + old_token = env.get("CLAUDE_CODE_OAUTH_TOKEN", "") + if old_token: + content = content.replace(old_token, new_token) + else: + content = f"CLAUDE_CODE_OAUTH_TOKEN={new_token}\n" + content + ENV_FILE.write_text(content) + ok(f"Token auto-fixed from ~/.claude/.credentials.json ({new_token[:20]}...)") + self.fixed.append("expired_token") + except Exception as e: + fail(f"Auto-fix failed: {e}") + + def check_ollama(self): + """Verify Ollama is reachable and models are available.""" + print(f"\n{BLUE}[2/6] Checking Ollama connectivity{RESET}") + + # Read URL from .env + ollama_url = "http://192.168.0.234:11434" + if ENV_FILE.exists(): + with open(ENV_FILE) as f: + for line in f: + if line.strip().startswith("OLLAMA_BASE_URL="): + ollama_url = line.strip().split("=", 1)[1] + + try: + result = subprocess.run( + ["curl", "-s", "-m", "5", f"{ollama_url}/api/tags"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode != 0: + fail(f"Ollama unreachable at {ollama_url}") + self.issues.append("ollama_unreachable") + return + + data = json.loads(result.stdout) + models = [m["name"] for m in data.get("models", [])] + ok(f"Ollama responding ({len(models)} models)") + + # Check required models + required = ["qwen2.5-coder:14b", "nomic-embed-text"] + for model in required: + found = any(model in m for m in models) + if found: + ok(f"Model available: {model}") + else: + fail(f"Model missing: {model}") + self.issues.append(f"missing_model_{model}") + except Exception as e: + fail(f"Ollama check failed: {e}") + self.issues.append("ollama_error") + + def check_venv(self): + """Verify Python venv and dependencies.""" + print(f"\n{BLUE}[3/6] Checking Python environment{RESET}") + if not VENV_PYTHON.exists(): + fail(f"venv not found at {VENV_PYTHON}") + self.issues.append("missing_venv") + if self.auto_fix: + info("Run: cd apps/backend && python3 -m venv .venv && pip install -r requirements.txt") + return + + ok(f"venv exists at {VENV_PYTHON}") + + # Check key imports + try: + result = subprocess.run( + [str(VENV_PYTHON), "-c", "from core.client import create_client; print('OK')"], + capture_output=True, text=True, timeout=10, cwd=str(BACKEND_DIR) + ) + if "OK" in result.stdout: + ok("Core imports working") + else: + fail(f"Import error: {result.stderr[:100]}") + self.issues.append("import_error") + except Exception as e: + fail(f"venv check failed: {e}") + + def check_stuck_specs(self): + """Find and optionally clear stuck specs/locks.""" + print(f"\n{BLUE}[4/6] Checking for stuck specs/locks{RESET}") + + # Check common project locations + project_dirs = [ + Path.home() / "projects", + Path("/aidata/projects"), + ] + + stuck_count = 0 + for pdir in project_dirs: + if not pdir.exists(): + continue + for spec_dir in pdir.glob("*/.auto-claude/specs/*/.state"): + stuck_count += 1 + warn(f"State cache: {spec_dir}") + + for lock_file in pdir.glob("*/.auto-claude/specs/*/.lock"): + stuck_count += 1 + warn(f"Lock file: {lock_file}") + if self.auto_fix: + lock_file.unlink() + ok(f"Removed stale lock: {lock_file}") + self.fixed.append(f"lock_{lock_file.name}") + + if stuck_count == 0: + ok("No stuck specs or locks found") + else: + info(f"Found {stuck_count} items (use --fix to clean)") + + def check_node(self): + """Verify Node.js version for Claude Code.""" + print(f"\n{BLUE}[5/6] Checking Node.js{RESET}") + try: + result = subprocess.run( + ["node", "--version"], capture_output=True, text=True, timeout=5 + ) + version = result.stdout.strip() + major = int(version.lstrip("v").split(".")[0]) + if major >= 24: + ok(f"Node.js {version}") + else: + warn(f"Node.js {version} - Auto-Claude needs v24+") + self.issues.append("old_node") + except Exception: + warn("Node.js not found in PATH") + + def check_git_status(self): + """Check for uncommitted Auto-Claude changes in projects.""" + print(f"\n{BLUE}[6/6] Checking git status{RESET}") + try: + result = subprocess.run( + ["git", "status", "--porcelain"], capture_output=True, text=True, + timeout=5, cwd=str(BACKEND_DIR) + ) + if result.stdout.strip(): + lines = result.stdout.strip().split("\n") + warn(f"Auto-Claude repo has {len(lines)} uncommitted changes") + else: + ok("Auto-Claude repo clean") + except Exception: + warn("Could not check git status") + + def run(self): + print(f"\n{'='*60}") + print(f" Auto-Claude Preflight Check {'(+ Auto-Fix)' if self.auto_fix else ''}") + print(f"{'='*60}") + + self.check_env_file() + self.check_ollama() + self.check_venv() + self.check_stuck_specs() + self.check_node() + self.check_git_status() + + # Summary + print(f"\n{'='*60}") + if not self.issues: + print(f" {GREEN}All checks passed! Auto-Claude is ready.{RESET}") + else: + print(f" {YELLOW}{len(self.issues)} issue(s) found", end="") + if self.fixed: + print(f", {len(self.fixed)} auto-fixed", end="") + print(f"{RESET}") + remaining = [i for i in self.issues if i not in self.fixed] + if remaining: + print(f" {RED}Remaining: {', '.join(remaining)}{RESET}") + if not self.auto_fix: + print(f"\n Run with --fix to attempt auto-repair") + print(f"{'='*60}\n") + + return len(self.issues) - len(self.fixed) == 0 + + +if __name__ == "__main__": + auto_fix = "--fix" in sys.argv + checker = PreflightCheck(auto_fix=auto_fix) + success = checker.run() + sys.exit(0 if success else 1) diff --git a/apps/backend/preflight_hook.py b/apps/backend/preflight_hook.py new file mode 100644 index 0000000000..64407f7fe7 --- /dev/null +++ b/apps/backend/preflight_hook.py @@ -0,0 +1,169 @@ +"""Auto-Claude Preflight Hook. + +Lightweight wrapper around preflight.py that runs checks before any runner. +Designed to be called once at the start of main() in each runner. + +Usage in runners: + from preflight_hook import run_preflight + run_preflight() # Returns True if OK, exits with message if critical failure + +Checks: + - OAuth token present and not known-expired (auto-fixes from ~/.claude/.credentials.json) + - Ollama reachable (warns but continues if down - only needed for local LLM tasks) + - .env file exists + +Skips checks if: + - SKIP_PREFLIGHT=1 env var is set (for CI/testing) + - Already ran this session (deduplication) +""" + +import json +import os +import subprocess +import sys +from pathlib import Path + +_PREFLIGHT_RAN = False +BACKEND_DIR = Path(__file__).parent +ENV_FILE = BACKEND_DIR / ".env" + + +def _load_env_vars() -> dict: + """Read .env file into dict without importing dotenv.""" + env = {} + if ENV_FILE.exists(): + with open(ENV_FILE) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + key, _, val = line.partition("=") + env[key.strip()] = val.strip() + return env + + +def _check_and_fix_token(env: dict) -> bool: + """Check OAuth token, auto-fix from credentials if expired. Returns True if OK.""" + token = env.get("CLAUDE_CODE_OAUTH_TOKEN", "") + if not token: + # Try auto-fix + return _auto_fix_token() + + # Check for known-expired tokens (add patterns as discovered) + known_expired = ["sk-ant-oat01-cnqsmZU"] + for expired in known_expired: + if token.startswith(expired): + print("[preflight] OAuth token is known-expired, attempting auto-fix...") + return _auto_fix_token() + + return True + + +def _auto_fix_token() -> bool: + """Pull fresh token from ~/.claude/.credentials.json and update .env.""" + creds_file = Path.home() / ".claude" / ".credentials.json" + if not creds_file.exists(): + print("[preflight] ERROR: No OAuth token and ~/.claude/.credentials.json not found") + print("[preflight] Run 'claude /login' to authenticate, then try again") + return False + + try: + with open(creds_file) as f: + creds = json.load(f) + new_token = creds.get("claudeAiOauth", {}).get("accessToken", "") + if not new_token: + print("[preflight] ERROR: No access token in credentials file") + return False + + # Update .env + content = ENV_FILE.read_text() + # Find and replace existing token line + lines = content.split("\n") + updated = False + for i, line in enumerate(lines): + if line.strip().startswith("CLAUDE_CODE_OAUTH_TOKEN="): + lines[i] = f"CLAUDE_CODE_OAUTH_TOKEN={new_token}" + updated = True + break + if not updated: + lines.insert(0, f"CLAUDE_CODE_OAUTH_TOKEN={new_token}") + + ENV_FILE.write_text("\n".join(lines)) + os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = new_token + print(f"[preflight] Token auto-fixed from ~/.claude/.credentials.json ({new_token[:20]}...)") + return True + except Exception as e: + print(f"[preflight] Token auto-fix failed: {e}") + return False + + +def _check_ollama(env: dict) -> bool: + """Check Ollama connectivity. Warns but doesn't fail (not always needed).""" + ollama_url = env.get("OLLAMA_BASE_URL", "http://192.168.0.234:11434") + try: + result = subprocess.run( + ["curl", "-s", "-m", "3", f"{ollama_url}/api/tags"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode != 0: + print(f"[preflight] WARNING: Ollama unreachable at {ollama_url}") + print("[preflight] Local LLM tasks (embeddings, complexity) may fail") + return True # Warn but don't block + return True + except Exception: + print(f"[preflight] WARNING: Could not reach Ollama at {ollama_url}") + return True # Warn but don't block + + +def _check_stale_locks() -> None: + """Remove stale .lock files from spec directories.""" + project_dirs = [Path.home() / "projects", Path("/aidata/projects")] + for pdir in project_dirs: + if not pdir.exists(): + continue + for lock_file in pdir.glob("*/.auto-claude/specs/*/.lock"): + try: + # Only remove locks older than 1 hour + import time + age = time.time() - lock_file.stat().st_mtime + if age > 3600: + lock_file.unlink() + print(f"[preflight] Removed stale lock: {lock_file}") + except Exception: + pass + + +def run_preflight() -> bool: + """Run preflight checks. Call at the start of each runner's main(). + + Returns True if all critical checks pass. + Exits with error message if critical checks fail. + """ + global _PREFLIGHT_RAN + + # Skip if already ran this process, or explicitly disabled + if _PREFLIGHT_RAN: + return True + if os.environ.get("SKIP_PREFLIGHT") == "1": + return True + + _PREFLIGHT_RAN = True + + # Check .env exists + if not ENV_FILE.exists(): + print("[preflight] ERROR: .env file not found at", ENV_FILE) + print("[preflight] Copy .env.example to .env and configure it") + sys.exit(1) + + env = _load_env_vars() + + # Critical: OAuth token + if not _check_and_fix_token(env): + sys.exit(1) + + # Non-critical: Ollama + _check_ollama(env) + + # Non-critical: Stale locks + _check_stale_locks() + + return True diff --git a/apps/backend/runners/ideation_runner.py b/apps/backend/runners/ideation_runner.py index 1ec3412aaf..a18e370687 100644 --- a/apps/backend/runners/ideation_runner.py +++ b/apps/backend/runners/ideation_runner.py @@ -62,6 +62,9 @@ def main(): """CLI entry point.""" + from preflight_hook import run_preflight + run_preflight() + import argparse parser = argparse.ArgumentParser( diff --git a/apps/backend/runners/insights_runner.py b/apps/backend/runners/insights_runner.py index 891a4d84ed..46b57d643f 100644 --- a/apps/backend/runners/insights_runner.py +++ b/apps/backend/runners/insights_runner.py @@ -338,6 +338,9 @@ def run_simple(project_dir: str, message: str, history: list) -> None: def main(): + from preflight_hook import run_preflight + run_preflight() + parser = argparse.ArgumentParser(description="Insights AI Chat Runner") parser.add_argument("--project-dir", required=True, help="Project directory path") parser.add_argument("--message", required=True, help="User message") diff --git a/apps/backend/runners/roadmap_runner.py b/apps/backend/runners/roadmap_runner.py index 185dcc5f76..40cb05d20f 100644 --- a/apps/backend/runners/roadmap_runner.py +++ b/apps/backend/runners/roadmap_runner.py @@ -45,6 +45,9 @@ def main(): """CLI entry point.""" + from preflight_hook import run_preflight + run_preflight() + import argparse parser = argparse.ArgumentParser( diff --git a/apps/backend/runners/spec_runner.py b/apps/backend/runners/spec_runner.py index 70d6e755d7..314d764de3 100644 --- a/apps/backend/runners/spec_runner.py +++ b/apps/backend/runners/spec_runner.py @@ -116,6 +116,9 @@ def main(): """CLI entry point.""" + from preflight_hook import run_preflight + run_preflight() + debug_section("spec_runner", "Spec Runner CLI") import argparse diff --git a/apps/frontend/src/renderer/components/github-issues/utils/github-error-parser.ts b/apps/frontend/src/renderer/components/github-issues/utils/github-error-parser.ts index f0521773e1..0c7e26ab92 100644 --- a/apps/frontend/src/renderer/components/github-issues/utils/github-error-parser.ts +++ b/apps/frontend/src/renderer/components/github-issues/utils/github-error-parser.ts @@ -138,7 +138,7 @@ function extractRateLimitResetTime(error: string): Date | undefined { } // Then try absolute timestamp pattern - const absolutePattern = /(?:reset[s]?\s*at[:\s]*|X-RateLimit-Reset[:\s]*)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z?|\d+)/i; + const absolutePattern = /(?:reset[s]?\s*at[:\s]*|X-RateLimit-Reset[:\s]*)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?|\d+)/i; const match = error.match(absolutePattern); if (!match) { return undefined; diff --git a/package-lock.json b/package-lock.json index 2c05a728ed..d84b210bb8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "auto-claude", - "version": "2.7.6-beta.3", + "version": "2.7.6-beta.5", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "auto-claude", - "version": "2.7.6-beta.3", + "version": "2.7.6-beta.5", "license": "AGPL-3.0", "workspaces": [ "apps/*", @@ -25,7 +25,7 @@ }, "apps/frontend": { "name": "auto-claude-ui", - "version": "2.7.6-beta.3", + "version": "2.7.6-beta.5", "hasInstallScript": true, "license": "AGPL-3.0", "dependencies": { diff --git a/scripts/REFERENCE.md b/scripts/REFERENCE.md new file mode 100644 index 0000000000..6a2f793c61 --- /dev/null +++ b/scripts/REFERENCE.md @@ -0,0 +1,142 @@ +# Auto-Claude Quick Reference + +## Aliases (add to ~/.zshrc) + +```bash +alias ac-batch="python ~/projects/Auto-Claude/scripts/ac-batch.py" +alias ac-phase="python ~/projects/Auto-Claude/scripts/ac-phase.py" +``` + +## The Two Main Commands + +| Command | Purpose | +|---------|---------| +| `ac-batch` | Interactive menu — discover, create specs, build | +| `ac-phase` | Execute specs in dependency-aware phases | + +--- + +## Workflow 1: New Project / Full Analysis + +```bash +cd ~/projects/my-app + +# All-in-one interactive menu +ac-batch + +# Or step by step: +ac-batch --insights # 1. Ask questions, understand the codebase +ac-batch --ideation # 2. Brainstorm improvements, bugs, security +ac-batch --roadmap # 3. Prioritize into strategic plan +ac-batch --discover # 4. Pick ideas → create specs +ac-phase # 5. Build specs in phased order +``` + +## Workflow 2: Quick Single Task + +```bash +~/auto-claude.sh spec ~/projects/my-app --task "Add dark mode support" +~/auto-claude.sh run ~/projects/my-app --spec 001 +``` + +## Workflow 3: Bug Fix / Issue Response + +```bash +ac-batch --insights "What does this auth code do? Why might it break?" +~/auto-claude.sh github ~/projects/my-app triage +~/auto-claude.sh github ~/projects/my-app auto-fix 456 +``` + +## Workflow 4: Ongoing Maintenance + +```bash +ac-batch --ideation # Weekly scan +ac-batch --roadmap # Refresh priorities +ac-batch --discover # Create specs from findings +ac-phase # Build in order +~/auto-claude.sh worktrees ~/projects/my-app --cleanup +``` + +--- + +## ac-batch CLI Reference + +``` +Discover: + ac-batch --insights Interactive codebase Q&A + ac-batch --insights "question" One-shot question + ac-batch --ideation Brainstorm improvements + ac-batch --roadmap Strategic feature roadmap + +Create: + ac-batch --discover Create specs from discovery outputs + ac-batch --create tasks.json Create specs from JSON file + +Build: + ac-batch --build Build all pending specs + ac-batch --build --spec 003 Build specific spec + ac-batch --build --qa Build all + run QA + ac-batch --qa QA all built specs + +Manage: + ac-batch --status Show all spec statuses + ac-batch --cleanup Preview cleanup + ac-batch --cleanup --confirm Delete completed specs +``` + +## ac-phase CLI Reference + +``` + ac-phase Interactive menu + ac-phase --status Show phase progress + ac-phase --run Run next pending phase + ac-phase --run --phase 2 Run specific phase + ac-phase --run --all Run all remaining phases + ac-phase --init Regenerate phases from specs +``` + +## auto-claude.sh Reference (individual commands) + +``` + ~/auto-claude.sh ideation Brainstorm improvements + ~/auto-claude.sh roadmap Create implementation roadmap + ~/auto-claude.sh insights "question" Ask about the codebase + ~/auto-claude.sh spec --task "..." Create spec for a task + ~/auto-claude.sh run --spec 001 Execute a spec build + ~/auto-claude.sh github review-pr 42 Review a PR + ~/auto-claude.sh github triage Triage issues + ~/auto-claude.sh github auto-fix 456 Auto-fix an issue + ~/auto-claude.sh list List specs + ~/auto-claude.sh worktrees --cleanup Clean up worktrees + ~/auto-claude.sh ui Start desktop app +``` + +## Insights Example Questions + +```bash +# Architecture +ac-batch --insights "What is the overall architecture?" +ac-batch --insights "How does authentication work?" + +# Before changes +ac-batch --insights "What would I need to change to add a new feature?" +ac-batch --insights "What API endpoints exist?" + +# Risk assessment +ac-batch --insights "Are there any security concerns or hardcoded credentials?" +ac-batch --insights "What are the biggest technical debt items?" + +# Production readiness +ac-batch --insights "What features are missing for production readiness?" +ac-batch --insights "What error handling patterns are used?" +``` + +--- + +## Quick Mental Model + +``` +ac-batch = WHAT to build (discover → create → build) +ac-phase = HOW to build it (phased execution with review gates) +auto-claude.sh = individual commands for one-off tasks +``` diff --git a/scripts/ac-batch.py b/scripts/ac-batch.py new file mode 100755 index 0000000000..264ed380c5 --- /dev/null +++ b/scripts/ac-batch.py @@ -0,0 +1,868 @@ +#!/usr/bin/env python3 +""" +ac-batch — Interactive batch task manager for Auto-Claude +========================================================== + +Create, manage, build, and track batches of specs from discovery +outputs (ideation, roadmap, insights) or manual batch JSON files. + +Usage: + ac-batch # Interactive menu (auto-detects project) + ac-batch --insights # Interactive codebase Q&A chat + ac-batch --insights "question" # One-shot codebase question + ac-batch --ideation # Brainstorm improvements and features + ac-batch --roadmap # Generate strategic feature roadmap + ac-batch --status # Show all spec statuses + ac-batch --create tasks.json # Create specs from batch JSON file + ac-batch --discover # Create batch from discovery outputs + ac-batch --build # Build all pending specs sequentially + ac-batch --build --spec 016 # Build a specific spec + ac-batch --qa # QA all built specs + ac-batch --cleanup # Show what would be cleaned up + ac-batch --cleanup --confirm # Actually delete completed specs +""" + +import argparse +import json +import os +import re +import signal +import subprocess +import sys +import time +from collections import defaultdict +from pathlib import Path + + +# --------------------------------------------------------------------------- +# Graceful exit on Ctrl+C +# --------------------------------------------------------------------------- + +def _handle_sigint(sig, frame): + print(f"\n\n Interrupted. Progress saved — run ac-batch again to resume.") + sys.exit(0) + +signal.signal(signal.SIGINT, _handle_sigint) + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +AUTO_CLAUDE_ROOT = Path(__file__).resolve().parent.parent +RUN_PY = AUTO_CLAUDE_ROOT / "apps" / "backend" / "run.py" +SPEC_RUNNER_PY = AUTO_CLAUDE_ROOT / "apps" / "backend" / "runners" / "spec_runner.py" +INSIGHTS_RUNNER_PY = AUTO_CLAUDE_ROOT / "apps" / "backend" / "runners" / "insights_runner.py" +VENV_PYTHON = AUTO_CLAUDE_ROOT / "apps" / "backend" / ".venv" / "bin" / "python" +BATCH_FROM_DISCOVERY = Path(__file__).resolve().parent / "batch-from-discovery.py" + +# Colors +C_RESET = "\033[0m" +C_BOLD = "\033[1m" +C_DIM = "\033[2m" +C_GREEN = "\033[32m" +C_YELLOW = "\033[33m" +C_RED = "\033[31m" +C_CYAN = "\033[36m" +C_BLUE = "\033[34m" +C_MAGENTA = "\033[35m" + +STATUS_ICONS = { + "qa_passed": f"{C_GREEN}✅{C_RESET}", + "built": f"{C_BLUE}⚙️{C_RESET}", + "spec_ready": f"{C_CYAN}📋{C_RESET}", + "planned": f"{C_YELLOW}📐{C_RESET}", + "pending": f"{C_YELLOW}⏳{C_RESET}", + "failed": f"{C_RED}❌{C_RESET}", +} + +STATUS_LABELS = { + "qa_passed": "QA Passed", + "built": "Built", + "spec_ready": "Spec Ready", + "planned": "Planned", + "pending": "Pending", + "failed": "Failed", +} + +# Status marker files (checked in order — first match wins) +STATUS_FILES = [ + ("qa_report.md", "qa_passed"), + ("build_log.md", "built"), + ("spec.md", "spec_ready"), + ("implementation_plan.json", "planned"), + ("requirements.json", "pending"), +] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def get_python(): + return str(VENV_PYTHON) if VENV_PYTHON.exists() else "python3" + + +def find_project_dir(): + """Walk up from cwd to find a directory with .auto-claude/.""" + d = Path.cwd() + while d != d.parent: + if (d / ".auto-claude").is_dir(): + return d + d = d.parent + return None + + +def get_specs_dir(project_dir): + return project_dir / ".auto-claude" / "specs" + + +def parse_spec_id(spec_name): + match = re.match(r"^(\d+)", spec_name) + return match.group(1) if match else None + + +def parse_category(spec_name): + match = re.search(r"\[(\w+)-\d+\]", spec_name) + return match.group(1) if match else "other" + + +def get_spec_status(spec_dir): + for filename, status in STATUS_FILES: + if (spec_dir / filename).exists(): + return status + return "pending" + + +def get_spec_title(spec_dir): + req_file = spec_dir / "requirements.json" + if req_file.exists(): + try: + with open(req_file) as f: + data = json.load(f) + desc = data.get("task_description", "") + if "\n" in desc: + desc = desc.split("\n")[0] + return desc[:100] + except (json.JSONDecodeError, KeyError): + pass + name = spec_dir.name + name = re.sub(r"^\d+-", "", name) + name = re.sub(r"\[[\w-]+\]-?", "", name) + return name.replace("-", " ").strip().title() + + +def load_all_specs(project_dir): + specs_dir = get_specs_dir(project_dir) + if not specs_dir.exists(): + return [] + specs = [] + for d in sorted(specs_dir.iterdir()): + if not d.is_dir(): + continue + spec_id = parse_spec_id(d.name) + if not spec_id: + continue + specs.append({ + "id": spec_id, + "name": d.name, + "category": parse_category(d.name), + "status": get_spec_status(d), + "title": get_spec_title(d), + "dir": d, + }) + return specs + + +def prompt_yn(question, default=True): + hint = "Y/n" if default else "y/N" + raw = input(f" {C_BOLD}{question}{C_RESET} [{hint}] ").strip().lower() + if not raw: + return default + return raw in ("y", "yes") + + +def prompt_choice(question, options, allow_multi=False): + """Numbered menu. Returns list of selected indices.""" + print(f" {C_BOLD}{question}{C_RESET}") + print() + for i, (label, detail) in enumerate(options, 1): + detail_str = f" {C_DIM}— {detail}{C_RESET}" if detail else "" + print(f" {C_CYAN}{i:>3}{C_RESET}) {label}{detail_str}") + print(f" {C_CYAN} q{C_RESET}) Cancel") + print() + + while True: + raw = input(f" {C_BOLD}>{C_RESET} ").strip().lower() + if raw == "q": + return [] + try: + if allow_multi and ("," in raw or " " in raw): + parts = raw.replace(",", " ").split() + indices = [int(p) - 1 for p in parts] + if all(0 <= i < len(options) for i in indices): + return indices + else: + idx = int(raw) - 1 + if 0 <= idx < len(options): + return [idx] + except ValueError: + pass + print(f" {C_YELLOW}Invalid choice.{C_RESET}") + + +# --------------------------------------------------------------------------- +# Display +# --------------------------------------------------------------------------- + +def print_header(text): + width = 64 + print() + print(f" {C_BOLD}{'═' * width}{C_RESET}") + print(f" {C_BOLD} {text}{C_RESET}") + print(f" {C_BOLD}{'═' * width}{C_RESET}") + print() + + +def print_status_table(specs): + """Display spec status table with summary.""" + if not specs: + print(f" {C_DIM}No specs found.{C_RESET}") + return + + # Group by status for summary + by_status = defaultdict(list) + for s in specs: + by_status[s["status"]].append(s) + + # Summary bar + total = len(specs) + parts = [] + for status in ["qa_passed", "built", "spec_ready", "planned", "pending"]: + count = len(by_status.get(status, [])) + if count > 0: + label = STATUS_LABELS.get(status, status) + parts.append(f"{label}: {count}") + print(f" {C_BOLD}{total} specs{C_RESET} — {', '.join(parts)}") + print() + + # Progress bar + done = len(by_status.get("qa_passed", [])) + len(by_status.get("built", [])) + pct = int(done / total * 100) if total > 0 else 0 + bar_width = 40 + filled = int(bar_width * pct / 100) + bar = f"{'█' * filled}{'░' * (bar_width - filled)}" + print(f" Progress: [{bar}] {pct}% ({done}/{total} complete)") + print() + + # Group by category + by_category = defaultdict(list) + for s in specs: + by_category[s["category"]].append(s) + + for cat in sorted(by_category.keys()): + cat_specs = by_category[cat] + print(f" {C_CYAN}{C_BOLD}{cat.upper()}{C_RESET} ({len(cat_specs)} specs)") + for s in cat_specs: + icon = STATUS_ICONS.get(s["status"], "?") + title = s["title"][:65] + print(f" {icon} {C_DIM}{s['id']}{C_RESET} {title}") + print() + + +# --------------------------------------------------------------------------- +# Actions +# --------------------------------------------------------------------------- + +def action_create_from_file(project_dir, batch_file): + """Create specs from a batch JSON file using run.py --batch-create.""" + batch_path = Path(batch_file) + if not batch_path.exists(): + print(f" {C_RED}Batch file not found: {batch_file}{C_RESET}") + return False + + try: + with open(batch_path) as f: + data = json.load(f) + task_count = len(data.get("tasks", [])) + except (json.JSONDecodeError, KeyError): + print(f" {C_RED}Invalid batch JSON file{C_RESET}") + return False + + print(f" {C_CYAN}Creating {task_count} specs from {batch_path.name}...{C_RESET}") + print() + + python = get_python() + cmd = [python, str(RUN_PY), "--project-dir", str(project_dir), + "--batch-create", str(batch_path)] + print(f" {C_DIM}$ {' '.join(cmd)}{C_RESET}") + print() + + result = subprocess.run(cmd, cwd=str(project_dir)) + return result.returncode == 0 + + +def action_discover(project_dir): + """Launch the interactive discovery-to-batch workflow.""" + if not BATCH_FROM_DISCOVERY.exists(): + print(f" {C_RED}batch-from-discovery.py not found at:{C_RESET}") + print(f" {BATCH_FROM_DISCOVERY}") + return False + + python = get_python() + cmd = [python, str(BATCH_FROM_DISCOVERY), str(project_dir), + "--auto-claude-dir", str(AUTO_CLAUDE_ROOT)] + result = subprocess.run(cmd, cwd=str(project_dir)) + return result.returncode == 0 + + +def action_build_spec(project_dir, spec_id, generate_spec=True, run_qa=False): + """Build a single spec (optionally generate spec.md first, optionally QA).""" + python = get_python() + + # Step 1: Generate spec.md if needed + if generate_spec: + specs_dir = get_specs_dir(project_dir) + spec_dir = None + for d in specs_dir.iterdir(): + if d.is_dir() and d.name.startswith(spec_id + "-"): + spec_dir = d + break + + if spec_dir and not (spec_dir / "spec.md").exists(): + print(f" {C_CYAN}Generating spec for {spec_id}...{C_RESET}") + gen_cmd = [python, str(SPEC_RUNNER_PY), + "--project-dir", str(project_dir), + "--continue", spec_dir.name, + "--auto-approve"] + print(f" {C_DIM}$ {' '.join(gen_cmd)}{C_RESET}") + try: + result = subprocess.run(gen_cmd, cwd=str(project_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}Spec generation interrupted{C_RESET}") + return False + if result.returncode != 0: + print(f" {C_RED}Spec generation failed for {spec_id}{C_RESET}") + return False + + # Step 2: Build + print(f" {C_CYAN}Building spec {spec_id}...{C_RESET}") + build_cmd = [python, str(RUN_PY), "--project-dir", str(project_dir), + "--spec", spec_id, "--force"] + print(f" {C_DIM}$ {' '.join(build_cmd)}{C_RESET}") + + try: + result = subprocess.run(build_cmd, cwd=str(project_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}Build interrupted for {spec_id}{C_RESET}") + return False + + if result.returncode != 0: + print(f" {C_RED}Build failed for {spec_id}{C_RESET}") + return False + + # Step 3: QA (optional) + if run_qa: + print(f" {C_CYAN}Running QA for {spec_id}...{C_RESET}") + qa_cmd = [python, str(RUN_PY), "--project-dir", str(project_dir), + "--spec", spec_id, "--qa"] + print(f" {C_DIM}$ {' '.join(qa_cmd)}{C_RESET}") + try: + subprocess.run(qa_cmd, cwd=str(project_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}QA interrupted for {spec_id}{C_RESET}") + return False + + print(f" {C_GREEN}✓ Spec {spec_id} done{C_RESET}") + return True + + +def action_build_all(project_dir, run_qa=False, statuses=("pending", "spec_ready")): + """Build all specs that match the given statuses.""" + specs = load_all_specs(project_dir) + targets = [s for s in specs if s["status"] in statuses] + + if not targets: + print(f" {C_GREEN}No specs need building.{C_RESET}") + return True + + print(f" {C_BOLD}Building {len(targets)} spec(s):{C_RESET}") + for s in targets: + print(f" {s['id']} — {s['title'][:60]}") + print() + + if not prompt_yn(f"Proceed with building {len(targets)} specs?"): + print(" Cancelled.") + return False + + succeeded = [] + failed = [] + + for i, s in enumerate(targets, 1): + print() + print(f" {C_BOLD}[{i}/{len(targets)}]{C_RESET} {s['id']} — {s['title'][:50]}") + print(f" {'─' * 50}") + + ok = action_build_spec(project_dir, s["id"], generate_spec=True, run_qa=run_qa) + if ok: + succeeded.append(s["id"]) + else: + failed.append(s["id"]) + if not prompt_yn("Continue with remaining specs?"): + break + + # Summary + print() + print(f" {'═' * 50}") + print(f" {C_GREEN}Succeeded: {len(succeeded)}{C_RESET}", end="") + if failed: + print(f" {C_RED}Failed: {len(failed)} ({', '.join(failed)}){C_RESET}") + else: + print() + + return len(failed) == 0 + + +def action_qa_all(project_dir): + """Run QA on all built (but not QA'd) specs.""" + specs = load_all_specs(project_dir) + targets = [s for s in specs if s["status"] == "built"] + + if not targets: + print(f" {C_GREEN}No specs awaiting QA.{C_RESET}") + return True + + python = get_python() + print(f" {C_BOLD}Running QA on {len(targets)} spec(s):{C_RESET}") + for s in targets: + print(f" {s['id']} — {s['title'][:60]}") + print() + + for i, s in enumerate(targets, 1): + print(f" {C_CYAN}[{i}/{len(targets)}] QA for {s['id']}...{C_RESET}") + qa_cmd = [python, str(RUN_PY), "--project-dir", str(project_dir), + "--spec", s["id"], "--qa"] + print(f" {C_DIM}$ {' '.join(qa_cmd)}{C_RESET}") + try: + subprocess.run(qa_cmd, cwd=str(project_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}QA interrupted{C_RESET}") + return False + + return True + + +def action_ideation(project_dir): + """Run ideation to brainstorm features and improvements for the project.""" + python = get_python() + backend_dir = AUTO_CLAUDE_ROOT / "apps" / "backend" + + print(f" {C_CYAN}Running ideation for {project_dir.name}...{C_RESET}") + print(f" {C_DIM}This will analyze the codebase and suggest improvements,{C_RESET}") + print(f" {C_DIM}security fixes, performance optimizations, and new features.{C_RESET}") + print() + + cmd = [python, "-m", "runners.ideation_runner", + "--project", str(project_dir)] + print(f" {C_DIM}$ {' '.join(cmd)}{C_RESET}") + print() + + try: + result = subprocess.run(cmd, cwd=str(backend_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}Ideation interrupted{C_RESET}") + return False + + if result.returncode == 0: + # Check if output was created + ideation_file = project_dir / ".auto-claude" / "ideation" / "ideation.json" + if ideation_file.exists(): + try: + data = json.loads(ideation_file.read_text()) + count = len(data.get("ideas", [])) + print() + print(f" {C_GREEN}✓ Ideation complete — {count} ideas generated{C_RESET}") + print(f" {C_DIM}Output: {ideation_file}{C_RESET}") + print() + print(f" {C_BOLD}Next steps:{C_RESET}") + print(f" • Run {C_CYAN}ac-batch --discover{C_RESET} to create specs from these ideas") + print(f" • Or select option 4 from the menu") + except (json.JSONDecodeError, KeyError): + pass + return True + + print(f" {C_RED}Ideation failed{C_RESET}") + return False + + +def action_roadmap(project_dir): + """Run roadmap generation to create an implementation plan for the project.""" + python = get_python() + backend_dir = AUTO_CLAUDE_ROOT / "apps" / "backend" + + print(f" {C_CYAN}Generating roadmap for {project_dir.name}...{C_RESET}") + print(f" {C_DIM}This will create a strategic roadmap with prioritized features{C_RESET}") + print(f" {C_DIM}and implementation phases.{C_RESET}") + print() + + cmd = [python, "-m", "runners.roadmap_runner", + "--project", str(project_dir)] + print(f" {C_DIM}$ {' '.join(cmd)}{C_RESET}") + print() + + try: + result = subprocess.run(cmd, cwd=str(backend_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}Roadmap generation interrupted{C_RESET}") + return False + + if result.returncode == 0: + # Check if output was created + roadmap_file = project_dir / ".auto-claude" / "roadmap" / "roadmap.json" + if roadmap_file.exists(): + try: + data = json.loads(roadmap_file.read_text()) + count = len(data.get("features", [])) + phases = len(data.get("phases", [])) + print() + print(f" {C_GREEN}✓ Roadmap complete — {count} features across {phases} phases{C_RESET}") + print(f" {C_DIM}Output: {roadmap_file}{C_RESET}") + print() + print(f" {C_BOLD}Next steps:{C_RESET}") + print(f" • Run {C_CYAN}ac-batch --discover{C_RESET} to create specs from this roadmap") + print(f" • Or select option 4 from the menu") + except (json.JSONDecodeError, KeyError): + pass + return True + + print(f" {C_RED}Roadmap generation failed{C_RESET}") + return False + + +def action_insights(project_dir, message=None): + """Run an insights query against the project codebase.""" + python = get_python() + backend_dir = AUTO_CLAUDE_ROOT / "apps" / "backend" + + if message: + # One-shot mode + print(f" {C_CYAN}Asking about {project_dir.name}...{C_RESET}") + print(f" {C_DIM}Q: {message}{C_RESET}") + print() + cmd = [python, "-m", "runners.insights_runner", + "--project-dir", str(project_dir), + "--message", message] + try: + result = subprocess.run(cmd, cwd=str(backend_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}Interrupted{C_RESET}") + return False + return result.returncode == 0 + + # Interactive chat mode + print_header(f"Insights — {project_dir.name}") + print(f" {C_BOLD}Ask questions about your codebase.{C_RESET}") + print(f" {C_DIM}Type your question and press Enter. Type 'q' to go back.{C_RESET}") + print() + + # Suggested questions + print(f" {C_BOLD}Suggested questions:{C_RESET}") + suggestions = [ + "What is the overall architecture?", + "What are the main API endpoints?", + "Are there any security concerns?", + "What features are missing for production readiness?", + "What are the biggest technical debt items?", + "How does authentication work?", + "What error handling patterns are used?", + "What would need to change to add a new feature?", + ] + for i, q in enumerate(suggestions, 1): + print(f" {C_CYAN}{i}){C_RESET} {q}") + print() + + history_file = project_dir / ".auto-claude" / "insights" / "ac-batch-history.json" + history = [] + + while True: + try: + raw = input(f" {C_BOLD}?{C_RESET} ").strip() + except (KeyboardInterrupt, EOFError): + print() + break + + if not raw or raw.lower() == "q": + break + + # Allow selecting a suggested question by number + try: + idx = int(raw) - 1 + if 0 <= idx < len(suggestions): + raw = suggestions[idx] + print(f" {C_DIM}Q: {raw}{C_RESET}") + except ValueError: + pass + + # Build command + cmd = [python, "-m", "runners.insights_runner", + "--project-dir", str(project_dir), + "--message", raw] + + # Pass conversation history if we have prior messages + if history: + history_file.parent.mkdir(parents=True, exist_ok=True) + history_file.write_text(json.dumps(history)) + cmd.extend(["--history-file", str(history_file)]) + + print() + try: + result = subprocess.run(cmd, cwd=str(backend_dir), + capture_output=False) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}Interrupted{C_RESET}") + continue + + # Track conversation for context + history.append({"role": "user", "content": raw}) + history.append({"role": "assistant", "content": "(see above)"}) + + print() + + # Clean up temp history file + if history_file.exists(): + history_file.unlink() + + return True + + +def action_cleanup(project_dir, confirm=False): + """Clean up completed specs.""" + python = get_python() + cmd = [python, str(RUN_PY), "--project-dir", str(project_dir), "--batch-cleanup"] + if confirm: + cmd.append("--no-dry-run") + + result = subprocess.run(cmd, cwd=str(project_dir)) + return result.returncode == 0 + + +# --------------------------------------------------------------------------- +# Interactive mode +# --------------------------------------------------------------------------- + +def interactive(project_dir): + """Main interactive loop.""" + project_name = project_dir.name + + while True: + specs = load_all_specs(project_dir) + print_header(f"ac-batch — {project_name}") + print_status_table(specs) + + # Menu + print(f" {C_BOLD}What would you like to do?{C_RESET}") + print() + print(f" {C_DIM} Discover{C_RESET}") + print(f" {C_CYAN} 1){C_RESET} Ask about the codebase (insights)") + print(f" {C_CYAN} 2){C_RESET} Run ideation (brainstorm improvements)") + print(f" {C_CYAN} 3){C_RESET} Run roadmap (strategic feature planning)") + print() + print(f" {C_DIM} Create{C_RESET}") + print(f" {C_CYAN} 4){C_RESET} Create batch from discovery outputs") + print(f" {C_CYAN} 5){C_RESET} Create batch from JSON file") + print() + print(f" {C_DIM} Build{C_RESET}") + print(f" {C_CYAN} 6){C_RESET} Build all pending specs") + print(f" {C_CYAN} 7){C_RESET} Build a specific spec") + print(f" {C_CYAN} 8){C_RESET} QA all built specs") + print() + print(f" {C_DIM} Manage{C_RESET}") + print(f" {C_CYAN} 9){C_RESET} View status") + print(f" {C_CYAN}10){C_RESET} Cleanup completed specs") + print(f" {C_CYAN} q){C_RESET} Quit") + print() + + try: + choice = input(f" {C_BOLD}>{C_RESET} ").strip().lower() + except (KeyboardInterrupt, EOFError): + print(f"\n Bye!") + break + + if choice == "q": + print(" Bye!") + break + + elif choice == "1": + action_insights(project_dir) + + elif choice == "2": + action_ideation(project_dir) + + elif choice == "3": + action_roadmap(project_dir) + + elif choice == "4": + action_discover(project_dir) + + elif choice == "5": + try: + path = input(f" {C_BOLD}Path to batch JSON file:{C_RESET} ").strip() + except (KeyboardInterrupt, EOFError): + continue + if path: + action_create_from_file(project_dir, path) + + elif choice == "6": + qa_too = prompt_yn("Also run QA after each build?", default=False) + action_build_all(project_dir, run_qa=qa_too) + + elif choice == "7": + if not specs: + print(f" {C_YELLOW}No specs found.{C_RESET}") + continue + spec_options = [ + (f"{s['id']} — {s['title'][:50]}", STATUS_LABELS.get(s["status"], s["status"])) + for s in specs + ] + sel = prompt_choice("Which spec to build?", spec_options) + if sel: + s = specs[sel[0]] + qa_too = prompt_yn("Run QA after build?", default=False) + action_build_spec(project_dir, s["id"], run_qa=qa_too) + + elif choice == "8": + action_qa_all(project_dir) + + elif choice == "9": + # Just loops back to top which shows status + pass + + elif choice == "10": + print() + action_cleanup(project_dir, confirm=False) + print() + if prompt_yn("Proceed with cleanup?", default=False): + action_cleanup(project_dir, confirm=True) + + else: + print(f" {C_DIM}Unknown option. Try 1-10 or q.{C_RESET}") + + print() + + +# --------------------------------------------------------------------------- +# CLI entry +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="Interactive batch task manager for Auto-Claude", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + ac-batch Interactive menu + ac-batch --insights Interactive codebase Q&A chat + ac-batch --insights "How does auth work?" One-shot question + ac-batch --ideation Brainstorm improvements + ac-batch --roadmap Generate feature roadmap + ac-batch --discover Create specs from discovery outputs + ac-batch --status Show all spec statuses + ac-batch --create tasks.json Create specs from batch JSON + ac-batch --build Build all pending specs + ac-batch --build --spec 016 Build specific spec + ac-batch --build --qa Build all + run QA + ac-batch --qa QA all built specs + ac-batch --cleanup Show cleanup preview + ac-batch --cleanup --confirm Actually clean up + """, + ) + parser.add_argument("--project-dir", type=Path, default=None, + help="Project directory (default: auto-detect from cwd)") + parser.add_argument("--insights", nargs="?", const="", metavar="QUESTION", + help="Ask about the codebase (interactive if no question given)") + parser.add_argument("--ideation", action="store_true", + help="Run ideation to brainstorm improvements and features") + parser.add_argument("--roadmap", action="store_true", + help="Generate a strategic feature roadmap") + parser.add_argument("--status", action="store_true", + help="Show status of all specs") + parser.add_argument("--create", metavar="FILE", + help="Create specs from batch JSON file") + parser.add_argument("--discover", action="store_true", + help="Interactive batch creation from discovery outputs") + parser.add_argument("--build", action="store_true", + help="Build specs (all pending by default)") + parser.add_argument("--spec", metavar="ID", + help="Specific spec ID to build (with --build)") + parser.add_argument("--qa", action="store_true", + help="Run QA (on all built specs, or after --build)") + parser.add_argument("--cleanup", action="store_true", + help="Clean up completed specs") + parser.add_argument("--confirm", action="store_true", + help="Actually perform cleanup (with --cleanup)") + + args = parser.parse_args() + + # Find project + if args.project_dir: + project_dir = args.project_dir.resolve() + else: + project_dir = find_project_dir() + + if not project_dir: + print(f" {C_RED}Error: No .auto-claude/ directory found.{C_RESET}") + print(" Run from inside a project with Auto-Claude,") + print(" or pass --project-dir /path/to/project") + sys.exit(1) + + # Handle CLI modes + if args.insights is not None: + if args.insights: + action_insights(project_dir, message=args.insights) + else: + action_insights(project_dir) + return + + if args.ideation: + action_ideation(project_dir) + return + + if args.roadmap: + action_roadmap(project_dir) + return + + if args.status: + specs = load_all_specs(project_dir) + print_header(f"ac-batch — {project_dir.name}") + print_status_table(specs) + return + + if args.create: + action_create_from_file(project_dir, args.create) + return + + if args.discover: + action_discover(project_dir) + return + + if args.build: + if args.spec: + action_build_spec(project_dir, args.spec, run_qa=args.qa) + else: + action_build_all(project_dir, run_qa=args.qa) + return + + if args.qa: + action_qa_all(project_dir) + return + + if args.cleanup: + action_cleanup(project_dir, confirm=args.confirm) + return + + # Default: interactive mode + interactive(project_dir) + + +if __name__ == "__main__": + main() diff --git a/scripts/ac-phase.py b/scripts/ac-phase.py new file mode 100755 index 0000000000..cb77cf9579 --- /dev/null +++ b/scripts/ac-phase.py @@ -0,0 +1,759 @@ +#!/usr/bin/env python3 +""" +ac-phase — Interactive phased spec executor for Auto-Claude +============================================================ + +Run from any repo with .auto-claude/specs/ to execute specs in +dependency-aware phases with review gates between them. + +Usage: + ac-phase # Interactive menu (auto-detects project) + ac-phase --status # Show phase & spec status + ac-phase --run # Run next pending phase + ac-phase --run --phase 2 # Run specific phase + ac-phase --run --all # Run all phases (pause between each) + ac-phase --run --no-pause # Run all without pausing + ac-phase --init # Auto-generate phases.json from specs + ac-phase --edit # Open phases.json for manual editing +""" + +import json +import os +import re +import signal +import subprocess +import sys +import time +from collections import defaultdict +from pathlib import Path + + +# --------------------------------------------------------------------------- +# Graceful exit on Ctrl+C +# --------------------------------------------------------------------------- + +def _handle_sigint(sig, frame): + """Handle Ctrl+C gracefully without tracebacks.""" + print(f"\n\n Interrupted. Progress has been saved — run ac-phase again to resume.") + sys.exit(0) + +signal.signal(signal.SIGINT, _handle_sigint) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +AUTO_CLAUDE_ROOT = Path(__file__).resolve().parent.parent +RUN_PY = AUTO_CLAUDE_ROOT / "apps" / "backend" / "run.py" +SPEC_RUNNER_PY = AUTO_CLAUDE_ROOT / "apps" / "backend" / "runners" / "spec_runner.py" +VENV_PYTHON = AUTO_CLAUDE_ROOT / "apps" / "backend" / ".venv" / "bin" / "python" + +# Category → recommended execution order (lower = earlier) +CATEGORY_PRIORITY = { + "sec": 1, # Security first + "cq": 2, # Code quality / refactors + "ci": 3, # Code improvements / utilities + "perf": 4, # Performance + "uiux": 5, # UI/UX features + "doc": 6, # Documentation last +} + +# Phase display names +CATEGORY_NAMES = { + "sec": "Security Hardening", + "cq": "Code Quality", + "ci": "Code Improvements", + "perf": "Performance", + "uiux": "UI/UX Improvements", + "doc": "Documentation", +} + +# Status markers (what files indicate spec state) +STATUS_FILES = [ + ("qa_report.md", "qa_passed"), + ("build_log.md", "built"), + ("spec.md", "spec_ready"), + ("implementation_plan.json", "planned"), + ("requirements.json", "pending"), +] + +# Colors +C_RESET = "\033[0m" +C_BOLD = "\033[1m" +C_DIM = "\033[2m" +C_GREEN = "\033[32m" +C_YELLOW = "\033[33m" +C_RED = "\033[31m" +C_CYAN = "\033[36m" +C_BLUE = "\033[34m" +C_MAGENTA = "\033[35m" + +STATUS_ICONS = { + "qa_passed": f"{C_GREEN}✅{C_RESET}", + "built": f"{C_BLUE}⚙️{C_RESET}", + "spec_ready": f"{C_CYAN}📋{C_RESET}", + "planned": f"{C_YELLOW}📐{C_RESET}", + "pending": f"{C_YELLOW}⏳{C_RESET}", + "failed": f"{C_RED}❌{C_RESET}", + "running": f"{C_MAGENTA}▶{C_RESET}", +} + +PHASE_STATUS_ICONS = { + "complete": f"{C_GREEN}✅{C_RESET}", + "in_progress": f"{C_MAGENTA}▶{C_RESET}", + "pending": f"{C_DIM}○{C_RESET}", +} + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def find_project_dir(): + """Walk up from cwd to find a directory with .auto-claude/specs/.""" + d = Path.cwd() + while d != d.parent: + if (d / ".auto-claude" / "specs").is_dir(): + return d + d = d.parent + return None + + +def get_specs_dir(project_dir): + return project_dir / ".auto-claude" / "specs" + + +def get_phases_file(project_dir): + return project_dir / ".auto-claude" / "phases.json" + + +def get_phase_state_file(project_dir): + return project_dir / ".auto-claude" / "phase_state.json" + + +def parse_spec_id(spec_name): + """Extract numeric ID from spec directory name like '016-[sec-001]-fix-...'.""" + match = re.match(r"^(\d+)", spec_name) + return match.group(1) if match else None + + +def parse_category(spec_name): + """Extract category prefix from spec name like '016-[sec-001]-...' → 'sec'.""" + match = re.search(r"\[(\w+)-\d+\]", spec_name) + return match.group(1) if match else "other" + + +def get_spec_status(spec_dir): + """Determine current status of a spec based on marker files.""" + for filename, status in STATUS_FILES: + if (spec_dir / filename).exists(): + return status + return "pending" + + +def get_spec_title(spec_dir): + """Get human-readable title from requirements.json.""" + req_file = spec_dir / "requirements.json" + if req_file.exists(): + try: + with open(req_file) as f: + data = json.load(f) + desc = data.get("task_description", "") + # Truncate to first sentence or 80 chars + if "\n" in desc: + desc = desc.split("\n")[0] + return desc[:100] + except (json.JSONDecodeError, KeyError): + pass + # Fallback: clean up directory name + name = spec_dir.name + name = re.sub(r"^\d+-", "", name) + name = re.sub(r"\[[\w-]+\]-?", "", name) + return name.replace("-", " ").strip().title() + + +def load_all_specs(project_dir): + """Load all specs with metadata.""" + specs_dir = get_specs_dir(project_dir) + specs = [] + for d in sorted(specs_dir.iterdir()): + if not d.is_dir(): + continue + spec_id = parse_spec_id(d.name) + if not spec_id: + continue + specs.append({ + "id": spec_id, + "name": d.name, + "category": parse_category(d.name), + "status": get_spec_status(d), + "title": get_spec_title(d), + "dir": d, + }) + return specs + + +def load_phases(project_dir): + """Load phases.json or return None if not found.""" + pf = get_phases_file(project_dir) + if not pf.exists(): + return None + with open(pf) as f: + return json.load(f) + + +def save_phases(project_dir, phases_data): + """Save phases.json.""" + pf = get_phases_file(project_dir) + with open(pf, "w") as f: + json.dump(phases_data, f, indent=2) + print(f" {C_GREEN}✓{C_RESET} Saved {pf}") + + +def load_phase_state(project_dir): + """Load phase execution state (which phases completed, failures, etc.).""" + sf = get_phase_state_file(project_dir) + if sf.exists(): + with open(sf) as f: + return json.load(f) + return {"completed_phases": [], "failed_specs": [], "current_phase": None} + + +def save_phase_state(project_dir, state): + sf = get_phase_state_file(project_dir) + with open(sf, "w") as f: + json.dump(state, f, indent=2) + + +# --------------------------------------------------------------------------- +# Phase generation +# --------------------------------------------------------------------------- + +def auto_generate_phases(project_dir, specs): + """Auto-generate phases.json by grouping specs by category.""" + groups = defaultdict(list) + for s in specs: + groups[s["category"]].append(s["id"]) + + # Sort categories by recommended priority + sorted_cats = sorted( + groups.keys(), + key=lambda c: CATEGORY_PRIORITY.get(c, 99) + ) + + phases = [] + for idx, cat in enumerate(sorted_cats, 1): + spec_ids = sorted(groups[cat]) + name = CATEGORY_NAMES.get(cat, cat.upper()) + phases.append({ + "phase": idx, + "name": name, + "category": cat, + "specs": spec_ids, + "parallel": False, + }) + + phases_data = { + "version": 1, + "pause_between_phases": True, + "auto_qa": True, + "phases": phases, + } + + save_phases(project_dir, phases_data) + return phases_data + + +# --------------------------------------------------------------------------- +# Display +# --------------------------------------------------------------------------- + +def print_header(text): + width = 64 + print() + print(f" {C_BOLD}{'═' * width}{C_RESET}") + print(f" {C_BOLD} {text}{C_RESET}") + print(f" {C_BOLD}{'═' * width}{C_RESET}") + print() + + +def print_phase_status(project_dir, phases_data, specs): + """Display full status of all phases and their specs.""" + spec_map = {s["id"]: s for s in specs} + state = load_phase_state(project_dir) + completed_phases = state.get("completed_phases", []) + + total_specs = sum(len(p["specs"]) for p in phases_data["phases"]) + done_specs = sum(1 for s in specs if s["status"] in ("qa_passed", "built")) + pct = int(done_specs / total_specs * 100) if total_specs > 0 else 0 + + # Progress bar + bar_width = 40 + filled = int(bar_width * pct / 100) + bar = f"{'█' * filled}{'░' * (bar_width - filled)}" + print(f" Progress: [{bar}] {pct}% ({done_specs}/{total_specs} specs)") + print() + + for phase in phases_data["phases"]: + phase_num = phase["phase"] + phase_name = phase["name"] + phase_specs = phase["specs"] + + # Determine phase status + spec_statuses = [spec_map.get(sid, {}).get("status", "pending") for sid in phase_specs] + all_done = all(s in ("qa_passed", "built") for s in spec_statuses) + any_started = any(s != "pending" for s in spec_statuses) + + if all_done or phase_num in completed_phases: + phase_icon = PHASE_STATUS_ICONS["complete"] + phase_color = C_GREEN + elif any_started: + phase_icon = PHASE_STATUS_ICONS["in_progress"] + phase_color = C_MAGENTA + else: + phase_icon = PHASE_STATUS_ICONS["pending"] + phase_color = C_DIM + + done_in_phase = sum(1 for s in spec_statuses if s in ("qa_passed", "built")) + print(f" {phase_icon} {phase_color}Phase {phase_num}: {phase_name}{C_RESET} ({done_in_phase}/{len(phase_specs)})") + + for sid in phase_specs: + s = spec_map.get(sid) + if not s: + print(f" {C_RED}?{C_RESET} {sid} — spec not found") + continue + icon = STATUS_ICONS.get(s["status"], "?") + title = s["title"][:70] + print(f" {icon} {C_DIM}{sid}{C_RESET} {title}") + + print() + + +def print_menu(phases_data, project_dir, specs): + """Print interactive menu options.""" + state = load_phase_state(project_dir) + completed = state.get("completed_phases", []) + + # Find next phase + next_phase = None + for p in phases_data["phases"]: + if p["phase"] not in completed: + next_phase = p + break + + print(f" {C_BOLD}What would you like to do?{C_RESET}") + print() + if next_phase: + print(f" {C_CYAN}1){C_RESET} Run next phase → {C_BOLD}Phase {next_phase['phase']}: {next_phase['name']}{C_RESET}") + else: + print(f" {C_GREEN}1) All phases complete!{C_RESET}") + print(f" {C_CYAN}2){C_RESET} Run a specific phase") + print(f" {C_CYAN}3){C_RESET} Run all remaining phases") + print(f" {C_CYAN}4){C_RESET} View detailed status") + print(f" {C_CYAN}5){C_RESET} Regenerate phases (re-read specs)") + print(f" {C_CYAN}6){C_RESET} Reset phase progress") + print(f" {C_CYAN}q){C_RESET} Quit") + print() + + return next_phase + + +# --------------------------------------------------------------------------- +# Execution +# --------------------------------------------------------------------------- + +def find_spec_dir_name(project_dir, spec_id): + """Find the full directory name for a spec ID (e.g. '016' → '016-[sec-001]-fix-...').""" + specs_dir = get_specs_dir(project_dir) + for d in specs_dir.iterdir(): + if d.is_dir() and d.name.startswith(spec_id + "-"): + return d.name + return None + + +def spec_needs_generation(project_dir, spec_id): + """Check if a spec still needs spec.md generated (only has requirements.json).""" + specs_dir = get_specs_dir(project_dir) + for d in specs_dir.iterdir(): + if d.is_dir() and d.name.startswith(spec_id + "-"): + return not (d / "spec.md").exists() + return False + + +def run_spec(project_dir, spec_id, auto_qa=True): + """Run a single spec through Auto-Claude (generate spec → build → optional QA).""" + python = str(VENV_PYTHON) if VENV_PYTHON.exists() else "python3" + run_py = str(RUN_PY) + spec_runner_py = str(SPEC_RUNNER_PY) + + spec_dir_name = find_spec_dir_name(project_dir, spec_id) + if not spec_dir_name: + print(f" {C_RED}✗ Spec {spec_id} directory not found{C_RESET}") + return False + + # Step 1: Generate spec.md if it doesn't exist yet + if spec_needs_generation(project_dir, spec_id): + print(f"\n {C_CYAN}▸ Generating spec for {spec_id}...{C_RESET}") + gen_cmd = [ + python, spec_runner_py, + "--project-dir", str(project_dir), + "--continue", spec_dir_name, + "--auto-approve", + ] + print(f" {C_DIM}$ {' '.join(gen_cmd)}{C_RESET}") + + try: + result = subprocess.run(gen_cmd, cwd=str(project_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}⚠ Spec generation for {spec_id} interrupted{C_RESET}") + return False + if result.returncode != 0: + print(f" {C_RED}✗ Spec generation for {spec_id} failed (exit {result.returncode}){C_RESET}") + return False + + # Verify spec.md was created + if spec_needs_generation(project_dir, spec_id): + print(f" {C_RED}✗ spec.md was not created for {spec_id}{C_RESET}") + return False + + print(f" {C_GREEN}✓ Spec {spec_id} generated{C_RESET}") + + # Step 2: Build + print(f"\n {C_CYAN}▸ Building spec {spec_id}...{C_RESET}") + build_cmd = [python, run_py, "--project-dir", str(project_dir), "--spec", spec_id, "--force"] + print(f" {C_DIM}$ {' '.join(build_cmd)}{C_RESET}") + + try: + result = subprocess.run(build_cmd, cwd=str(project_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}⚠ Build for spec {spec_id} interrupted{C_RESET}") + return False + + if result.returncode != 0: + print(f" {C_RED}✗ Build for spec {spec_id} failed (exit {result.returncode}){C_RESET}") + return False + + # Step 3: QA (optional) + if auto_qa: + print(f"\n {C_CYAN}▸ QA validating spec {spec_id}...{C_RESET}") + qa_cmd = [python, run_py, "--project-dir", str(project_dir), "--spec", spec_id, "--qa"] + print(f" {C_DIM}$ {' '.join(qa_cmd)}{C_RESET}") + try: + qa_result = subprocess.run(qa_cmd, cwd=str(project_dir)) + except KeyboardInterrupt: + print(f"\n {C_YELLOW}⚠ QA for spec {spec_id} interrupted{C_RESET}") + return False + if qa_result.returncode != 0: + print(f" {C_YELLOW}⚠ QA for spec {spec_id} returned warnings{C_RESET}") + + print(f" {C_GREEN}✓ Spec {spec_id} complete{C_RESET}") + return True + + +def run_phase(project_dir, phase, phases_data, specs): + """Execute all specs in a phase.""" + state = load_phase_state(project_dir) + auto_qa = phases_data.get("auto_qa", True) + spec_map = {s["id"]: s for s in specs} + + phase_num = phase["phase"] + phase_name = phase["name"] + phase_specs = phase["specs"] + + # Filter to only pending/unbuilt specs + pending = [ + sid for sid in phase_specs + if spec_map.get(sid, {}).get("status", "pending") not in ("qa_passed", "built") + ] + + if not pending: + print(f"\n {C_GREEN}✓ Phase {phase_num}: {phase_name} — all specs already complete{C_RESET}") + if phase_num not in state["completed_phases"]: + state["completed_phases"].append(phase_num) + save_phase_state(project_dir, state) + return True + + print_header(f"Phase {phase_num}: {phase_name}") + print(f" Running {len(pending)} spec(s): {', '.join(pending)}") + print() + + state["current_phase"] = phase_num + save_phase_state(project_dir, state) + + failed = [] + succeeded = [] + + for sid in pending: + success = run_spec(project_dir, sid, auto_qa=auto_qa) + if success: + succeeded.append(sid) + else: + failed.append(sid) + if sid not in state["failed_specs"]: + state["failed_specs"].append(sid) + save_phase_state(project_dir, state) + + # Phase summary + print() + print(f" {'─' * 50}") + print(f" Phase {phase_num} summary: {C_GREEN}{len(succeeded)} passed{C_RESET}", end="") + if failed: + print(f", {C_RED}{len(failed)} failed ({', '.join(failed)}){C_RESET}") + else: + print() + + if not failed: + state["completed_phases"].append(phase_num) + state["current_phase"] = None + save_phase_state(project_dir, state) + + return len(failed) == 0 + + +def run_all_phases(project_dir, phases_data, specs, pause=True): + """Run all remaining phases in order.""" + state = load_phase_state(project_dir) + completed = state.get("completed_phases", []) + + remaining = [p for p in phases_data["phases"] if p["phase"] not in completed] + + if not remaining: + print(f"\n {C_GREEN}✓ All phases already complete!{C_RESET}") + return + + for i, phase in enumerate(remaining): + success = run_phase(project_dir, phase, phases_data, specs) + + # Refresh specs after each phase (statuses may have changed) + specs = load_all_specs(project_dir) + + if not success: + print(f"\n {C_YELLOW}⚠ Phase {phase['phase']} had failures. Continue? [y/N]{C_RESET} ", end="") + ans = input().strip().lower() + if ans != "y": + print(" Stopping. Fix failures and re-run.") + return + + # Pause between phases for review + if pause and i < len(remaining) - 1: + next_p = remaining[i + 1] + print(f"\n {C_CYAN}▸ Next up: Phase {next_p['phase']}: {next_p['name']}{C_RESET}") + print(f" {C_BOLD}Continue to next phase? [Y/n]{C_RESET} ", end="") + ans = input().strip().lower() + if ans == "n": + print(" Paused. Run ac-phase again to continue.") + return + + print(f"\n {C_GREEN}{'═' * 50}{C_RESET}") + print(f" {C_GREEN} All phases complete!{C_RESET}") + print(f" {C_GREEN}{'═' * 50}{C_RESET}") + + +# --------------------------------------------------------------------------- +# Interactive mode +# --------------------------------------------------------------------------- + +def interactive(project_dir): + """Main interactive loop.""" + specs = load_all_specs(project_dir) + + if not specs: + print(f" {C_RED}No specs found in {get_specs_dir(project_dir)}{C_RESET}") + print(" Run ac-batch --discover first to create specs from ideation/roadmap.") + return + + # Load or generate phases + phases_data = load_phases(project_dir) + if not phases_data: + print(f" {C_YELLOW}No phases.json found. Generating from spec categories...{C_RESET}") + phases_data = auto_generate_phases(project_dir, specs) + print() + + project_name = project_dir.name + print_header(f"ac-phase — {project_name}") + print_phase_status(project_dir, phases_data, specs) + + while True: + next_phase = print_menu(phases_data, project_dir, specs) + + try: + choice = input(f" {C_BOLD}>{C_RESET} ").strip().lower() + except (KeyboardInterrupt, EOFError): + print(f"\n Progress saved. Run ac-phase again to resume.") + break + + if choice == "q": + print(" Bye!") + break + + elif choice == "1": + if next_phase: + run_phase(project_dir, next_phase, phases_data, specs) + specs = load_all_specs(project_dir) # Refresh + print() + print_phase_status(project_dir, phases_data, specs) + else: + print(f" {C_GREEN}All phases are complete!{C_RESET}") + + elif choice == "2": + print(f"\n Which phase? (1-{len(phases_data['phases'])}): ", end="") + try: + pnum = int(input().strip()) + phase = next((p for p in phases_data["phases"] if p["phase"] == pnum), None) + if phase: + run_phase(project_dir, phase, phases_data, specs) + specs = load_all_specs(project_dir) + print() + print_phase_status(project_dir, phases_data, specs) + else: + print(f" {C_RED}Phase {pnum} not found{C_RESET}") + except ValueError: + print(f" {C_RED}Invalid input{C_RESET}") + + elif choice == "3": + pause = phases_data.get("pause_between_phases", True) + run_all_phases(project_dir, phases_data, specs, pause=pause) + specs = load_all_specs(project_dir) + print() + print_phase_status(project_dir, phases_data, specs) + + elif choice == "4": + print() + print_phase_status(project_dir, phases_data, specs) + + elif choice == "5": + phases_data = auto_generate_phases(project_dir, specs) + print() + print_phase_status(project_dir, phases_data, specs) + + elif choice == "6": + save_phase_state(project_dir, { + "completed_phases": [], + "failed_specs": [], + "current_phase": None, + }) + print(f" {C_GREEN}✓ Phase progress reset{C_RESET}") + print() + specs = load_all_specs(project_dir) + print_phase_status(project_dir, phases_data, specs) + + else: + print(f" {C_DIM}Unknown option. Try 1-6 or q.{C_RESET}") + + print() + + +# --------------------------------------------------------------------------- +# CLI entry +# --------------------------------------------------------------------------- + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description="Interactive phased spec executor for Auto-Claude", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + ac-phase Interactive menu + ac-phase --status Show phase status + ac-phase --run Run next pending phase + ac-phase --run --phase 2 Run phase 2 + ac-phase --run --all Run all remaining phases + ac-phase --init Generate phases.json from specs + """, + ) + parser.add_argument("--project-dir", type=Path, default=None, + help="Project directory (default: auto-detect from cwd)") + parser.add_argument("--status", action="store_true", + help="Show phase and spec status") + parser.add_argument("--run", action="store_true", + help="Run phases (next by default)") + parser.add_argument("--phase", type=int, default=None, + help="Specific phase number to run (with --run)") + parser.add_argument("--all", action="store_true", + help="Run all remaining phases (with --run)") + parser.add_argument("--no-pause", action="store_true", + help="Don't pause between phases (with --run --all)") + parser.add_argument("--init", action="store_true", + help="Generate/regenerate phases.json from specs") + parser.add_argument("--edit", action="store_true", + help="Open phases.json in $EDITOR") + + args = parser.parse_args() + + # Find project + if args.project_dir: + project_dir = args.project_dir.resolve() + else: + project_dir = find_project_dir() + + if not project_dir: + print(f" {C_RED}Error: No .auto-claude/specs/ found.{C_RESET}") + print(" Run this from inside a project with Auto-Claude specs,") + print(" or pass --project-dir /path/to/project") + sys.exit(1) + + specs = load_all_specs(project_dir) + + # --init: generate phases + if args.init: + auto_generate_phases(project_dir, specs) + return + + # --edit: open in editor + if args.edit: + pf = get_phases_file(project_dir) + if not pf.exists(): + auto_generate_phases(project_dir, specs) + editor = os.environ.get("EDITOR", "nano") + os.execvp(editor, [editor, str(pf)]) + return + + # Ensure phases exist + phases_data = load_phases(project_dir) + if not phases_data: + if args.status or args.run: + print(f" {C_YELLOW}No phases.json found. Generating...{C_RESET}") + phases_data = auto_generate_phases(project_dir, specs) + else: + # Interactive mode will handle it + interactive(project_dir) + return + + # --status: just display + if args.status: + print_header(f"ac-phase — {project_dir.name}") + print_phase_status(project_dir, phases_data, specs) + return + + # --run: execute phases + if args.run: + if args.all: + run_all_phases(project_dir, phases_data, specs, pause=not args.no_pause) + elif args.phase: + phase = next((p for p in phases_data["phases"] if p["phase"] == args.phase), None) + if not phase: + print(f" {C_RED}Phase {args.phase} not found{C_RESET}") + sys.exit(1) + run_phase(project_dir, phase, phases_data, specs) + else: + # Run next pending phase + state = load_phase_state(project_dir) + completed = state.get("completed_phases", []) + next_phase = next( + (p for p in phases_data["phases"] if p["phase"] not in completed), + None + ) + if next_phase: + run_phase(project_dir, next_phase, phases_data, specs) + else: + print(f" {C_GREEN}✓ All phases complete!{C_RESET}") + return + + # Default: interactive mode + interactive(project_dir) + + +if __name__ == "__main__": + main() diff --git a/scripts/batch-from-discovery.py b/scripts/batch-from-discovery.py new file mode 100755 index 0000000000..ffb2779817 --- /dev/null +++ b/scripts/batch-from-discovery.py @@ -0,0 +1,581 @@ +#!/usr/bin/env python3 +""" +Auto-Claude: Interactive Batch Task Creator +============================================ +Reads ideation, roadmap, or insights output from any project's +.auto-claude/ directory and walks you through creating batch specs. + +Usage: + python batch-from-discovery.py # auto-detect project (cwd) + python batch-from-discovery.py /path/to/project # specify project + python batch-from-discovery.py --auto-claude-dir /path/to/Auto-Claude # custom AC path +""" + +import argparse +import json +import os +import subprocess +import sys +from pathlib import Path + +# --------------------------------------------------------------------------- +# ANSI colors +# --------------------------------------------------------------------------- +BOLD = "\033[1m" +DIM = "\033[2m" +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +CYAN = "\033[96m" +RESET = "\033[0m" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def clear_screen(): + os.system("clear" if os.name != "nt" else "cls") + + +def header(text): + try: + width = min(os.get_terminal_size().columns, 72) + except OSError: + width = 72 + print(f"\n{CYAN}{'═' * width}{RESET}") + print(f"{CYAN}{BOLD} {text}{RESET}") + print(f"{CYAN}{'═' * width}{RESET}\n") + + +def subheader(text): + print(f"\n{BLUE}{BOLD} ▸ {text}{RESET}\n") + + +def success(text): + print(f" {GREEN}✓{RESET} {text}") + + +def warn(text): + print(f" {YELLOW}⚠{RESET} {text}") + + +def error(text): + print(f" {RED}✗{RESET} {text}") + + +def info(text): + print(f" {DIM}→{RESET} {text}") + + +def prompt_choice(question, options, allow_multi=False, allow_all=True): + """Interactive numbered menu. Returns list of selected indices.""" + print(f" {BOLD}{question}{RESET}") + print() + for i, (label, detail) in enumerate(options, 1): + detail_str = f" {DIM}— {detail}{RESET}" if detail else "" + print(f" {CYAN}{i:>3}{RESET}) {label}{detail_str}") + + if allow_all: + print(f" {CYAN} a{RESET}) All of the above") + print(f" {CYAN} q{RESET}) Cancel / go back") + print() + + while True: + raw = input(f" {BOLD}>{RESET} ").strip().lower() + if raw == "q": + return [] + if raw == "a" and allow_all: + return list(range(len(options))) + try: + if allow_multi and ("," in raw or " " in raw): + parts = raw.replace(",", " ").split() + indices = [int(p) - 1 for p in parts] + if all(0 <= i < len(options) for i in indices): + return indices + else: + idx = int(raw) - 1 + if 0 <= idx < len(options): + return [idx] + except ValueError: + pass + warn("Invalid choice. Enter a number, 'a' for all, or 'q' to cancel.") + + +def prompt_yn(question, default=True): + hint = "Y/n" if default else "y/N" + raw = input(f" {BOLD}{question}{RESET} [{hint}] ").strip().lower() + if not raw: + return default + return raw in ("y", "yes") + + +# --------------------------------------------------------------------------- +# Source detection +# --------------------------------------------------------------------------- + +def detect_sources(project_dir: Path): + """Scan .auto-claude/ for available discovery outputs.""" + ac_dir = project_dir / ".auto-claude" + sources = [] + + # Ideation + ideation_file = ac_dir / "ideation" / "ideation.json" + if ideation_file.exists(): + try: + data = json.loads(ideation_file.read_text()) + count = len(data.get("ideas", [])) + sources.append({ + "type": "ideation", + "file": ideation_file, + "data": data, + "count": count, + "label": f"Ideation ({count} ideas)", + }) + except (json.JSONDecodeError, KeyError): + pass + + # Roadmap + roadmap_file = ac_dir / "roadmap" / "roadmap.json" + if roadmap_file.exists(): + try: + data = json.loads(roadmap_file.read_text()) + count = len(data.get("features", [])) + sources.append({ + "type": "roadmap", + "file": roadmap_file, + "data": data, + "count": count, + "label": f"Roadmap ({count} features)", + }) + except (json.JSONDecodeError, KeyError): + pass + + # Insights — check for saved chat messages with suggestedTasks + insights_dir = ac_dir / "insights" + if insights_dir.exists(): + tasks = _collect_insights_tasks(insights_dir) + if tasks: + sources.append({ + "type": "insights", + "file": insights_dir, + "data": {"tasks": tasks}, + "count": len(tasks), + "label": f"Insights ({len(tasks)} suggested tasks)", + }) + + return sources + + +def _collect_insights_tasks(insights_dir: Path): + """Collect task suggestions from insights output files.""" + tasks = [] + for f in sorted(insights_dir.glob("*.json")): + try: + data = json.loads(f.read_text()) + # Could be a chat messages file or a direct suggestions file + if isinstance(data, list): + for msg in data: + for t in msg.get("suggestedTasks", []): + if t.get("title"): + tasks.append(t) + elif isinstance(data, dict): + for t in data.get("suggestedTasks", data.get("tasks", [])): + if isinstance(t, dict) and t.get("title"): + tasks.append(t) + except (json.JSONDecodeError, KeyError, TypeError): + continue + return tasks + + +# --------------------------------------------------------------------------- +# Item extraction (normalize all sources to common format) +# --------------------------------------------------------------------------- + +def extract_items(source): + """Return list of normalized items from any source type.""" + src_type = source["type"] + data = source["data"] + + if src_type == "ideation": + return [_normalize_ideation_idea(i) for i in data.get("ideas", [])] + elif src_type == "roadmap": + return [_normalize_roadmap_feature(f, data) for f in data.get("features", [])] + elif src_type == "insights": + return [_normalize_insights_task(t) for t in data.get("tasks", [])] + return [] + + +def _normalize_ideation_idea(idea): + effort = (idea.get("estimated_effort") or idea.get("estimatedEffort", "medium")).lower() + severity = idea.get("severity", "") + + # Build rich description + parts = [idea.get("description", "")] + if idea.get("rationale"): + parts.append(f"\nRationale: {idea['rationale']}") + if idea.get("implementation_approach"): + parts.append(f"\nApproach: {idea['implementation_approach']}") + if idea.get("implementation"): + parts.append(f"\nImplementation: {idea['implementation']}") + if idea.get("remediation"): + parts.append(f"\nRemediation: {idea['remediation']}") + if idea.get("proposedChange"): + parts.append(f"\nProposed change: {idea['proposedChange']}") + files = idea.get("affected_files") or idea.get("affectedFiles") or idea.get("affectedAreas", []) + if files: + parts.append(f"\nAffected files: {', '.join(files)}") + + return { + "id": idea.get("id", "?"), + "title": idea.get("title", "Untitled"), + "description": "\n".join(parts), + "category": idea.get("type", "general"), + "effort": effort, + "severity": severity, + "source": "ideation", + } + + +def _normalize_roadmap_feature(feature, roadmap_data): + phases = {p["id"]: p.get("name", p["id"]) for p in roadmap_data.get("phases", [])} + phase_name = phases.get(feature.get("phase_id", ""), "") + + parts = [feature.get("description", "")] + if feature.get("rationale"): + parts.append(f"\nRationale: {feature['rationale']}") + if feature.get("acceptance_criteria"): + criteria = "\n".join(f" - {c}" for c in feature["acceptance_criteria"]) + parts.append(f"\nAcceptance criteria:\n{criteria}") + if feature.get("user_stories"): + stories = "\n".join(f" - {s}" for s in feature["user_stories"]) + parts.append(f"\nUser stories:\n{stories}") + + complexity = feature.get("complexity", "medium").lower() + effort_map = {"low": "small", "medium": "medium", "high": "large"} + + return { + "id": feature.get("id", "?"), + "title": feature.get("title", "Untitled"), + "description": "\n".join(parts), + "category": f"roadmap/{phase_name}" if phase_name else "roadmap", + "effort": effort_map.get(complexity, "medium"), + "severity": "", + "priority_label": feature.get("priority", ""), + "source": "roadmap", + } + + +def _normalize_insights_task(task): + meta = task.get("metadata", {}) + return { + "id": meta.get("category", "insight"), + "title": task.get("title", "Untitled"), + "description": task.get("description", ""), + "category": meta.get("category", "general"), + "effort": meta.get("complexity", "medium"), + "severity": meta.get("impact", ""), + "source": "insights", + } + + +# --------------------------------------------------------------------------- +# Filtering UI +# --------------------------------------------------------------------------- + +EFFORT_ORDER = ["trivial", "small", "medium", "large", "high", "complex"] + + +def filter_items_interactive(items): + """Walk user through filtering items. Returns filtered list.""" + if not items: + return items + + # Show summary by category + categories = {} + for item in items: + cat = item["category"] + categories.setdefault(cat, []).append(item) + + subheader("Available categories") + cat_options = [] + for cat, cat_items in sorted(categories.items()): + efforts = [i["effort"] for i in cat_items] + effort_summary = ", ".join(f"{e}:{efforts.count(e)}" for e in dict.fromkeys(efforts)) + cat_options.append((f"{cat} ({len(cat_items)} items)", effort_summary)) + + selected_cats = prompt_choice( + "Which categories do you want to include?", + cat_options, + allow_multi=True, + ) + if not selected_cats: + return [] + + cat_keys = list(sorted(categories.keys())) + chosen_cats = {cat_keys[i] for i in selected_cats} + filtered = [item for item in items if item["category"] in chosen_cats] + info(f"{len(filtered)} items selected") + + # Filter by effort? + if prompt_yn("Filter by max effort level?", default=False): + effort_options = [(e, "") for e in EFFORT_ORDER] + sel = prompt_choice("Max effort to include:", effort_options, allow_all=False) + if sel: + max_idx = sel[0] + allowed_efforts = set(EFFORT_ORDER[: max_idx + 1]) + filtered = [i for i in filtered if i["effort"] in allowed_efforts] + info(f"{len(filtered)} items after effort filter") + + # Cherry-pick individual items? + if len(filtered) > 1 and prompt_yn("Cherry-pick individual items?", default=False): + item_options = [ + (f"[{i['id']}] {i['title']}", f"{i['effort']} effort") + for i in filtered + ] + sel = prompt_choice("Select items:", item_options, allow_multi=True) + if not sel: + return [] + filtered = [filtered[i] for i in sel] + + return filtered + + +# --------------------------------------------------------------------------- +# Batch task conversion +# --------------------------------------------------------------------------- + +TYPE_TO_WORKFLOW = { + "code_improvements": "feature", + "ui_ux_improvements": "feature", + "documentation_gaps": "documentation", + "security_hardening": "bugfix", + "performance_optimizations": "feature", + "code_quality": "refactor", + "feature": "feature", + "bug_fix": "bugfix", + "refactoring": "refactor", + "documentation": "documentation", + "security": "bugfix", + "performance": "feature", + "ui_ux": "feature", + "infrastructure": "feature", + "testing": "feature", +} + +EFFORT_TO_PRIORITY = { + "trivial": 1, + "small": 3, + "medium": 5, + "large": 7, + "high": 7, + "complex": 9, +} + +EFFORT_TO_HOURS = { + "trivial": 1, + "small": 2, + "medium": 4, + "large": 8, + "high": 8, + "complex": 16, +} + + +def item_to_batch_task(item): + effort = item.get("effort", "medium") + cat = item.get("category", "general").split("/")[0] # strip roadmap/phase prefix + + return { + "title": f"[{item['id']}] {item['title']}", + "description": item["description"], + "workflow_type": TYPE_TO_WORKFLOW.get(cat, "feature"), + "services": ["frontend"], + "priority": EFFORT_TO_PRIORITY.get(effort, 5), + "complexity": "quick" if effort in ("trivial", "small") else "standard", + "estimated_hours": EFFORT_TO_HOURS.get(effort, 4), + } + + +# --------------------------------------------------------------------------- +# Main interactive flow +# --------------------------------------------------------------------------- + +def find_auto_claude_dir(): + """Try to find the Auto-Claude installation.""" + candidates = [ + Path("/aidata/projects/Auto-Claude"), + Path.home() / "Auto-Claude", + Path.home() / "auto-claude", + ] + # Also check if we're inside Auto-Claude itself + cwd = Path.cwd() + if (cwd / "apps" / "backend" / "run.py").exists(): + candidates.insert(0, cwd) + + for p in candidates: + if (p / "apps" / "backend" / "run.py").exists(): + return p + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Interactive batch task creator from Auto-Claude discovery outputs" + ) + parser.add_argument("project", nargs="?", default=".", help="Project directory (default: cwd)") + parser.add_argument("--auto-claude-dir", help="Path to Auto-Claude installation") + parser.add_argument("--dry-run", action="store_true", help="Generate batch file but don't run batch-create") + args = parser.parse_args() + + project_dir = Path(args.project).resolve() + if not project_dir.exists(): + error(f"Project directory not found: {project_dir}") + sys.exit(1) + + ac_dir = Path(args.auto_claude_dir) if args.auto_claude_dir else find_auto_claude_dir() + run_py = ac_dir / "apps" / "backend" / "run.py" if ac_dir else None + + # ── Welcome ────────────────────────────────────────────────────── + clear_screen() + header("Auto-Claude: Batch Task Creator") + info(f"Project: {project_dir}") + if ac_dir: + info(f"Auto-Claude: {ac_dir}") + else: + warn("Auto-Claude installation not found — will generate batch file only") + print() + + # ── Step 1: Detect sources ─────────────────────────────────────── + subheader("Step 1: Detecting discovery outputs") + + sources = detect_sources(project_dir) + if not sources: + error("No discovery outputs found in .auto-claude/") + print() + info("Run one of these first:") + info(" python run.py --project . --ideation") + info(" python run.py --project . --roadmap") + info(" python run.py --project . --insights") + sys.exit(1) + + for s in sources: + success(f"{s['label']} → {s['file']}") + + # ── Step 2: Choose source ──────────────────────────────────────── + print() + if len(sources) == 1: + chosen_source = sources[0] + info(f"Only one source found, using: {chosen_source['label']}") + else: + subheader("Step 2: Choose a source") + src_options = [(s["label"], str(s["file"])) for s in sources] + sel = prompt_choice("Which discovery output do you want to use?", src_options, allow_all=False) + if not sel: + info("Cancelled.") + sys.exit(0) + chosen_source = sources[sel[0]] + + success(f"Using: {chosen_source['label']}") + + # ── Step 3: Extract & display items ────────────────────────────── + items = extract_items(chosen_source) + if not items: + error("No items found in this source.") + sys.exit(1) + + subheader(f"Step 3: Review items ({len(items)} total)") + print() + for i, item in enumerate(items, 1): + sev = f" {RED}[{item['severity']}]{RESET}" if item.get("severity") else "" + print(f" {DIM}{i:>3}.{RESET} [{item['id']}] {item['title']}{sev}") + print(f" {DIM}{item['category']} · {item['effort']} effort{RESET}") + print() + + # ── Step 4: Filter ─────────────────────────────────────────────── + subheader("Step 4: Filter & select") + filtered = filter_items_interactive(items) + + if not filtered: + info("No items selected. Exiting.") + sys.exit(0) + + print() + success(f"{len(filtered)} items selected for batch creation:") + print() + for item in filtered: + print(f" • [{item['id']}] {item['title']} {DIM}({item['effort']}){RESET}") + + # ── Step 5: Confirm & generate ─────────────────────────────────── + print() + if not prompt_yn(f"Generate batch file with {len(filtered)} tasks?"): + info("Cancelled.") + sys.exit(0) + + tasks = [item_to_batch_task(item) for item in filtered] + batch = {"tasks": tasks} + + output_dir = project_dir / ".auto-claude" / "ideation" + output_dir.mkdir(parents=True, exist_ok=True) + output_file = output_dir / "batch_tasks.json" + + output_file.write_text(json.dumps(batch, indent=2)) + print() + success(f"Batch file written: {output_file}") + info(f"Contains {len(tasks)} tasks") + + # ── Step 6: Run batch-create ───────────────────────────────────── + if args.dry_run or not run_py or not run_py.exists(): + print() + subheader("Next step — run manually:") + if run_py and run_py.exists(): + print(f" python {run_py} \\") + else: + print(f" python /path/to/Auto-Claude/apps/backend/run.py \\") + print(f" --project {project_dir} \\") + print(f" --batch-create {output_file}") + print() + print(f" {DIM}Then build each spec:{RESET}") + print(f" python run.py --project {project_dir} --spec --build") + return + + print() + if not prompt_yn("Run batch-create now to generate all specs?"): + info("Batch file saved. You can run it later:") + print(f" python {run_py} --project {project_dir} --batch-create {output_file}") + return + + subheader("Step 6: Creating specs") + print() + + cmd = [ + sys.executable, str(run_py), + "--project", str(project_dir), + "--batch-create", str(output_file), + ] + + result = subprocess.run(cmd, cwd=str(project_dir)) + + if result.returncode == 0: + print() + success("Batch creation complete!") + print() + subheader("What's next?") + print(f" {BOLD}Check status:{RESET}") + print(f" python {run_py} --project {project_dir} --batch-status") + print() + print(f" {BOLD}Generate full spec + build for a task:{RESET}") + print(f" python {run_py} --project {project_dir} --spec ") + print(f" python {run_py} --project {project_dir} --spec --build") + print() + print(f" {BOLD}QA validate:{RESET}") + print(f" python {run_py} --project {project_dir} --spec --qa") + print() + print(f" {BOLD}Clean up when done:{RESET}") + print(f" python {run_py} --project {project_dir} --batch-cleanup") + else: + error(f"Batch creation failed with exit code {result.returncode}") + + +if __name__ == "__main__": + main()