From 75f36602b60dc78ab7de8d5342fe6419e0859554 Mon Sep 17 00:00:00 2001 From: "ai-engineering.at" Date: Fri, 1 May 2026 17:23:14 +0200 Subject: [PATCH 1/5] feat(hooks): add false-positive-guard for Opus 4.7 confidence-drift mitigation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit H1 of 4 new hooks addressing 4.7 friction patterns (META-ANALYSIS audit-2). Two-event hook: - UserPromptSubmit: scan prompt for bug-evidence keywords (DE+EN), update state with timestamp + source. No output. - PreToolUse (Edit): if no recent bug-evidence in state (default: <10 min), emit advisory; else silent pass. Never blocks. Mitigates "false-positive bug invention" pattern observed in 4.7 audit (+21% Wrong-Approach trend post-swap). Hook is structural — Code-enforced verification gate before edits, not prompt-level guidance which 4.7's higher confidence overrides. Implementation: - BUG_PATTERNS: 20 regex (DE+EN bug/error keywords, word-bounded) - FAILURE_PATTERNS: 6 regex (test-failure markers in tool output) - DoS-guard: truncate prompt to 100KB before regex (Judge B finding) - Clock-skew defense: future timestamps not counted as recent (Judge A) - State namespace: false_positive_guard.{last_evidence_seen_at, last_evidence_source, evidence_total, advisories_emitted, last_advisory_at} Tests: 47 passed (parametrized DE/EN keywords, time-window logic, DoS-guard, subprocess integration, state persistence, edge cases). Coverage: hook fully exercised; lib/state.py shared paths unchanged. Code-Reviewers (judgment-day pattern): 2 blind judges - Judge A (Pattern-Compliance): 7/9 → CONDITIONAL YES → fixed Issue 1 - Judge B (Security/Performance): NO → 3 critical issues identified (path-traversal, race-condition, DoS). DoS fixed in-scope (input-cap). Path-traversal + race-condition are lib/state.py-wide concerns; filed as separate ERPNext follow-up tasks (out-of-scope for H1). hooks.json: registered for UserPromptSubmit (matcher "") and PreToolUse (matcher "Edit"). Total hook-commands: 13 → 15. Refs: audit-2026-05-01/META-ANALYSIS.md (4.7-friction analysis) Plan: ~/.claude/plans/jaundder-plan-hashed-tiger.md Co-Authored-By: Claude Opus 4.7 (1M context) --- hooks/false-positive-guard.py | 188 +++++++++++++++ hooks/hooks.json | 16 ++ tests/test_false_positive_guard.py | 369 +++++++++++++++++++++++++++++ 3 files changed, 573 insertions(+) create mode 100644 hooks/false-positive-guard.py create mode 100644 tests/test_false_positive_guard.py diff --git a/hooks/false-positive-guard.py b/hooks/false-positive-guard.py new file mode 100644 index 0000000..c9bff05 --- /dev/null +++ b/hooks/false-positive-guard.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +"""Hook: false-positive-guard (UserPromptSubmit + PreToolUse Edit) + +Mitigates Opus 4.7 confidence-drift pattern: claim a bug, then edit without +visible evidence. Tracks "bug evidence" timestamps in session state. + +Two events: +- UserPromptSubmit: scan prompt for bug-evidence keywords (DE+EN). If found, + store timestamp in state with source="user_prompt". No output. +- PreToolUse (Edit): check state for recent evidence (default: <10 min). + If evidence is fresh: silent pass. If stale or missing: emit advisory. + +Never blocks. Always exit 0. Advisory tells Claude to read the file + verify +the bug exists before editing. + +Background: Opus 4.7 audit-2 trend +21% Wrong-Approach + new "false-positive +bug invention" pattern after 4.6→4.7 swap. This hook is the structural +mitigation (vs prompt-level guidance which 4.7's higher confidence overrides). +""" + +from __future__ import annotations + +import json +import re +import sys +import time +from pathlib import Path + +# Add hooks dir to path for lib import +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from lib.state import SessionState + +HOOK_NAME = "false_positive_guard" +EVIDENCE_WINDOW_SECONDS = 600 # 10 minutes — recent evidence threshold +MAX_PROMPT_LEN = 100_000 # 100 KB — pathological-input DoS guard + +# Bug-evidence patterns in user prompts (DE + EN) +BUG_PATTERNS = [ + # English bug words (word boundaries to avoid "debugger" etc. — but allow "bug" itself) + re.compile(r"\bbug(s|gy)?\b", re.IGNORECASE), + re.compile(r"\bbroken\b", re.IGNORECASE), + re.compile(r"\bcrash(ed|es|ing)?\b", re.IGNORECASE), + re.compile(r"\bdoesn'?t\s+work\b", re.IGNORECASE), + re.compile(r"\bnot\s+working\b", re.IGNORECASE), + re.compile(r"\berror\s+(in|on|when|while|at)\b", re.IGNORECASE), + re.compile(r"\bexception\b", re.IGNORECASE), + re.compile(r"\bstack\s*trace\b", re.IGNORECASE), + re.compile(r"\btraceback\b", re.IGNORECASE), + re.compile(r"\bfix(es|ed|ing)?\s+(the|a|this)?\s*(typo|bug|crash|error|issue|problem)\b", re.IGNORECASE), + re.compile(r"\bfailing\b", re.IGNORECASE), + re.compile(r"\bfailure\b", re.IGNORECASE), + re.compile(r"\bregression\b", re.IGNORECASE), + # German bug words + re.compile(r"\bfehler\b", re.IGNORECASE), + re.compile(r"\bkaputt\b", re.IGNORECASE), + re.compile(r"\bcras(c)?ht\b", re.IGNORECASE), # "crasht" common DE-EN mix + re.compile(r"\bgeht\s+nicht\b", re.IGNORECASE), + re.compile(r"\bfunktioniert\s+nicht\b", re.IGNORECASE), + re.compile(r"\bproblem\s+(mit|bei|in)\b", re.IGNORECASE), + re.compile(r"\babsturz\b", re.IGNORECASE), +] + +# Failure markers in tool output (Bash/test output) +FAILURE_PATTERNS = [ + re.compile(r"\bFAIL(ED|URE|ING)?\b"), + re.compile(r"\bTraceback\b"), + re.compile(r"\bError(:|\s+at|\s+in|\s+on)\b"), + re.compile(r"\b\d+\s+failed\b", re.IGNORECASE), + re.compile(r"\bAssertionError\b"), + re.compile(r"\bruff\s+check\s+found\s+\d+\s+errors?\b", re.IGNORECASE), +] + + +def detect_bug_evidence(prompt: str | None) -> bool: + """Return True if the prompt contains a bug-evidence keyword (DE/EN). + + Truncates input to MAX_PROMPT_LEN as DoS-guard against pathological inputs. + """ + if not prompt: + return False + # DoS-guard: truncate pathological inputs before regex (Judge B finding) + sample = prompt[:MAX_PROMPT_LEN] if len(prompt) > MAX_PROMPT_LEN else prompt + return any(p.search(sample) for p in BUG_PATTERNS) + + +def detect_failure_in_tool_output(output: str | None) -> bool: + """Return True if a tool output text contains a failure marker.""" + if not output: + return False + return any(p.search(output) for p in FAILURE_PATTERNS) + + +def is_evidence_recent(timestamp: float | None, threshold_seconds: int = EVIDENCE_WINDOW_SECONDS) -> bool: + """Return True if timestamp is within threshold from now. + + Inclusive at the boundary: timestamp - now <= threshold. + Future timestamps (clock skew) are treated as NOT recent (Judge A finding). + """ + if not timestamp: + return False + try: + age = time.time() - float(timestamp) + except (TypeError, ValueError): + return False + return 0 <= age <= threshold_seconds + + +def _handle_user_prompt(data: dict, session_state: SessionState) -> None: + """Detect bug evidence in user prompt; update state if found.""" + prompt = data.get("prompt") or "" + if not detect_bug_evidence(prompt): + return + + ns = session_state.get(HOOK_NAME) or {} + ns["last_evidence_seen_at"] = time.time() + ns["last_evidence_source"] = "user_prompt" + ns["evidence_total"] = int(ns.get("evidence_total", 0)) + 1 + session_state.set(HOOK_NAME, ns) + session_state.save() + + +def _handle_pre_edit(data: dict, session_state: SessionState) -> None: + """Check for recent evidence; emit advisory if missing/stale.""" + tool_name = data.get("tool_name", "") + if tool_name != "Edit": + return # Defensive — only Edit is in scope + + ns = session_state.get(HOOK_NAME) or {} + last_at = ns.get("last_evidence_seen_at") + + if is_evidence_recent(last_at): + return # Evidence is fresh — silent pass + + # No recent evidence → emit advisory + file_path = (data.get("tool_input") or {}).get("file_path", "") + advisory = ( + "⚠️ Edit without visible bug-evidence in recent context. " + "Opus 4.7 confidence-drift risk: false-positive bug invention. " + f"File: {file_path}. " + "Before editing: (1) READ the file, (2) cite the source/line that " + "demonstrates the bug, (3) if no source exists, ASK Joe what to fix. " + "Do NOT edit on speculation. (Hook: false-positive-guard)" + ) + print(json.dumps({"additionalContext": advisory})) + + ns["advisories_emitted"] = int(ns.get("advisories_emitted", 0)) + 1 + ns["last_advisory_at"] = time.time() + session_state.set(HOOK_NAME, ns) + session_state.save() + + +def main() -> None: + # Read stdin JSON + try: + raw = sys.stdin.read() + data = json.loads(raw) if raw.strip() else {} + except Exception: + sys.exit(0) + + if not isinstance(data, dict): + sys.exit(0) + + event = data.get("hook_event_name", "") + session_id = data.get("session_id", "unknown") + + if not event: + sys.exit(0) + + try: + session_state = SessionState(session_id) + except Exception: + sys.exit(0) + + try: + if event == "UserPromptSubmit": + _handle_user_prompt(data, session_state) + elif event == "PreToolUse": + _handle_pre_edit(data, session_state) + except Exception: + # Never block; never raise + pass + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/hooks/hooks.json b/hooks/hooks.json index 77fe6cc..0637d96 100644 --- a/hooks/hooks.json +++ b/hooks/hooks.json @@ -31,6 +31,11 @@ "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/scope-tracker.py\"", "timeout": 3, "statusMessage": "Scope Tracker..." + }, + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/false-positive-guard.py\"", + "timeout": 3 } ] } @@ -57,6 +62,17 @@ "statusMessage": "Exploration Check..." } ] + }, + { + "matcher": "Edit", + "hooks": [ + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/false-positive-guard.py\"", + "timeout": 3, + "statusMessage": "Bug-Evidence Check..." + } + ] } ], "PostToolUse": [ diff --git a/tests/test_false_positive_guard.py b/tests/test_false_positive_guard.py new file mode 100644 index 0000000..eb97963 --- /dev/null +++ b/tests/test_false_positive_guard.py @@ -0,0 +1,369 @@ +"""Tests for hooks/false-positive-guard.py — Opus 4.7 confidence-drift mitigation. + +Two-event hook: +- UserPromptSubmit: scan prompt for bug-evidence keywords, update state with timestamp +- PreToolUse (Edit): if no recent bug-evidence in state, emit advisory; else silent pass + +Covers: +- Bug-evidence pattern detection (DE+EN) +- Tool-output failure detection (FAIL, Traceback, Error) +- State persistence across invocations +- Time-window logic (recent vs stale evidence) +- Edge cases (empty/malformed JSON, missing fields) +- Subprocess integration with both event types +""" + +from __future__ import annotations + +import importlib.util +import json +import os +import subprocess +import sys +import time +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent +HOOK_FILE = REPO_ROOT / "hooks" / "false-positive-guard.py" + +# Load dash-named hook module via importlib (same pattern as test_correction_detect) +sys.path.insert(0, str(REPO_ROOT / "hooks")) +_spec = importlib.util.spec_from_file_location("false_positive_guard", HOOK_FILE) +fpg = importlib.util.module_from_spec(_spec) +sys.modules["false_positive_guard"] = fpg +_spec.loader.exec_module(fpg) + + +# --------------------------------------------------------------------------- +# Pure-Function Tests: bug-evidence detection +# --------------------------------------------------------------------------- + + +class TestDetectBugEvidenceGerman: + @pytest.mark.parametrize( + "prompt", + [ + "es gibt einen bug in der API", + "diese Funktion ist kaputt", + "ich bekomme einen fehler beim start", + "es crasht beim Import", + "das geht nicht mehr", + "exception in main loop", + "stack trace zeigt eine NullPointerException", + ], + ) + def test_german_bug_keywords(self, prompt): + assert fpg.detect_bug_evidence(prompt), f"failed for: {prompt}" + + +class TestDetectBugEvidenceEnglish: + @pytest.mark.parametrize( + "prompt", + [ + "there's a bug in the parser", + "the function is broken", + "it crashes on startup", + "it doesn't work anymore", + "I'm getting an error in the logger", + "fix the typo on line 42", + "exception thrown when calling foo()", + "this is failing my tests", + ], + ) + def test_english_bug_keywords(self, prompt): + assert fpg.detect_bug_evidence(prompt), f"failed for: {prompt}" + + +class TestDetectBugEvidenceFromToolOutput: + @pytest.mark.parametrize( + "output", + [ + "FAILED tests/test_x.py::test_foo - AssertionError", + "Traceback (most recent call last):", + "ERROR: AssertionError: expected 5, got 3", + "1 failed, 0 passed", + "ruff check found 3 errors", + ], + ) + def test_failure_in_tool_output(self, output): + assert fpg.detect_failure_in_tool_output(output), f"failed for: {output}" + + +class TestNoBugEvidence: + @pytest.mark.parametrize( + "prompt", + [ + "bitte schreibe eine neue Funktion für X", + "kannst du den Code refactoren?", + "add a new feature for parsing", + "make this class more readable", + "ich möchte eine neue Datei anlegen", + "let's implement the new endpoint", + "documentation update", + ], + ) + def test_neutral_prompts_no_evidence(self, prompt): + assert not fpg.detect_bug_evidence(prompt), f"false positive for: {prompt}" + + def test_empty_prompt(self): + assert not fpg.detect_bug_evidence("") + + def test_none_prompt(self): + assert not fpg.detect_bug_evidence(None) + + +# --------------------------------------------------------------------------- +# Pure-Function Tests: time-window logic +# --------------------------------------------------------------------------- + + +class TestEvidenceWindowLogic: + def test_recent_evidence_is_recent(self): + now = time.time() + assert fpg.is_evidence_recent(now - 60, threshold_seconds=600) + + def test_stale_evidence_is_not_recent(self): + now = time.time() + assert not fpg.is_evidence_recent(now - 3600, threshold_seconds=600) + + def test_no_evidence_timestamp_is_not_recent(self): + assert not fpg.is_evidence_recent(0, threshold_seconds=600) + assert not fpg.is_evidence_recent(None, threshold_seconds=600) + + def test_threshold_boundary(self): + now = time.time() + # Exactly at threshold should be considered recent (inclusive) + assert fpg.is_evidence_recent(now - 599, threshold_seconds=600) + # Past threshold is stale + assert not fpg.is_evidence_recent(now - 601, threshold_seconds=600) + + def test_future_timestamp_clock_skew_not_recent(self): + """Clock-skew defense: future timestamps must NOT count as recent.""" + now = time.time() + assert not fpg.is_evidence_recent(now + 60, threshold_seconds=600) + assert not fpg.is_evidence_recent(now + 3600, threshold_seconds=600) + + def test_invalid_timestamp_type_not_recent(self): + assert not fpg.is_evidence_recent("not-a-number", threshold_seconds=600) + assert not fpg.is_evidence_recent([1, 2, 3], threshold_seconds=600) + + +class TestDoSGuard: + def test_long_prompt_truncated_still_finds_evidence(self): + """Long prompts are truncated to MAX_PROMPT_LEN; bug-keyword in prefix still matches.""" + long_prompt = "bug detected " + "x" * 200_000 + start = time.time() + result = fpg.detect_bug_evidence(long_prompt) + elapsed = time.time() - start + assert result, "expected match for 'bug' in prefix" + assert elapsed < 0.5, f"DoS-guard too slow: {elapsed:.3f}s" + + def test_long_prompt_no_match_still_fast(self): + """Pathological no-match input must not DoS due to backtracking.""" + long_prompt = "x" * 200_000 + start = time.time() + result = fpg.detect_bug_evidence(long_prompt) + elapsed = time.time() - start + assert not result + assert elapsed < 0.5, f"DoS-guard too slow: {elapsed:.3f}s" + + +# --------------------------------------------------------------------------- +# Subprocess Integration: UserPromptSubmit +# --------------------------------------------------------------------------- + + +def _run_hook(payload: dict, tmp_path: Path) -> subprocess.CompletedProcess: + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + return subprocess.run( + [sys.executable, str(HOOK_FILE)], + input=json.dumps(payload), + capture_output=True, + text=True, + timeout=10, + env=env, + ) + + +class TestUserPromptSubmitEvent: + def test_bug_keyword_in_prompt_updates_state(self, tmp_path): + session_id = "test-fpg-prompt-bug" + payload = { + "hook_event_name": "UserPromptSubmit", + "session_id": session_id, + "prompt": "fix the bug in the parser", + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + # UserPromptSubmit never emits advisory — only updates state + assert r.stdout.strip() == "" + + state_path = tmp_path / f".meta-state-{session_id}.json" + assert state_path.exists() + state = json.loads(state_path.read_text(encoding="utf-8")) + ns = state.get("false_positive_guard", {}) + assert ns.get("last_evidence_seen_at"), f"timestamp missing: {ns}" + assert ns.get("last_evidence_source") == "user_prompt" + + def test_neutral_prompt_does_not_set_evidence(self, tmp_path): + session_id = "test-fpg-prompt-neutral" + payload = { + "hook_event_name": "UserPromptSubmit", + "session_id": session_id, + "prompt": "please add a new feature for parsing", + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + + state_path = tmp_path / f".meta-state-{session_id}.json" + if state_path.exists(): + state = json.loads(state_path.read_text(encoding="utf-8")) + ns = state.get("false_positive_guard", {}) + assert not ns.get("last_evidence_seen_at"), f"evidence set for neutral prompt: {ns}" + + +# --------------------------------------------------------------------------- +# Subprocess Integration: PreToolUse Edit +# --------------------------------------------------------------------------- + + +class TestPreToolUseEditAdvisory: + def test_edit_without_recent_evidence_emits_advisory(self, tmp_path): + session_id = "test-fpg-edit-no-evidence" + payload = { + "hook_event_name": "PreToolUse", + "session_id": session_id, + "tool_name": "Edit", + "tool_input": {"file_path": "/some/file.py"}, + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip(), "expected additionalContext output" + out = json.loads(r.stdout.strip()) + ctx = out.get("additionalContext", "") + assert "confidence" in ctx.lower() or "evidence" in ctx.lower() or "beleg" in ctx.lower() + + def test_edit_with_recent_evidence_passes_silent(self, tmp_path): + session_id = "test-fpg-edit-with-evidence" + # First, create evidence via UserPromptSubmit + prompt_payload = { + "hook_event_name": "UserPromptSubmit", + "session_id": session_id, + "prompt": "there's a bug in the parser", + } + _run_hook(prompt_payload, tmp_path) + + # Then trigger PreToolUse Edit — should be silent because evidence is fresh + edit_payload = { + "hook_event_name": "PreToolUse", + "session_id": session_id, + "tool_name": "Edit", + "tool_input": {"file_path": "/some/file.py"}, + } + r = _run_hook(edit_payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip() == "", f"unexpected output: {r.stdout!r}" + + def test_edit_with_non_edit_tool_passes(self, tmp_path): + # Hook is registered with matcher: Edit, but defensively test other tool_name + session_id = "test-fpg-non-edit-tool" + payload = { + "hook_event_name": "PreToolUse", + "session_id": session_id, + "tool_name": "Bash", # not Edit + "tool_input": {"command": "ls"}, + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + # Should not emit advisory for non-Edit tools (defensive) + assert r.stdout.strip() == "" + + +# --------------------------------------------------------------------------- +# Edge Cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_invalid_json_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="{not valid json", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_empty_stdin_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 + + def test_missing_event_name_exits_0(self, tmp_path): + # No hook_event_name → can't dispatch, exit 0 silent + payload = {"session_id": "x", "prompt": "bug here"} + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_advisory_json_valid_and_bounded(self, tmp_path): + """Advisory JSON must parse cleanly and stay <2KB (UI-budget).""" + session_id = "test-fpg-json-format" + payload = { + "hook_event_name": "PreToolUse", + "session_id": session_id, + "tool_name": "Edit", + "tool_input": {"file_path": "/some/file.py"}, + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + out = json.loads(r.stdout.strip()) + assert "additionalContext" in out + assert isinstance(out["additionalContext"], str) + assert len(out["additionalContext"]) < 2000, "advisory too long for UI" + + def test_state_persistence_across_invocations(self, tmp_path): + session_id = "test-fpg-persist" + # First UserPromptSubmit with bug + _run_hook( + { + "hook_event_name": "UserPromptSubmit", + "session_id": session_id, + "prompt": "bug in foo", + }, + tmp_path, + ) + # Second UserPromptSubmit (neutral) — should NOT clear evidence + _run_hook( + { + "hook_event_name": "UserPromptSubmit", + "session_id": session_id, + "prompt": "okay let's continue", + }, + tmp_path, + ) + # PreToolUse Edit — should still see fresh evidence + r = _run_hook( + { + "hook_event_name": "PreToolUse", + "session_id": session_id, + "tool_name": "Edit", + "tool_input": {"file_path": "/x.py"}, + }, + tmp_path, + ) + assert r.returncode == 0 + assert r.stdout.strip() == "", "evidence should persist, advisory should NOT fire" From 5cce1209d936dd396e366a2409478be04e9941ee Mon Sep 17 00:00:00 2001 From: "ai-engineering.at" Date: Fri, 1 May 2026 17:26:14 +0200 Subject: [PATCH 2/5] feat(hooks): add org-naming-pre-push for Wrong-Folder/Repo mitigation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit H2 of 4 new hooks. PreToolUse Bash matcher; only fires on `git push` commands. Reads cwd's .git/config, parses origin URL, classifies org: - allow: AI-Engineering-at, LEEI1337, FoxLabs-ai → silent pass - typo: AI-Engineerings-at → advisory with set-url remediation - unknown: any other org → advisory (intentional fork? re-check) - none: no origin / non-GitHub → silent pass Default mode: advisory only (never blocks). Push works via redirect for typo-org. Future config flag could enable strict block-mode after a soak window. Implementation: - is_git_push_command: regex word-boundary, allows `git -C path push` - parse_org_from_url: handles https:// and git@ URLs - _read_origin_url: pure file-read (no subprocess), tolerates missing config - State namespace: org_naming_pre_push.{push_count, violations_warned, last_org, last_classification} Tests: 40 passed (parametrized URL/org/command parsing, fake-repo fixtures, edge cases). hooks.json: registered alongside approach-guard in PreToolUse Bash matcher. Total hook-commands: 15 → 16. Refs: audit-2026-05-01/MASTER-FIX-EXECUTION.md (#7 Org-Naming-Drift) Plan: ~/.claude/plans/jaundder-plan-hashed-tiger.md Co-Authored-By: Claude Opus 4.7 (1M context) --- hooks/hooks.json | 5 + hooks/org-naming-pre-push.py | 171 ++++++++++++++++++ tests/test_org_naming_pre_push.py | 276 ++++++++++++++++++++++++++++++ 3 files changed, 452 insertions(+) create mode 100644 hooks/org-naming-pre-push.py create mode 100644 tests/test_org_naming_pre_push.py diff --git a/hooks/hooks.json b/hooks/hooks.json index 0637d96..b1a016e 100644 --- a/hooks/hooks.json +++ b/hooks/hooks.json @@ -49,6 +49,11 @@ "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/approach-guard.py\"", "timeout": 3, "statusMessage": "Approach Guard..." + }, + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/org-naming-pre-push.py\"", + "timeout": 3 } ] }, diff --git a/hooks/org-naming-pre-push.py b/hooks/org-naming-pre-push.py new file mode 100644 index 0000000..9ed79d8 --- /dev/null +++ b/hooks/org-naming-pre-push.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +"""Hook: org-naming-pre-push (PreToolUse Bash, matcher: git push) + +Mitigates Wrong-Folder/Repo friction (Audit pattern, +21% post-4.7). + +Reads the repo's `.git/config` from cwd, extracts the origin URL, parses +the GitHub org/user, and emits an advisory if the org is NOT in the +allowlist (or is the known typo-org "AI-Engineerings-at"). + +Default mode: advisory (exit 0 + additionalContext). Never blocks. +A future config flag could enable strict block-mode after a soak period. + +Allowlist: AI-Engineering-at, LEEI1337, FoxLabs-ai. +Typo-org explicitly flagged: AI-Engineerings-at (server-redirected, but +local remote URLs should be migrated — see Session B5 audit). +""" + +from __future__ import annotations + +import json +import re +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from lib.state import SessionState + +HOOK_NAME = "org_naming_pre_push" + +ALLOWLIST = frozenset({"AI-Engineering-at", "LEEI1337", "FoxLabs-ai"}) +KNOWN_TYPO = frozenset({"AI-Engineerings-at"}) + +# `git push` at a word boundary (not as substring of echoed text). +# Allow optional flags like `-C path` between `git` and `push`. +PUSH_PATTERN = re.compile(r"^\s*git(?:\s+-\S+\s+\S+)*\s+push\b") + +# Parse GitHub org from https://github.com/ORG/repo or git@github.com:ORG/repo +HTTPS_URL_PATTERN = re.compile(r"^https?://github\.com/([A-Za-z0-9_.-]+)/") +SSH_URL_PATTERN = re.compile(r"^git@github\.com:([A-Za-z0-9_.-]+)/") + + +def is_git_push_command(command: str | None) -> bool: + """Return True if the command is a real `git push` invocation. + + Excludes echoed text like `echo 'git push'`. + """ + if not command: + return False + return bool(PUSH_PATTERN.match(command)) + + +def parse_org_from_url(url: str | None) -> str | None: + """Extract the GitHub org/user from a remote URL. None if not a GitHub URL.""" + if not url: + return None + url = url.strip() + m = HTTPS_URL_PATTERN.match(url) or SSH_URL_PATTERN.match(url) + if not m: + return None + org = m.group(1) + if not org or "/" in org: + return None + return org + + +def classify_org(org: str | None) -> str: + """Return one of: 'allow' | 'typo' | 'unknown' | 'none'.""" + if not org: + return "none" + if org in ALLOWLIST: + return "allow" + if org in KNOWN_TYPO: + return "typo" + return "unknown" + + +def _read_origin_url(cwd: str) -> str | None: + """Read origin URL from .git/config in cwd. Returns None if missing/unreadable.""" + if not cwd: + return None + git_config = Path(cwd) / ".git" / "config" + if not git_config.is_file(): + return None + try: + content = git_config.read_text(encoding="utf-8", errors="replace") + except OSError: + return None + # Find the [remote "origin"] section + url line. Simple parser without configparser + # (configparser dislikes the special `[remote "origin"]` syntax in some setups). + in_origin = False + for line in content.splitlines(): + stripped = line.strip() + if stripped.startswith("[") and stripped.endswith("]"): + in_origin = stripped == '[remote "origin"]' + continue + if in_origin and stripped.lower().startswith("url"): + # Format: `url = https://...` or `\turl=...` + _, _, value = stripped.partition("=") + return value.strip() or None + return None + + +def _build_advisory(org: str, classification: str, command: str) -> str: + if classification == "typo": + return ( + f"⚠️ Push target uses TYPO-org '{org}' (should be 'AI-Engineering-at'). " + f"Server-side migration is active, but local remote URL still points " + f"to the old org. Consider: `git remote set-url origin " + f"https://github.com/AI-Engineering-at/.git`. " + f"Push may still work via redirect. (Hook: org-naming-pre-push)" + ) + # unknown + return ( + f"⚠️ Push target uses unknown org '{org}' (allowlist: AI-Engineering-at, " + f"LEEI1337, FoxLabs-ai). If intentional (e.g., third-party fork), " + f"silence-mark in state; otherwise re-check `git remote get-url origin`. " + f"Command: {command[:80]}. (Hook: org-naming-pre-push)" + ) + + +def main() -> None: + try: + raw = sys.stdin.read() + data = json.loads(raw) if raw.strip() else {} + except Exception: + sys.exit(0) + + if not isinstance(data, dict): + sys.exit(0) + + if data.get("hook_event_name") != "PreToolUse": + sys.exit(0) + if data.get("tool_name") != "Bash": + sys.exit(0) + + command = (data.get("tool_input") or {}).get("command", "") + if not is_git_push_command(command): + sys.exit(0) + + cwd = data.get("cwd", "") + origin_url = _read_origin_url(cwd) + org = parse_org_from_url(origin_url) + classification = classify_org(org) + + if classification in ("allow", "none"): + sys.exit(0) # Silent pass + + # typo or unknown → emit advisory + advisory = _build_advisory(org, classification, command) + print(json.dumps({"additionalContext": advisory})) + + # Update state for telemetry + try: + session_id = data.get("session_id", "unknown") + state = SessionState(session_id) + ns = state.get(HOOK_NAME) or {} + ns["push_count"] = int(ns.get("push_count", 0)) + 1 + ns["violations_warned"] = int(ns.get("violations_warned", 0)) + 1 + ns["last_org"] = org + ns["last_classification"] = classification + state.set(HOOK_NAME, ns) + state.save() + except Exception: + pass + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/tests/test_org_naming_pre_push.py b/tests/test_org_naming_pre_push.py new file mode 100644 index 0000000..4a4f039 --- /dev/null +++ b/tests/test_org_naming_pre_push.py @@ -0,0 +1,276 @@ +"""Tests for hooks/org-naming-pre-push.py — Wrong-Folder/Repo mitigation. + +Hook only fires on PreToolUse Bash with `git push` in command. Extracts the +repo's origin URL from cwd, parses the GitHub org/user, and emits an advisory +if the org is NOT in the allowlist (default: AI-Engineering-at, LEEI1337, +FoxLabs-ai). Special-case: warn explicitly on the typo-org "AI-Engineerings-at". + +Default mode: advisory (exit 0 + additionalContext). Optional block-mode +gated by config flag (NOT enabled by default). +""" + +from __future__ import annotations + +import importlib.util +import json +import os +import subprocess +import sys +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent +HOOK_FILE = REPO_ROOT / "hooks" / "org-naming-pre-push.py" + +sys.path.insert(0, str(REPO_ROOT / "hooks")) +_spec = importlib.util.spec_from_file_location("org_naming_pre_push", HOOK_FILE) +ong = importlib.util.module_from_spec(_spec) +sys.modules["org_naming_pre_push"] = ong +_spec.loader.exec_module(ong) + + +# --------------------------------------------------------------------------- +# Pure-Function: command parsing +# --------------------------------------------------------------------------- + + +class TestIsGitPushCommand: + @pytest.mark.parametrize( + "command", + [ + "git push", + "git push origin main", + "git push origin feature/x", + "git push --force-with-lease origin main", + "git -C /some/path push origin master", + "git push origin HEAD:refs/heads/main", + ], + ) + def test_recognized_pushes(self, command): + assert ong.is_git_push_command(command), f"missed: {command!r}" + + @pytest.mark.parametrize( + "command", + [ + "git status", + "git pull", + "git fetch origin", + "git commit -m 'feat: x'", + "echo 'git push'", # not actually a push, just text + "git log --oneline", + "ls -la", + "", + ], + ) + def test_non_push_commands(self, command): + assert not ong.is_git_push_command(command), f"false-fire: {command!r}" + + +# --------------------------------------------------------------------------- +# Pure-Function: org parsing +# --------------------------------------------------------------------------- + + +class TestParseOrgFromUrl: + @pytest.mark.parametrize( + "url,expected", + [ + ("https://github.com/AI-Engineering-at/nomos.git", "AI-Engineering-at"), + ("https://github.com/AI-Engineering-at/nomos", "AI-Engineering-at"), + ("https://github.com/LEEI1337/BBB000k-.git", "LEEI1337"), + ("git@github.com:AI-Engineering-at/zeroth.git", "AI-Engineering-at"), + ("git@github.com:LEEI1337/repo", "LEEI1337"), + ("https://github.com/AI-Engineerings-at/typo.git", "AI-Engineerings-at"), + ("https://github.com/FoxLabs-ai/proj.git", "FoxLabs-ai"), + ("https://github.com/external-fork/repo.git", "external-fork"), + ], + ) + def test_extracts_org(self, url, expected): + assert ong.parse_org_from_url(url) == expected + + @pytest.mark.parametrize( + "url", + [ + "", + None, + "not-a-git-url", + "https://gitlab.com/foo/bar.git", # not GitHub + "https://github.com/", # missing org + ], + ) + def test_unparseable_returns_none(self, url): + assert ong.parse_org_from_url(url) is None + + +# --------------------------------------------------------------------------- +# Pure-Function: classify_org +# --------------------------------------------------------------------------- + + +class TestClassifyOrg: + def test_allowlisted(self): + assert ong.classify_org("AI-Engineering-at") == "allow" + assert ong.classify_org("LEEI1337") == "allow" + assert ong.classify_org("FoxLabs-ai") == "allow" + + def test_typo_org(self): + assert ong.classify_org("AI-Engineerings-at") == "typo" + + def test_unknown_org(self): + assert ong.classify_org("johndpope") == "unknown" + assert ong.classify_org("random-fork") == "unknown" + + def test_none_org(self): + assert ong.classify_org(None) == "none" + assert ong.classify_org("") == "none" + + +# --------------------------------------------------------------------------- +# Subprocess Integration +# --------------------------------------------------------------------------- + + +def _run_hook(payload: dict, tmp_path: Path) -> subprocess.CompletedProcess: + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + return subprocess.run( + [sys.executable, str(HOOK_FILE)], + input=json.dumps(payload), + capture_output=True, + text=True, + timeout=10, + env=env, + ) + + +class TestNonPushCommandIsSilent: + def test_git_status_silent(self, tmp_path): + payload = { + "hook_event_name": "PreToolUse", + "session_id": "test-ong-status", + "tool_name": "Bash", + "tool_input": {"command": "git status"}, + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_non_bash_tool_silent(self, tmp_path): + payload = { + "hook_event_name": "PreToolUse", + "session_id": "test-ong-non-bash", + "tool_name": "Edit", + "tool_input": {"file_path": "/x.py"}, + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + +class TestPushAdvisoryBehavior: + def _push_payload(self, session_id: str, cwd: str = ""): + return { + "hook_event_name": "PreToolUse", + "session_id": session_id, + "tool_name": "Bash", + "tool_input": {"command": "git push origin main"}, + "cwd": cwd, + } + + def test_push_in_typo_org_emits_warning(self, tmp_path, monkeypatch): + """When cwd is in a repo with AI-Engineerings-at remote, hook warns.""" + # Create fake repo dir with .git/config containing typo-org URL + repo = tmp_path / "fake-typo-repo" + (repo / ".git").mkdir(parents=True) + (repo / ".git" / "config").write_text( + '[remote "origin"]\n\turl = https://github.com/AI-Engineerings-at/foo.git\n', + encoding="utf-8", + ) + payload = self._push_payload("test-ong-typo", str(repo)) + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip(), "expected advisory output" + out = json.loads(r.stdout.strip()) + ctx = out.get("additionalContext", "") + assert "Engineerings-at" in ctx or "typo" in ctx.lower() + + def test_push_in_allowed_org_silent(self, tmp_path): + repo = tmp_path / "fake-allowed-repo" + (repo / ".git").mkdir(parents=True) + (repo / ".git" / "config").write_text( + '[remote "origin"]\n\turl = https://github.com/AI-Engineering-at/foo.git\n', + encoding="utf-8", + ) + payload = self._push_payload("test-ong-allowed", str(repo)) + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_push_in_unknown_org_emits_warning(self, tmp_path): + repo = tmp_path / "fake-unknown-repo" + (repo / ".git").mkdir(parents=True) + (repo / ".git" / "config").write_text( + '[remote "origin"]\n\turl = https://github.com/johndpope/llama-cpp.git\n', + encoding="utf-8", + ) + payload = self._push_payload("test-ong-unknown", str(repo)) + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip(), "expected advisory for unknown org" + out = json.loads(r.stdout.strip()) + ctx = out.get("additionalContext", "") + assert "johndpope" in ctx or "unknown" in ctx.lower() or "allowlist" in ctx.lower() + + def test_push_with_no_origin_silent(self, tmp_path): + """Repo with no origin remote (Documents-parent case) → silent pass.""" + repo = tmp_path / "fake-no-origin" + (repo / ".git").mkdir(parents=True) + (repo / ".git" / "config").write_text("[core]\n\trepositoryformatversion = 0\n", encoding="utf-8") + payload = self._push_payload("test-ong-no-origin", str(repo)) + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + # No origin → can't classify → silent (don't fire on every Bash) + assert r.stdout.strip() == "" + + +# --------------------------------------------------------------------------- +# Edge Cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_invalid_json_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="{not valid json", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_empty_stdin_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 + + def test_missing_command_silent(self, tmp_path): + payload = { + "hook_event_name": "PreToolUse", + "session_id": "test-ong-no-cmd", + "tool_name": "Bash", + "tool_input": {}, + } + r = _run_hook(payload, tmp_path) + assert r.returncode == 0 + assert r.stdout.strip() == "" From c182f4c601c0ea46fb0f914fa5bec29fe3c5db24 Mon Sep 17 00:00:00 2001 From: "ai-engineering.at" Date: Fri, 1 May 2026 17:28:30 +0200 Subject: [PATCH 3/5] feat(hooks): add ahead-of-remote-warning for data-loss-risk mitigation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit H3 of 4 new hooks. SessionStart event. Iterates a configurable watch-list of repos and runs `git rev-list --count origin/..HEAD` (NO fetch — pure local) for each. Emits an advisory on Session-Start if any repo is ≥ warn threshold (5), critical (20). Watch-list resolution: 1. Env AHEAD_WARN_WATCH (comma-separated paths) — for testing/override 2. Default: phantom-ai, nomos, zeroth, Playbook01, wiki under ~/Documents Pure-local: no `git fetch`, no network. count_ahead() returns None when repo is invalid, has no origin, or git times out (5s per call). Multiple repo checks are sequential; total bound by 30s hook timeout in hooks.json. Implementation: - classify_severity: count → ok|warn|critical|unknown - count_ahead: subprocess git rev-list, defensive - _current_branch: subprocess git symbolic-ref - _resolve_watch_list: env-or-defaults - _build_advisory: formatted multi-repo output with severity icons Tests: 13 passed (severity classification, real-git fixture with clone+commits, env-override watch-list, edge cases). hooks.json: registered alongside session-start in SessionStart. Total hook-commands: 16 → 17. Refs: audit-2026-05-01/COMPREHENSIVE-AUDIT (nomos 97 unpushed pattern) Plan: ~/.claude/plans/jaundder-plan-hashed-tiger.md Co-Authored-By: Claude Opus 4.7 (1M context) --- hooks/ahead-of-remote-warning.py | 176 ++++++++++++++++++++++ hooks/hooks.json | 5 + tests/test_ahead_of_remote_warning.py | 205 ++++++++++++++++++++++++++ 3 files changed, 386 insertions(+) create mode 100644 hooks/ahead-of-remote-warning.py create mode 100644 tests/test_ahead_of_remote_warning.py diff --git a/hooks/ahead-of-remote-warning.py b/hooks/ahead-of-remote-warning.py new file mode 100644 index 0000000..b109a46 --- /dev/null +++ b/hooks/ahead-of-remote-warning.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +"""Hook: ahead-of-remote-warning (SessionStart) + +Mitigates "Ahead-of-Remote unentdeckt" pattern (audit: nomos had 97 +unpushed commits before today's audit forced a push). + +On SessionStart, iterates a configurable watch-list of repos and runs +`git rev-list --count origin/..HEAD` (NO fetch — uses local refs) +for each. Emits an advisory if any repo is ≥ warn threshold (5 ahead), +escalates if ≥ critical threshold (20). + +Watch-list resolution order: + 1. Env var AHEAD_WARN_WATCH (comma-separated absolute paths) + 2. Default: phantom-ai, nomos, zeroth, Playbook01, wiki under + ~/Documents/ + +Pure-local operation: no network calls, no `git fetch`. Won't update +remote refs. Hook exit 0 always; advisory only. +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from lib.state import SessionState + +HOOK_NAME = "ahead_of_remote_warning" +WARN_THRESHOLD = 5 +CRITICAL_THRESHOLD = 20 +GIT_TIMEOUT_SECONDS = 5 + +DEFAULT_WATCH_DIRS = [ + "phantom-ai", + "nomos", + "zeroth", + "Playbook01", + "wiki", +] + + +def classify_severity(count: int | None) -> str: + """Return 'ok' | 'warn' | 'critical' | 'unknown'.""" + if count is None or count < 0: + return "unknown" + if count >= CRITICAL_THRESHOLD: + return "critical" + if count >= WARN_THRESHOLD: + return "warn" + return "ok" + + +def _current_branch(repo: str) -> str | None: + """Return current branch name, or None.""" + try: + result = subprocess.run( + ["git", "-C", repo, "symbolic-ref", "--short", "HEAD"], + capture_output=True, + text=True, + timeout=GIT_TIMEOUT_SECONDS, + ) + if result.returncode == 0: + return result.stdout.strip() or None + except (OSError, subprocess.SubprocessError): + pass + return None + + +def count_ahead(repo: str, branch: str | None = None) -> int | None: + """Count commits in `branch` that are not in `origin/`. + + Returns None when the repo is invalid, has no origin, or the count + cannot be determined within the timeout. + """ + if not repo or not Path(repo, ".git").exists(): + return None + if branch is None: + branch = _current_branch(repo) + if not branch: + return None + try: + result = subprocess.run( + ["git", "-C", repo, "rev-list", "--count", f"origin/{branch}..HEAD"], + capture_output=True, + text=True, + timeout=GIT_TIMEOUT_SECONDS, + ) + if result.returncode != 0: + return None + return int(result.stdout.strip()) + except (OSError, ValueError, subprocess.SubprocessError): + return None + + +def _resolve_watch_list() -> list[str]: + """Resolve watch-list of repo paths from env or defaults.""" + env_value = os.environ.get("AHEAD_WARN_WATCH", "").strip() + if env_value: + return [p.strip() for p in env_value.split(",") if p.strip()] + home = Path.home() + base = home / "Documents" + return [str(base / name) for name in DEFAULT_WATCH_DIRS] + + +def _build_advisory(findings: list[dict]) -> str: + """Build a single advisory string from the list of {repo, count, severity}.""" + lines = [ + "⚠️ Repos with unpushed commits (ahead-of-remote-warning):", + ] + has_critical = any(f["severity"] == "critical" for f in findings) + for f in findings: + marker = "🔴" if f["severity"] == "critical" else "🟡" + repo_name = Path(f["repo"]).name + lines.append(f" {marker} {repo_name}: {f['count']} commits ahead ({f['severity']})") + if has_critical: + lines.append( + " → CRITICAL: ≥20 commits unpushed = data-loss risk on disk crash. " + "Push now (audit pattern: nomos had 97 unpushed before today's recovery)." + ) + else: + lines.append(" → Run `git push` in each warned repo to clear advisory.") + lines.append("(Hook: ahead-of-remote-warning. Threshold: warn≥5, critical≥20.)") + return "\n".join(lines) + + +def main() -> None: + try: + raw = sys.stdin.read() + data = json.loads(raw) if raw.strip() else {} + except Exception: + sys.exit(0) + + if not isinstance(data, dict): + sys.exit(0) + if data.get("hook_event_name") != "SessionStart": + sys.exit(0) + + watch_list = _resolve_watch_list() + findings = [] + for repo in watch_list: + count = count_ahead(repo) + severity = classify_severity(count) + if severity in ("warn", "critical"): + findings.append({"repo": repo, "count": count, "severity": severity}) + + if not findings: + sys.exit(0) # No repo at risk → silent + + advisory = _build_advisory(findings) + print(json.dumps({"additionalContext": advisory})) + + try: + session_id = data.get("session_id", "unknown") + state = SessionState(session_id) + ns = state.get(HOOK_NAME) or {} + import time as _time + + ns["last_check_at"] = _time.time() + ns["repos_at_risk"] = [ + {"repo": Path(f["repo"]).name, "count": f["count"], "severity": f["severity"]} for f in findings + ] + state.set(HOOK_NAME, ns) + state.save() + except Exception: + pass + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/hooks/hooks.json b/hooks/hooks.json index b1a016e..e4de261 100644 --- a/hooks/hooks.json +++ b/hooks/hooks.json @@ -8,6 +8,11 @@ "type": "command", "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/session-start.py\"", "timeout": 10 + }, + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/ahead-of-remote-warning.py\"", + "timeout": 30 } ] } diff --git a/tests/test_ahead_of_remote_warning.py b/tests/test_ahead_of_remote_warning.py new file mode 100644 index 0000000..1b9ac3a --- /dev/null +++ b/tests/test_ahead_of_remote_warning.py @@ -0,0 +1,205 @@ +"""Tests for hooks/ahead-of-remote-warning.py — Datenverlust-Risiko Mitigation. + +SessionStart hook. Iterates a watch-list of repos, runs `git rev-list --count +origin/..HEAD` to count unpushed commits, and emits an advisory if +any repo is ≥ threshold (default: 5 ahead, 20 critical). + +Pure subprocess call (no git fetch) — won't update remote refs. +""" + +from __future__ import annotations + +import importlib.util +import json +import os +import subprocess +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent +HOOK_FILE = REPO_ROOT / "hooks" / "ahead-of-remote-warning.py" + +sys.path.insert(0, str(REPO_ROOT / "hooks")) +_spec = importlib.util.spec_from_file_location("ahead_of_remote_warning", HOOK_FILE) +arw = importlib.util.module_from_spec(_spec) +sys.modules["ahead_of_remote_warning"] = arw +_spec.loader.exec_module(arw) + + +# --------------------------------------------------------------------------- +# Pure-Function: severity classification +# --------------------------------------------------------------------------- + + +class TestClassifySeverity: + def test_below_warn_threshold(self): + assert arw.classify_severity(0) == "ok" + assert arw.classify_severity(4) == "ok" + + def test_warn_threshold(self): + assert arw.classify_severity(5) == "warn" + assert arw.classify_severity(19) == "warn" + + def test_critical_threshold(self): + assert arw.classify_severity(20) == "critical" + assert arw.classify_severity(100) == "critical" + + def test_unknown_count(self): + assert arw.classify_severity(None) == "unknown" + assert arw.classify_severity(-1) == "unknown" + + +# --------------------------------------------------------------------------- +# Pure-Function: count_ahead via subprocess (real git, fast) +# --------------------------------------------------------------------------- + + +def _make_git_repo(path: Path) -> None: + """Initialize a git repo with origin pointing to itself for self-test.""" + subprocess.run(["git", "init", "-q"], cwd=path, check=True) + subprocess.run(["git", "config", "user.email", "test@example.com"], cwd=path, check=True) + subprocess.run(["git", "config", "user.name", "Test"], cwd=path, check=True) + (path / "README.md").write_text("init", encoding="utf-8") + subprocess.run(["git", "add", "."], cwd=path, check=True) + subprocess.run(["git", "commit", "-q", "-m", "init"], cwd=path, check=True) + + +class TestCountAhead: + def test_invalid_repo_returns_none(self, tmp_path): + # Not a git repo at all + assert arw.count_ahead(str(tmp_path / "nonexistent"), "main") is None + + def test_repo_without_origin_returns_none(self, tmp_path): + repo = tmp_path / "repo-no-origin" + repo.mkdir() + _make_git_repo(repo) + # No origin configured → can't compute origin/main..HEAD → None + assert arw.count_ahead(str(repo), "main") is None + + def test_count_with_synthetic_origin(self, tmp_path): + # Create source repo, clone it, make commits in clone → ahead count > 0 + src = tmp_path / "src" + src.mkdir() + _make_git_repo(src) + clone = tmp_path / "clone" + subprocess.run(["git", "clone", "-q", str(src), str(clone)], check=True) + # Repo has 1 commit (init), clone is sync. Make 3 new commits in clone. + for i in range(3): + (clone / f"f{i}.txt").write_text(str(i), encoding="utf-8") + subprocess.run(["git", "add", "."], cwd=clone, check=True) + subprocess.run(["git", "commit", "-q", "-m", f"c{i}"], cwd=clone, check=True) + + # Detect default branch — git init may use "main" or "master" + head = subprocess.run( + ["git", "symbolic-ref", "--short", "HEAD"], + cwd=clone, + capture_output=True, + text=True, + check=True, + ).stdout.strip() + assert arw.count_ahead(str(clone), head) == 3 + + +# --------------------------------------------------------------------------- +# Subprocess Integration +# --------------------------------------------------------------------------- + + +def _run_hook(payload: dict, tmp_path: Path, watch_list_override=None) -> subprocess.CompletedProcess: + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + if watch_list_override is not None: + env["AHEAD_WARN_WATCH"] = ",".join(watch_list_override) + return subprocess.run( + [sys.executable, str(HOOK_FILE)], + input=json.dumps(payload), + capture_output=True, + text=True, + timeout=15, + env=env, + ) + + +class TestSessionStartIntegration: + def test_no_repos_at_risk_silent(self, tmp_path): + # Override watch-list to a non-existent repo → all return None → no warning + payload = {"hook_event_name": "SessionStart", "session_id": "test-arw-clean"} + r = _run_hook(payload, tmp_path, watch_list_override=[str(tmp_path / "nonexistent")]) + assert r.returncode == 0 + assert r.stdout.strip() == "", "expected silent pass when no repos at risk" + + def test_repo_with_5_ahead_emits_warn(self, tmp_path): + src = tmp_path / "src" + src.mkdir() + _make_git_repo(src) + clone = tmp_path / "watched-repo" + subprocess.run(["git", "clone", "-q", str(src), str(clone)], check=True) + # 5 commits ahead → warn threshold + for i in range(5): + (clone / f"f{i}.txt").write_text(str(i), encoding="utf-8") + subprocess.run(["git", "add", "."], cwd=clone, check=True) + subprocess.run(["git", "commit", "-q", "-m", f"c{i}"], cwd=clone, check=True) + + payload = {"hook_event_name": "SessionStart", "session_id": "test-arw-5ahead"} + r = _run_hook(payload, tmp_path, watch_list_override=[str(clone)]) + assert r.returncode == 0 + assert r.stdout.strip(), "expected advisory output" + out = json.loads(r.stdout.strip()) + ctx = out.get("additionalContext", "") + assert "5" in ctx + assert "ahead" in ctx.lower() + + def test_repo_with_20_ahead_critical(self, tmp_path): + src = tmp_path / "src" + src.mkdir() + _make_git_repo(src) + clone = tmp_path / "critical-repo" + subprocess.run(["git", "clone", "-q", str(src), str(clone)], check=True) + for i in range(20): + (clone / f"f{i}.txt").write_text(str(i), encoding="utf-8") + subprocess.run(["git", "add", "."], cwd=clone, check=True) + subprocess.run(["git", "commit", "-q", "-m", f"c{i}"], cwd=clone, check=True) + + payload = {"hook_event_name": "SessionStart", "session_id": "test-arw-critical"} + r = _run_hook(payload, tmp_path, watch_list_override=[str(clone)]) + assert r.returncode == 0 + out = json.loads(r.stdout.strip()) + ctx = out.get("additionalContext", "").lower() + assert "critical" in ctx or "20" in ctx + + +# --------------------------------------------------------------------------- +# Edge Cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_non_session_start_event_silent(self, tmp_path): + payload = {"hook_event_name": "UserPromptSubmit", "session_id": "x"} + r = _run_hook(payload, tmp_path, watch_list_override=[]) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_invalid_json_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path), "AHEAD_WARN_WATCH": ""} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="{not valid", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_empty_stdin_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path), "AHEAD_WARN_WATCH": ""} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 From c1a831478228b463f3f40eefd396fc488d46c2b6 Mon Sep 17 00:00:00 2001 From: "ai-engineering.at" Date: Fri, 1 May 2026 17:30:37 +0200 Subject: [PATCH 4/5] feat(hooks): add working-set-watch for unversioned-strategy-files mitigation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit H4 of 4 new hooks. SessionStart event. Scans configurable inboxes (default: ~/Downloads, ~/Documents/Downloads) for strategy/concept/decision files older than threshold. Emits advisory listing stale files with migration suggestion. Threshold: warn ≥7 days, critical ≥30 days. Capped at 20 files reported. Strategy-file patterns (filename, .md/.py/.yaml/.yml/.json only): Action_Plan* Compliance_* Lineage_* DEC-* M0* *_Concept_* Implementation: - is_strategy_file: pattern + extension whitelist - classify_age_days: ok/warn/critical - scan_inbox: per-inbox file walk + filter, returns list of findings - _resolve_inboxes: env WORKING_SET_INBOXES or defaults - _build_advisory: formatted output with severity icons + migration CTA Tests: 20 passed (filename patterns, age classification, real-fs fixtures with os.utime mtime control, edge cases). hooks.json: registered alongside ahead-of-remote-warning in SessionStart. Total hook-commands: 17 → 18. Refs: audit-2026-05-01/MASTER-FIX-EXECUTION (Action Plan v1.0 lived unversioned in Downloads/, migrated in Session A2) Plan: ~/.claude/plans/jaundder-plan-hashed-tiger.md Co-Authored-By: Claude Opus 4.7 (1M context) --- hooks/hooks.json | 5 + hooks/working-set-watch.py | 186 +++++++++++++++++++++++++++ tests/test_working_set_watch.py | 215 ++++++++++++++++++++++++++++++++ 3 files changed, 406 insertions(+) create mode 100644 hooks/working-set-watch.py create mode 100644 tests/test_working_set_watch.py diff --git a/hooks/hooks.json b/hooks/hooks.json index e4de261..1b6b35c 100644 --- a/hooks/hooks.json +++ b/hooks/hooks.json @@ -13,6 +13,11 @@ "type": "command", "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/ahead-of-remote-warning.py\"", "timeout": 30 + }, + { + "type": "command", + "command": "python3 \"${CLAUDE_PLUGIN_ROOT}/hooks/working-set-watch.py\"", + "timeout": 5 } ] } diff --git a/hooks/working-set-watch.py b/hooks/working-set-watch.py new file mode 100644 index 0000000..0384aa2 --- /dev/null +++ b/hooks/working-set-watch.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +"""Hook: working-set-watch (SessionStart) + +Mitigates "Source-Files in Downloads/ ohne Versionierung" pattern (audit: +Action Plan v1.0 + 4 DECs lebten unversioniert in Downloads/ bis zur +Migration in Session A2). + +On SessionStart, scans configurable inboxes (default: ~/Downloads, +~/Documents/Downloads if exists) for strategy/concept/decision files +older than threshold (warn ≥7 days, critical ≥30 days). Emits an advisory +listing stale files with migration suggestion. + +Strategy-file patterns (filename): + - Action_Plan* Compliance_* Lineage_* + - DEC-* M0* *_Concept_* + +Extension whitelist: .md .py .yaml .yml .json (no .png/.exe/.pdf). +""" + +from __future__ import annotations + +import json +import os +import re +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +from lib.state import SessionState + +HOOK_NAME = "working_set_watch" +WARN_AGE_DAYS = 7 +CRITICAL_AGE_DAYS = 30 +ALLOWED_EXTENSIONS = {".md", ".py", ".yaml", ".yml", ".json"} +MAX_FILES_REPORTED = 20 + +STRATEGY_PATTERNS = [ + re.compile(r"^Action_Plan", re.IGNORECASE), + re.compile(r"^DEC-\d+", re.IGNORECASE), + re.compile(r"_Concept_", re.IGNORECASE), + re.compile(r"^Compliance_", re.IGNORECASE), + re.compile(r"^Lineage_", re.IGNORECASE), + re.compile(r"^M\d{2}[_-]", re.IGNORECASE), +] + + +def is_strategy_file(name: str | None) -> bool: + """Return True if filename is a strategy/concept/decision file.""" + if not name: + return False + suffix = Path(name).suffix.lower() + if suffix not in ALLOWED_EXTENSIONS: + return False + return any(p.search(name) for p in STRATEGY_PATTERNS) + + +def classify_age_days(age_days: float) -> str: + """Return 'ok' | 'warn' | 'critical'.""" + if age_days >= CRITICAL_AGE_DAYS: + return "critical" + if age_days >= WARN_AGE_DAYS: + return "warn" + return "ok" + + +def scan_inbox(inbox: str) -> list[dict]: + """List stale strategy files in `inbox`. + + Returns list of {name, path, age_days, severity}, capped at MAX_FILES_REPORTED. + Skips fresh files, non-strategy files, dirs, missing inboxes. + """ + results: list[dict] = [] + base = Path(inbox) + if not base.is_dir(): + return results + now = time.time() + try: + entries = list(base.iterdir()) + except OSError: + return results + for path in entries: + if not path.is_file(): + continue + if not is_strategy_file(path.name): + continue + try: + mtime = path.stat().st_mtime + except OSError: + continue + age_days = (now - mtime) / 86400 + severity = classify_age_days(age_days) + if severity == "ok": + continue + results.append( + { + "name": path.name, + "path": str(path), + "age_days": int(age_days), + "severity": severity, + } + ) + if len(results) >= MAX_FILES_REPORTED: + break + return results + + +def _resolve_inboxes() -> list[str]: + """Resolve inbox paths from env or defaults.""" + env_value = os.environ.get("WORKING_SET_INBOXES", "").strip() + if env_value: + return [p.strip() for p in env_value.split(",") if p.strip()] + home = Path.home() + candidates = [ + home / "Downloads", + home / "Documents" / "Downloads", + ] + return [str(p) for p in candidates if p.is_dir()] + + +def _build_advisory(findings: list[dict]) -> str: + lines = [ + "⚠️ Strategy/concept files in Downloads/ are stale (working-set-watch):", + ] + has_critical = any(f["severity"] == "critical" for f in findings) + for f in findings: + marker = "🔴" if f["severity"] == "critical" else "🟡" + lines.append(f" {marker} {f['name']} ({f['age_days']} days, {f['severity']})") + if has_critical: + lines.append( + " → CRITICAL: ≥30 days unversioned. Migrate now to " + "`zeroth/decisions/` (DECs, Action Plans) or `zeroth/concepts/` " + "(Module specs, Compliance docs)." + ) + else: + lines.append( + " → Migrate to `zeroth/decisions/` or `zeroth/concepts/` to prevent " + "loss. Audit pattern: Action Plan v1.0 lived unversioned in Downloads/." + ) + lines.append(f"(Hook: working-set-watch. Threshold: warn≥{WARN_AGE_DAYS}d, critical≥{CRITICAL_AGE_DAYS}d.)") + return "\n".join(lines) + + +def main() -> None: + try: + raw = sys.stdin.read() + data = json.loads(raw) if raw.strip() else {} + except Exception: + sys.exit(0) + + if not isinstance(data, dict): + sys.exit(0) + if data.get("hook_event_name") != "SessionStart": + sys.exit(0) + + inboxes = _resolve_inboxes() + findings: list[dict] = [] + for inbox in inboxes: + findings.extend(scan_inbox(inbox)) + if len(findings) >= MAX_FILES_REPORTED: + findings = findings[:MAX_FILES_REPORTED] + break + + if not findings: + sys.exit(0) # Silent pass when inbox is clean + + advisory = _build_advisory(findings) + print(json.dumps({"additionalContext": advisory})) + + try: + session_id = data.get("session_id", "unknown") + state = SessionState(session_id) + ns = state.get(HOOK_NAME) or {} + ns["last_scan_at"] = time.time() + ns["stale_files_count"] = len(findings) + state.set(HOOK_NAME, ns) + state.save() + except Exception: + pass + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/tests/test_working_set_watch.py b/tests/test_working_set_watch.py new file mode 100644 index 0000000..55d9e5a --- /dev/null +++ b/tests/test_working_set_watch.py @@ -0,0 +1,215 @@ +"""Tests for hooks/working-set-watch.py — Working-Set ohne Versionierung Mitigation. + +SessionStart hook. Scans Downloads-style inboxes for strategy/concept/decision +files older than threshold (default: 7 days warn, 30 days critical), emits +advisory with migration suggestion (zeroth/decisions/ or zeroth/concepts/). +""" + +from __future__ import annotations + +import importlib.util +import json +import os +import subprocess +import sys +import time +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent +HOOK_FILE = REPO_ROOT / "hooks" / "working-set-watch.py" + +sys.path.insert(0, str(REPO_ROOT / "hooks")) +_spec = importlib.util.spec_from_file_location("working_set_watch", HOOK_FILE) +wsw = importlib.util.module_from_spec(_spec) +sys.modules["working_set_watch"] = wsw +_spec.loader.exec_module(wsw) + + +# --------------------------------------------------------------------------- +# Pure-Function: filename pattern matching +# --------------------------------------------------------------------------- + + +class TestIsStrategyFile: + def test_action_plan(self): + assert wsw.is_strategy_file("Action_Plan_v1.0_post-session.md") + assert wsw.is_strategy_file("Action_Plan_v2.md") + + def test_dec_records(self): + assert wsw.is_strategy_file("DEC-001-naming-collision.md") + assert wsw.is_strategy_file("DEC-042-foo.md") + + def test_concept_files(self): + assert wsw.is_strategy_file("Zeroth_Concept_Update_v2026-04.md") + assert wsw.is_strategy_file("Lineage_Engine_Concept_v0.1.md") + assert wsw.is_strategy_file("Compliance_Pilot_Annex_IV_v0.1.md") + + def test_module_specs(self): + assert wsw.is_strategy_file("M08_TTT-SI_Mini-Spec.md") + assert wsw.is_strategy_file("M01-architecture.md") + + def test_neutral_files_not_matched(self): + assert not wsw.is_strategy_file("README.md") + assert not wsw.is_strategy_file("notes.txt") + assert not wsw.is_strategy_file("photo.png") + assert not wsw.is_strategy_file("Setup-v1.exe") + assert not wsw.is_strategy_file("debug-log.json") + assert not wsw.is_strategy_file("ChatGPT Image.png") + + def test_extension_filter(self): + # Only .md / .py / .yaml / .yml / .json + assert wsw.is_strategy_file("Action_Plan.md") + assert wsw.is_strategy_file("DEC-001.yaml") + assert not wsw.is_strategy_file("Action_Plan.exe") + assert not wsw.is_strategy_file("DEC-001.png") + + +# --------------------------------------------------------------------------- +# Pure-Function: severity classification +# --------------------------------------------------------------------------- + + +class TestClassifyAge: + def test_fresh(self): + assert wsw.classify_age_days(0) == "ok" + assert wsw.classify_age_days(6) == "ok" + + def test_warn(self): + assert wsw.classify_age_days(7) == "warn" + assert wsw.classify_age_days(29) == "warn" + + def test_critical(self): + assert wsw.classify_age_days(30) == "critical" + assert wsw.classify_age_days(365) == "critical" + + +# --------------------------------------------------------------------------- +# scan_inbox — lists stale strategy files +# --------------------------------------------------------------------------- + + +class TestScanInbox: + def test_no_inbox_returns_empty(self, tmp_path): + assert wsw.scan_inbox(str(tmp_path / "nonexistent")) == [] + + def test_empty_inbox(self, tmp_path): + assert wsw.scan_inbox(str(tmp_path)) == [] + + def test_neutral_files_skipped(self, tmp_path): + # File matches neither pattern nor extension whitelist + (tmp_path / "Setup.exe").write_bytes(b"x") + (tmp_path / "photo.png").write_bytes(b"x") + # Old enough to be flagged if it matched + old_time = time.time() - (10 * 86400) + for f in tmp_path.iterdir(): + os.utime(f, (old_time, old_time)) + assert wsw.scan_inbox(str(tmp_path)) == [] + + def test_strategy_file_recent_skipped(self, tmp_path): + f = tmp_path / "Action_Plan_v1.0.md" + f.write_text("plan", encoding="utf-8") + # Default mtime = now → not stale + assert wsw.scan_inbox(str(tmp_path)) == [] + + def test_strategy_file_old_flagged(self, tmp_path): + f = tmp_path / "DEC-099-test.md" + f.write_text("decision", encoding="utf-8") + old_time = time.time() - (10 * 86400) # 10 days + os.utime(f, (old_time, old_time)) + results = wsw.scan_inbox(str(tmp_path)) + assert len(results) == 1 + assert results[0]["name"] == "DEC-099-test.md" + assert results[0]["age_days"] >= 9 + assert results[0]["severity"] == "warn" + + def test_critical_age_classified(self, tmp_path): + f = tmp_path / "Lineage_Concept_v2.md" + f.write_text("c", encoding="utf-8") + old_time = time.time() - (45 * 86400) # 45 days + os.utime(f, (old_time, old_time)) + results = wsw.scan_inbox(str(tmp_path)) + assert len(results) == 1 + assert results[0]["severity"] == "critical" + + +# --------------------------------------------------------------------------- +# Subprocess Integration +# --------------------------------------------------------------------------- + + +def _run_hook(payload: dict, tmp_path: Path, inbox_override=None) -> subprocess.CompletedProcess: + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path)} + if inbox_override is not None: + env["WORKING_SET_INBOXES"] = ",".join(inbox_override) + return subprocess.run( + [sys.executable, str(HOOK_FILE)], + input=json.dumps(payload), + capture_output=True, + text=True, + timeout=10, + env=env, + ) + + +class TestSessionStartIntegration: + def test_clean_inbox_silent(self, tmp_path): + inbox = tmp_path / "clean-inbox" + inbox.mkdir() + payload = {"hook_event_name": "SessionStart", "session_id": "test-wsw-clean"} + r = _run_hook(payload, tmp_path, inbox_override=[str(inbox)]) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_stale_strategy_file_emits_advisory(self, tmp_path): + inbox = tmp_path / "downloads" + inbox.mkdir() + f = inbox / "Action_Plan_v1.0.md" + f.write_text("plan", encoding="utf-8") + old_time = time.time() - (10 * 86400) + os.utime(f, (old_time, old_time)) + + payload = {"hook_event_name": "SessionStart", "session_id": "test-wsw-stale"} + r = _run_hook(payload, tmp_path, inbox_override=[str(inbox)]) + assert r.returncode == 0 + out = json.loads(r.stdout.strip()) + ctx = out.get("additionalContext", "") + assert "Action_Plan_v1.0.md" in ctx + assert "zeroth" in ctx.lower() or "migration" in ctx.lower() or "decisions" in ctx.lower() + + +# --------------------------------------------------------------------------- +# Edge Cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_non_session_start_event_silent(self, tmp_path): + payload = {"hook_event_name": "UserPromptSubmit", "session_id": "x"} + r = _run_hook(payload, tmp_path, inbox_override=[]) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_invalid_json_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path), "WORKING_SET_INBOXES": ""} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="{not valid", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 + assert r.stdout.strip() == "" + + def test_empty_stdin_exits_0(self, tmp_path): + env = {**os.environ, "CLAUDE_PLUGIN_DATA": str(tmp_path), "WORKING_SET_INBOXES": ""} + r = subprocess.run( + [sys.executable, str(HOOK_FILE)], + input="", + capture_output=True, + text=True, + timeout=10, + env=env, + ) + assert r.returncode == 0 From 09f428ab8ad617231f76357dfe0810e9b36a1a25 Mon Sep 17 00:00:00 2001 From: "ai-engineering.at" Date: Fri, 1 May 2026 17:33:01 +0200 Subject: [PATCH 5/5] =?UTF-8?q?chore(meta):=20bump=20to=20v4.4.0=20?= =?UTF-8?q?=E2=80=94=204=20new=20hooks=20for=20Opus=204.7=20friction=20mit?= =?UTF-8?q?igation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - plugin.json: 4.3.0 → 4.4.0, description updated to reflect 16 hooks - CHANGELOG.md: full v4.4.0 entry with hook details + judge findings + lib/state.py follow-ups (path-traversal, race-condition) - CLAUDE.md: hook-count 12 → 16, list of 4 new hooks - README.md: hook-count 12 → 16 Co-Authored-By: Claude Opus 4.7 (1M context) --- .claude-plugin/plugin.json | 4 +-- CHANGELOG.md | 65 ++++++++++++++++++++++++++++++++++++++ CLAUDE.md | 2 +- README.md | 2 +- 4 files changed, 69 insertions(+), 4 deletions(-) diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json index 192c321..f6674cd 100644 --- a/.claude-plugin/plugin.json +++ b/.claude-plugin/plugin.json @@ -1,7 +1,7 @@ { "name": "meta-skills", - "version": "4.3.0", - "description": "Enterprise Quality Engine — 16 skills, 17 commands, 12 hooks across 7 events. Opus 4.7 + T-scale + boundary-safe formatters + reproducible hardening evidence. 226-test coverage incl. hook layer (wrapper 94%, state 92%), lint-clean, pathlib-native. Centralized config + state. Adversarial review, CI/CD gates.", + "version": "4.4.0", + "description": "Enterprise Quality Engine — 16 skills, 17 commands, 16 hooks across 7 events. Opus 4.7 friction mitigation: false-positive-guard (Confidence-Drift), org-naming-pre-push (Wrong-Folder), ahead-of-remote-warning (Data-Loss), working-set-watch (Unversioned-Strategy). 346-test coverage incl. hook layer, lint-clean, pathlib-native. Centralized config + state. Adversarial review, CI/CD gates.", "author": { "name": "AI Engineering", "email": "kontakt@ai-engineering.at" diff --git a/CHANGELOG.md b/CHANGELOG.md index e02eca1..d80e265 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,70 @@ # Changelog +## v4.4.0 — 2026-05-01 + +Opus 4.7 friction mitigation — 4 new hooks addressing audit-2 trends +(+21% Wrong-Approach, +28% Buggy-Code, +22% Misunderstood-Request after +4.6→4.7 swap). Builds on the audit-2026-05-01 / Inkohärenz-Behebung work. + +### Added — 4 New Hooks (12 → 16, +4 hooks across 4 events) + +- **`hooks/false-positive-guard.py`** (UserPromptSubmit + PreToolUse Edit): + Detects Edit-tool invocations without bug-evidence in recent context. + UserPromptSubmit branch scans prompt for bug-keywords (DE+EN), stores + timestamp. PreToolUse Edit branch checks 10-min window; emits advisory + if no recent evidence. Mitigates 4.7's "false-positive bug invention" + pattern (audit-2 NEU). DoS-guard for 100KB+ inputs. 47 tests passed. + +- **`hooks/org-naming-pre-push.py`** (PreToolUse Bash, matcher: git push): + Reads cwd's .git/config, parses GitHub org from origin URL, classifies + against allowlist (AI-Engineering-at, LEEI1337, FoxLabs-ai). Warns on + typo-org `AI-Engineerings-at` (server-redirected, but local URLs need + `git remote set-url`). Default mode: advisory only. 40 tests passed. + +- **`hooks/ahead-of-remote-warning.py`** (SessionStart): + Iterates a watch-list (default: phantom-ai, nomos, zeroth, Playbook01, + wiki) and runs `git rev-list --count origin/..HEAD` (no fetch, + pure local). Emits advisory if any repo is ≥5 ahead, critical at ≥20. + Addresses nomos-97-unpushed pattern from today's audit. 13 tests passed. + +- **`hooks/working-set-watch.py`** (SessionStart): + Scans `~/Downloads/` and `~/Documents/Downloads/` for strategy files + (Action_Plan*, DEC-*, *_Concept_*, Compliance_*, Lineage_*, M0*) older + than 7 days (warn) / 30 days (critical). Emits migration advisory + pointing at `zeroth/decisions/` or `zeroth/concepts/`. Addresses the + "Action Plan v1.0 lived unversioned in Downloads/" pattern. 20 tests + passed. + +### Changed + +- `hooks/hooks.json`: 13 → 18 hook commands (4 new files registered). + H1 false-positive-guard registered twice (UserPromptSubmit + PreToolUse + Edit). H2-H4 each register once. +- `.claude-plugin/plugin.json`: version 4.3.0 → 4.4.0. + +### Tests + +- 120 new tests across 4 hooks (47+40+13+20). +- All hooks ruff-clean (format + check). +- Pattern-Compliance review (Judge A): 7/9, blocker fixed before commit. +- Security/Performance review (Judge B): 3 critical findings (path-traversal, + race-condition in lib/state.py — separately filed; DoS-guard added in H1). + +### Filed for Follow-up (out-of-scope, lib/state.py-wide) + +- Path-Traversal: session_id not validated, allows `../etc/passwd`-style + escape from STATE_DIR. Affects all hooks. Filed in ERPNext. +- Race-Condition: concurrent `state.save()` in different hooks may + produce last-write-wins data loss. Filed in ERPNext. + +### References + +- Audit reports: `Documents/audit-2026-05-01/` +- Master fix list: `audit-2026-05-01/MASTER-FIX-EXECUTION.md` +- Plan: `~/.claude/plans/jaundder-plan-hashed-tiger.md` + +--- + ## v4.3.0 — 2026-04-17/18 Hook-layer test coverage (+99 tests, 127→226) + session lessons + diff --git a/CLAUDE.md b/CLAUDE.md index 4b19c33..46d4328 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,7 +10,7 @@ P7 Context Recovery. - **16 Skills**: creator, design, dispatch, doc-updater, feedback, git-worktrees, harden, init, judgment-day, knowledge, refactor-loop, statusbar, systematic-debugging, tdd, triad-review, verify - **17 Commands**: /meta-audit, /meta-ci, /meta-create, /meta-design, /meta-discover, /meta-docs, /meta-feedback, /meta-harden, /meta-judgment, /meta-knowledge, /meta-loop, /meta-quality, /meta-snapshot, /meta-status, /meta-test, /meta-triad, /cancel-meta-loop - **6 Agents**: doc-auditor, doc-editor, 3x doc-scanner, session-analyst -- **12 Hooks** across 7 events: session-start, session-init, correction-detect, scope-tracker, approach-guard, exploration-first, token-audit, quality-gate, context-recovery, meta-loop-stop, session-stop, session-end +- **16 Hooks** across 7 events: session-start, session-init, correction-detect, scope-tracker, approach-guard, exploration-first, token-audit, quality-gate, context-recovery, meta-loop-stop, session-stop, session-end, **false-positive-guard** (4.7 confidence-drift), **org-naming-pre-push** (Wrong-Folder), **ahead-of-remote-warning** (Data-Loss), **working-set-watch** (Unversioned-Strategy) ## Quality System diff --git a/README.md b/README.md index 9db70f2..58c620d 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ claude plugins list | grep meta-skills meta-skills/ .claude-plugin/plugin.json # Plugin manifest (v4.0.0) hooks/ - hooks.json # 7 events, 12 hooks + hooks.json # 7 events, 16 hooks (v4.4.0) lib/config.py # Centralized settings (all tunable values) lib/services.py # Shared clients (Honcho, open-notebook, vault) lib/hook_wrapper.py # Shared hook utilities