Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ Environment variables set in `.claude/settings.json` under `"env"`:
| Variable | Recommended Value | Why |
|----------|-------------------|-----|
| `CLAUDE_CODE_AUTO_COMPACT_WINDOW` | `400000` | Prompt cache TTL is 5 minutes (not 1 hour). Larger conversations miss cache frequently, making each turn reprocess the full context at full cost. Compacting at 400k keeps conversations in the cache-friendly zone. ([anthropics/claude-code#45756](https://github.com/anthropics/claude-code/issues/45756#issuecomment-4231739206)) |
| `CLAUDE_CODE_DISABLE_ADAPTIVE_THINKING` | `1` | **Deprecated on Opus 4.7 (no-op).** Opus 4.7 removed the fixed-budget option; this variable has no effect on the current model. See the deprecation notice in `docs/PHILOSOPHY.md` under "Prompt Phrasing Does Not Replace Domain Knowledge" (Experiment 4). Users migrating from Opus 4.6 can leave this set; it is harmless but inactive. |
| `CLAUDE_CODE_DISABLE_ADAPTIVE_THINKING` | `1` | **Deprecated (no-op).** The fixed-budget option no longer exists; this variable has no effect. Safe to leave set harmless but inactive. |

**Context on `AUTO_COMPACT_WINDOW`:** Anthropic's prompt caching currently uses a 5-minute TTL. When conversations grow large and cache entries expire between turns, each API call re-processes the full conversation at uncached token prices. Even though Claude supports a 1M token context window, using the full window without cache hits is prohibitively expensive. Setting `AUTO_COMPACT_WINDOW=400000` triggers compaction earlier, keeping the active context within a size that cache hits can cover. Anthropic is aware of this issue and exploring improvements. Credit: [@bcherny](https://github.com/bcherny).

Expand Down
164 changes: 164 additions & 0 deletions hooks/instruction-compliance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#!/usr/bin/env python3
# hook-version: 1.0.0
"""PostToolUse Hook: Instruction Compliance Measurement

Fires after Agent tool dispatches to check whether MANDATORY instructions
(M01-M09 from ADR instruction-skip-rate-measurement) were followed.

Records compliance observations to learning.db for skip-rate dashboard.

Design Principles:
- Informational only (always exits 0, never blocks)
- Lightweight string-presence checks (<50ms)
- Multiple signal patterns per instruction for reduced false negatives
"""

import json
import os
import re
import sys
from pathlib import Path

# Add lib directory to path for imports
sys.path.insert(0, str(Path(__file__).parent / "lib"))

from hook_utils import empty_output, get_session_id, get_tool_output, get_tool_result
from learning_db_v2 import record_instruction_compliance_batch
from stdin_timeout import read_stdin

EVENT_NAME = "PostToolUse"

# ─── Instruction Definitions ─────────────────────────────────────

INSTRUCTIONS: dict[str, dict[str, str | list[re.Pattern[str]]]] = {
"M01": {
"name": "Phase Banners",
"patterns": [
re.compile(r"##\s*Phase\s+\d", re.IGNORECASE),
re.compile(r"Phase\s+\d\s*:", re.IGNORECASE),
],
},
"M03": {
"name": "Routing Decision",
"patterns": [
re.compile(r"^={3,}\s*$", re.MULTILINE),
re.compile(r"(?:^|\s)ROUTING\s*:", re.IGNORECASE | re.MULTILINE),
re.compile(r"Selected\s*:", re.IGNORECASE),
],
},
"M04": {
"name": "Reference Loading",
"patterns": [
re.compile(r"Reference\s+Loading", re.IGNORECASE),
re.compile(r"reference.*table", re.IGNORECASE),
re.compile(r"Before\s+starting\s+work", re.IGNORECASE),
re.compile(r"Load\s+EVERY\s+reference\s+file", re.IGNORECASE),
],
},
"M05": {
"name": "Completeness",
"patterns": [
re.compile(r"deliver\s+the\s+finished\s+product", re.IGNORECASE),
re.compile(r"ship\s+the\s+complete\s+thing", re.IGNORECASE),
re.compile(r"Ship\s+the\s+complete", re.IGNORECASE),
re.compile(r"Deliver\s+the\s+finished", re.IGNORECASE),
],
},
"M06": {
"name": "Density Standard",
"patterns": [
re.compile(r"write\s+dense", re.IGNORECASE),
re.compile(r"high\s+fidelity,?\s+minimum\s+words", re.IGNORECASE),
],
},
}


def check_compliance(text: str) -> dict[str, bool]:
"""Check agent output against all instrumented instructions.

Args:
text: Combined agent prompt and output text to scan.

Returns:
Dict mapping instruction ID to compliance boolean.
"""
results: dict[str, bool] = {}
for instr_id, instr in INSTRUCTIONS.items():
patterns: list[re.Pattern[str]] = instr["patterns"] # type: ignore[assignment]
compliant = any(p.search(text) for p in patterns)
results[instr_id] = compliant
return results


def record_compliance_batch(
results: dict[str, bool],
session_id: str,
) -> None:
"""Record all instruction compliance observations in one transaction.

Args:
results: Dict mapping instruction ID to compliance boolean.
session_id: Current session identifier.
"""
records = [(instr_id, compliant, session_id) for instr_id, compliant in results.items()]
record_instruction_compliance_batch(records)


def main() -> None:
"""Process PostToolUse events for Agent instruction compliance.

Flow:
1. Read stdin JSON
2. Extract agent output text
3. Check each instruction for compliance signals
4. Record observations to learning.db
5. Exit silently (informational, never blocks)
"""
try:
event_data = read_stdin(timeout=2)
if not event_data:
empty_output(EVENT_NAME).print_and_exit()

event = json.loads(event_data)
session_id = event.get("session_id") or get_session_id()

# Extract agent output text
tool_result = get_tool_result(event)
if isinstance(tool_result, dict):
output_text = get_tool_output(tool_result)
elif isinstance(tool_result, str):
output_text = tool_result
else:
output_text = ""

# Also check tool_input (agent prompt) for M04/M05/M06
tool_input = event.get("tool_input", event.get("input", ""))
if isinstance(tool_input, dict):
tool_input = json.dumps(tool_input)
elif not isinstance(tool_input, str):
tool_input = ""

combined_text = f"{tool_input}\n{output_text}"

if not combined_text.strip():
empty_output(EVENT_NAME).print_and_exit()

# Check and record compliance for all instructions in one transaction
results = check_compliance(combined_text)
record_compliance_batch(results, session_id)

empty_output(EVENT_NAME).print_and_exit()

except Exception as e:
if os.environ.get("CLAUDE_HOOKS_DEBUG"):
import traceback

print(f"[instruction-compliance] HOOK-ERROR: {type(e).__name__}: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
finally:
sys.exit(0) # Never block


if __name__ == "__main__":
main()
112 changes: 111 additions & 1 deletion hooks/lib/learning_db_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

_DEFAULT_DB_DIR = Path.home() / ".claude" / "learning"

_CURRENT_SCHEMA_VERSION = 3
_CURRENT_SCHEMA_VERSION = 4

CATEGORY_DEFAULTS = {
"error": 0.55,
Expand Down Expand Up @@ -152,6 +152,27 @@ def _run_migrations(conn: sqlite3.Connection) -> None:
"VALUES (3, 'add timestamp and cohort indexes for query performance')"
)

if current < 4:
# v3 -> v4: Add instruction_compliance table for per-observation tracking
conn.execute(
"""
CREATE TABLE IF NOT EXISTS instruction_compliance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instruction_id TEXT NOT NULL,
compliant BOOLEAN NOT NULL,
session_id TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_ic_instruction_id ON instruction_compliance(instruction_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ic_timestamp ON instruction_compliance(timestamp)")
conn.execute("PRAGMA user_version = 4")
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version, description) "
"VALUES (4, 'add instruction_compliance table for per-observation tracking')"
)

conn.commit()


Expand Down Expand Up @@ -331,6 +352,17 @@ def _migrate_fts(pre_migration_version: int = 0) -> None:
CREATE INDEX IF NOT EXISTS idx_gov_severity ON governance_events(severity);
CREATE INDEX IF NOT EXISTS idx_gov_created ON governance_events(created_at);

CREATE TABLE IF NOT EXISTS instruction_compliance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instruction_id TEXT NOT NULL,
compliant BOOLEAN NOT NULL,
session_id TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);

CREATE INDEX IF NOT EXISTS idx_ic_instruction_id ON instruction_compliance(instruction_id);
CREATE INDEX IF NOT EXISTS idx_ic_timestamp ON instruction_compliance(timestamp);

CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now')),
Expand Down Expand Up @@ -755,6 +787,84 @@ def record_activation(
record_activations([(topic, key)], session_id, outcome)


def record_instruction_compliance(
instruction_id: str,
compliant: bool,
session_id: str | None = None,
) -> None:
"""Record a single instruction compliance observation.

Each call INSERTs a new row — observations accumulate, never overwrite.
For multiple observations, prefer record_instruction_compliance_batch().

Args:
instruction_id: Instruction identifier (e.g. "M01").
compliant: Whether the instruction was followed.
session_id: Current session identifier.
"""
record_instruction_compliance_batch([(instruction_id, compliant, session_id)])


def record_instruction_compliance_batch(
records: list[tuple[str, bool, str | None]],
) -> None:
"""Record multiple instruction compliance observations in one transaction.

Args:
records: List of (instruction_id, compliant, session_id) tuples.
"""
if not records:
return
init_db()
now = datetime.now().isoformat()
rows = [(instr_id, compliant, sid, now) for instr_id, compliant, sid in records]
with get_connection() as conn:
conn.executemany(
"INSERT INTO instruction_compliance (instruction_id, compliant, session_id, timestamp) VALUES (?, ?, ?, ?)",
rows,
)
conn.commit()


def query_instruction_skip_rate(days: int = 30) -> list[dict]:
"""Query instruction compliance skip rates from the dedicated table.

Args:
days: Look back window in days (default 30).

Returns:
List of dicts with instruction_id, observations, non_compliant, skip_rate.
"""
init_db()
with get_connection() as conn:
rows = conn.execute(
"""
SELECT instruction_id,
COUNT(*) as observations,
SUM(CASE WHEN NOT compliant THEN 1 ELSE 0 END) as non_compliant
FROM instruction_compliance
WHERE timestamp > datetime('now', ?)
GROUP BY instruction_id
ORDER BY instruction_id
""",
(f"-{days} days",),
).fetchall()
results = []
for row in rows:
obs = row["observations"]
nc = row["non_compliant"]
skip_rate = (nc / obs * 100) if obs > 0 else 0.0
results.append(
{
"instruction_id": row["instruction_id"],
"observations": obs,
"non_compliant": nc,
"skip_rate": round(skip_rate, 1),
}
)
return results


def boost_confidence(topic: str, key: str, delta: float = 0.10) -> float:
"""Boost confidence for an entry. Returns new confidence."""
init_db()
Expand Down
Loading