safety: implement Phase 0 (F1-F5) and Phase 1 (safety layer + Stream 1)#1
Conversation
Phase 0 — closes all five PRD findings (F1–F5):
F1 (Critical) — BackupToken HMAC + BackupManager:
- BackupToken now carries hmac = HMAC_K(session_id ∥ device_path ∥
backup_sha256 ∥ timestamp) per Definition 3.2.
- New BackupManager class issues and registers tokens with WriteBlocker.
- write_gate() rejects: device-path mismatch, unregistered tokens, and
HMAC mismatch — a structurally valid but never-issued token is rejected.
- _issued_tokens is now populated (was declared but never used).
F2 (Critical) — Orchestration state machine:
- VALID_TRANSITIONS enforces the documented graph; INIT → EXECUTE is
now impossible; AUTH failure → DETECT is a first-class edge.
- InvalidTransitionError / MissingBackupTokenError raised on violation.
- Rejected transitions are logged to the audit log before raising
(security-relevant event per spec §6).
- SessionContext generates a 32-byte session_key used by AuditLog and
exposed for BackupManager / WriteBlocker.
F3 (High) — HMAC keying in AuditLog:
- AuditLog accepts session_key (default b"" for backward compat).
- Each LogEntry carries mac = HMAC_K(entry_hash).
- verify_chain() checks hash-chain linkage, payload integrity, and MAC
— the chain is no longer regeneratable without session key K.
F4 (Medium-High) — LUKS2 plugin import:
- from ...safety (3 levels, ImportError) → from ..safety (correct).
- from .._base (wrong depth) → from . (direct plugin package import).
F5 (Medium) — AuthToken docstring:
- Removed false "Zeroed via ctypes.memset" claim; replaced with honest
status note and Phase 1 deferral reference.
Phase 1 — safety layer completion + Stream 1:
- BackupManager.create_backup_from_device(): real ddrescue-backed
backup path; SHA-256 computed after write completes (TOCTOU-safe).
- safety/credentials.py: SecureBuffer with mlock() + ctypes.memset
zeroing for credential buffers per spec §7.2.
- stream1/generator.py: FDEProfileGenerator produces 150 synthetic
SCPR instances (≥140 per spec §14.4) covering 15 signals × 15
reasons across single-fault, named-scenario, two-reason combination,
and random multi-fault pools. Deterministically seeded; all feasible;
SCPR-engine compatible.
Tests: 111 passing (was 65); safety-layer modules now have non-zero
coverage (write_blocker 100%, audit_log 98%, orchestration 87%).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Reviewer's GuideImplements Phase 0 safety fixes (F1–F5) and Phase 1 safety-layer features by hardening the orchestration state machine, adding HMAC-backed audit logging and backup tokens wired through a new BackupManager/WriteBlocker flow, introducing mlock-backed credential buffers, and adding a synthetic FDE profile generator with comprehensive tests and plugin import fixes. Sequence diagram for HMAC-signed backup and write gating flowsequenceDiagram
participant SessionContext
participant BackupManager
participant WriteBlocker
participant Plugin
Note over SessionContext,WriteBlocker: SessionContext creates WriteBlocker(session_key) and BackupManager(write_blocker, session_key, session_id)
SessionContext->>BackupManager: create_backup_from_device(device_path, backup_dest)
activate BackupManager
BackupManager->>BackupManager: create_backup(device_path, backup_sha256)
BackupManager->>WriteBlocker: _register_token(token)
deactivate BackupManager
Note over BackupManager,WriteBlocker: BackupToken.hmac = _compute_token_hmac(session_key, session_id, device_path, backup_sha256, timestamp)
Plugin->>WriteBlocker: write_gate(device_path, token)
activate WriteBlocker
WriteBlocker->>WriteBlocker: verify device_path match
WriteBlocker->>WriteBlocker: lookup _issued_tokens[device_path]
WriteBlocker->>WriteBlocker: recompute HMAC via _compute_token_hmac
WriteBlocker-->>Plugin: write permitted or PermissionError
deactivate WriteBlocker
Plugin->>Plugin: execute_recovery(device_path)
State diagram for updated orchestration SessionState machinestateDiagram-v2
[*] --> INIT
INIT --> ENUMERATE
INIT --> ABORTED
ENUMERATE --> DETECT
ENUMERATE --> ABORTED
DETECT --> DIAGNOSE
DETECT --> ABORTED
DIAGNOSE --> AUTH
DIAGNOSE --> ABORTED
AUTH --> SELECT
AUTH --> DETECT
AUTH --> ABORTED
SELECT --> CONFIRM
SELECT --> ABORTED
CONFIRM --> EXECUTE
CONFIRM --> ABORTED
EXECUTE --> REPORT
EXECUTE --> ABORTED
REPORT --> ABORTED
ABORTED --> [*]
note right of EXECUTE
Transition to EXECUTE
requires backup_token
or MissingBackupTokenError
is raised
end note
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 6 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="src/cipherrescue/orchestration/__init__.py" line_range="158" />
<code_context>
+ logger.warning(
+ "Session %s aborted: %s", self.context.session_id, reason
+ )
self.context.transition(SessionState.ABORTED)
</code_context>
<issue_to_address>
**issue (bug_risk):** abort() may now raise InvalidTransitionError when context is already ABORTED.
With the stricter VALID_TRANSITIONS graph, calling abort() when the session is already ABORTED will now raise InvalidTransitionError (since ABORTED has no successors), which is a behavior change from an idempotent abort and could surface unexpected exceptions in shutdown/error paths. Consider either short‑circuiting in abort() (skip transition if state is already ABORTED) or adding an ABORTED → ABORTED self‑loop if you want to preserve idempotency.
</issue_to_address>
### Comment 2
<location path="src/cipherrescue/safety/credentials.py" line_range="74-77" />
<code_context>
+ self._length,
+ )
+
+ @property
+ def value(self) -> bytes:
+ """Return the current buffer contents as bytes."""
+ return bytes(self._buf)
+
+ def zero(self) -> None:
</code_context>
<issue_to_address>
**issue (bug_risk):** SecureBuffer.value and lifecycle semantics don’t match the documented behavior.
The docstring says that after close()/__exit__, `buf.value` should return `b''` and `is_zeroed` should be True, but `zero()` only overwrites with NUL bytes and keeps the original length, so `value` will be NULs, not empty. If callers use `b''` as a signal, this discrepancy can cause subtle bugs. Please either make `zero()` also clear the buffer length (e.g. `self._buf[:] = b''`), or have `value` return `b''` whenever `is_zeroed` is True so behavior matches the docs.
</issue_to_address>
### Comment 3
<location path="tests/unit/test_audit_log.py" line_range="67" />
<code_context>
assert seqs == list(range(len(seqs)))
+
+
+class TestAuditLogHMAC:
+ """P0-3: HMAC keying makes the chain non-regeneratable without session key K."""
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for isolated MAC tampering and the default session_key=b"" case
The new HMAC tests cover the main flows well. Two additional cases would strengthen coverage:
1. **MAC-only tampering:** Change `entry.mac` on an otherwise valid chain and assert `verify_chain()` returns `False`.
2. **Default `session_key=b""`:** Add a test that uses the default key, logs an event, asserts the entry still has a `mac` field, and that `verify_chain()` is `True` to guard backward compatibility for the default key behavior.
Suggested implementation:
```python
class TestAuditLogHMAC:
"""P0-3: HMAC keying makes the chain non-regeneratable without session key K."""
def test_mac_only_tampering_invalidates_chain(self):
audit_log = log()
audit_log.log_event("event-1")
# Sanity check: untouched chain should verify
assert audit_log.verify_chain()
# Tamper with MAC of the first entry while keeping the rest valid
entry = audit_log.entries[0]
original_mac = entry.mac
if isinstance(original_mac, bytes):
tampered_mac = bytes(~b & 0xFF for b in original_mac)
else:
# Assume hex/string-like MAC
tampered_mac = (
original_mac[:-1] + ("0" if original_mac[-1] != "0" else "1")
)
entry.mac = tampered_mac
# MAC-only tampering should break the chain verification
assert not audit_log.verify_chain()
def test_default_session_key_uses_mac_and_verifies(self):
# log() uses the default session_key=b"" by construction
audit_log = log()
audit_log.log_event("event-1")
exported = json.loads(audit_log.export_json())
assert "mac" in exported[0]
# Default key must still produce a verifiable chain
assert audit_log.verify_chain()
```
If your `AuditLog` API differs from what is assumed here, you will need to adjust:
1. Replace `audit_log.log_event("event-1")` with whatever method you currently use in tests to append a single log entry.
2. Replace `audit_log.entries[0]` and `entry.mac` access with the actual way entries and their MACs are exposed in your implementation.
3. Replace `audit_log.verify_chain()` with the correct verification function if it has a different name or is a module-level helper.
4. If `export_json()` returns a dict with a different structure (e.g., `{"entries": [...]}`), update `exported[0]` accordingly (e.g., `exported["entries"][0]`).
</issue_to_address>
### Comment 4
<location path="src/cipherrescue/safety/write_blocker.py" line_range="49" />
<code_context>
+ hmac: str
+
+
+def _compute_token_hmac(
+ session_key: bytes,
+ session_id: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider encapsulating HMAC handling in the BackupToken API and splitting write_gate into small helper checks to keep the control flow simpler and easier to follow.
You can simplify the HMAC + registration flow by:
1. Encapsulating the MAC shape in `BackupToken` (or a tiny helper) so `WriteBlocker` and `BackupManager` don’t both need to know the exact fields/order.
2. Splitting `write_gate` into small, linear checks so it’s easier to scan and reason about.
### 1. Move HMAC logic behind a `BackupToken` API
Instead of `_compute_token_hmac(...)` plus recomposition in `write_gate`, put the construction and verification in `BackupToken` (or a `BackupTokenSigner`). This removes duplication and the need for callers to know which fields are in the MAC.
```python
@dataclass(frozen=True)
class BackupToken:
device_path: str
backup_sha256: str
timestamp: float
session_id: str
hmac: str
def _message(self) -> bytes:
# single source of truth for HMAC shape
return f"{self.session_id}|{self.device_path}|{self.backup_sha256}|{self.timestamp}".encode()
def sign(self, session_key: bytes) -> "BackupToken":
"""Return a new token with HMAC computed under session_key."""
mac = _hmac.new(session_key, self._message(), hashlib.sha256).hexdigest()
return dataclasses.replace(self, hmac=mac)
def verify(self, session_key: bytes) -> None:
"""Raise PermissionError if HMAC is invalid."""
expected = _hmac.new(session_key, self._message(), hashlib.sha256).hexdigest()
if not _hmac.compare_digest(expected, self.hmac):
raise PermissionError(
"BackupToken HMAC verification failed. Token may be forged or use a different session key."
)
```
`BackupManager` then only calls `token.sign(session_key)`; `WriteBlocker` only calls `token.verify(session_key)` and doesn’t need `_compute_token_hmac` at all.
### 2. Make `write_gate` a linear orchestration method
With HMAC hidden behind `BackupToken.verify`, `write_gate` can delegate to small helpers for each concern: path, registration, cryptographic verification.
```python
class WriteBlocker:
def __init__(self, session_key: bytes) -> None:
self._session_key = session_key
self._issued_tokens: dict[str, BackupToken] = {}
def _register_token(self, token: BackupToken) -> None:
self._issued_tokens[token.device_path] = token
def _check_device_path(self, device_path: str, token: BackupToken) -> None:
if token.device_path != device_path:
raise ValueError(
f"Token device {token.device_path!r} does not match "
f"target device {device_path!r}."
)
def _check_registered(self, device_path: str, token: BackupToken) -> None:
registered = self._issued_tokens.get(device_path)
if registered is None or registered != token:
raise PermissionError(
f"No registered backup token for {device_path!r}. "
"BackupManager.create_backup() must be called first."
)
def write_gate(self, device_path: str, token: BackupToken) -> None:
self._check_device_path(device_path, token)
self._check_registered(device_path, token)
token.verify(self._session_key)
logger.info(
"WriteBlocker: write permitted to %s (backup=%s)",
device_path,
token.backup_sha256[:16] + "...",
)
```
This keeps all existing behavior (path check, registration requirement, HMAC verification) but makes:
- HMAC shape centralized and reusable.
- `write_gate` small and linear, with clearly separated responsibilities.
</issue_to_address>
### Comment 5
<location path="src/cipherrescue/safety/backup_manager.py" line_range="22" />
<code_context>
+import time
+from pathlib import Path
+
+from .write_blocker import BackupToken, WriteBlocker, _compute_token_hmac
+
+logger = logging.getLogger(__name__)
</code_context>
<issue_to_address>
**issue (complexity):** Consider encapsulating token signing and registration behind public factory and registration methods to decouple BackupManager from WriteBlocker’s internal details and HMAC wiring.
You can reduce the coupling here by pushing the HMAC construction and registration into public, higher‑level APIs, so `BackupManager` stops depending on private details.
### 1. Encapsulate HMAC construction in `BackupToken`
Instead of calling `_compute_token_hmac` and manually wiring fields, add a factory on `BackupToken` that handles the field ordering and HMAC:
```python
# write_blocker.py
class BackupToken:
# existing fields ...
@classmethod
def create_signed(
cls,
*,
session_key: bytes,
session_id: str,
device_path: str,
backup_sha256: str,
timestamp: float | None = None,
) -> "BackupToken":
if timestamp is None:
timestamp = time.time()
mac = _compute_token_hmac(
session_key,
session_id,
device_path,
backup_sha256,
timestamp,
)
return cls(
device_path=device_path,
backup_sha256=backup_sha256,
timestamp=timestamp,
session_id=session_id,
hmac=mac,
)
```
Then `BackupManager.create_backup` becomes:
```python
# backup_manager.py
from .write_blocker import BackupToken, WriteBlocker
def create_backup(self, device_path: str, backup_sha256: str) -> BackupToken:
token = BackupToken.create_signed(
session_key=self._session_key,
session_id=self._session_id,
device_path=device_path,
backup_sha256=backup_sha256,
)
self._wb.register_token(token)
logger.info(
"BackupManager: token issued for %s (sha256=%s...)",
device_path,
backup_sha256[:16],
)
return token
```
This removes the duplication of the HMAC field ordering from `BackupManager`; any change to the token format is confined to `BackupToken.create_signed` and `_compute_token_hmac`.
### 2. Make token registration a public API on `WriteBlocker`
Expose a small public method instead of calling the private `_register_token`:
```python
# write_blocker.py
class WriteBlocker:
# existing methods ...
def register_token(self, token: BackupToken) -> None:
self._register_token(token)
def _register_token(self, token: BackupToken) -> None:
# existing implementation
...
```
And update `BackupManager` to use it (as shown above):
```python
self._wb.register_token(token)
```
These two changes keep all behavior intact while:
- Removing `BackupManager`’s knowledge of the HMAC concatenation scheme.
- Eliminating cross‑module access to a private method.
- Making the token lifecycle (`create_signed` → `register_token`) explicit and easier to maintain.
</issue_to_address>
### Comment 6
<location path="src/cipherrescue/safety/audit_log.py" line_range="140" />
<code_context>
)
def verify_chain(self) -> bool:
- """Return True iff the entire chain is unmodified."""
+ """
</code_context>
<issue_to_address>
**issue (complexity):** Consider moving hash and MAC computation into LogEntry so AuditLog.verify_chain only orchestrates chain verification rather than duplicating low-level logic.
You can reduce the complexity and duplication by pushing the hash/MAC logic into `LogEntry` and letting `AuditLog.verify_chain` delegate to it.
### 1. Centralize hash computation on `LogEntry`
```python
@dataclass
class LogEntry:
sequence: int
timestamp: float
event_type: str
payload: dict[str, Any]
prev_hash: str
entry_hash: str = field(init=False)
mac: str = field(init=False, default="")
def _raw_json(self) -> str:
return json.dumps(
{
"seq": self.sequence,
"ts": self.timestamp,
"type": self.event_type,
"payload": self.payload,
"prev": self.prev_hash,
},
sort_keys=True,
)
def recompute_hash(self) -> str:
return hashlib.sha256(self._raw_json().encode()).hexdigest()
def __post_init__(self) -> None:
self.entry_hash = self.recompute_hash()
```
### 2. Encapsulate MAC computation/verification in `LogEntry`
```python
def set_mac(self, session_key: bytes) -> None:
self.mac = _hmac.new(
session_key, self.entry_hash.encode(), hashlib.sha256
).hexdigest()
def verify_mac(self, session_key: bytes) -> bool:
expected = _hmac.new(
session_key, self.entry_hash.encode(), hashlib.sha256
).hexdigest()
return _hmac.compare_digest(expected, self.mac)
```
### 3. Simplify `AuditLog.verify_chain`
```python
def verify_chain(self) -> bool:
"""Return True iff the entire chain is unmodified."""
for i, entry in enumerate(self._entries):
prev = self._entries[i - 1].entry_hash if i > 0 else self.GENESIS_HASH
if entry.prev_hash != prev:
return False
if entry.recompute_hash() != entry.entry_hash:
return False
if not entry.verify_mac(self._session_key):
return False
return True
```
This keeps all existing behavior (same JSON structure, same hash/MAC algorithms) but removes duplicated logic and makes `verify_chain` focus purely on chain semantics.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| logger.warning( | ||
| "Session %s aborted: %s", self.context.session_id, reason | ||
| ) | ||
| self.context.transition(SessionState.ABORTED) |
There was a problem hiding this comment.
issue (bug_risk): abort() may now raise InvalidTransitionError when context is already ABORTED.
With the stricter VALID_TRANSITIONS graph, calling abort() when the session is already ABORTED will now raise InvalidTransitionError (since ABORTED has no successors), which is a behavior change from an idempotent abort and could surface unexpected exceptions in shutdown/error paths. Consider either short‑circuiting in abort() (skip transition if state is already ABORTED) or adding an ABORTED → ABORTED self‑loop if you want to preserve idempotency.
| @property | ||
| def value(self) -> bytes: | ||
| """Return the current buffer contents as bytes.""" | ||
| return bytes(self._buf) |
There was a problem hiding this comment.
issue (bug_risk): SecureBuffer.value and lifecycle semantics don’t match the documented behavior.
The docstring says that after close()/exit, buf.value should return b'' and is_zeroed should be True, but zero() only overwrites with NUL bytes and keeps the original length, so value will be NULs, not empty. If callers use b'' as a signal, this discrepancy can cause subtle bugs. Please either make zero() also clear the buffer length (e.g. self._buf[:] = b''), or have value return b'' whenever is_zeroed is True so behavior matches the docs.
| assert seqs == list(range(len(seqs))) | ||
|
|
||
|
|
||
| class TestAuditLogHMAC: |
There was a problem hiding this comment.
suggestion (testing): Consider adding tests for isolated MAC tampering and the default session_key=b"" case
The new HMAC tests cover the main flows well. Two additional cases would strengthen coverage:
- MAC-only tampering: Change
entry.macon an otherwise valid chain and assertverify_chain()returnsFalse. - Default
session_key=b"": Add a test that uses the default key, logs an event, asserts the entry still has amacfield, and thatverify_chain()isTrueto guard backward compatibility for the default key behavior.
Suggested implementation:
class TestAuditLogHMAC:
"""P0-3: HMAC keying makes the chain non-regeneratable without session key K."""
def test_mac_only_tampering_invalidates_chain(self):
audit_log = log()
audit_log.log_event("event-1")
# Sanity check: untouched chain should verify
assert audit_log.verify_chain()
# Tamper with MAC of the first entry while keeping the rest valid
entry = audit_log.entries[0]
original_mac = entry.mac
if isinstance(original_mac, bytes):
tampered_mac = bytes(~b & 0xFF for b in original_mac)
else:
# Assume hex/string-like MAC
tampered_mac = (
original_mac[:-1] + ("0" if original_mac[-1] != "0" else "1")
)
entry.mac = tampered_mac
# MAC-only tampering should break the chain verification
assert not audit_log.verify_chain()
def test_default_session_key_uses_mac_and_verifies(self):
# log() uses the default session_key=b"" by construction
audit_log = log()
audit_log.log_event("event-1")
exported = json.loads(audit_log.export_json())
assert "mac" in exported[0]
# Default key must still produce a verifiable chain
assert audit_log.verify_chain()If your AuditLog API differs from what is assumed here, you will need to adjust:
- Replace
audit_log.log_event("event-1")with whatever method you currently use in tests to append a single log entry. - Replace
audit_log.entries[0]andentry.macaccess with the actual way entries and their MACs are exposed in your implementation. - Replace
audit_log.verify_chain()with the correct verification function if it has a different name or is a module-level helper. - If
export_json()returns a dict with a different structure (e.g.,{"entries": [...]}), updateexported[0]accordingly (e.g.,exported["entries"][0]).
| hmac: str | ||
|
|
||
|
|
||
| def _compute_token_hmac( |
There was a problem hiding this comment.
issue (complexity): Consider encapsulating HMAC handling in the BackupToken API and splitting write_gate into small helper checks to keep the control flow simpler and easier to follow.
You can simplify the HMAC + registration flow by:
- Encapsulating the MAC shape in
BackupToken(or a tiny helper) soWriteBlockerandBackupManagerdon’t both need to know the exact fields/order. - Splitting
write_gateinto small, linear checks so it’s easier to scan and reason about.
1. Move HMAC logic behind a BackupToken API
Instead of _compute_token_hmac(...) plus recomposition in write_gate, put the construction and verification in BackupToken (or a BackupTokenSigner). This removes duplication and the need for callers to know which fields are in the MAC.
@dataclass(frozen=True)
class BackupToken:
device_path: str
backup_sha256: str
timestamp: float
session_id: str
hmac: str
def _message(self) -> bytes:
# single source of truth for HMAC shape
return f"{self.session_id}|{self.device_path}|{self.backup_sha256}|{self.timestamp}".encode()
def sign(self, session_key: bytes) -> "BackupToken":
"""Return a new token with HMAC computed under session_key."""
mac = _hmac.new(session_key, self._message(), hashlib.sha256).hexdigest()
return dataclasses.replace(self, hmac=mac)
def verify(self, session_key: bytes) -> None:
"""Raise PermissionError if HMAC is invalid."""
expected = _hmac.new(session_key, self._message(), hashlib.sha256).hexdigest()
if not _hmac.compare_digest(expected, self.hmac):
raise PermissionError(
"BackupToken HMAC verification failed. Token may be forged or use a different session key."
)BackupManager then only calls token.sign(session_key); WriteBlocker only calls token.verify(session_key) and doesn’t need _compute_token_hmac at all.
2. Make write_gate a linear orchestration method
With HMAC hidden behind BackupToken.verify, write_gate can delegate to small helpers for each concern: path, registration, cryptographic verification.
class WriteBlocker:
def __init__(self, session_key: bytes) -> None:
self._session_key = session_key
self._issued_tokens: dict[str, BackupToken] = {}
def _register_token(self, token: BackupToken) -> None:
self._issued_tokens[token.device_path] = token
def _check_device_path(self, device_path: str, token: BackupToken) -> None:
if token.device_path != device_path:
raise ValueError(
f"Token device {token.device_path!r} does not match "
f"target device {device_path!r}."
)
def _check_registered(self, device_path: str, token: BackupToken) -> None:
registered = self._issued_tokens.get(device_path)
if registered is None or registered != token:
raise PermissionError(
f"No registered backup token for {device_path!r}. "
"BackupManager.create_backup() must be called first."
)
def write_gate(self, device_path: str, token: BackupToken) -> None:
self._check_device_path(device_path, token)
self._check_registered(device_path, token)
token.verify(self._session_key)
logger.info(
"WriteBlocker: write permitted to %s (backup=%s)",
device_path,
token.backup_sha256[:16] + "...",
)This keeps all existing behavior (path check, registration requirement, HMAC verification) but makes:
- HMAC shape centralized and reusable.
write_gatesmall and linear, with clearly separated responsibilities.
| import time | ||
| from pathlib import Path | ||
|
|
||
| from .write_blocker import BackupToken, WriteBlocker, _compute_token_hmac |
There was a problem hiding this comment.
issue (complexity): Consider encapsulating token signing and registration behind public factory and registration methods to decouple BackupManager from WriteBlocker’s internal details and HMAC wiring.
You can reduce the coupling here by pushing the HMAC construction and registration into public, higher‑level APIs, so BackupManager stops depending on private details.
1. Encapsulate HMAC construction in BackupToken
Instead of calling _compute_token_hmac and manually wiring fields, add a factory on BackupToken that handles the field ordering and HMAC:
# write_blocker.py
class BackupToken:
# existing fields ...
@classmethod
def create_signed(
cls,
*,
session_key: bytes,
session_id: str,
device_path: str,
backup_sha256: str,
timestamp: float | None = None,
) -> "BackupToken":
if timestamp is None:
timestamp = time.time()
mac = _compute_token_hmac(
session_key,
session_id,
device_path,
backup_sha256,
timestamp,
)
return cls(
device_path=device_path,
backup_sha256=backup_sha256,
timestamp=timestamp,
session_id=session_id,
hmac=mac,
)Then BackupManager.create_backup becomes:
# backup_manager.py
from .write_blocker import BackupToken, WriteBlocker
def create_backup(self, device_path: str, backup_sha256: str) -> BackupToken:
token = BackupToken.create_signed(
session_key=self._session_key,
session_id=self._session_id,
device_path=device_path,
backup_sha256=backup_sha256,
)
self._wb.register_token(token)
logger.info(
"BackupManager: token issued for %s (sha256=%s...)",
device_path,
backup_sha256[:16],
)
return tokenThis removes the duplication of the HMAC field ordering from BackupManager; any change to the token format is confined to BackupToken.create_signed and _compute_token_hmac.
2. Make token registration a public API on WriteBlocker
Expose a small public method instead of calling the private _register_token:
# write_blocker.py
class WriteBlocker:
# existing methods ...
def register_token(self, token: BackupToken) -> None:
self._register_token(token)
def _register_token(self, token: BackupToken) -> None:
# existing implementation
...And update BackupManager to use it (as shown above):
self._wb.register_token(token)These two changes keep all behavior intact while:
- Removing
BackupManager’s knowledge of the HMAC concatenation scheme. - Eliminating cross‑module access to a private method.
- Making the token lifecycle (
create_signed→register_token) explicit and easier to maintain.
| @@ -114,12 +138,19 @@ def log_write(self, device: str, backup_sha256: str, action: str) -> None: | |||
| ) | |||
|
|
|||
| def verify_chain(self) -> bool: | |||
There was a problem hiding this comment.
issue (complexity): Consider moving hash and MAC computation into LogEntry so AuditLog.verify_chain only orchestrates chain verification rather than duplicating low-level logic.
You can reduce the complexity and duplication by pushing the hash/MAC logic into LogEntry and letting AuditLog.verify_chain delegate to it.
1. Centralize hash computation on LogEntry
@dataclass
class LogEntry:
sequence: int
timestamp: float
event_type: str
payload: dict[str, Any]
prev_hash: str
entry_hash: str = field(init=False)
mac: str = field(init=False, default="")
def _raw_json(self) -> str:
return json.dumps(
{
"seq": self.sequence,
"ts": self.timestamp,
"type": self.event_type,
"payload": self.payload,
"prev": self.prev_hash,
},
sort_keys=True,
)
def recompute_hash(self) -> str:
return hashlib.sha256(self._raw_json().encode()).hexdigest()
def __post_init__(self) -> None:
self.entry_hash = self.recompute_hash()2. Encapsulate MAC computation/verification in LogEntry
def set_mac(self, session_key: bytes) -> None:
self.mac = _hmac.new(
session_key, self.entry_hash.encode(), hashlib.sha256
).hexdigest()
def verify_mac(self, session_key: bytes) -> bool:
expected = _hmac.new(
session_key, self.entry_hash.encode(), hashlib.sha256
).hexdigest()
return _hmac.compare_digest(expected, self.mac)3. Simplify AuditLog.verify_chain
def verify_chain(self) -> bool:
"""Return True iff the entire chain is unmodified."""
for i, entry in enumerate(self._entries):
prev = self._entries[i - 1].entry_hash if i > 0 else self.GENESIS_HASH
if entry.prev_hash != prev:
return False
if entry.recompute_hash() != entry.entry_hash:
return False
if not entry.verify_mac(self._session_key):
return False
return TrueThis keeps all existing behavior (same JSON structure, same hash/MAC algorithms) but removes duplicated logic and makes verify_chain focus purely on chain semantics.
Phase 0 — closes all five PRD findings (F1–F5):
F1 (Critical) — BackupToken HMAC + BackupManager:
F2 (Critical) — Orchestration state machine:
F3 (High) — HMAC keying in AuditLog:
F4 (Medium-High) — LUKS2 plugin import:
F5 (Medium) — AuthToken docstring:
Phase 1 — safety layer completion + Stream 1:
Tests: 111 passing (was 65); safety-layer modules now have non-zero coverage (write_blocker 100%, audit_log 98%, orchestration 87%).
Summary by Sourcery
Implement safety layer phases 0 and 1, strengthening backup enforcement, audit logging, orchestration state transitions, credential handling, and adding a synthetic FDE profile generator.
New Features:
Bug Fixes:
Enhancements:
Tests: