diff --git a/tests/conftest.py b/tests/conftest.py index 56f769e..6542320 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,6 +45,9 @@ def _isolate_data(tmp_path, monkeypatch): monkeypatch.setattr(proxy_mod, "SERVICES_DIR", tmp_path / "data" / "services") monkeypatch.setattr(proxy_mod, "TRUST_LAYER_BASE_URL", "https://test.arkforge.fr") + import trust_layer.app as app_mod + monkeypatch.setattr(app_mod, "TRUST_LAYER_BASE_URL", "https://test.arkforge.fr") + @pytest.fixture def test_api_key(tmp_path): diff --git a/tests/test_triptyque.py b/tests/test_triptyque.py new file mode 100644 index 0000000..8b19d96 --- /dev/null +++ b/tests/test_triptyque.py @@ -0,0 +1,426 @@ +"""Tests for the Triptyque de la Preuve — 3 levels of ArkForge watermark.""" + +import pytest +from unittest.mock import patch, AsyncMock, MagicMock + +from trust_layer.proxy import _inject_digital_stamp, execute_proxy +from trust_layer.proofs import verify_proof_integrity, store_proof, load_proof +from trust_layer.templates import _esc, render_proof_page +from trust_layer.payments.base import ChargeResult + + +# --- Helpers --- + +def _make_proof_record(proof_id="prf_20260225_120000_abc123"): + """Create a minimal proof record for testing.""" + return { + "proof_id": proof_id, + "verification_url": f"https://test.arkforge.fr/v1/proof/{proof_id}", + "verification_algorithm": "https://test.arkforge.fr/docs/verification", + "hashes": { + "request": "sha256:aaa111", + "response": "sha256:bbb222", + "chain": "sha256:ccc333", + }, + "parties": { + "buyer_fingerprint": "buyer_hash_xyz", + "seller": "example.com", + "agent_identity": None, + "agent_version": None, + }, + "payment": { + "provider": "stripe", + "transaction_id": "pi_test_triptyque", + "amount": 0.50, + "currency": "eur", + "status": "succeeded", + "receipt_url": "https://pay.stripe.com/receipts/test", + }, + "timestamp": "2026-02-25T12:00:00Z", + "opentimestamps": {"status": "pending", "ots_url": f"https://test.arkforge.fr/v1/proof/{proof_id}/ots"}, + "identity_consistent": None, + } + + +def _mock_full_proxy(): + """Return context managers for a full proxy mock (payment + service OK).""" + mock_charge = ChargeResult( + provider="stripe", + transaction_id="pi_test_triptyque", + amount=0.50, + currency="eur", + status="succeeded", + receipt_url="https://pay.stripe.com/receipts/test", + ) + mock_provider = AsyncMock() + mock_provider.charge.return_value = mock_charge + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"result": "scan_complete", "score": 85} + + mock_client = AsyncMock() + mock_client.post.return_value = mock_response + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + + return mock_provider, mock_client + + +def _mock_error_proxy(): + """Return context managers for a proxy mock where service returns 500.""" + mock_charge = ChargeResult( + provider="stripe", + transaction_id="pi_test_err", + amount=0.50, + currency="eur", + status="succeeded", + receipt_url=None, + ) + mock_provider = AsyncMock() + mock_provider.charge.return_value = mock_charge + + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.json.return_value = {"error": "internal server error"} + + mock_client = AsyncMock() + mock_client.post.return_value = mock_response + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + + return mock_provider, mock_client + + +# ============================================================ +# Level 1 — Digital Stamp +# ============================================================ + +class TestLevel1DigitalStamp: + + @pytest.mark.asyncio + async def test_attestation_present_in_success(self, test_api_key): + """Attestation is injected into successful proxy responses.""" + mock_provider, mock_client = _mock_full_proxy() + + with patch("trust_layer.proxy.get_provider", return_value=mock_provider), \ + patch("httpx.AsyncClient", return_value=mock_client), \ + patch("trust_layer.proxy.submit_hash", return_value=None), \ + patch("trust_layer.proxy.send_proof_email"): + + result = await execute_proxy( + target="https://example.com/api/scan", + method="POST", + payload={"repo_url": "https://github.com/test/repo"}, + amount=0.50, + currency="eur", + api_key=test_api_key, + ) + + body = result["service_response"]["body"] + assert "_arkforge_attestation" in body + att = body["_arkforge_attestation"] + assert att["id"].startswith("prf_") + assert "seal" in att + assert att["status"] == "VERIFIED_TRANSACTION" + + @pytest.mark.asyncio + async def test_no_attestation_on_error_upstream(self, test_api_key): + """No attestation when upstream service returns 500.""" + mock_provider, mock_client = _mock_error_proxy() + + with patch("trust_layer.proxy.get_provider", return_value=mock_provider), \ + patch("httpx.AsyncClient", return_value=mock_client), \ + patch("trust_layer.proxy.submit_hash", return_value=None), \ + patch("trust_layer.proxy.send_proof_email"): + + result = await execute_proxy( + target="https://example.com/api/fail", + method="POST", + payload={}, + amount=0.50, + currency="eur", + api_key=test_api_key, + ) + + # Error path — no attestation injected (even if service_response exists) + sr = result.get("service_response", {}) + body = sr.get("body", {}) + assert "_arkforge_attestation" not in body + + @pytest.mark.asyncio + async def test_no_attestation_on_raw_text(self, test_api_key): + """No attestation when response is _raw_text (non-JSON).""" + mock_charge = ChargeResult( + provider="stripe", transaction_id="pi_test_raw", + amount=0.50, currency="eur", status="succeeded", receipt_url=None, + ) + mock_provider = AsyncMock() + mock_provider.charge.return_value = mock_charge + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.side_effect = Exception("not JSON") + mock_response.text = "Hello" + + mock_client = AsyncMock() + mock_client.post.return_value = mock_response + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + + with patch("trust_layer.proxy.get_provider", return_value=mock_provider), \ + patch("httpx.AsyncClient", return_value=mock_client), \ + patch("trust_layer.proxy.submit_hash", return_value=None), \ + patch("trust_layer.proxy.send_proof_email"): + + result = await execute_proxy( + target="https://example.com/api/html", + method="POST", + payload={}, + amount=0.50, + currency="eur", + api_key=test_api_key, + ) + + body = result["service_response"]["body"] + assert "_raw_text" in body + assert "_arkforge_attestation" not in body + + @pytest.mark.asyncio + async def test_chain_hash_not_affected(self, test_api_key): + """Chain hash integrity is NOT affected by attestation injection.""" + mock_provider, mock_client = _mock_full_proxy() + + with patch("trust_layer.proxy.get_provider", return_value=mock_provider), \ + patch("httpx.AsyncClient", return_value=mock_client), \ + patch("trust_layer.proxy.submit_hash", return_value=None), \ + patch("trust_layer.proxy.send_proof_email"): + + result = await execute_proxy( + target="https://example.com/api/scan", + method="POST", + payload={"test": True}, + amount=0.50, + currency="eur", + api_key=test_api_key, + ) + + # Attestation is present + assert "_arkforge_attestation" in result["service_response"]["body"] + + # But proof integrity is still valid (hash was computed BEFORE injection) + proof_id = result["proof"]["proof_id"] + stored = load_proof(proof_id) + assert stored is not None + assert verify_proof_integrity(stored) + + def test_inject_digital_stamp_nominal(self): + """Unit test: _inject_digital_stamp injects correctly.""" + proof_record = _make_proof_record() + result = { + "proof": proof_record, + "service_response": { + "status_code": 200, + "body": {"data": "ok"}, + }, + } + _inject_digital_stamp(result, proof_record) + att = result["service_response"]["body"]["_arkforge_attestation"] + assert att["id"] == "prf_20260225_120000_abc123" + assert att["seal"] == "https://test.arkforge.fr/v1/proof/prf_20260225_120000_abc123" + assert att["status"] == "VERIFIED_TRANSACTION" + + def test_inject_digital_stamp_skip_no_service_response(self): + """Unit test: _inject_digital_stamp skips when no service_response.""" + proof_record = _make_proof_record() + result = {"proof": proof_record} + _inject_digital_stamp(result, proof_record) + assert "service_response" not in result + + def test_inject_digital_stamp_skip_on_error(self): + """Unit test: _inject_digital_stamp skips on error results.""" + proof_record = _make_proof_record() + result = { + "error": {"code": "service_error", "message": "fail", "status": 502}, + "service_response": {"status_code": 500, "body": {"error": "oops"}}, + } + _inject_digital_stamp(result, proof_record) + assert "_arkforge_attestation" not in result["service_response"]["body"] + + +# ============================================================ +# Level 2 — Ghost Stamp +# ============================================================ + +class TestLevel2GhostStamp: + + def test_four_headers_on_success(self, client, test_api_key): + """4 X-ArkForge-* headers present on successful proxy response.""" + mock_provider, mock_client = _mock_full_proxy() + + with patch("trust_layer.proxy.get_provider", return_value=mock_provider), \ + patch("httpx.AsyncClient", return_value=mock_client), \ + patch("trust_layer.proxy.submit_hash", return_value=None), \ + patch("trust_layer.proxy.send_proof_email"): + + resp = client.post("/v1/proxy", json={ + "target": "https://example.com/api", + "amount": 0.50, + "currency": "eur", + "payload": {"test": True}, + }, headers={"X-Api-Key": test_api_key}) + + assert resp.status_code == 200 + assert "x-arkforge-proof" in resp.headers + assert resp.headers["x-arkforge-verified"] == "true" + assert resp.headers["x-arkforge-proof-id"].startswith("prf_") + assert "/v/" in resp.headers["x-arkforge-trust-link"] + + def test_verified_false_on_service_error(self, client, test_api_key): + """X-ArkForge-Verified is false when upstream returns 500.""" + mock_provider, mock_client = _mock_error_proxy() + + with patch("trust_layer.proxy.get_provider", return_value=mock_provider), \ + patch("httpx.AsyncClient", return_value=mock_client), \ + patch("trust_layer.proxy.submit_hash", return_value=None), \ + patch("trust_layer.proxy.send_proof_email"): + + resp = client.post("/v1/proxy", json={ + "target": "https://example.com/api/fail", + "amount": 0.50, + "currency": "eur", + "payload": {}, + }, headers={"X-Api-Key": test_api_key}) + + assert resp.headers["x-arkforge-verified"] == "false" + + def test_backward_compat_proof_header(self, client, test_api_key): + """X-ArkForge-Proof header still present (backward compat).""" + mock_provider, mock_client = _mock_full_proxy() + + with patch("trust_layer.proxy.get_provider", return_value=mock_provider), \ + patch("httpx.AsyncClient", return_value=mock_client), \ + patch("trust_layer.proxy.submit_hash", return_value=None), \ + patch("trust_layer.proxy.send_proof_email"): + + resp = client.post("/v1/proxy", json={ + "target": "https://example.com/api", + "amount": 0.50, + "currency": "eur", + "payload": {}, + }, headers={"X-Api-Key": test_api_key}) + + assert "x-arkforge-proof" in resp.headers + assert "/v1/proof/prf_" in resp.headers["x-arkforge-proof"] + + +# ============================================================ +# Level 3 — Visual Stamp +# ============================================================ + +class TestLevel3VisualStamp: + + def _store_test_proof(self, proof_id="prf_test_visual"): + """Store a proof and return its ID.""" + proof_record = _make_proof_record(proof_id) + store_proof(proof_id, proof_record) + return proof_id + + def test_html_on_accept_text_html(self, client): + """Accept: text/html returns HTML response.""" + pid = self._store_test_proof() + resp = client.get(f"/v1/proof/{pid}", headers={"Accept": "text/html"}) + assert resp.status_code == 200 + assert "text/html" in resp.headers["content-type"] + assert "ArkForge Trust Layer" in resp.text + assert pid in resp.text + + def test_json_on_accept_application_json(self, client): + """Accept: application/json returns JSON response.""" + pid = self._store_test_proof("prf_test_json") + resp = client.get(f"/v1/proof/{pid}", headers={"Accept": "application/json"}) + assert resp.status_code == 200 + data = resp.json() + assert data["proof_id"] == pid + + def test_json_on_no_accept(self, client): + """No Accept header returns JSON (backward compat).""" + pid = self._store_test_proof("prf_test_noaccept") + resp = client.get(f"/v1/proof/{pid}") + assert resp.status_code == 200 + data = resp.json() + assert data["proof_id"] == pid + + def test_json_wins_over_html(self, client): + """Accept: application/json, text/html — JSON wins.""" + pid = self._store_test_proof("prf_test_both") + resp = client.get(f"/v1/proof/{pid}", headers={"Accept": "application/json, text/html"}) + assert resp.status_code == 200 + data = resp.json() + assert data["proof_id"] == pid + + def test_green_badge_verified(self, client): + """Badge is green (#22c55e) on verified proof with OTS verified.""" + pid = "prf_test_green" + proof = _make_proof_record(pid) + # Make OTS verified and set real hashes for integrity to pass + proof["opentimestamps"]["status"] = "verified" + # We need integrity to pass — store raw data so verify_proof_integrity works + from trust_layer.proofs import sha256_hex, canonical_json + req_data = {"target": "https://example.com", "method": "POST", "payload": {}, "amount": 0.5, "currency": "eur"} + resp_data = {"result": "ok"} + req_hash = sha256_hex(canonical_json(req_data)) + resp_hash = sha256_hex(canonical_json(resp_data)) + chain_input = req_hash + resp_hash + "pi_test_triptyque" + "2026-02-25T12:00:00Z" + "buyer_hash_xyz" + "example.com" + chain_hash = sha256_hex(chain_input) + proof["hashes"] = { + "request": f"sha256:{req_hash}", + "response": f"sha256:{resp_hash}", + "chain": f"sha256:{chain_hash}", + } + store_proof(pid, proof) + + resp = client.get(f"/v1/proof/{pid}", headers={"Accept": "text/html"}) + assert resp.status_code == 200 + assert "#22c55e" in resp.text + assert "VERIFIED" in resp.text + + def test_html_shows_payment_info(self, client): + """HTML page displays payment information.""" + pid = self._store_test_proof("prf_test_payment") + resp = client.get(f"/v1/proof/{pid}", headers={"Accept": "text/html"}) + assert resp.status_code == 200 + assert "0.5" in resp.text + assert "EUR" in resp.text + assert "stripe" in resp.text.lower() + + def test_short_url_redirects(self, client): + """GET /v/{proof_id} returns 302 redirect to full path.""" + pid = self._store_test_proof("prf_test_redirect") + resp = client.get(f"/v/{pid}", follow_redirects=False) + assert resp.status_code == 302 + assert resp.headers["location"] == f"/v1/proof/{pid}" + + def test_short_url_404(self, client): + """GET /v/prf_nonexistent returns 404.""" + resp = client.get("/v/prf_nonexistent") + assert resp.status_code == 404 + + +# ============================================================ +# Unit: _esc() anti-XSS +# ============================================================ + +class TestEscape: + + def test_esc_html_entities(self): + assert _esc('') == '<script>alert("xss")</script>' + + def test_esc_none(self): + assert _esc(None) == "" + + def test_esc_number(self): + assert _esc(42) == "42" + + def test_esc_ampersand(self): + assert _esc("a&b") == "a&b" diff --git a/trust_layer/app.py b/trust_layer/app.py index e297397..2c7e124 100644 --- a/trust_layer/app.py +++ b/trust_layer/app.py @@ -7,7 +7,7 @@ import stripe from fastapi import FastAPI, Request, HTTPException, Header -from fastapi.responses import JSONResponse, Response +from fastapi.responses import JSONResponse, Response, HTMLResponse, RedirectResponse from fastapi.middleware.cors import CORSMiddleware from . import __version__ @@ -27,6 +27,7 @@ from .proofs import load_proof, store_proof, get_public_proof, verify_proof_integrity from .timestamps import upgrade_pending from .proxy import execute_proxy, ProxyError +from .templates import render_proof_page from .rate_limit import get_usage from .email_notify import send_welcome_email @@ -124,11 +125,19 @@ async def proxy_endpoint( else: status_code = 200 - # Add proof header + # Level 2 — Ghost Stamp: inject proof headers proof = result.get("proof") or result.get("error", {}).get("proof_data") headers = {} - if proof and proof.get("verification_url"): - headers["X-ArkForge-Proof"] = proof["verification_url"] + if proof: + verification_url = proof.get("verification_url", "") + proof_id = proof.get("proof_id", "") + service_ok = "error" not in result + if verification_url: + headers["X-ArkForge-Proof"] = verification_url + headers["X-ArkForge-Verified"] = "true" if service_ok else "false" + if proof_id: + headers["X-ArkForge-Proof-ID"] = proof_id + headers["X-ArkForge-Trust-Link"] = f"{TRUST_LAYER_BASE_URL}/v/{proof_id}" return JSONResponse(status_code=status_code, content=result, headers=headers) @@ -136,8 +145,13 @@ async def proxy_endpoint( # --- GET /v1/proof/{proof_id} --- @app.get("/v1/proof/{proof_id}") -async def get_proof(proof_id: str): - """Public proof verification — no auth required. Lazy-upgrades OTS on access.""" +async def get_proof(proof_id: str, request: Request): + """Public proof verification — no auth required. Lazy-upgrades OTS on access. + + Content negotiation (Level 3 — Visual Stamp): + - Accept: text/html (without application/json) → HTML proof page + - Otherwise → JSON (backward compat) + """ proof = load_proof(proof_id) if not proof: return _error_response("not_found", f"Proof '{proof_id}' not found", 404) @@ -158,10 +172,33 @@ async def get_proof(proof_id: str): logger.debug("OTS upgrade attempt for %s: %s", proof_id, e) public = get_public_proof(proof) - public["integrity_verified"] = verify_proof_integrity(proof) + integrity_verified = verify_proof_integrity(proof) + public["integrity_verified"] = integrity_verified + + # Level 3 — Visual Stamp: content negotiation + accept = request.headers.get("accept", "") + if "text/html" in accept and "application/json" not in accept: + html_content = render_proof_page(public, integrity_verified) + return HTMLResponse(content=html_content) + return public +# --- GET /v/{proof_id} — Short URL redirect --- + +@app.get("/v/{proof_id}") +async def short_proof_url(proof_id: str): + """Short URL redirect to full proof endpoint. 302 with cache.""" + proof = load_proof(proof_id) + if not proof: + return _error_response("not_found", f"Proof '{proof_id}' not found", 404) + return RedirectResponse( + url=f"/v1/proof/{proof_id}", + status_code=302, + headers={"Cache-Control": "public, max-age=86400"}, + ) + + # --- GET /v1/proof/{proof_id}/ots --- @app.get("/v1/proof/{proof_id}/ots") diff --git a/trust_layer/proxy.py b/trust_layer/proxy.py index df15b09..14b2453 100644 --- a/trust_layer/proxy.py +++ b/trust_layer/proxy.py @@ -32,6 +32,36 @@ logger = logging.getLogger("trust_layer.proxy") + +def _inject_digital_stamp(result: dict, proof_record: dict) -> None: + """Level 1 — Digital Stamp: inject _arkforge_attestation into successful response body. + + Skips injection if: + - No service_response in result + - Body is not a dict (non-JSON response) + - Body contains _raw_text (non-parseable response) + - Result contains an error path + """ + sr = result.get("service_response") + if not sr or not isinstance(sr, dict): + return + body = sr.get("body") + if not isinstance(body, dict): + return + if "_raw_text" in body: + return + if "error" in result: + return + + proof_id = proof_record.get("proof_id", "") + verification_url = proof_record.get("verification_url", "") + body["_arkforge_attestation"] = { + "id": proof_id, + "seal": verification_url, + "status": "VERIFIED_TRANSACTION", + "msg": "Payment confirmed, execution anchored.", + } + # Private IP ranges to block _PRIVATE_NETWORKS = [ ipaddress.ip_network("127.0.0.0/8"), @@ -400,6 +430,9 @@ async def execute_proxy( }, } + # 14b. Level 1 — Digital Stamp (AFTER hashing, does NOT affect chain hash) + _inject_digital_stamp(result, proof_record) + # 15. Cache idempotency _cache_idempotency(idempotency_key, result) diff --git a/trust_layer/templates.py b/trust_layer/templates.py new file mode 100644 index 0000000..a548459 --- /dev/null +++ b/trust_layer/templates.py @@ -0,0 +1,130 @@ +"""HTML templates for proof visualization — zero dependencies, self-contained.""" + +import html + + +def _esc(value) -> str: + """Escape user data for safe HTML rendering (anti-XSS).""" + if value is None: + return "" + return html.escape(str(value)) + + +def render_proof_page(proof: dict, integrity_verified: bool) -> str: + """Render a self-contained HTML page for a proof record. + + Badge colors: + - Green #22c55e: integrity verified + - Orange #f59e0b: OTS pending (integrity OK but timestamp not confirmed) + - Red #ef4444: integrity check failed + """ + proof_id = _esc(proof.get("proof_id", "")) + timestamp = _esc(proof.get("timestamp", "")) + hashes = proof.get("hashes", {}) + parties = proof.get("parties", {}) + payment = proof.get("payment", {}) + ots = proof.get("opentimestamps", {}) + identity_consistent = proof.get("identity_consistent") + + ots_status = ots.get("status", "unknown") + + if not integrity_verified: + badge_color = "#ef4444" + badge_text = "INTEGRITY FAILED" + badge_desc = "The chain hash does not match. This proof may have been tampered with." + elif ots_status == "verified": + badge_color = "#22c55e" + badge_text = "VERIFIED" + badge_desc = "Integrity verified. Timestamp confirmed on Bitcoin blockchain." + else: + badge_color = "#f59e0b" + badge_text = "VERIFIED — TIMESTAMP PENDING" + badge_desc = "Integrity verified. OpenTimestamps confirmation in progress." + + # Build identity section (conditional) + identity_html = "" + agent_identity = parties.get("agent_identity") + if agent_identity: + consistent_badge = "" + if identity_consistent is True: + consistent_badge = 'Consistent' + elif identity_consistent is False: + consistent_badge = 'Mismatch detected' + identity_html = f""" +
+

Identity

+
Agent{_esc(agent_identity)}
+
Version{_esc(parties.get("agent_version", ""))}
+
Consistency{consistent_badge}
+
""" + + return f""" + + + + +ArkForge Proof — {proof_id} + + + +
+
+

ArkForge Trust Layer

+
Cryptographic Proof of Transaction
+
{badge_text}
+
{badge_desc}
+
+ +
+

Proof

+
Proof ID{proof_id}
+
Timestamp{timestamp}
+
OTS Status{_esc(ots_status)}
+
+ +
+

Hashes

+
Request{_esc(hashes.get("request", ""))}
+
Response{_esc(hashes.get("response", ""))}
+
Chain{_esc(hashes.get("chain", ""))}
+
+ +
+

Parties

+
Buyer{_esc(parties.get("buyer_fingerprint", ""))}
+
Seller{_esc(parties.get("seller", ""))}
+
+{identity_html} +
+

Payment

+
Amount{_esc(payment.get("amount", ""))} {_esc(payment.get("currency", "").upper())}
+
Status{_esc(payment.get("status", ""))}
+
Provider{_esc(payment.get("provider", ""))}
+
Transaction{_esc(payment.get("transaction_id", ""))}
+
+ + +
+ +"""