diff --git a/core/framework/skills/__init__.py b/core/framework/skills/__init__.py index 1ec48d9970..170a4901f1 100644 --- a/core/framework/skills/__init__.py +++ b/core/framework/skills/__init__.py @@ -9,27 +9,42 @@ from framework.skills.config import DefaultSkillConfig, SkillsConfig from framework.skills.defaults import DefaultSkillManager from framework.skills.discovery import DiscoveryConfig, SkillDiscovery +from framework.skills.installer import ( + fork_skill, + install_from_git, + install_from_registry, + remove_skill, +) from framework.skills.manager import SkillsManager, SkillsManagerConfig from framework.skills.models import TrustStatus from framework.skills.parser import ParsedSkill, parse_skill_md +from framework.skills.registry import RegistryClient from framework.skills.skill_errors import SkillError, SkillErrorCode, log_skill_error from framework.skills.trust import TrustedRepoStore, TrustGate +from framework.skills.validator import ValidationResult, validate_strict __all__ = [ "DefaultSkillConfig", "DefaultSkillManager", "DiscoveryConfig", "ParsedSkill", + "RegistryClient", "SkillCatalog", "SkillDiscovery", + "SkillError", + "SkillErrorCode", "SkillsConfig", "SkillsManager", "SkillsManagerConfig", "TrustGate", "TrustedRepoStore", "TrustStatus", - "parse_skill_md", - "SkillError", - "SkillErrorCode", + "ValidationResult", + "fork_skill", + "install_from_git", + "install_from_registry", "log_skill_error", + "parse_skill_md", + "remove_skill", + "validate_strict", ] diff --git a/core/framework/skills/cli.py b/core/framework/skills/cli.py index 1d69f078ea..bbcf295b44 100644 --- a/core/framework/skills/cli.py +++ b/core/framework/skills/cli.py @@ -1,18 +1,64 @@ -"""CLI commands for the Hive skill system. +"""CLI commands for the Hive skill system (CLI-1 through CLI-13). -Phase 1 commands (AS-13): - hive skill list — list discovered skills across all scopes - hive skill trust — permanently trust a project repo's skills - -Full CLI suite (CLI-1 through CLI-13) is Phase 2. +Commands: + hive skill list — list discovered skills (all scopes) + hive skill install — install from registry or git URL + hive skill remove — uninstall a skill + hive skill info — show skill details + hive skill init — scaffold a new SKILL.md + hive skill validate — strict-validate a SKILL.md + hive skill doctor — health-check skills / default skills + hive skill update — refresh registry cache or re-install a skill + hive skill search — search registry by name/tag/description + hive skill fork — create local editable copy of a skill + hive skill test — run skill in isolation or execute its eval suite + hive skill trust — permanently trust a project repo's skills """ from __future__ import annotations +import json as _json +import os +import shutil import subprocess import sys from pathlib import Path +_SKILL_MD_TEMPLATE = """\ +--- +name: {name} +description: +version: 0.1.0 +license: MIT +author: "" +compatibility: + - claude-code + - hive +metadata: + tags: [] +# allowed-tools: +# - tool_name +--- + +## Instructions + +Describe what the agent should do when this skill is activated. + +### When to Use This Skill + +Describe the conditions under which the agent should activate this skill. + +### Step-by-Step Protocol + +1. First, ... +2. Then, ... +3. Finally, ... + +### Output Format + +Describe the expected output format or deliverable. +""" + def register_skill_commands(subparsers) -> None: """Register the ``hive skill`` subcommand group.""" @@ -27,8 +73,184 @@ def register_skill_commands(subparsers) -> None: metavar="PATH", help="Project directory to scan (default: current directory)", ) + list_parser.add_argument("--json", action="store_true", help="Output as JSON") list_parser.set_defaults(func=cmd_skill_list) + # hive skill install + install_parser = skill_sub.add_parser( + "install", + help="Install a skill from the registry or a git URL", + ) + install_parser.add_argument( + "name_or_url", + nargs="?", + help="Skill name (from registry) or git URL", + ) + install_parser.add_argument( + "--version", + default=None, + metavar="REF", + help="Git ref (branch/tag) to install", + ) + install_parser.add_argument( + "--from", + dest="from_url", + default=None, + metavar="URL", + help="Install from this git URL directly", + ) + install_parser.add_argument( + "--pack", + default=None, + metavar="PACK", + help="Install a starter pack by name", + ) + install_parser.add_argument( + "--name", + dest="install_name", + default=None, + metavar="NAME", + help="Override the skill directory name on install", + ) + install_parser.add_argument("--json", action="store_true", help="Output as JSON") + install_parser.set_defaults(func=cmd_skill_install) + + # hive skill remove + remove_parser = skill_sub.add_parser("remove", help="Uninstall a skill") + remove_parser.add_argument("name", help="Skill name to remove") + remove_parser.add_argument("--json", action="store_true", help="Output as JSON") + remove_parser.set_defaults(func=cmd_skill_remove) + + # hive skill info + info_parser = skill_sub.add_parser("info", help="Show skill details") + info_parser.add_argument("name", help="Skill name") + info_parser.add_argument( + "--project-dir", + default=None, + metavar="PATH", + help="Project directory to scan (default: current directory)", + ) + info_parser.add_argument("--json", action="store_true", help="Output as JSON") + info_parser.set_defaults(func=cmd_skill_info) + + # hive skill init + init_parser = skill_sub.add_parser( + "init", help="Scaffold a new skill directory with a SKILL.md template" + ) + init_parser.add_argument("--name", dest="skill_name", default=None, metavar="NAME") + init_parser.add_argument( + "--dir", + dest="target_dir", + default=None, + metavar="PATH", + help="Parent directory for the new skill (default: current directory)", + ) + init_parser.set_defaults(func=cmd_skill_init) + + # hive skill validate + validate_parser = skill_sub.add_parser( + "validate", help="Strictly validate a SKILL.md against the Agent Skills spec" + ) + validate_parser.add_argument("path", help="Path to SKILL.md or its parent directory") + validate_parser.add_argument("--json", action="store_true", help="Output as JSON") + validate_parser.set_defaults(func=cmd_skill_validate) + + # hive skill doctor + doctor_parser = skill_sub.add_parser( + "doctor", help="Health-check skills (parseable, scripts executable, tools available)" + ) + doctor_parser.add_argument( + "name", + nargs="?", + default=None, + help="Skill name to check (default: all discovered skills)", + ) + doctor_parser.add_argument( + "--defaults", + action="store_true", + help="Check all 6 framework default skills", + ) + doctor_parser.add_argument( + "--project-dir", + default=None, + metavar="PATH", + ) + doctor_parser.add_argument("--json", action="store_true", help="Output as JSON") + doctor_parser.set_defaults(func=cmd_skill_doctor) + + # hive skill update + update_parser = skill_sub.add_parser( + "update", + help="Refresh registry cache or re-install a specific skill", + ) + update_parser.add_argument( + "name", + nargs="?", + default=None, + help="Skill name to update (default: refresh registry cache only)", + ) + update_parser.add_argument("--json", action="store_true", help="Output as JSON") + update_parser.set_defaults(func=cmd_skill_update) + + # hive skill search + search_parser = skill_sub.add_parser( + "search", help="Search the skill registry by name, tag, or description" + ) + search_parser.add_argument("query", help="Search query string") + search_parser.add_argument("--json", action="store_true", help="Output as JSON") + search_parser.set_defaults(func=cmd_skill_search) + + # hive skill fork + fork_parser = skill_sub.add_parser("fork", help="Create a local editable copy of a skill") + fork_parser.add_argument("name", help="Skill name to fork") + fork_parser.add_argument( + "--name", + dest="new_name", + default=None, + metavar="NEW_NAME", + help="Name for the forked skill (default: -fork)", + ) + fork_parser.add_argument( + "--dir", + dest="target_dir", + default=None, + metavar="PATH", + help="Parent directory for the fork (default: ~/.hive/skills/)", + ) + fork_parser.add_argument( + "--yes", + action="store_true", + help="Skip confirmation prompt", + ) + fork_parser.add_argument( + "--project-dir", + default=None, + metavar="PATH", + ) + fork_parser.add_argument("--json", action="store_true", help="Output as JSON") + fork_parser.set_defaults(func=cmd_skill_fork) + + # hive skill test + test_parser = skill_sub.add_parser( + "test", help="Run a skill in isolation or execute its eval suite (CLI-9)" + ) + test_parser.add_argument("path", help="Path to SKILL.md or its parent directory") + test_parser.add_argument( + "--input", + dest="input_json", + default=None, + metavar="JSON", + help='JSON input to pass to the skill, e.g. \'{"prompt": "..."}\'', + ) + test_parser.add_argument( + "--model", + default=None, + metavar="MODEL", + help="Override the LLM model (default: claude-haiku-4-5-20251001)", + ) + test_parser.add_argument("--json", action="store_true", help="Output as JSON") + test_parser.set_defaults(func=cmd_skill_test) + # hive skill trust trust_parser = skill_sub.add_parser( "trust", @@ -41,6 +263,11 @@ def register_skill_commands(subparsers) -> None: trust_parser.set_defaults(func=cmd_skill_trust) +# --------------------------------------------------------------------------- +# Command handlers +# --------------------------------------------------------------------------- + + def cmd_skill_list(args) -> int: """List all discovered skills grouped by scope.""" from framework.skills.discovery import DiscoveryConfig, SkillDiscovery @@ -48,6 +275,24 @@ def cmd_skill_list(args) -> int: project_dir = Path(args.project_dir).resolve() if args.project_dir else Path.cwd() skills = SkillDiscovery(DiscoveryConfig(project_root=project_dir)).discover() + if getattr(args, "json", False): + print( + _json.dumps( + { + "skills": [ + { + "name": s.name, + "description": s.description, + "scope": s.source_scope, + "location": s.location, + } + for s in skills + ] + } + ) + ) + return 0 + if not skills: print("No skills discovered.") return 0 @@ -72,6 +317,959 @@ def cmd_skill_list(args) -> int: return 0 +def cmd_skill_install(args) -> int: + """Install a skill from the registry or a git URL.""" + from framework.skills.installer import ( + USER_SKILLS_DIR, + install_from_git, + install_from_registry, + maybe_show_install_notice, + ) + from framework.skills.registry import RegistryClient + from framework.skills.skill_errors import SkillError + + maybe_show_install_notice() + sys.stdout.flush() + + target_dir = USER_SKILLS_DIR + + # hive skill install --pack + if args.pack: + return _install_pack(args.pack, target_dir, args.version) + + use_json = getattr(args, "json", False) + + # hive skill install --from [--name ] + if args.from_url: + skill_name = args.install_name or _derive_name_from_url(args.from_url) + if not use_json: + print(f"Installing '{skill_name}' from {args.from_url} ...", flush=True) + try: + dest = install_from_git( + git_url=args.from_url, + skill_name=skill_name, + version=args.version, + target_dir=target_dir, + ) + except SkillError as exc: + if use_json: + print(_json.dumps({"error": exc.what, "why": exc.why, "fix": exc.fix})) + else: + print(f"Error: {exc.what}", file=sys.stderr) + print(f" Why: {exc.why}", file=sys.stderr) + print(f" Fix: {exc.fix}", file=sys.stderr) + return 1 + if use_json: + print(_json.dumps({"name": skill_name, "location": str(dest)})) + else: + print(f"✓ Installed: {skill_name}") + print(f" Location: {dest}") + return 0 + + # hive skill install (registry lookup) + if args.name_or_url: + name = args.install_name or args.name_or_url + client = RegistryClient() + entry = client.get_skill_entry(args.name_or_url) + if entry is None: + if use_json: + print( + _json.dumps( + { + "error": f"skill '{args.name_or_url}' not found in registry", + "why": "Registry may be unavailable or skill name is incorrect.", + "fix": "hive skill install --from ", + } + ) + ) + else: + print( + f"Error: skill '{args.name_or_url}' not found in registry.", + file=sys.stderr, + ) + print( + " The registry may be unavailable, or the skill name is incorrect.", + file=sys.stderr, + ) + print( + " Install from a git URL directly: hive skill install --from ", + file=sys.stderr, + ) + return 1 + if not use_json: + print(f"Installing '{name}' from registry ...") + try: + dest = install_from_registry(entry, target_dir=target_dir, version=args.version) + except SkillError as exc: + if use_json: + print(_json.dumps({"error": exc.what, "why": exc.why, "fix": exc.fix})) + else: + print(f"Error: {exc.what}", file=sys.stderr) + print(f" Why: {exc.why}", file=sys.stderr) + print(f" Fix: {exc.fix}", file=sys.stderr) + return 1 + if use_json: + print(_json.dumps({"name": name, "location": str(dest)})) + else: + print(f"✓ Installed: {name}") + print(f" Location: {dest}") + return 0 + + if use_json: + print( + _json.dumps( + { + "error": "No install target specified", + "why": "Provide a skill name, --from , or --pack .", + "fix": "hive skill install --help", + } + ) + ) + else: + print("Error: specify a skill name, --from , or --pack .", file=sys.stderr) + print(" Usage: hive skill install ", file=sys.stderr) + print(" hive skill install --from ", file=sys.stderr) + print(" hive skill install --pack ", file=sys.stderr) + return 1 + + +def cmd_skill_remove(args) -> int: + """Uninstall a skill from ~/.hive/skills/.""" + from framework.skills.installer import remove_skill + from framework.skills.skill_errors import SkillError + + use_json = getattr(args, "json", False) + + try: + removed = remove_skill(args.name) + except SkillError as exc: + if use_json: + print(_json.dumps({"error": exc.what, "why": exc.why, "fix": exc.fix})) + else: + print(f"Error: {exc.what}", file=sys.stderr) + print(f" Why: {exc.why}", file=sys.stderr) + print(f" Fix: {exc.fix}", file=sys.stderr) + return 1 + + if not removed: + if use_json: + print( + _json.dumps( + { + "error": f"skill '{args.name}' not found", + "why": "Skill is not installed in ~/.hive/skills/.", + "fix": "hive skill list", + } + ) + ) + else: + print(f"Error: skill '{args.name}' not found in ~/.hive/skills/.", file=sys.stderr) + print(" Use 'hive skill list' to see installed skills.", file=sys.stderr) + return 1 + + if use_json: + print(_json.dumps({"name": args.name, "removed": True})) + else: + print(f"✓ Removed: {args.name}") + return 0 + + +def cmd_skill_info(args) -> int: + """Show details for a skill by name.""" + from framework.skills.discovery import DiscoveryConfig, SkillDiscovery + from framework.skills.registry import RegistryClient + + use_json = getattr(args, "json", False) + project_dir = Path(args.project_dir).resolve() if args.project_dir else Path.cwd() + skills = SkillDiscovery(DiscoveryConfig(project_root=project_dir)).discover() + match = next((s for s in skills if s.name == args.name), None) + + if match: + base = Path(match.base_dir) + sub_files: dict[str, list[str]] = {} + for sub in ("scripts", "references", "assets"): + sub_dir = base / sub + if sub_dir.is_dir(): + files = sorted(f.name for f in sub_dir.iterdir() if f.is_file()) + if files: + sub_files[sub] = files + + if use_json: + print( + _json.dumps( + { + "name": match.name, + "description": match.description, + "scope": match.source_scope, + "location": match.location, + "installed": True, + "license": match.license, + "compatibility": match.compatibility or [], + "allowed_tools": match.allowed_tools or [], + "tags": list(match.metadata.get("tags", [])) if match.metadata else [], + **dict(sub_files), + } + ) + ) + return 0 + + print(f"\n{match.name}") + print("─" * 40) + print(f" Description: {match.description}") + print(f" Scope: {match.source_scope}") + print(f" Location: {match.location}") + if match.license: + print(f" License: {match.license}") + if match.compatibility: + print(f" Compatibility: {', '.join(match.compatibility)}") + if match.allowed_tools: + print(f" Allowed tools: {', '.join(match.allowed_tools)}") + if match.metadata: + tags = match.metadata.get("tags", []) + if tags: + print(f" Tags: {', '.join(str(t) for t in tags)}") + for sub, files in sub_files.items(): + print(f" {sub.capitalize():13s}: {', '.join(files)}") + return 0 + + # Not installed locally — try registry + client = RegistryClient() + entry = client.get_skill_entry(args.name) + if entry: + if use_json: + print( + _json.dumps( + { + "name": entry.get("name", args.name), + "description": entry.get("description", ""), + "installed": False, + "version": entry.get("version", "unknown"), + "author": entry.get("author", "unknown"), + "trust_tier": entry.get("trust_tier", "community"), + "license": entry.get("license"), + "tags": entry.get("tags", []), + } + ) + ) + return 0 + + print(f"\n{entry.get('name', args.name)} (not installed)") + print("─" * 40) + print(f" Description: {entry.get('description', '')}") + print(f" Version: {entry.get('version', 'unknown')}") + print(f" Author: {entry.get('author', 'unknown')}") + print(f" Trust tier: {entry.get('trust_tier', 'community')}") + if entry.get("license"): + print(f" License: {entry['license']}") + if entry.get("tags"): + print(f" Tags: {', '.join(entry['tags'])}") + print(f"\n Install with: hive skill install {args.name}") + return 0 + + if use_json: + print( + _json.dumps( + { + "error": f"skill '{args.name}' not found locally or in registry", + } + ) + ) + else: + print(f"Error: skill '{args.name}' not found locally or in registry.", file=sys.stderr) + return 1 + + +def cmd_skill_init(args) -> int: + """Scaffold a new skill directory with a SKILL.md template.""" + name = args.skill_name + if not name: + # Prompt interactively if not provided + if sys.stdin.isatty(): + name = input("Skill name (e.g. my-research-skill): ").strip() + if not name: + print("Error: provide a skill name with --name .", file=sys.stderr) + return 1 + + parent = Path(args.target_dir).resolve() if args.target_dir else Path.cwd() + skill_dir = parent / name + + if skill_dir.exists(): + print(f"Error: directory already exists: {skill_dir}", file=sys.stderr) + print( + " Choose a different --name or use --dir to place it elsewhere.", + file=sys.stderr, + ) + return 1 + + skill_dir.mkdir(parents=True) + skill_md = skill_dir / "SKILL.md" + skill_md.write_text(_SKILL_MD_TEMPLATE.format(name=name), encoding="utf-8") + + print(f"✓ Created: {skill_md}") + print(" Next steps:") + print(" 1. Edit SKILL.md — fill in description and instructions") + print(f" 2. Run: hive skill validate {skill_md}") + print(f" 3. Move to ~/.hive/skills/{name}/ to make it available to all agents") + return 0 + + +def cmd_skill_validate(args) -> int: + """Strictly validate a SKILL.md against the Agent Skills spec.""" + from framework.skills.validator import validate_strict + + path = Path(args.path) + # Accept either the file or its parent directory + if path.is_dir(): + path = path / "SKILL.md" + + result = validate_strict(path) + + if getattr(args, "json", False): + print( + _json.dumps( + { + "path": str(path), + "passed": result.passed, + "errors": result.errors, + "warnings": result.warnings, + } + ) + ) + return 0 if result.passed else 1 + + for warning in result.warnings: + print(f" [WARN] {warning}") + for error in result.errors: + print(f" [ERROR] {error}") + + if result.passed: + if not result.warnings: + print(f"✓ {path} — valid") + else: + print(f"✓ {path} — valid ({len(result.warnings)} warning(s))") + return 0 + else: + print( + f"✗ {path} — invalid ({len(result.errors)} error(s), {len(result.warnings)} warning(s))" + ) + return 1 + + +def cmd_skill_doctor(args) -> int: + """Health-check skills: parseable, scripts executable, tools available.""" + from framework.skills.defaults import _DEFAULT_SKILLS_DIR, SKILL_REGISTRY + from framework.skills.discovery import DiscoveryConfig, SkillDiscovery + from framework.skills.parser import parse_skill_md + + use_json = getattr(args, "json", False) + overall_errors = 0 + + if args.defaults: + if not use_json: + print("\nFRAMEWORK DEFAULT SKILLS") + print("─" * 40) + skill_results = [] + for skill_name, dir_name in SKILL_REGISTRY.items(): + skill_md = _DEFAULT_SKILLS_DIR / dir_name / "SKILL.md" + if use_json: + report = _doctor_skill_file( + skill_name, skill_md, parse_skill_md, json_mode=True, scope="framework" + ) + overall_errors += len(report["errors"]) + skill_results.append(report) + else: + overall_errors += _doctor_skill_file(skill_name, skill_md, parse_skill_md) + if use_json: + print(_json.dumps({"skills": skill_results, "total_errors": overall_errors})) + return 0 if overall_errors == 0 else 1 + + # Discover skills for doctor + project_dir = Path(args.project_dir).resolve() if args.project_dir else Path.cwd() + skills = SkillDiscovery(DiscoveryConfig(project_root=project_dir)).discover() + + if args.name: + skills = [s for s in skills if s.name == args.name] + if not skills: + # Skill failed to parse (e.g. missing description) — look for the file directly + from framework.skills.installer import USER_SKILLS_DIR + + candidate = USER_SKILLS_DIR / args.name / "SKILL.md" + if candidate.exists(): + if use_json: + report = _doctor_skill_file( + args.name, candidate, parse_skill_md, json_mode=True, scope="user" + ) + print(_json.dumps({"skills": [report], "total_errors": len(report["errors"])})) + return 1 if report["errors"] else 0 + print(f"\nChecking skill: {args.name} [user]") + overall_errors += _doctor_skill_file(args.name, candidate, parse_skill_md) + print() + print(f"✗ {overall_errors} error(s) found.") + return 1 + if use_json: + print(_json.dumps({"error": f"skill '{args.name}' not found"})) + else: + print(f"Error: skill '{args.name}' not found.", file=sys.stderr) + return 1 + + if not skills: + if use_json: + print(_json.dumps({"skills": [], "total_errors": 0})) + else: + print("No skills discovered.") + return 0 + + skill_results = [] + for skill in skills: + if use_json: + report = _doctor_skill_file( + skill.name, + Path(skill.location), + parse_skill_md, + json_mode=True, + scope=skill.source_scope, + ) + overall_errors += len(report["errors"]) + skill_results.append(report) + else: + print(f"\nChecking skill: {skill.name} [{skill.source_scope}]") + overall_errors += _doctor_skill_file(skill.name, Path(skill.location), parse_skill_md) + + if use_json: + print(_json.dumps({"skills": skill_results, "total_errors": overall_errors})) + return 0 if overall_errors == 0 else 1 + + print() + if overall_errors == 0: + print("✓ All skills healthy.") + else: + print(f"✗ {overall_errors} error(s) found.") + return 0 if overall_errors == 0 else 1 + + +def cmd_skill_update(args) -> int: + """Refresh registry cache or re-install a specific skill.""" + from framework.skills.installer import ( + USER_SKILLS_DIR, + install_from_registry, + remove_skill, + ) + from framework.skills.registry import RegistryClient + from framework.skills.skill_errors import SkillError + + use_json = getattr(args, "json", False) + client = RegistryClient() + + if not args.name: + # Refresh cache only + if not use_json: + print("Refreshing registry cache ...") + index = client.fetch_index(force_refresh=True) + if index is None: + if use_json: + print( + _json.dumps( + { + "status": "unavailable", + "warning": "registry unavailable — could not refresh cache", + } + ) + ) + else: + print("Warning: registry unavailable — could not refresh cache.", file=sys.stderr) + return 0 # Non-fatal + count = len(index.get("skills", [])) + if use_json: + print(_json.dumps({"status": "refreshed", "skill_count": count})) + else: + print(f"✓ Registry cache updated ({count} skills).") + return 0 + + # Update a specific skill + entry = client.get_skill_entry(args.name) + if entry is None: + if use_json: + print( + _json.dumps( + { + "error": f"skill '{args.name}' not found in registry", + "why": "Registry may be unavailable or skill name is incorrect.", + "fix": "Check your network connection or verify the skill name.", + } + ) + ) + else: + print( + f"Error: skill '{args.name}' not found in registry — cannot update.", + file=sys.stderr, + ) + print(" Check your network connection or verify the skill name.", file=sys.stderr) + return 1 + + registry_version = entry.get("version") + installed_dir = USER_SKILLS_DIR / args.name + installed_skill_md = installed_dir / "SKILL.md" + + if installed_skill_md.exists(): + import yaml + + try: + content = installed_skill_md.read_text(encoding="utf-8") + parts = content.split("---", 2) + fm = yaml.safe_load(parts[1]) if len(parts) >= 3 else {} + installed_version = fm.get("version") if isinstance(fm, dict) else None + except Exception: + installed_version = None + + if installed_version and installed_version == registry_version: + if use_json: + print( + _json.dumps( + { + "name": args.name, + "status": "up_to_date", + "version": registry_version, + } + ) + ) + else: + print(f"✓ '{args.name}' is already at version {registry_version}.") + return 0 + + if not installed_version and not use_json: + print( + f"Warning: installed skill '{args.name}' has no version field — " + "cannot compare. Re-installing.", + file=sys.stderr, + ) + + # Remove and reinstall + if not use_json: + print(f"Updating '{args.name}' ...") + try: + remove_skill(args.name) + dest = install_from_registry(entry, target_dir=USER_SKILLS_DIR) + except SkillError as exc: + if use_json: + print(_json.dumps({"error": exc.what, "why": exc.why, "fix": exc.fix})) + else: + print(f"Error: {exc.what}", file=sys.stderr) + print(f" Why: {exc.why}", file=sys.stderr) + print(f" Fix: {exc.fix}", file=sys.stderr) + return 1 + + new_version = registry_version or "unknown" + if use_json: + print( + _json.dumps( + { + "name": args.name, + "status": "updated", + "version": new_version, + "location": str(dest), + } + ) + ) + else: + print(f"✓ Updated '{args.name}' to version {new_version}.") + print(f" Location: {dest}") + return 0 + + +def cmd_skill_search(args) -> int: + """Search the skill registry by name, tag, or description.""" + from framework.skills.registry import RegistryClient + + use_json = getattr(args, "json", False) + client = RegistryClient() + # Trigger a fetch to check availability + index = client.fetch_index() + if index is None: + if use_json: + print( + _json.dumps( + { + "error": "registry unavailable", + "query": args.query, + "fix": "hive skill install --from ", + } + ) + ) + else: + print( + f"Error: registry unavailable — cannot search for '{args.query}'.", + file=sys.stderr, + ) + print( + " Install from a git URL directly: hive skill install --from ", + file=sys.stderr, + ) + return 1 + + results = client.search(args.query) + + if use_json: + print( + _json.dumps( + { + "query": args.query, + "results": [ + { + "name": e.get("name", ""), + "description": e.get("description", ""), + "trust_tier": e.get("trust_tier", "community"), + "tags": e.get("tags", []), + } + for e in results + ], + } + ) + ) + return 0 + + if not results: + print(f"No skills found matching '{args.query}'.") + return 0 + + print(f"\n{len(results)} result(s) for '{args.query}':\n") + for entry in results: + name = entry.get("name", "") + tier = entry.get("trust_tier", "community") + description = entry.get("description", "") + print(f" • {name} [{tier}]") + print(f" {description}") + print() + return 0 + + +def cmd_skill_fork(args) -> int: + """Create a local editable copy of a skill.""" + from framework.skills.discovery import DiscoveryConfig, SkillDiscovery + from framework.skills.installer import USER_SKILLS_DIR, fork_skill + from framework.skills.skill_errors import SkillError + + use_json = getattr(args, "json", False) + project_dir = Path(args.project_dir).resolve() if args.project_dir else Path.cwd() + skills = SkillDiscovery(DiscoveryConfig(project_root=project_dir)).discover() + source = next((s for s in skills if s.name == args.name), None) + + if source is None: + if use_json: + print(_json.dumps({"error": f"skill '{args.name}' not found"})) + else: + print(f"Error: skill '{args.name}' not found.", file=sys.stderr) + print(" Use 'hive skill list' to see available skills.", file=sys.stderr) + return 1 + + new_name = args.new_name or f"{args.name}-fork" + target_dir = Path(args.target_dir).resolve() if args.target_dir else USER_SKILLS_DIR + dest = target_dir / new_name + + if not args.yes and not use_json: + answer = _prompt_yes_no(f"Fork '{args.name}' to {dest}? [y/N] ") + if not answer: + print("Aborted.") + return 0 + + try: + result = fork_skill(source, new_name, target_dir) + except SkillError as exc: + if use_json: + print(_json.dumps({"error": exc.what, "why": exc.why, "fix": exc.fix})) + else: + print(f"Error: {exc.what}", file=sys.stderr) + print(f" Why: {exc.why}", file=sys.stderr) + print(f" Fix: {exc.fix}", file=sys.stderr) + return 1 + + if use_json: + print(_json.dumps({"source": args.name, "new_name": new_name, "location": str(result)})) + else: + print(f"✓ Forked '{args.name}' → '{new_name}'") + print(f" Location: {result}") + print(" Edit SKILL.md to customise, then run: hive skill validate") + return 0 + + +def cmd_skill_test(args) -> int: + """Run a skill in isolation or execute its eval suite (CLI-9). + + Three progressive modes: + 1. Structural (always): validate_strict + doctor checks — no API key needed. + 2. Invocation (--input): inject skill body as system, run prompt through Claude. + 3. Eval suite (evals/ present): run each eval case + LLM-judge assertions. + """ + from framework.skills.parser import parse_skill_md + from framework.skills.validator import validate_strict + + use_json = getattr(args, "json", False) + + # ── 1. Resolve path ────────────────────────────────────────────────────── + path = Path(args.path) + if path.is_dir(): + path = path / "SKILL.md" + + # ── 2. Structural validation (always) ──────────────────────────────────── + vresult = validate_strict(path) + structural = { + "passed": vresult.passed, + "errors": vresult.errors, + "warnings": vresult.warnings, + } + + if not use_json: + for w in vresult.warnings: + print(f" [WARN] {w}") + for e in vresult.errors: + print(f" [ERROR] {e}") + + if not vresult.passed: + if use_json: + print(_json.dumps({"path": str(path), "skill": None, "structural": structural})) + else: + print(f"✗ {path} — structural validation failed. Fix errors before testing.") + return 1 + + # ── 3. Parse the skill ─────────────────────────────────────────────────── + skill = parse_skill_md(path, source_scope="user") + if skill is None: + if use_json: + print( + _json.dumps( + { + "path": str(path), + "skill": None, + "structural": { + "passed": False, + "errors": ["parse_skill_md returned None"], + "warnings": [], + }, + } + ) + ) + else: + print(f"✗ {path} — skill could not be parsed.", file=sys.stderr) + return 1 + + evals_dir = path.parent / "evals" + has_evals = evals_dir.is_dir() and any(evals_dir.glob("*.json")) + has_input = args.input_json is not None + + # ── 4. Structural-only mode (no LLM needed) ─────────────────────────────── + if not has_input and not has_evals: + doctor_errors = _doctor_skill_file( + skill.name, path, parse_skill_md, json_mode=use_json, scope="user" + ) + if use_json: + print( + _json.dumps( + { + "path": str(path), + "skill": skill.name, + "structural": structural, + "doctor": doctor_errors, + } + ) + ) + return 0 if (structural["passed"] and not doctor_errors.get("errors")) else 1 + if doctor_errors == 0: + print(f"✓ {skill.name} — structurally valid and healthy.") + print(" No evals/ directory found. Use --input for a live invocation test.") + else: + print(f"✗ {skill.name} — {doctor_errors} doctor error(s) found.") + return 0 if doctor_errors == 0 else 1 + + # ── 5. Initialize LLM provider ──────────────────────────────────────────── + provider = None + provider_error = None + try: + from framework.llm.anthropic import AnthropicProvider + + model = getattr(args, "model", None) or "claude-haiku-4-5-20251001" + provider = AnthropicProvider(model=model) + except Exception as exc: + provider_error = str(exc) + + if provider is None and has_input: + # --input was explicitly requested but we have no provider — hard error + if use_json: + print( + _json.dumps( + { + "path": str(path), + "skill": skill.name, + "error": f"Cannot initialize LLM provider: {provider_error}", + "fix": "Set ANTHROPIC_API_KEY to enable live invocation.", + } + ) + ) + else: + print(f"Error: Cannot initialize LLM provider: {provider_error}", file=sys.stderr) + print(" Set ANTHROPIC_API_KEY to enable live invocation.", file=sys.stderr) + return 1 + + result: dict = { + "path": str(path), + "skill": skill.name, + "structural": structural, + } + overall_failed = 0 + + # ── 6. Invocation mode (--input) ────────────────────────────────────────── + if has_input and provider is not None: + raw = args.input_json + try: + data = _json.loads(raw) + except ValueError: + data = raw + prompt = data.get("prompt", raw) if isinstance(data, dict) else str(data) + + if not use_json: + print(f"\nRunning '{skill.name}' with provided input ...") + try: + response = provider.complete( + messages=[{"role": "user", "content": prompt}], + system=skill.body, + max_tokens=2048, + ) + if not use_json: + print("\n── Response ──────────────────────────────────────────────────") + print(response.content) + print("──────────────────────────────────────────────────────────────") + result["invocation"] = { + "prompt": prompt, + "response": response.content, + "model": response.model, + } + except Exception as exc: + if not use_json: + print(f"Error during invocation: {exc}", file=sys.stderr) + result["invocation"] = {"prompt": prompt, "error": str(exc)} + overall_failed += 1 + + # ── 7. Eval suite ───────────────────────────────────────────────────────── + if has_evals: + if provider is None: + # Degrade gracefully: structural passed, just warn about evals + if not use_json: + n = len(list(evals_dir.glob("*.json"))) + print( + f"\nWarning: ANTHROPIC_API_KEY not set — skipping {n} eval file(s).", + file=sys.stderr, + ) + else: + from framework.testing.llm_judge import LLMJudge + + judge = LLMJudge(llm_provider=provider) + eval_results = [] + + for eval_file in sorted(evals_dir.glob("*.json")): + try: + eval_data = _json.loads(eval_file.read_text(encoding="utf-8")) + except Exception as exc: + if not use_json: + print(f" [ERROR] Cannot parse {eval_file.name}: {exc}", file=sys.stderr) + overall_failed += 1 + continue + + for eval_case in eval_data.get("evals", []): + case_id = eval_case.get("id", "?") + eval_prompt = eval_case.get("prompt", "") + + if not use_json: + truncated = eval_prompt[:60] + ("..." if len(eval_prompt) > 60 else "") + print(f"\nEval #{case_id}: {truncated}") + + try: + response = provider.complete( + messages=[{"role": "user", "content": eval_prompt}], + system=skill.body, + max_tokens=2048, + ) + skill_response = response.content + except Exception as exc: + if not use_json: + print(f" [ERROR] Invocation failed: {exc}", file=sys.stderr) + eval_results.append( + { + "id": case_id, + "prompt": eval_prompt, + "error": str(exc), + "passed": False, + } + ) + overall_failed += 1 + continue + + assertion_results = [] + case_failed = False + for assertion in eval_case.get("assertions", []): + try: + judged = judge.evaluate( + constraint=assertion, + source_document=eval_prompt, + summary=skill_response, + criteria=( + "Evaluate whether the skill response satisfies the assertion." + ), + ) + passes = judged.get("passes", False) + explanation = judged.get("explanation", "") + except Exception as exc: + passes = False + explanation = f"Judge error: {exc}" + + assertion_results.append( + { + "text": assertion, + "passes": passes, + "explanation": explanation, + } + ) + if not passes: + case_failed = True + overall_failed += 1 + + if not use_json: + icon = "✓" if passes else "✗" + print(f" {icon} {assertion}") + if not passes: + print(f" → {explanation}") + + eval_results.append( + { + "id": case_id, + "prompt": eval_prompt, + "response": skill_response, + "assertions": assertion_results, + "passed": not case_failed, + } + ) + + passed_count = sum(1 for e in eval_results if e.get("passed")) + failed_count = len(eval_results) - passed_count + result["evals"] = eval_results + result["total_evals"] = len(eval_results) + result["total_passed"] = passed_count + result["total_failed"] = failed_count + + if not use_json: + print(f"\n{passed_count}/{len(eval_results)} eval(s) passed.") + + # ── 8. Output ───────────────────────────────────────────────────────────── + if use_json: + print(_json.dumps(result)) + + if not use_json: + print() + if overall_failed == 0: + print(f"✓ {skill.name} — all tests passed.") + else: + print(f"✗ {skill.name} — {overall_failed} failure(s).") + + return 0 if overall_failed == 0 else 1 + + def cmd_skill_trust(args) -> int: """Permanently trust a project repository's skills.""" from framework.skills.trust import TrustedRepoStore, _normalize_remote_url @@ -118,3 +1316,151 @@ def cmd_skill_trust(args) -> int: print(" Stored in ~/.hive/trusted_repos.json") print(" Skills from this repository will load without prompting in future runs.") return 0 + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _install_pack(pack_name: str, target_dir: Path, version: str | None) -> int: + """Install all skills in a registry starter pack.""" + from framework.skills.installer import install_from_registry + from framework.skills.registry import RegistryClient + from framework.skills.skill_errors import SkillError + + client = RegistryClient() + skill_names = client.get_pack(pack_name) + + if skill_names is None: + print(f"Error: pack '{pack_name}' not found in registry.", file=sys.stderr) + print( + " The registry may be unavailable. Check your network connection.", + file=sys.stderr, + ) + return 1 + + if not skill_names: + print(f"Warning: pack '{pack_name}' contains no skills.", file=sys.stderr) + return 0 + + print(f"Installing pack '{pack_name}' ({len(skill_names)} skills) ...") + errors = 0 + for name in skill_names: + entry = client.get_skill_entry(name) + if not entry: + print(f" ✗ {name} — not found in registry, skipping", file=sys.stderr) + errors += 1 + continue + try: + dest = install_from_registry(entry, target_dir=target_dir, version=version) + print(f" ✓ {name} → {dest}") + except SkillError as exc: + print(f" ✗ {name} — {exc.why}", file=sys.stderr) + errors += 1 + + print() + if errors == 0: + print(f"✓ Pack '{pack_name}' installed successfully.") + else: + print(f"✗ Pack install completed with {errors} error(s).") + return 0 if errors == 0 else 1 + + +def _derive_name_from_url(url: str) -> str: + """Derive a skill directory name from a git URL. + + github.com/org/deep-research.git → deep-research + github.com/org/skills → skills + """ + last = url.rstrip("/").split("/")[-1] + return last[:-4] if last.endswith(".git") else last + + +def _doctor_skill_file( + skill_name: str, + skill_md: Path, + parse_fn, + json_mode: bool = False, + scope: str = "unknown", +): + """Run doctor checks on a single skill file. + + Returns int (error count) when json_mode=False, or a dict report when json_mode=True. + """ + errors: list[str] = [] + warnings: list[str] = [] + + # Check 1: SKILL.md parseable + parsed = parse_fn(skill_md) + if parsed is None: + msg = f"SKILL.md not parseable: {skill_md}" + if json_mode: + errors.append(msg) + return { + "name": skill_name, + "scope": scope, + "parseable": False, + "errors": errors, + "warnings": warnings, + } + print(f" ✗ {msg}") + return 1 + if not json_mode: + print(" ✓ SKILL.md parseable") + + base_dir = skill_md.parent + + # Check 2: scripts exist and are executable + scripts_dir = base_dir / "scripts" + if scripts_dir.is_dir(): + for script in sorted(scripts_dir.iterdir()): + if script.is_file(): + if not script.exists(): + msg = f"Script missing: {script.name}" + errors.append(msg) if json_mode else print(f" ✗ {msg}") + elif not os.access(script, os.X_OK): + msg = f"Script not executable: {script.name} (run: chmod +x {script})" + errors.append(msg) if json_mode else print(f" ✗ {msg}") + elif not json_mode: + print(f" ✓ Script executable: {script.name}") + + # Check 3: references readable + references_dir = base_dir / "references" + if references_dir.is_dir(): + for ref in sorted(references_dir.iterdir()): + if ref.is_file(): + if not os.access(ref, os.R_OK): + msg = f"Reference not readable: {ref.name}" + errors.append(msg) if json_mode else print(f" ✗ {msg}") + elif not json_mode: + print(f" ✓ Reference readable: {ref.name}") + + # Check 4: allowed-tools available on PATH (warning, not error) + if parsed.allowed_tools: + for tool in parsed.allowed_tools: + tool_name = tool.split("/")[-1].split("(")[0].strip() + if tool_name and shutil.which(tool_name) is None: + msg = f"Tool not found in PATH: {tool_name} (may be an MCP tool — OK)" + warnings.append(msg) if json_mode else print(f" ! {msg}") + + if json_mode: + return { + "name": skill_name, + "scope": scope, + "parseable": True, + "errors": errors, + "warnings": warnings, + } + return len(errors) + + +def _prompt_yes_no(prompt: str) -> bool: + """Prompt the user for yes/no. Returns True for y/Y. Non-interactive → False.""" + if not sys.stdin.isatty(): + return False + try: + answer = input(prompt).strip().lower() + return answer in ("y", "yes") + except (EOFError, KeyboardInterrupt): + return False diff --git a/core/framework/skills/installer.py b/core/framework/skills/installer.py new file mode 100644 index 0000000000..4a002a7401 --- /dev/null +++ b/core/framework/skills/installer.py @@ -0,0 +1,348 @@ +"""Skill install, remove, and fork operations. + +Handles filesystem operations for the hive skill CLI: + - install_from_git: git clone --depth=1 → copy to target directory + - install_from_registry: resolve registry entry → delegate to install_from_git + - remove_skill: delete a skill from ~/.hive/skills/ + - fork_skill: copy a skill to a new location with a new name + - maybe_show_install_notice: one-time security notice on first install (NFR-5) +""" + +from __future__ import annotations + +import shutil +import subprocess +import tempfile +from pathlib import Path + +from framework.skills.parser import ParsedSkill +from framework.skills.skill_errors import SkillError, SkillErrorCode + +# Default install destination for user-scope skills +USER_SKILLS_DIR = Path.home() / ".hive" / "skills" + +# Sentinel file for the one-time security notice on first install (NFR-5) +INSTALL_NOTICE_SENTINEL = Path.home() / ".hive" / ".install_notice_shown" + +_INSTALL_NOTICE = """\ +───────────────────────────────────────────────────────────── + Security Notice: Installing Third-Party Skills +───────────────────────────────────────────────────────────── + Skills are instructions executed by AI agents. A malicious + skill can manipulate agent behavior, exfiltrate data, or + cause unintended actions. + + Only install skills from sources you trust. Review the + SKILL.md before running it in a production environment. + + This notice is shown once. Use 'hive skill doctor' to audit + installed skills at any time. +───────────────────────────────────────────────────────────── +""" + + +def maybe_show_install_notice() -> None: + """Print a one-time security notice before the first skill install (NFR-5). + + Touches a sentinel file in ~/.hive/ after showing the notice so it is + only displayed once across all future installs. + """ + if INSTALL_NOTICE_SENTINEL.exists(): + return + print(_INSTALL_NOTICE, flush=True) + try: + INSTALL_NOTICE_SENTINEL.parent.mkdir(parents=True, exist_ok=True) + INSTALL_NOTICE_SENTINEL.touch() + except OSError: + pass # If we can't write the sentinel, just show the notice every time + + +def install_from_git( + git_url: str, + skill_name: str, + subdirectory: str | None = None, + version: str | None = None, + target_dir: Path | None = None, +) -> Path: + """Install a skill from a git repository. + + Clones the repository with --depth=1 into a temporary directory, then + copies the skill subdirectory (or repo root) to the target location. + + Args: + git_url: Git repository URL to clone. + skill_name: Name of the skill — used as the install directory name. + subdirectory: Relative path within the repo to the skill directory. + If None, the repo root is treated as the skill directory. + version: Git ref to checkout (tag, branch, or commit). Defaults to + the remote's default branch. + target_dir: Where to install the skill. Defaults to + ~/.hive/skills//. + + Returns: + Path to the installed skill directory (the parent of SKILL.md). + + Raises: + SkillError: On any failure (git not found, clone failed, SKILL.md missing). + """ + if shutil.which("git") is None: + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"Cannot install '{skill_name}' from {git_url}", + why="git is not installed or not on PATH.", + fix="Install git (https://git-scm.com/) and retry.", + ) + + dest = (target_dir or USER_SKILLS_DIR) / skill_name + if dest.exists(): + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"Cannot install '{skill_name}'", + why=f"Directory already exists: {dest}", + fix=f"Run 'hive skill remove {skill_name}' first, or use a different --name.", + ) + + tmp_dir = tempfile.mkdtemp(prefix="hive-skill-install-") + try: + _git_clone_shallow(git_url, Path(tmp_dir), version=version) + + # Locate the skill within the cloned repo + source_dir = Path(tmp_dir) / subdirectory if subdirectory else Path(tmp_dir) + skill_md = source_dir / "SKILL.md" + if not skill_md.exists(): + raise SkillError( + code=SkillErrorCode.SKILL_NOT_FOUND, + what=f"No SKILL.md found in '{subdirectory or '/'}' of {git_url}", + why="The expected SKILL.md file is not present at the given path.", + fix=( + "Check the repository structure and use " + "'hive skill install --from ' with the correct subdirectory." + ), + ) + + dest.parent.mkdir(parents=True, exist_ok=True) + _copy_skill_dir(source_dir, dest) + return dest + + except SkillError: + raise + except Exception as exc: + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"Failed to install '{skill_name}' from {git_url}", + why=str(exc), + fix="Check the URL, your network connection, and git configuration.", + ) from exc + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + +def install_from_registry( + registry_entry: dict, + target_dir: Path | None = None, + version: str | None = None, +) -> Path: + """Install a skill using a registry index entry. + + Resolves the git_url and subdirectory from the registry entry and + delegates to install_from_git. + + Args: + registry_entry: A skill entry dict from skill_index.json. + target_dir: Override install destination. + version: Override version (defaults to entry's 'version' field). + + Returns: + Path to the installed skill directory. + + Raises: + SkillError: If the registry entry is missing required fields or install fails. + """ + name = registry_entry.get("name") + git_url = registry_entry.get("git_url") + + if not name or not git_url: + raise SkillError( + code=SkillErrorCode.SKILL_NOT_FOUND, + what="Incomplete registry entry — missing 'name' or 'git_url'.", + why="The registry index entry does not contain all required fields.", + fix="Report this issue to the registry maintainer.", + ) + + resolved_version = version or registry_entry.get("version") + subdirectory = registry_entry.get("subdirectory") + + return install_from_git( + git_url=git_url, + skill_name=str(name), + subdirectory=subdirectory, + version=resolved_version, + target_dir=target_dir, + ) + + +def remove_skill(name: str, skills_dir: Path | None = None) -> bool: + """Remove an installed skill from the user skills directory. + + Args: + name: Skill directory name to remove. + skills_dir: Override the search directory (default: ~/.hive/skills/). + + Returns: + True if removed, False if not found. + + Raises: + SkillError: If the directory exists but cannot be removed. + """ + target = (skills_dir or USER_SKILLS_DIR) / name + if not target.exists(): + return False + try: + shutil.rmtree(target) + return True + except OSError as exc: + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"Failed to remove skill '{name}' at {target}", + why=str(exc), + fix="Check file permissions and try again.", + ) from exc + + +def fork_skill( + source: ParsedSkill, + new_name: str, + target_dir: Path, +) -> Path: + """Create a local editable copy of a skill with a new name. + + Copies the skill's base directory to target_dir/new_name/ and rewrites + the 'name' field in the copied SKILL.md frontmatter. + + Args: + source: The source skill to fork (from SkillDiscovery). + new_name: Name for the forked skill. + target_dir: Parent directory for the fork (e.g. ~/.hive/skills/). + + Returns: + Path to the forked skill directory. + + Raises: + SkillError: If the target already exists or the copy fails. + """ + dest = target_dir / new_name + if dest.exists(): + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"Cannot fork to '{dest}'", + why="Target directory already exists.", + fix=f"Choose a different --name or remove '{dest}' first.", + ) + + source_dir = Path(source.base_dir) + try: + dest.parent.mkdir(parents=True, exist_ok=True) + _copy_skill_dir(source_dir, dest) + except OSError as exc: + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"Failed to fork skill '{source.name}' to '{dest}'", + why=str(exc), + fix="Check file permissions and available disk space.", + ) from exc + + # Rewrite the name in the forked SKILL.md via YAML round-trip (safe) + forked_skill_md = dest / "SKILL.md" + if forked_skill_md.exists(): + _rewrite_name_in_skill_md(forked_skill_md, new_name) + + return dest + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _git_clone_shallow(git_url: str, target: Path, version: str | None = None) -> None: + """Clone a git repo at --depth=1 into target directory. + + Args: + git_url: Repository URL. + target: Destination directory (will be created by git). + version: Optional git ref (branch/tag) to clone. + + Raises: + SkillError: If the clone fails. + """ + cmd = ["git", "clone", "--depth=1"] + if version: + cmd += ["--branch", version] + cmd += [git_url, str(target)] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=60, + ) + except subprocess.TimeoutExpired: + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"git clone timed out for {git_url}", + why="The clone operation took longer than 60 seconds.", + fix="Check your network connection and retry.", + ) from None + except (FileNotFoundError, OSError) as exc: + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"Cannot run git for {git_url}", + why=str(exc), + fix="Ensure git is installed and on PATH.", + ) from exc + + if result.returncode != 0: + stderr = result.stderr.strip() + raise SkillError( + code=SkillErrorCode.SKILL_ACTIVATION_FAILED, + what=f"git clone failed for {git_url}", + why=stderr or f"git exited with code {result.returncode}", + fix="Check the URL is correct and the repository is publicly accessible.", + ) + + +def _copy_skill_dir(src: Path, dst: Path) -> None: + """Copy a skill directory, ignoring VCS and cache artifacts.""" + ignore = shutil.ignore_patterns(".git", "__pycache__", "*.pyc", ".venv", "venv", "node_modules") + shutil.copytree(src, dst, ignore=ignore) + + +def _rewrite_name_in_skill_md(skill_md: Path, new_name: str) -> None: + """Rewrite the 'name' field in a SKILL.md frontmatter via YAML round-trip. + + Parses the frontmatter with yaml.safe_load, updates 'name', re-serializes + with yaml.dump, and reconstructs the file as: + --- + + --- + + + Falls back to no-op if the file can't be parsed (the copy is still usable). + """ + import yaml + + try: + content = skill_md.read_text(encoding="utf-8") + parts = content.split("---", 2) + if len(parts) < 3: + return + frontmatter = yaml.safe_load(parts[1].strip()) + if not isinstance(frontmatter, dict): + return + frontmatter["name"] = new_name + new_yaml = yaml.dump(frontmatter, default_flow_style=False, allow_unicode=True) + new_content = f"---\n{new_yaml}---\n{parts[2]}" + skill_md.write_text(new_content, encoding="utf-8") + except Exception: + pass # Degraded: forked copy works, name just isn't updated diff --git a/core/framework/skills/registry.py b/core/framework/skills/registry.py new file mode 100644 index 0000000000..c3170f9f42 --- /dev/null +++ b/core/framework/skills/registry.py @@ -0,0 +1,206 @@ +"""Registry client for the Hive community skill registry. + +Fetches the skill index from the hive-skill-registry GitHub repo, caches it +locally, and provides search and resolution utilities. + +The registry repo (Phase 3) may not exist yet. All public methods degrade +gracefully — returning None or [] on any network or parse failure. + +Configure a custom registry URL via the HIVE_REGISTRY_URL environment variable. +""" + +from __future__ import annotations + +import json +import logging +import os +from datetime import UTC, datetime +from pathlib import Path +from urllib.error import URLError +from urllib.request import urlopen + +logger = logging.getLogger(__name__) + +# Default registry index URL (Phase 3 repo, may not exist yet) +_DEFAULT_REGISTRY_URL = ( + "https://raw.githubusercontent.com/hive-skill-registry/" + "hive-skill-registry/main/skill_index.json" +) + +_CACHE_DIR = Path.home() / ".hive" / "registry_cache" +_CACHE_INDEX_PATH = _CACHE_DIR / "skill_index.json" +_CACHE_METADATA_PATH = _CACHE_DIR / "metadata.json" +_CACHE_TTL_SECONDS = 3600 # 1 hour + + +class RegistryClient: + """Client for the Hive community skill registry. + + All public methods return None / [] on any failure — never raise. + Network errors, parse failures, and missing registries are all + treated as graceful degradation. + """ + + def __init__( + self, + registry_url: str | None = None, + cache_dir: Path | None = None, + ) -> None: + self._url = registry_url or os.environ.get("HIVE_REGISTRY_URL", _DEFAULT_REGISTRY_URL) + cache_root = cache_dir or _CACHE_DIR + self._index_path = cache_root / "skill_index.json" + self._metadata_path = cache_root / "metadata.json" + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def fetch_index(self, force_refresh: bool = False) -> dict | None: + """Return the registry index dict. + + Uses the local cache if it is fresh (within TTL) unless + force_refresh=True. Returns None on any failure. + """ + if not force_refresh and self._is_cache_fresh(): + cached = self._load_cache() + if cached is not None: + return cached + + raw = self._http_fetch(self._url) + if raw is None: + # Network unavailable — fall back to stale cache if present + stale = self._load_cache() + if stale is not None: + logger.debug("registry: network unavailable, using stale cache") + return stale + + try: + data = json.loads(raw.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as exc: + logger.warning("registry: failed to parse index JSON: %s", exc) + return self._load_cache() + + if not isinstance(data, dict): + logger.warning("registry: index is not a JSON object") + return self._load_cache() + + self._save_cache(data) + return data + + def search(self, query: str) -> list[dict]: + """Search registry skills by name, description, or tags. + + Case-insensitive substring match. Returns [] if index unavailable. + """ + index = self.fetch_index() + if not index: + return [] + skills = index.get("skills", []) + if not isinstance(skills, list): + return [] + q = query.lower() + results = [] + for entry in skills: + if not isinstance(entry, dict): + continue + name = str(entry.get("name", "")).lower() + description = str(entry.get("description", "")).lower() + tags = " ".join(str(t) for t in entry.get("tags", [])).lower() + if q in name or q in description or q in tags: + results.append(entry) + return results + + def get_skill_entry(self, name: str) -> dict | None: + """Look up a single skill by exact name. Returns None if not found.""" + index = self.fetch_index() + if not index: + return None + for entry in index.get("skills", []): + if isinstance(entry, dict) and entry.get("name") == name: + return entry + return None + + def get_pack(self, pack_name: str) -> list[str] | None: + """Return the list of skill names in a starter pack. + + Returns None if the pack is not found or the index is unavailable. + """ + index = self.fetch_index() + if not index: + return None + for pack in index.get("packs", []): + if isinstance(pack, dict) and pack.get("name") == pack_name: + skills = pack.get("skills", []) + if isinstance(skills, list): + return [s for s in skills if isinstance(s, str)] + return None + + def resolve_git_url(self, name: str) -> tuple[str, str | None] | None: + """Return (git_url, subdirectory) for a skill name. + + Returns None if the skill is not in the registry or the index + is unavailable. + """ + entry = self.get_skill_entry(name) + if not entry: + return None + git_url = entry.get("git_url") + if not git_url: + return None + subdirectory = entry.get("subdirectory") or None + return str(git_url), subdirectory + + # ------------------------------------------------------------------ + # Cache internals + # ------------------------------------------------------------------ + + def _load_cache(self) -> dict | None: + """Read cached index from disk. Returns None if absent or unreadable.""" + try: + data = json.loads(self._index_path.read_text(encoding="utf-8")) + return data if isinstance(data, dict) else None + except FileNotFoundError: + return None + except Exception as exc: + logger.debug("registry: could not read cache: %s", exc) + return None + + def _save_cache(self, data: dict) -> None: + """Write index to disk atomically (.tmp then rename).""" + try: + self._index_path.parent.mkdir(parents=True, exist_ok=True) + tmp = self._index_path.with_suffix(".tmp") + tmp.write_text(json.dumps(data, indent=2), encoding="utf-8") + tmp.replace(self._index_path) + # Update metadata + meta = {"last_fetched": datetime.now(tz=UTC).isoformat()} + meta_tmp = self._metadata_path.with_suffix(".tmp") + meta_tmp.write_text(json.dumps(meta, indent=2), encoding="utf-8") + meta_tmp.replace(self._metadata_path) + except Exception as exc: + logger.debug("registry: could not write cache: %s", exc) + + def _is_cache_fresh(self) -> bool: + """Return True if the cached index was fetched within the TTL.""" + try: + meta = json.loads(self._metadata_path.read_text(encoding="utf-8")) + last_fetched = datetime.fromisoformat(meta["last_fetched"]) + age = (datetime.now(tz=UTC) - last_fetched).total_seconds() + return age < _CACHE_TTL_SECONDS + except Exception: + return False + + def _http_fetch(self, url: str, timeout: int = 10) -> bytes | None: + """Fetch URL contents. Returns None on any network error — never raises.""" + try: + with urlopen(url, timeout=timeout) as resp: # noqa: S310 + return resp.read() + except URLError as exc: + logger.debug("registry: HTTP fetch failed for %s: %s", url, exc) + return None + except TimeoutError as exc: + logger.debug("registry: HTTP fetch timed out for %s: %s", url, exc) + return None + except Exception as exc: + logger.debug("registry: unexpected error fetching %s: %s", url, exc) + return None diff --git a/core/framework/skills/validator.py b/core/framework/skills/validator.py new file mode 100644 index 0000000000..865b7171a8 --- /dev/null +++ b/core/framework/skills/validator.py @@ -0,0 +1,176 @@ +"""Strict SKILL.md validation for contributor tooling (hive skill validate). + +Unlike the lenient parser used at runtime, this module applies hard-error rules +that match the Agent Skills specification exactly. Intended for contributor +tooling, CI gates, and hive skill doctor. +""" + +from __future__ import annotations + +import stat +from dataclasses import dataclass, field +from pathlib import Path + +from framework.skills.parser import _MAX_NAME_LENGTH + + +@dataclass +class ValidationResult: + """Result of a strict SKILL.md validation run.""" + + passed: bool + errors: list[str] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + + +def validate_strict(path: Path) -> ValidationResult: + """Run all strict checks against a SKILL.md file. + + Applies hard-error rules that go beyond the lenient runtime parser: + - name must be explicit (no directory-name fallback) + - YAML must parse without fixup + - name/directory mismatch is an error, not a warning + - empty body is an error + - scripts must be executable + + Args: + path: Path to the SKILL.md file to validate. + + Returns: + ValidationResult with passed=True if no errors, plus any warnings. + """ + errors: list[str] = [] + warnings: list[str] = [] + + # 1. File exists and is readable + try: + content = path.read_text(encoding="utf-8") + except FileNotFoundError: + return ValidationResult(passed=False, errors=[f"File not found: {path}"]) + except PermissionError: + return ValidationResult(passed=False, errors=[f"Permission denied reading: {path}"]) + except OSError as exc: + return ValidationResult(passed=False, errors=[f"Cannot read file: {exc}"]) + + # 2. File not empty + if not content.strip(): + return ValidationResult(passed=False, errors=["File is empty."]) + + # 3. YAML frontmatter present + parts = content.split("---", 2) + if len(parts) < 3: + return ValidationResult( + passed=False, + errors=["Missing YAML frontmatter — wrap frontmatter with --- delimiters."], + ) + + raw_yaml = parts[1].strip() + body = parts[2].strip() + + if not raw_yaml: + return ValidationResult( + passed=False, + errors=["Frontmatter delimiters present but YAML block is empty."], + ) + + # 4. YAML parses WITHOUT fixup (strict: unquoted colons are an error) + import yaml + + frontmatter: dict | None = None + try: + frontmatter = yaml.safe_load(raw_yaml) + except yaml.YAMLError as exc: + errors.append( + f"YAML parse error: {exc}. " + 'Wrap values containing colons in quotes, e.g. description: "Use for: research".' + ) + return ValidationResult(passed=False, errors=errors, warnings=warnings) + + if not isinstance(frontmatter, dict): + return ValidationResult( + passed=False, + errors=["Frontmatter is not a YAML key-value mapping."], + ) + + # 5. description present and non-empty + description = frontmatter.get("description") + if not description or not str(description).strip(): + errors.append("Missing required field: 'description' must be present and non-empty.") + + # 6. name present and non-empty (no directory-name fallback in strict mode) + name = frontmatter.get("name") + if not name or not str(name).strip(): + errors.append( + "Missing required field: 'name' must be present. " + "Add 'name: your-skill-name' to the frontmatter." + ) + else: + name = str(name).strip() + parent_dir_name = path.parent.name + + # 7. name length <= 64 chars + if len(name) > _MAX_NAME_LENGTH: + errors.append( + f"Skill name '{name}' is {len(name)} characters — " + f"maximum is {_MAX_NAME_LENGTH}. Shorten the name." + ) + + # 8. name matches parent directory (dot-namespace prefix allowed: hive.X with dir X) + if name != parent_dir_name and not name.endswith(f".{parent_dir_name}"): + errors.append( + f"Name '{name}' does not match directory '{parent_dir_name}'. " + f"Rename the directory to '{name}' or set name to '{parent_dir_name}'." + ) + + # 9. body non-empty + if not body: + errors.append( + "Skill body (instructions) is empty. " + "Add markdown instructions after the closing --- delimiter." + ) + + # 10. license present — warning only + if not frontmatter.get("license"): + warnings.append("No 'license' field — consider adding a license (e.g. MIT, Apache-2.0).") + + # 11. Scripts in scripts/ exist and are executable + base_dir = path.parent + scripts_dir = base_dir / "scripts" + if scripts_dir.is_dir(): + for script_path in sorted(scripts_dir.iterdir()): + if script_path.is_file(): + if not (script_path.stat().st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)): + errors.append( + f"Script not executable: {script_path.name}. Run: chmod +x {script_path}" + ) + + # 12. allowed-tools entries are non-empty strings — warning if malformed + allowed_tools = frontmatter.get("allowed-tools") + if allowed_tools is not None: + if not isinstance(allowed_tools, list): + warnings.append("'allowed-tools' should be a list of strings.") + else: + for tool in allowed_tools: + if not isinstance(tool, str) or not tool.strip(): + warnings.append(f"'allowed-tools' entry {tool!r} is not a non-empty string.") + + # 13. compatibility is a list of strings — error if malformed + compatibility = frontmatter.get("compatibility") + if compatibility is not None: + if not isinstance(compatibility, list): + errors.append("'compatibility' must be a list of strings.") + else: + for item in compatibility: + if not isinstance(item, str): + errors.append(f"'compatibility' entry {item!r} is not a string.") + + # 14. metadata is a dict — error if malformed + metadata = frontmatter.get("metadata") + if metadata is not None and not isinstance(metadata, dict): + errors.append("'metadata' must be a YAML mapping (dict), not a scalar or list.") + + return ValidationResult( + passed=len(errors) == 0, + errors=errors, + warnings=warnings, + ) diff --git a/core/tests/test_skill_cli_commands.py b/core/tests/test_skill_cli_commands.py new file mode 100644 index 0000000000..09da830cef --- /dev/null +++ b/core/tests/test_skill_cli_commands.py @@ -0,0 +1,579 @@ +"""Integration tests for hive skill CLI command handlers. + +Uses argparse.Namespace objects directly (not argv parsing) for concise tests. +""" + +from __future__ import annotations + +import json +from argparse import Namespace +from pathlib import Path +from unittest.mock import patch + +from framework.skills.cli import ( + cmd_skill_doctor, + cmd_skill_info, + cmd_skill_init, + cmd_skill_install, + cmd_skill_list, + cmd_skill_remove, + cmd_skill_search, + cmd_skill_test, + cmd_skill_validate, +) + + +def _make_valid_skill(parent: Path, name: str) -> Path: + """Create a minimal valid skill in parent/name/SKILL.md.""" + d = parent / name + d.mkdir(parents=True, exist_ok=True) + (d / "SKILL.md").write_text( + f"---\nname: {name}\ndescription: A test skill.\nlicense: MIT\n---\n\n## Body\n", + encoding="utf-8", + ) + return d + + +class TestCmdSkillInit: + def test_creates_skill_md(self, tmp_path): + args = Namespace(skill_name="test-skill", target_dir=str(tmp_path)) + result = cmd_skill_init(args) + assert result == 0 + assert (tmp_path / "test-skill" / "SKILL.md").exists() + + def test_skill_md_contains_name(self, tmp_path): + args = Namespace(skill_name="my-skill", target_dir=str(tmp_path)) + cmd_skill_init(args) + content = (tmp_path / "my-skill" / "SKILL.md").read_text() + assert "name: my-skill" in content + + def test_error_when_dir_exists(self, tmp_path, capsys): + (tmp_path / "existing").mkdir() + args = Namespace(skill_name="existing", target_dir=str(tmp_path)) + result = cmd_skill_init(args) + assert result == 1 + assert "already exists" in capsys.readouterr().err + + def test_error_when_no_name(self, tmp_path, monkeypatch, capsys): + # Non-interactive (stdin not a tty in test env) → error + monkeypatch.setattr("sys.stdin.isatty", lambda: False) + args = Namespace(skill_name=None, target_dir=str(tmp_path)) + result = cmd_skill_init(args) + assert result == 1 + + +class TestCmdSkillValidate: + def test_exits_0_on_valid_skill(self, tmp_path): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + args = Namespace(path=str(skill_dir / "SKILL.md")) + result = cmd_skill_validate(args) + assert result == 0 + + def test_accepts_directory_path(self, tmp_path): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + args = Namespace(path=str(skill_dir)) + result = cmd_skill_validate(args) + assert result == 0 + + def test_exits_1_on_invalid_skill(self, tmp_path, capsys): + skill_dir = tmp_path / "bad-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text("no frontmatter here", encoding="utf-8") + args = Namespace(path=str(skill_dir / "SKILL.md")) + result = cmd_skill_validate(args) + assert result == 1 + assert "[ERROR]" in capsys.readouterr().out + + def test_shows_warnings_on_valid_skill_without_license(self, tmp_path, capsys): + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: No license.\n---\n\n## Body\n", + encoding="utf-8", + ) + args = Namespace(path=str(skill_dir / "SKILL.md")) + result = cmd_skill_validate(args) + assert result == 0 + assert "[WARN]" in capsys.readouterr().out + + +class TestCmdSkillDoctor: + def test_defaults_pass_against_real_framework_skills(self): + """All 6 framework default skills should be healthy (no mocking).""" + args = Namespace(defaults=True, name=None, project_dir=None) + result = cmd_skill_doctor(args) + assert result == 0 + + def test_named_skill_not_found_exits_1(self, tmp_path, capsys): + args = Namespace(name="nonexistent-skill", defaults=False, project_dir=str(tmp_path)) + result = cmd_skill_doctor(args) + assert result == 1 + assert "not found" in capsys.readouterr().err + + def test_healthy_skill_exits_0(self, tmp_path): + _make_valid_skill(tmp_path, "my-skill") + args = Namespace(name=None, defaults=False, project_dir=str(tmp_path)) + with patch("framework.skills.discovery.SkillDiscovery.discover") as mock_discover: + from framework.skills.parser import ParsedSkill + + mock_discover.return_value = [ + ParsedSkill( + name="my-skill", + description="Test.", + location=str(tmp_path / "my-skill" / "SKILL.md"), + base_dir=str(tmp_path / "my-skill"), + source_scope="user", + body="## Body", + ) + ] + result = cmd_skill_doctor(args) + assert result == 0 + + +class TestCmdSkillInstall: + def test_shows_security_notice_on_first_use(self, tmp_path, monkeypatch, capsys): + sentinel = tmp_path / ".install_notice_shown" + monkeypatch.setattr("framework.skills.installer.INSTALL_NOTICE_SENTINEL", sentinel) + + installed_path = tmp_path / "skills" / "my-skill" + installed_path.mkdir(parents=True) + + args = Namespace( + name_or_url=None, + from_url="https://example.com/skill.git", + pack=None, + install_name="my-skill", + version=None, + ) + + with patch("framework.skills.installer.install_from_git", return_value=installed_path): + with patch("shutil.which", return_value="/usr/bin/git"): + result = cmd_skill_install(args) + + captured = capsys.readouterr() + assert "Security Notice" in captured.out + assert result == 0 + + def test_install_from_url_calls_install_from_git(self, tmp_path, monkeypatch): + sentinel = tmp_path / ".install_notice_shown" + sentinel.parent.mkdir(parents=True, exist_ok=True) + sentinel.touch() + monkeypatch.setattr("framework.skills.installer.INSTALL_NOTICE_SENTINEL", sentinel) + + installed_path = tmp_path / "skills" / "my-skill" + installed_path.mkdir(parents=True) + + args = Namespace( + name_or_url=None, + from_url="https://github.com/org/my-skill.git", + pack=None, + install_name=None, + version=None, + ) + + with patch( + "framework.skills.installer.install_from_git", return_value=installed_path + ) as mock_install: + result = cmd_skill_install(args) + + mock_install.assert_called_once() + assert result == 0 + + def test_registry_not_found_exits_1(self, tmp_path, monkeypatch, capsys): + sentinel = tmp_path / ".install_notice_shown" + sentinel.parent.mkdir(parents=True, exist_ok=True) + sentinel.touch() + monkeypatch.setattr("framework.skills.installer.INSTALL_NOTICE_SENTINEL", sentinel) + + args = Namespace( + name_or_url="nonexistent-skill", + from_url=None, + pack=None, + install_name=None, + version=None, + ) + + with patch("framework.skills.registry.RegistryClient.get_skill_entry", return_value=None): + result = cmd_skill_install(args) + + assert result == 1 + assert "not found in registry" in capsys.readouterr().err + + def test_no_args_exits_1(self, tmp_path, monkeypatch, capsys): + sentinel = tmp_path / ".install_notice_shown" + sentinel.parent.mkdir(parents=True, exist_ok=True) + sentinel.touch() + monkeypatch.setattr("framework.skills.installer.INSTALL_NOTICE_SENTINEL", sentinel) + + args = Namespace( + name_or_url=None, from_url=None, pack=None, install_name=None, version=None + ) + result = cmd_skill_install(args) + assert result == 1 + + +class TestCmdSkillRemove: + def test_removes_installed_skill(self, tmp_path, capsys): + skills_dir = tmp_path / "skills" + skill_dir = skills_dir / "my-skill" + skill_dir.mkdir(parents=True) + + with patch("framework.skills.installer.USER_SKILLS_DIR", skills_dir): + with patch("framework.skills.installer.remove_skill", return_value=True): + args = Namespace(name="my-skill") + result = cmd_skill_remove(args) + + assert result == 0 + assert "Removed" in capsys.readouterr().out + + def test_exits_1_when_not_found(self, tmp_path, capsys): + with patch("framework.skills.installer.remove_skill", return_value=False): + args = Namespace(name="missing-skill") + result = cmd_skill_remove(args) + + assert result == 1 + assert "not found" in capsys.readouterr().err + + +class TestCmdSkillSearch: + def test_exits_1_when_registry_unavailable(self, capsys): + with patch("framework.skills.registry.RegistryClient.fetch_index", return_value=None): + args = Namespace(query="research") + result = cmd_skill_search(args) + + assert result == 1 + assert "registry unavailable" in capsys.readouterr().err.lower() + + def test_prints_results_when_found(self, capsys): + mock_index = { + "skills": [ + { + "name": "deep-research", + "description": "Multi-step research.", + "tags": ["research"], + "trust_tier": "official", + } + ] + } + with patch("framework.skills.registry.RegistryClient.fetch_index", return_value=mock_index): + args = Namespace(query="research") + result = cmd_skill_search(args) + + out = capsys.readouterr().out + assert result == 0 + assert "deep-research" in out + + def test_no_results_message(self, capsys): + mock_index = {"skills": []} + with patch("framework.skills.registry.RegistryClient.fetch_index", return_value=mock_index): + args = Namespace(query="xyzzy-nothing") + result = cmd_skill_search(args) + + assert result == 0 + assert "No skills found" in capsys.readouterr().out + + +class TestCmdSkillInfo: + def test_shows_locally_installed_skill(self, tmp_path, capsys): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + from framework.skills.parser import ParsedSkill + + mock_skill = ParsedSkill( + name="my-skill", + description="A test skill.", + location=str(skill_dir / "SKILL.md"), + base_dir=str(skill_dir), + source_scope="user", + body="## Body", + license="MIT", + ) + + with patch("framework.skills.discovery.SkillDiscovery.discover", return_value=[mock_skill]): + args = Namespace(name="my-skill", project_dir=str(tmp_path)) + result = cmd_skill_info(args) + + out = capsys.readouterr().out + assert result == 0 + assert "my-skill" in out + assert "A test skill." in out + + def test_falls_back_to_registry_when_not_installed(self, capsys): + registry_entry = { + "name": "deep-research", + "description": "Multi-step research.", + "version": "1.0.0", + "author": "anthropics", + "trust_tier": "official", + } + + with patch("framework.skills.discovery.SkillDiscovery.discover", return_value=[]): + with patch( + "framework.skills.registry.RegistryClient.get_skill_entry", + return_value=registry_entry, + ): + args = Namespace(name="deep-research", project_dir=None) + result = cmd_skill_info(args) + + out = capsys.readouterr().out + assert result == 0 + assert "not installed" in out + assert "deep-research" in out + + def test_exits_1_when_not_found_anywhere(self, tmp_path, capsys): + with patch("framework.skills.discovery.SkillDiscovery.discover", return_value=[]): + with patch( + "framework.skills.registry.RegistryClient.get_skill_entry", return_value=None + ): + args = Namespace(name="ghost-skill", project_dir=str(tmp_path)) + result = cmd_skill_info(args) + + assert result == 1 + + +class TestJsonFlag: + def test_list_json_produces_valid_json(self, tmp_path, capsys): + args = Namespace(project_dir=str(tmp_path), json=True) + with patch("framework.skills.discovery.SkillDiscovery.discover", return_value=[]): + result = cmd_skill_list(args) + out = capsys.readouterr().out + data = json.loads(out) + assert result == 0 + assert "skills" in data + assert isinstance(data["skills"], list) + + def test_validate_json_valid_skill(self, tmp_path, capsys): + from framework.skills.cli import cmd_skill_validate + + skill_dir = _make_valid_skill(tmp_path, "my-skill") + args = Namespace(path=str(skill_dir / "SKILL.md"), json=True) + result = cmd_skill_validate(args) + out = capsys.readouterr().out + data = json.loads(out) + assert result == 0 + assert data["passed"] is True + assert data["errors"] == [] + assert "warnings" in data + + def test_doctor_defaults_json(self, capsys): + args = Namespace(defaults=True, name=None, project_dir=None, json=True) + result = cmd_skill_doctor(args) + out = capsys.readouterr().out + data = json.loads(out) + assert result == 0 + assert "skills" in data + assert len(data["skills"]) == 6 # 6 framework default skills + assert data["total_errors"] == 0 + + def test_search_json_registry_unavailable_exits_1(self, capsys): + with patch("framework.skills.registry.RegistryClient.fetch_index", return_value=None): + args = Namespace(query="research", json=True) + result = cmd_skill_search(args) + out = capsys.readouterr().out + data = json.loads(out) + assert result == 1 + assert "error" in data + + def test_remove_json_not_found_exits_1(self, capsys): + with patch("framework.skills.installer.remove_skill", return_value=False): + args = Namespace(name="ghost-skill", json=True) + result = cmd_skill_remove(args) + out = capsys.readouterr().out + data = json.loads(out) + assert result == 1 + assert "error" in data + + +class TestCmdSkillTest: + """Tests for hive skill test (CLI-9).""" + + def test_structural_only_valid_exits_0(self, tmp_path): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + args = Namespace(path=str(skill_dir), input_json=None, model=None, json=False) + result = cmd_skill_test(args) + assert result == 0 + + def test_structural_invalid_exits_1(self, tmp_path, capsys): + skill_dir = tmp_path / "bad-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text("no frontmatter", encoding="utf-8") + args = Namespace(path=str(skill_dir), input_json=None, model=None, json=False) + result = cmd_skill_test(args) + assert result == 1 + assert "[ERROR]" in capsys.readouterr().out + + def test_invocation_mode_calls_provider_with_skill_body(self, tmp_path): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + from unittest.mock import MagicMock + + from framework.llm.provider import LLMResponse + + mock_response = LLMResponse(content="Hello!", model="claude-haiku-4-5-20251001") + mock_provider = MagicMock() + mock_provider.complete.return_value = mock_response + + args = Namespace( + path=str(skill_dir), input_json='{"prompt": "say hello"}', model=None, json=False + ) + with patch("framework.llm.anthropic.AnthropicProvider", return_value=mock_provider): + result = cmd_skill_test(args) + + assert result == 0 + call_kwargs = mock_provider.complete.call_args + assert call_kwargs is not None + # system should be the skill body + assert "system" in call_kwargs.kwargs or len(call_kwargs.args) >= 2 + + def test_invocation_extracts_prompt_from_json(self, tmp_path): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + from unittest.mock import MagicMock + + from framework.llm.provider import LLMResponse + + mock_provider = MagicMock() + mock_provider.complete.return_value = LLMResponse( + content="response", model="claude-haiku-4-5-20251001" + ) + + args = Namespace( + path=str(skill_dir), input_json='{"prompt": "extracted prompt"}', model=None, json=False + ) + with patch("framework.llm.anthropic.AnthropicProvider", return_value=mock_provider): + cmd_skill_test(args) + + call = mock_provider.complete.call_args + messages = call.kwargs.get("messages") or (call.args[0] if call.args else []) + assert any("extracted prompt" in m.get("content", "") for m in messages) + + def test_eval_suite_all_pass_exits_0(self, tmp_path): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + evals_dir = skill_dir / "evals" + evals_dir.mkdir() + (evals_dir / "evals.json").write_text( + json.dumps( + { + "skill_name": "my-skill", + "evals": [ + {"id": 1, "prompt": "Say hi.", "assertions": ["Response is a greeting"]} + ], + } + ), + encoding="utf-8", + ) + + from unittest.mock import MagicMock + + from framework.llm.provider import LLMResponse + + mock_provider = MagicMock() + mock_provider.complete.return_value = LLMResponse( + content="Hello!", model="claude-haiku-4-5-20251001" + ) + mock_judge = MagicMock() + mock_judge.evaluate.return_value = {"passes": True, "explanation": "Looks good."} + + args = Namespace(path=str(skill_dir), input_json=None, model=None, json=False) + with patch("framework.llm.anthropic.AnthropicProvider", return_value=mock_provider): + with patch("framework.testing.llm_judge.LLMJudge", return_value=mock_judge): + result = cmd_skill_test(args) + + assert result == 0 + + def test_eval_any_fail_exits_1(self, tmp_path): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + evals_dir = skill_dir / "evals" + evals_dir.mkdir() + (evals_dir / "evals.json").write_text( + json.dumps( + { + "skill_name": "my-skill", + "evals": [ + {"id": 1, "prompt": "Say hi.", "assertions": ["Impossible assertion"]} + ], + } + ), + encoding="utf-8", + ) + + from unittest.mock import MagicMock + + from framework.llm.provider import LLMResponse + + mock_provider = MagicMock() + mock_provider.complete.return_value = LLMResponse( + content="Hello!", model="claude-haiku-4-5-20251001" + ) + mock_judge = MagicMock() + mock_judge.evaluate.return_value = {"passes": False, "explanation": "Did not satisfy."} + + args = Namespace(path=str(skill_dir), input_json=None, model=None, json=False) + with patch("framework.llm.anthropic.AnthropicProvider", return_value=mock_provider): + with patch("framework.testing.llm_judge.LLMJudge", return_value=mock_judge): + result = cmd_skill_test(args) + + assert result == 1 + + def test_json_flag_structural_output(self, tmp_path, capsys): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + args = Namespace(path=str(skill_dir), input_json=None, model=None, json=True) + result = cmd_skill_test(args) + out = capsys.readouterr().out + data = json.loads(out) + assert result == 0 + assert "structural" in data + assert data["structural"]["passed"] is True + assert data["skill"] == "my-skill" + + def test_json_flag_eval_results(self, tmp_path, capsys): + skill_dir = _make_valid_skill(tmp_path, "my-skill") + evals_dir = skill_dir / "evals" + evals_dir.mkdir() + (evals_dir / "evals.json").write_text( + json.dumps( + { + "skill_name": "my-skill", + "evals": [{"id": 1, "prompt": "Hi.", "assertions": ["Is a greeting"]}], + } + ), + encoding="utf-8", + ) + + from unittest.mock import MagicMock + + from framework.llm.provider import LLMResponse + + mock_provider = MagicMock() + mock_provider.complete.return_value = LLMResponse( + content="Hello!", model="claude-haiku-4-5-20251001" + ) + mock_judge = MagicMock() + mock_judge.evaluate.return_value = {"passes": True, "explanation": "Yes."} + + args = Namespace(path=str(skill_dir), input_json=None, model=None, json=True) + with patch("framework.llm.anthropic.AnthropicProvider", return_value=mock_provider): + with patch("framework.testing.llm_judge.LLMJudge", return_value=mock_judge): + result = cmd_skill_test(args) + + out = capsys.readouterr().out + data = json.loads(out) + assert result == 0 + assert "evals" in data + assert data["total_passed"] == 1 + assert data["total_failed"] == 0 + + def test_no_api_key_with_evals_degrades_gracefully(self, tmp_path, capsys): + """No API key + evals present → structural checks pass, skip LLM, exit 0.""" + skill_dir = _make_valid_skill(tmp_path, "my-skill") + (skill_dir / "evals").mkdir() + (skill_dir / "evals" / "evals.json").write_text( + json.dumps({"skill_name": "my-skill", "evals": []}), encoding="utf-8" + ) + + args = Namespace(path=str(skill_dir), input_json=None, model=None, json=False) + with patch( + "framework.llm.anthropic.AnthropicProvider", + side_effect=ValueError("ANTHROPIC_API_KEY not set"), + ): + result = cmd_skill_test(args) + + assert result == 0 + assert "ANTHROPIC_API_KEY" in capsys.readouterr().err diff --git a/core/tests/test_skill_installer.py b/core/tests/test_skill_installer.py new file mode 100644 index 0000000000..566c69da36 --- /dev/null +++ b/core/tests/test_skill_installer.py @@ -0,0 +1,248 @@ +"""Tests for skill install, remove, and fork operations.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from framework.skills.installer import ( + fork_skill, + install_from_git, + maybe_show_install_notice, + remove_skill, +) +from framework.skills.parser import ParsedSkill +from framework.skills.skill_errors import SkillError + + +def _make_skill_dir(parent: Path, name: str, body: str = "## Instructions\n\nDo things.") -> Path: + """Create a minimal skill directory with a valid SKILL.md.""" + skill_dir = parent / name + skill_dir.mkdir(parents=True, exist_ok=True) + (skill_dir / "SKILL.md").write_text( + f"---\nname: {name}\ndescription: A test skill.\n---\n\n{body}\n", + encoding="utf-8", + ) + return skill_dir + + +def _make_parsed_skill(base_dir: Path, name: str) -> ParsedSkill: + """Create a ParsedSkill pointing to base_dir.""" + return ParsedSkill( + name=name, + description="Test skill.", + location=str(base_dir / "SKILL.md"), + base_dir=str(base_dir), + source_scope="user", + body="## Instructions", + ) + + +class TestInstallFromGit: + def test_copies_skill_dir_to_target(self, tmp_path): + """Successful clone copies skill directory to target.""" + source_repo = tmp_path / "repo" + _make_skill_dir(source_repo, ".") # SKILL.md at repo root + + target = tmp_path / "skills" + + def fake_clone(git_url, target_path, version=None): + # Simulate git clone by copying source_repo into target_path + import shutil + + if target_path.exists(): + shutil.rmtree(target_path) + shutil.copytree(source_repo, target_path) + + with patch("framework.skills.installer._git_clone_shallow", side_effect=fake_clone): + with patch("shutil.which", return_value="/usr/bin/git"): + dest = install_from_git( + git_url="https://example.com/skill.git", + skill_name="my-skill", + target_dir=target, + ) + + assert (dest / "SKILL.md").exists() + assert dest == target / "my-skill" + + def test_raises_when_git_not_found(self, tmp_path): + with patch("shutil.which", return_value=None): + with pytest.raises(SkillError) as exc_info: + install_from_git( + git_url="https://example.com/skill.git", + skill_name="my-skill", + target_dir=tmp_path / "skills", + ) + assert "git is not installed" in exc_info.value.why + + def test_raises_when_skill_md_missing(self, tmp_path): + """Clone succeeds but no SKILL.md in the subdirectory → error.""" + empty_repo = tmp_path / "empty_repo" + empty_repo.mkdir() + + def fake_clone(git_url, target_path, version=None): + import shutil + + if target_path.exists(): + shutil.rmtree(target_path) + shutil.copytree(empty_repo, target_path) + + with patch("framework.skills.installer._git_clone_shallow", side_effect=fake_clone): + with patch("shutil.which", return_value="/usr/bin/git"): + with pytest.raises(SkillError) as exc_info: + install_from_git( + git_url="https://example.com/skill.git", + skill_name="my-skill", + subdirectory="deep-research", + target_dir=tmp_path / "skills", + ) + assert exc_info.value.code.value == "SKILL_NOT_FOUND" + + def test_raises_when_target_already_exists(self, tmp_path): + skills_dir = tmp_path / "skills" + (skills_dir / "existing-skill").mkdir(parents=True) + + with patch("shutil.which", return_value="/usr/bin/git"): + with pytest.raises(SkillError) as exc_info: + install_from_git( + git_url="https://example.com/skill.git", + skill_name="existing-skill", + target_dir=skills_dir, + ) + assert "already exists" in exc_info.value.why + + def test_cleans_temp_dir_on_clone_failure(self, tmp_path): + """Temporary directory is cleaned up even when clone fails.""" + created_tmp_dirs = [] + original_mkdtemp = __import__("tempfile").mkdtemp + + def tracking_mkdtemp(**kwargs): + d = original_mkdtemp(**kwargs) + created_tmp_dirs.append(d) + return d + + def failing_clone(git_url, target_path, version=None): + from framework.skills.skill_errors import SkillErrorCode as SEC + + raise SkillError( + code=SEC.SKILL_ACTIVATION_FAILED, + what="clone failed", + why="network error", + fix="check network", + ) + + with patch("tempfile.mkdtemp", side_effect=tracking_mkdtemp): + with patch("framework.skills.installer._git_clone_shallow", side_effect=failing_clone): + with patch("shutil.which", return_value="/usr/bin/git"): + with pytest.raises(SkillError): + install_from_git( + git_url="https://example.com/skill.git", + skill_name="my-skill", + target_dir=tmp_path / "skills", + ) + + # All created temp dirs should be cleaned up + for d in created_tmp_dirs: + assert not Path(d).exists(), f"Temp dir not cleaned: {d}" + + +class TestRemoveSkill: + def test_removes_existing_skill(self, tmp_path): + skills_dir = tmp_path / "skills" + skill_dir = _make_skill_dir(skills_dir, "my-skill") + assert skill_dir.exists() + + result = remove_skill("my-skill", skills_dir=skills_dir) + assert result is True + assert not skill_dir.exists() + + def test_returns_false_when_not_found(self, tmp_path): + skills_dir = tmp_path / "skills" + skills_dir.mkdir() + + result = remove_skill("nonexistent", skills_dir=skills_dir) + assert result is False + + def test_raises_on_permission_error(self, tmp_path): + skills_dir = tmp_path / "skills" + _make_skill_dir(skills_dir, "locked-skill") + + with patch("shutil.rmtree", side_effect=OSError("permission denied")): + with pytest.raises(SkillError) as exc_info: + remove_skill("locked-skill", skills_dir=skills_dir) + assert "permission" in exc_info.value.why.lower() + + +class TestForkSkill: + def test_copies_skill_to_new_name(self, tmp_path): + source_dir = _make_skill_dir(tmp_path / "sources", "my-skill") + source = _make_parsed_skill(source_dir, "my-skill") + target_parent = tmp_path / "skills" + + dest = fork_skill(source, "my-skill-fork", target_parent) + + assert dest.exists() + assert (dest / "SKILL.md").exists() + + def test_rewrites_name_in_skill_md(self, tmp_path): + source_dir = _make_skill_dir(tmp_path / "sources", "original") + source = _make_parsed_skill(source_dir, "original") + target_parent = tmp_path / "skills" + + dest = fork_skill(source, "forked", target_parent) + + import yaml + + content = (dest / "SKILL.md").read_text(encoding="utf-8") + parts = content.split("---", 2) + fm = yaml.safe_load(parts[1]) + assert fm["name"] == "forked" + + def test_raises_when_dest_already_exists(self, tmp_path): + source_dir = _make_skill_dir(tmp_path / "sources", "my-skill") + source = _make_parsed_skill(source_dir, "my-skill") + target_parent = tmp_path / "skills" + (target_parent / "my-skill-fork").mkdir(parents=True) + + with pytest.raises(SkillError) as exc_info: + fork_skill(source, "my-skill-fork", target_parent) + assert "already exists" in exc_info.value.why + + def test_preserves_scripts_and_references(self, tmp_path): + source_dir = _make_skill_dir(tmp_path / "sources", "my-skill") + (source_dir / "scripts").mkdir() + (source_dir / "scripts" / "run.sh").write_text("#!/bin/sh\necho hi") + (source_dir / "references").mkdir() + (source_dir / "references" / "guide.md").write_text("# Guide") + source = _make_parsed_skill(source_dir, "my-skill") + target_parent = tmp_path / "skills" + + dest = fork_skill(source, "fork", target_parent) + + assert (dest / "scripts" / "run.sh").exists() + assert (dest / "references" / "guide.md").exists() + + +class TestInstallNotice: + def test_shown_on_first_call(self, tmp_path, monkeypatch, capsys): + sentinel = tmp_path / ".install_notice_shown" + monkeypatch.setattr("framework.skills.installer.INSTALL_NOTICE_SENTINEL", sentinel) + + maybe_show_install_notice() + + captured = capsys.readouterr() + assert "Security Notice" in captured.out + assert sentinel.exists() + + def test_not_shown_on_second_call(self, tmp_path, monkeypatch, capsys): + sentinel = tmp_path / ".install_notice_shown" + sentinel.parent.mkdir(parents=True, exist_ok=True) + sentinel.touch() + monkeypatch.setattr("framework.skills.installer.INSTALL_NOTICE_SENTINEL", sentinel) + + maybe_show_install_notice() + + captured = capsys.readouterr() + assert "Security Notice" not in captured.out diff --git a/core/tests/test_skill_registry.py b/core/tests/test_skill_registry.py new file mode 100644 index 0000000000..7b2b2a5ade --- /dev/null +++ b/core/tests/test_skill_registry.py @@ -0,0 +1,244 @@ +"""Tests for the RegistryClient skill registry client.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime, timedelta +from pathlib import Path +from unittest.mock import patch +from urllib.error import URLError + +import pytest + +from framework.skills.registry import _CACHE_TTL_SECONDS, RegistryClient + +_SAMPLE_INDEX = { + "version": 1, + "skills": [ + { + "name": "deep-research", + "description": "Multi-step web research with source verification.", + "version": "1.0.0", + "author": "anthropics", + "license": "MIT", + "tags": ["research", "web"], + "git_url": "https://github.com/anthropics/skills", + "subdirectory": "deep-research", + "trust_tier": "official", + }, + { + "name": "code-review", + "description": "Automated code review for style and correctness.", + "version": "0.9.0", + "author": "contributor", + "tags": ["code", "review"], + "git_url": "https://github.com/contributor/code-review", + "subdirectory": None, + "trust_tier": "community", + }, + ], + "packs": [ + { + "name": "research-starter", + "description": "Research-focused skill bundle", + "skills": ["deep-research"], + } + ], +} + + +@pytest.fixture +def cache_dir(tmp_path): + return tmp_path / "registry_cache" + + +@pytest.fixture +def client(cache_dir): + return RegistryClient(registry_url="https://example.com/skill_index.json", cache_dir=cache_dir) + + +class TestFetchIndex: + def test_returns_none_on_network_error(self, client): + with patch.object(client, "_http_fetch", return_value=None): + result = client.fetch_index() + assert result is None + + def test_returns_none_on_url_error(self, client): + with patch("framework.skills.registry.urlopen", side_effect=URLError("connection refused")): + result = client.fetch_index() + assert result is None + + def test_fetches_and_caches_index(self, client): + raw = json.dumps(_SAMPLE_INDEX).encode() + with patch.object(client, "_http_fetch", return_value=raw): + result = client.fetch_index() + assert result is not None + assert len(result["skills"]) == 2 + # Cache should be written + assert client._index_path.exists() + + def test_uses_fresh_cache_without_network(self, client, cache_dir): + # Write fresh cache + cache_dir.mkdir(parents=True, exist_ok=True) + (cache_dir / "skill_index.json").write_text(json.dumps(_SAMPLE_INDEX)) + meta = {"last_fetched": datetime.now(tz=UTC).isoformat()} + (cache_dir / "metadata.json").write_text(json.dumps(meta)) + + fetch_called = [] + + def _no_fetch(*a, **kw): + fetch_called.append(1) + + with patch.object(client, "_http_fetch", side_effect=_no_fetch): + result = client.fetch_index() + + assert not fetch_called, "Should not hit network when cache is fresh" + assert result is not None + + def test_refreshes_when_cache_is_stale(self, client, cache_dir): + # Write stale cache (older than TTL) + cache_dir.mkdir(parents=True, exist_ok=True) + (cache_dir / "skill_index.json").write_text(json.dumps(_SAMPLE_INDEX)) + old_time = (datetime.now(tz=UTC) - timedelta(seconds=_CACHE_TTL_SECONDS + 60)).isoformat() + meta = {"last_fetched": old_time} + (cache_dir / "metadata.json").write_text(json.dumps(meta)) + + raw = json.dumps(_SAMPLE_INDEX).encode() + with patch.object(client, "_http_fetch", return_value=raw) as mock_fetch: + client.fetch_index() + mock_fetch.assert_called_once() + + def test_force_refresh_bypasses_fresh_cache(self, client, cache_dir): + cache_dir.mkdir(parents=True, exist_ok=True) + (cache_dir / "skill_index.json").write_text(json.dumps(_SAMPLE_INDEX)) + meta = {"last_fetched": datetime.now(tz=UTC).isoformat()} + (cache_dir / "metadata.json").write_text(json.dumps(meta)) + + raw = json.dumps(_SAMPLE_INDEX).encode() + with patch.object(client, "_http_fetch", return_value=raw) as mock_fetch: + client.fetch_index(force_refresh=True) + mock_fetch.assert_called_once() + + def test_falls_back_to_stale_cache_on_network_error(self, client, cache_dir): + cache_dir.mkdir(parents=True, exist_ok=True) + (cache_dir / "skill_index.json").write_text(json.dumps(_SAMPLE_INDEX)) + # No metadata → stale + + with patch.object(client, "_http_fetch", return_value=None): + result = client.fetch_index() + + assert result is not None + assert result["version"] == 1 + + +class TestSearch: + def test_filters_by_name(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + results = client.search("deep") + assert len(results) == 1 + assert results[0]["name"] == "deep-research" + + def test_filters_by_description(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + results = client.search("source verification") + assert any(r["name"] == "deep-research" for r in results) + + def test_filters_by_tag(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + results = client.search("review") + assert any(r["name"] == "code-review" for r in results) + + def test_case_insensitive(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + results = client.search("DEEP") + assert len(results) == 1 + + def test_returns_empty_when_unavailable(self, client): + with patch.object(client, "fetch_index", return_value=None): + results = client.search("anything") + assert results == [] + + def test_returns_empty_on_no_match(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + results = client.search("xyzzy-no-match") + assert results == [] + + +class TestGetSkillEntry: + def test_finds_by_exact_name(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + entry = client.get_skill_entry("deep-research") + assert entry is not None + assert entry["name"] == "deep-research" + + def test_returns_none_when_not_found(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + entry = client.get_skill_entry("nonexistent") + assert entry is None + + def test_returns_none_when_index_unavailable(self, client): + with patch.object(client, "fetch_index", return_value=None): + entry = client.get_skill_entry("deep-research") + assert entry is None + + +class TestGetPack: + def test_returns_skill_names(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + skills = client.get_pack("research-starter") + assert skills == ["deep-research"] + + def test_returns_none_when_pack_not_found(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + result = client.get_pack("nonexistent-pack") + assert result is None + + def test_returns_none_when_index_unavailable(self, client): + with patch.object(client, "fetch_index", return_value=None): + result = client.get_pack("research-starter") + assert result is None + + +class TestResolveGitUrl: + def test_returns_git_url_and_subdirectory(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + result = client.resolve_git_url("deep-research") + assert result == ("https://github.com/anthropics/skills", "deep-research") + + def test_returns_none_subdirectory_when_absent(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + result = client.resolve_git_url("code-review") + git_url, subdir = result + assert subdir is None + + def test_returns_none_when_not_in_registry(self, client): + with patch.object(client, "fetch_index", return_value=_SAMPLE_INDEX): + result = client.resolve_git_url("not-there") + assert result is None + + +class TestCacheAtomicWrite: + def test_atomic_write_uses_tmp_then_replace(self, client, cache_dir, monkeypatch): + written_paths = [] + original_write = Path.write_text + + def tracking_write(self, data, encoding=None): + written_paths.append(str(self)) + return original_write(self, data, encoding=encoding or "utf-8") + + monkeypatch.setattr(Path, "write_text", tracking_write) + client._save_cache(_SAMPLE_INDEX) + + # .tmp file should have been written (then replaced — may not exist now) + assert any(".tmp" in p for p in written_paths) + # Final index file should exist + assert client._index_path.exists() + + def test_save_and_load_round_trip(self, client): + client._save_cache(_SAMPLE_INDEX) + loaded = client._load_cache() + assert loaded == _SAMPLE_INDEX + + def test_load_returns_none_when_absent(self, client): + result = client._load_cache() + assert result is None diff --git a/core/tests/test_skill_validator.py b/core/tests/test_skill_validator.py new file mode 100644 index 0000000000..2429aad596 --- /dev/null +++ b/core/tests/test_skill_validator.py @@ -0,0 +1,401 @@ +"""Tests for strict SKILL.md validation (hive skill validate). + +One test per strict check — happy path plus each individual failure mode. +""" + +from __future__ import annotations + +from pathlib import Path + +from framework.skills.validator import validate_strict + + +def _write_skill(tmp_path: Path, content: str, dir_name: str = "my-skill") -> Path: + """Write a SKILL.md in a named subdirectory and return the path.""" + skill_dir = tmp_path / dir_name + skill_dir.mkdir(parents=True, exist_ok=True) + skill_md = skill_dir / "SKILL.md" + skill_md.write_text(content, encoding="utf-8") + return skill_md + + +_VALID_CONTENT = """\ +--- +name: my-skill +description: A test skill for validation. +version: 0.1.0 +license: MIT +compatibility: + - claude-code + - hive +metadata: + tags: [] +--- + +## Instructions + +Do the thing properly. +""" + + +class TestHappyPath: + def test_valid_skill_passes(self, tmp_path): + path = _write_skill(tmp_path, _VALID_CONTENT) + result = validate_strict(path) + assert result.passed is True + assert result.errors == [] + + def test_namespace_prefix_name_allowed(self, tmp_path): + """hive.my-skill with directory my-skill is valid.""" + content = """\ +--- +name: hive.my-skill +description: A namespaced skill. +license: MIT +--- + +## Body +""" + path = _write_skill(tmp_path, content, dir_name="my-skill") + result = validate_strict(path) + assert result.passed is True + + def test_warning_on_missing_license(self, tmp_path): + content = """\ +--- +name: my-skill +description: No license field. +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is True + assert any("license" in w.lower() for w in result.warnings) + + +class TestCheck1FileExists: + def test_error_on_missing_file(self, tmp_path): + path = tmp_path / "nonexistent" / "SKILL.md" + result = validate_strict(path) + assert result.passed is False + assert any("not found" in e.lower() for e in result.errors) + + +class TestCheck2FileNotEmpty: + def test_error_on_empty_file(self, tmp_path): + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + path = skill_dir / "SKILL.md" + path.write_text(" \n", encoding="utf-8") + result = validate_strict(path) + assert result.passed is False + assert any("empty" in e.lower() for e in result.errors) + + +class TestCheck3FrontmatterPresent: + def test_error_on_missing_delimiters(self, tmp_path): + path = _write_skill(tmp_path, "name: my-skill\ndescription: no delimiters\n") + result = validate_strict(path) + assert result.passed is False + assert any("frontmatter" in e.lower() or "---" in e for e in result.errors) + + +class TestCheck4YamlNoFixup: + def test_error_on_yaml_requiring_fixup(self, tmp_path): + """Unquoted colon in value — lenient parser accepts, strict rejects.""" + content = """\ +--- +name: my-skill +description: Use for: research tasks +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is False + assert any("YAML" in e or "parse" in e.lower() for e in result.errors) + + def test_quoted_colon_passes(self, tmp_path): + content = """\ +--- +name: my-skill +description: "Use for: research tasks" +license: MIT +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is True + + +class TestCheck5Description: + def test_error_on_missing_description(self, tmp_path): + content = """\ +--- +name: my-skill +license: MIT +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is False + assert any("description" in e.lower() for e in result.errors) + + def test_error_on_empty_description(self, tmp_path): + content = """\ +--- +name: my-skill +description: "" +license: MIT +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is False + + +class TestCheck6NamePresent: + def test_error_on_missing_name(self, tmp_path): + content = """\ +--- +description: A skill without a name. +license: MIT +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is False + assert any("name" in e.lower() for e in result.errors) + + +class TestCheck7NameLength: + def test_error_on_name_too_long(self, tmp_path): + long_name = "a" * 65 + skill_dir = tmp_path / long_name + skill_dir.mkdir(parents=True) + content = f"---\nname: {long_name}\ndescription: Too long.\nlicense: MIT\n---\n\n## Body\n" + path = skill_dir / "SKILL.md" + path.write_text(content, encoding="utf-8") + + result = validate_strict(path) + assert result.passed is False + assert any("64" in e or "characters" in e.lower() for e in result.errors) + + def test_exactly_64_chars_passes(self, tmp_path): + name = "a" * 64 + skill_dir = tmp_path / name + skill_dir.mkdir(parents=True) + content = f"---\nname: {name}\ndescription: Exactly 64.\nlicense: MIT\n---\n\n## Body\n" + path = skill_dir / "SKILL.md" + path.write_text(content, encoding="utf-8") + + result = validate_strict(path) + # May have other warnings but should not error on length + assert not any("64" in e or "characters" in e.lower() for e in result.errors) + + +class TestCheck8NameDirectoryMatch: + def test_error_on_name_dir_mismatch(self, tmp_path): + content = """\ +--- +name: other-skill +description: Wrong name. +license: MIT +--- + +## Body +""" + # Directory is my-skill but name is other-skill + path = _write_skill(tmp_path, content, dir_name="my-skill") + result = validate_strict(path) + assert result.passed is False + assert any("other-skill" in e or "my-skill" in e for e in result.errors) + + def test_exact_match_passes(self, tmp_path): + content = """\ +--- +name: my-skill +description: Exact match. +license: MIT +--- + +## Body +""" + path = _write_skill(tmp_path, content, dir_name="my-skill") + result = validate_strict(path) + assert result.passed is True + + def test_dot_namespace_prefix_passes(self, tmp_path): + """hive.my-skill with dir my-skill is valid (namespace prefix).""" + content = """\ +--- +name: org.my-skill +description: Namespaced. +license: MIT +--- + +## Body +""" + path = _write_skill(tmp_path, content, dir_name="my-skill") + result = validate_strict(path) + # Should not error on name/dir mismatch for namespace prefix + assert not any("my-skill" in e and "other" in e for e in result.errors) + # Check no dir mismatch error specifically + name_mismatch_errors = [e for e in result.errors if "my-skill" in e and "org.my-skill" in e] + assert len(name_mismatch_errors) == 0 + + +class TestCheck9BodyNotEmpty: + def test_error_on_empty_body(self, tmp_path): + content = """\ +--- +name: my-skill +description: No body. +license: MIT +--- +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is False + assert any("body" in e.lower() or "instructions" in e.lower() for e in result.errors) + + +class TestCheck11Scripts: + def test_error_on_non_executable_script(self, tmp_path): + path = _write_skill(tmp_path, _VALID_CONTENT) + scripts_dir = path.parent / "scripts" + scripts_dir.mkdir() + script = scripts_dir / "run.sh" + script.write_text("#!/bin/sh\necho hi") + # Ensure NOT executable + script.chmod(0o644) + + result = validate_strict(path) + assert result.passed is False + assert any("executable" in e.lower() for e in result.errors) + + def test_passes_with_executable_script(self, tmp_path): + path = _write_skill(tmp_path, _VALID_CONTENT) + scripts_dir = path.parent / "scripts" + scripts_dir.mkdir() + script = scripts_dir / "run.sh" + script.write_text("#!/bin/sh\necho hi") + script.chmod(0o755) + + result = validate_strict(path) + assert result.passed is True + + +class TestCheck12AllowedTools: + def test_warning_on_malformed_allowed_tools(self, tmp_path): + content = """\ +--- +name: my-skill +description: Skill with bad tools. +license: MIT +allowed-tools: "not a list" +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert any("allowed-tools" in w.lower() for w in result.warnings) + + def test_valid_allowed_tools_no_warning(self, tmp_path): + content = """\ +--- +name: my-skill +description: Valid tools list. +license: MIT +allowed-tools: + - web_search + - file_read +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert not any("allowed-tools" in w.lower() for w in result.warnings) + + +class TestCheck13Compatibility: + def test_error_on_non_list_compatibility(self, tmp_path): + content = """\ +--- +name: my-skill +description: Bad compat. +license: MIT +compatibility: "claude-code" +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is False + assert any("compatibility" in e.lower() for e in result.errors) + + def test_valid_compatibility_passes(self, tmp_path): + content = """\ +--- +name: my-skill +description: Good compat. +license: MIT +compatibility: + - claude-code + - hive +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is True + + +class TestCheck14Metadata: + def test_error_on_non_dict_metadata(self, tmp_path): + content = """\ +--- +name: my-skill +description: Bad metadata. +license: MIT +metadata: "not a dict" +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is False + assert any("metadata" in e.lower() for e in result.errors) + + def test_valid_metadata_passes(self, tmp_path): + content = """\ +--- +name: my-skill +description: Good metadata. +license: MIT +metadata: + tags: + - research +--- + +## Body +""" + path = _write_skill(tmp_path, content) + result = validate_strict(path) + assert result.passed is True