-
Notifications
You must be signed in to change notification settings - Fork 0
safety: implement Phase 0 (F1-F5) and Phase 1 (safety layer + Stream 1) #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| """Layer 3 — Safety & Audit Layer.""" | ||
|
|
||
| from .backup_manager import BackupError, BackupManager | ||
| from .write_blocker import BackupToken, WriteBlocker | ||
|
|
||
| __all__ = ["WriteBlocker", "BackupToken"] | ||
| __all__ = ["WriteBlocker", "BackupToken", "BackupManager", "BackupError"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,23 +3,26 @@ | |
|
|
||
| Implements Theorem 3.3 (Log Tamper Evidence): | ||
|
|
||
| The audit log is hash-chained: each entry eₙ includes | ||
| H(eₙ₋₁ ‖ payload), making retrospective alteration detectable. | ||
| The audit log is HMAC-keyed and hash-chained: each entry eₙ includes | ||
| H(eₙ₋₁ ‖ payload) and mac_n = HMAC_K(entry_hash_n), making | ||
| retrospective alteration detectable even by an attacker who knows | ||
| the chain structure but not the session key K. | ||
|
|
||
| Each session produces a self-contained, append-only log that records: | ||
| - Authority declaration (device owner / authorised representative / | ||
| law enforcement / corporate IT) | ||
| - Every state machine transition | ||
| - Every state machine transition (including rejected transitions) | ||
| - All write operations and their BackupTokens | ||
| - The full SCPRSolution (optimal reasons + dual weights) | ||
| - Any uncovered signals flagged for operator review | ||
|
|
||
| Status: STUB — hash chaining and DFXML export pending implementation. | ||
| Status: hash chaining and HMAC keying implemented. DFXML export pending. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import hashlib | ||
| import hmac as _hmac | ||
| import json | ||
| import time | ||
| from dataclasses import dataclass, field | ||
|
|
@@ -42,6 +45,7 @@ class LogEntry: | |
| payload: dict[str, Any] | ||
| prev_hash: str | ||
| entry_hash: str = field(init=False) | ||
| mac: str = field(init=False, default="") | ||
|
|
||
| def __post_init__(self) -> None: | ||
| raw = json.dumps( | ||
|
|
@@ -56,20 +60,33 @@ def __post_init__(self) -> None: | |
| ) | ||
| self.entry_hash = hashlib.sha256(raw.encode()).hexdigest() | ||
|
|
||
| def set_mac(self, session_key: bytes) -> None: | ||
| """Compute and set HMAC_K(entry_hash). Called by AuditLog._append().""" | ||
| self.mac = _hmac.new( | ||
| session_key, self.entry_hash.encode(), hashlib.sha256 | ||
| ).hexdigest() | ||
|
|
||
|
|
||
| class AuditLog: | ||
| """ | ||
| Tamper-evident, append-only audit log for a recovery session. | ||
|
|
||
| The chain can be verified at any time with verify_chain(). | ||
| Any modification to a historical entry breaks all subsequent hashes. | ||
| Each entry is SHA-256 hash-chained and HMAC'd under the session key K | ||
| (Definition 3.1). verify_chain() checks both the hash linkage and the | ||
| per-entry MAC, so the chain is not regeneratable without K. | ||
| """ | ||
|
|
||
| GENESIS_HASH = "0" * 64 | ||
|
|
||
| def __init__(self, session_id: str, authority: Authority) -> None: | ||
| def __init__( | ||
| self, | ||
| session_id: str, | ||
| authority: Authority, | ||
| session_key: bytes = b"", | ||
| ) -> None: | ||
| self.session_id = session_id | ||
| self.authority = authority | ||
| self._session_key = session_key | ||
| self._entries: list[LogEntry] = [] | ||
| self._append( | ||
| "SESSION_OPEN", | ||
|
|
@@ -88,12 +105,19 @@ def _append(self, event_type: str, payload: dict[str, Any]) -> LogEntry: | |
| payload=payload, | ||
| prev_hash=prev, | ||
| ) | ||
| entry.set_mac(self._session_key) | ||
| self._entries.append(entry) | ||
| return entry | ||
|
|
||
| def log_state_transition(self, from_state: str, to_state: str) -> None: | ||
| self._append("STATE_TRANSITION", {"from": from_state, "to": to_state}) | ||
|
|
||
| def log_rejected_transition(self, from_state: str, to_state: str) -> None: | ||
| """Log an attempted transition that was rejected by the state machine.""" | ||
| self._append( | ||
| "REJECTED_TRANSITION", {"from": from_state, "to": to_state} | ||
| ) | ||
|
|
||
| def log_diagnosis(self, solution_repr: str, uncovered_count: int) -> None: | ||
| self._append( | ||
| "SCPR_DIAGNOSIS", | ||
|
|
@@ -114,12 +138,19 @@ def log_write(self, device: str, backup_sha256: str, action: str) -> None: | |
| ) | ||
|
|
||
| def verify_chain(self) -> bool: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider moving hash and MAC computation into LogEntry so AuditLog.verify_chain only orchestrates chain verification rather than duplicating low-level logic. You can reduce the complexity and duplication by pushing the hash/MAC logic into 1. Centralize hash computation on
|
||
| """Return True iff the entire chain is unmodified.""" | ||
| """ | ||
| Return True iff the entire chain is unmodified. | ||
|
|
||
| Checks per entry (in order): | ||
| 1. Hash-chain linkage: entry.prev_hash == previous entry's entry_hash. | ||
| 2. Payload integrity: recomputed entry_hash matches stored entry_hash. | ||
| 3. MAC integrity: HMAC_K(entry_hash) matches stored entry.mac. | ||
| """ | ||
| for i, entry in enumerate(self._entries): | ||
| prev = self._entries[i - 1].entry_hash if i > 0 else self.GENESIS_HASH | ||
| if entry.prev_hash != prev: | ||
| return False | ||
| # Recompute hash and compare | ||
|
|
||
| raw = json.dumps( | ||
| { | ||
| "seq": entry.sequence, | ||
|
|
@@ -132,6 +163,13 @@ def verify_chain(self) -> bool: | |
| ) | ||
| if hashlib.sha256(raw.encode()).hexdigest() != entry.entry_hash: | ||
| return False | ||
|
|
||
| expected_mac = _hmac.new( | ||
| self._session_key, entry.entry_hash.encode(), hashlib.sha256 | ||
| ).hexdigest() | ||
| if not _hmac.compare_digest(expected_mac, entry.mac): | ||
| return False | ||
|
|
||
| return True | ||
|
|
||
| def export_json(self) -> str: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): abort() may now raise InvalidTransitionError when context is already ABORTED.
With the stricter VALID_TRANSITIONS graph, calling abort() when the session is already ABORTED will now raise InvalidTransitionError (since ABORTED has no successors), which is a behavior change from an idempotent abort and could surface unexpected exceptions in shutdown/error paths. Consider either short‑circuiting in abort() (skip transition if state is already ABORTED) or adding an ABORTED → ABORTED self‑loop if you want to preserve idempotency.