Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 220 additions & 1 deletion api/ledger_anchor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Ledger anchoring — content hash computation + CLI-based broadcast to Regen Ledger (mainnet `regen-1`).

Computes BLAKE2b-256 hash of canonical claim serialization, derives IRI via
regen CLI, and broadcasts MsgAnchor to the Regen Ledger mainnet.
regen CLI, and broadcasts MsgAnchor / MsgAttest to the Regen Ledger.

Graph IRI generation (Phase 3): JSON-LD → URDNA2015 canonicalization → BLAKE2b-256
→ base58check → regen:*.rdf IRI. Mirrors regen-server's generateIRIFromGraph.
"""

import base64
Expand Down Expand Up @@ -143,6 +146,121 @@ def compute_attestation_hash(row) -> str:
return h.hexdigest()


# ---------------------------------------------------------------------------
# Graph IRI generation (Phase 3)
# Mirrors regen-server/iri-gen/iri-gen.ts: generateIRIFromGraph
# ---------------------------------------------------------------------------

# IRI prefix constants from regen-server iri-gen.constants.ts
_IRI_PREFIX_GRAPH = 1
_GRAPH_CANON_URDNA2015 = 1
_GRAPH_MERKLE_TREE_UNSPECIFIED = 0
_DIGEST_ALGORITHM_BLAKE2B_256 = 1
_IRI_VERSION_0 = 0

# Inline JSON-LD context for attestation documents (avoids remote URL fetch)
ATTESTATION_JSONLD_CONTEXT = {
"rfs": "https://framework.regen.network/schema/",
"attestation_rid": "rfs:attestation_rid",
"claim_rid": "rfs:claim_rid",
"reviewer_uri": "rfs:reviewer_uri",
"verdict": "rfs:verdict",
"rationale": "rfs:rationale",
"evidence_uris": {"@id": "rfs:evidence_uris", "@container": "@set"},
}


def _base58check_encode(payload: bytes, version: int) -> str:
"""Base58Check encoding (btcsuite-compatible).

Format: base58(version_byte || payload || checksum)
where checksum = SHA256(SHA256(version_byte || payload))[:4]
"""
versioned = bytes([version]) + payload
checksum = hashlib.sha256(hashlib.sha256(versioned).digest()).digest()[:4]
return _base58_encode(versioned + checksum)


def _base58_encode(data: bytes) -> str:
"""Pure-Python base58 encoding (Bitcoin alphabet)."""
alphabet = b"123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
n = int.from_bytes(data, "big")
result = bytearray()
while n > 0:
n, remainder = divmod(n, 58)
result.append(alphabet[remainder])
# Preserve leading zero bytes
for byte in data:
if byte == 0:
result.append(alphabet[0])
else:
break
return bytes(reversed(result)).decode("ascii")


def _content_hash_graph_to_iri(blake2b_hash: bytes) -> str:
"""Convert a BLAKE2b-256 hash of graph content to a regen: IRI.

Pattern: regen:{base58check(concat(
byte(IriPrefixGraph=1),
byte(URDNA2015=1),
byte(MerkleTreeUnspecified=0),
byte(BLAKE2b256=1),
hash
))}.rdf

Mirrors regen-server contentHashGraphToIRI.
"""
prefix = bytes([
_IRI_PREFIX_GRAPH,
_GRAPH_CANON_URDNA2015,
_GRAPH_MERKLE_TREE_UNSPECIFIED,
_DIGEST_ALGORITHM_BLAKE2B_256,
])
encoded = _base58check_encode(prefix + blake2b_hash, _IRI_VERSION_0)
return f"regen:{encoded}.rdf"


def generate_graph_iri(jsonld_doc: dict) -> str:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to work as expected

"""Generate a graph content IRI from a JSON-LD document.

1. Canonicalize via URDNA2015 (to n-quads)
2. Hash with BLAKE2b-256 (32 bytes)
3. Encode as regen: graph IRI

Mirrors regen-server generateIRIFromGraph.
Raises ValueError if the document produces empty canonicalization.
"""
from pyld import jsonld

canonized = jsonld.normalize(
jsonld_doc,
{"algorithm": "URDNA2015", "format": "application/n-quads"},
)
if not canonized or not canonized.strip():
raise ValueError("Invalid JSON-LD document: empty canonicalization")

blake2b_hash = hashlib.blake2b(canonized.encode("utf-8"), digest_size=32).digest()
return _content_hash_graph_to_iri(blake2b_hash)


def build_attestation_jsonld(row) -> dict:
"""Build a JSON-LD document for an attestation record.

Uses inline @context to avoid remote URL fetches during canonicalization.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's totally fine
the below context is not yet available online afaik so inline context is the only viable solution for now anyway

"""
return {
"@context": ATTESTATION_JSONLD_CONTEXT,
"@type": "rfs:Attestation",
"attestation_rid": row["attestation_rid"],
"claim_rid": row["claim_rid"],
"reviewer_uri": row["reviewer_uri"],
"verdict": row["verdict"],
"rationale": row.get("rationale") or "",
"evidence_uris": sorted(row.get("evidence_uris") or []),
}


# Cached signing address (deterministic for a given key name)
_signing_address: str | None = None

Expand Down Expand Up @@ -331,6 +449,107 @@ async def broadcast_anchor(claim_rid: str, content_hash: str) -> dict:
}


async def broadcast_attest(attestation_rid: str, graph_iri: str,
signer: str | None = None) -> dict:
"""Broadcast MsgAttest to Regen Ledger for a graph-native attestation.

MsgAttest attests to the veracity of anchored graph data. If the data
is not yet anchored, MsgAttest auto-anchors it (one tx instead of two).

Args:
attestation_rid: The attestation RID (for logging/tracking)
graph_iri: Graph IRI (regen:*.rdf) from generate_graph_iri()
signer: Regen address of the attestor. Defaults to service account.

Returns dict matching broadcast_anchor() pattern:
ready_to_anchor, ledger_iri, tx_hash, ledger_timestamp, reason
"""
regen_bin = _check_regen_cli()
signer = signer or REGEN_KEY_NAME

logger.info(f"ledger_anchor.broadcast_attest att={attestation_rid} iri={graph_iri} signer={signer}")

tx_result = subprocess.run(
[regen_bin, "tx", "data", "attest", graph_iri,
"--from", signer,
"--chain-id", REGEN_CHAIN_ID,
"--node", REGEN_RPC_URL,
"--keyring-backend", "test",
"--fees", "5000uregen",
"--output", "json",
"--yes"],
capture_output=True, text=True, timeout=30,
)

if tx_result.returncode != 0:
stderr = tx_result.stderr.strip()
if "key not found" in stderr.lower():
return {
"attestation_rid": attestation_rid,
"ready_to_anchor": False,
"reason": f"Key '{signer}' not found in keyring.",
"ledger_iri": graph_iri,
"ledger_timestamp": None,
}
if "insufficient funds" in stderr.lower() or "insufficient fee" in stderr.lower():
return {
"attestation_rid": attestation_rid,
"ready_to_anchor": False,
"reason": f"Insufficient funds for '{signer}'.",
"ledger_iri": graph_iri,
"ledger_timestamp": None,
}
raise RuntimeError(f"Attest broadcast failed: {stderr or tx_result.stdout}")

tx_data = json.loads(tx_result.stdout)
tx_hash = tx_data.get("txhash")
if not tx_hash:
raise RuntimeError(f"No txhash in broadcast response: {tx_result.stdout}")

logger.info(f"ledger_anchor.attest_sent att={attestation_rid} txhash={tx_hash}")

# Poll for tx confirmation (up to 30s)
ledger_timestamp = None
for attempt in range(6):
time.sleep(5)
query_result = subprocess.run(
[regen_bin, "query", "tx", tx_hash,
"--node", REGEN_RPC_URL,
"--output", "json"],
capture_output=True, text=True, timeout=15,
)
if query_result.returncode == 0:
query_data = json.loads(query_result.stdout)
code = query_data.get("code", -1)
if code == 0:
ledger_timestamp = query_data.get("timestamp") or query_data.get("height")
logger.info(f"ledger_anchor.attest_confirmed att={attestation_rid} txhash={tx_hash}")
break
else:
raise RuntimeError(
f"Attest tx failed on-chain: code={code} log={query_data.get('raw_log', '')}"
)

if ledger_timestamp is None:
logger.warning(f"ledger_anchor.attest_timeout att={attestation_rid} txhash={tx_hash}")
return {
"attestation_rid": attestation_rid,
"ready_to_anchor": False,
"reason": f"Attest tx broadcast but confirmation timed out. tx_hash={tx_hash}.",
"ledger_iri": graph_iri,
"ledger_timestamp": None,
"tx_hash": tx_hash,
}

return {
"attestation_rid": attestation_rid,
"ready_to_anchor": True,
"ledger_iri": graph_iri,
"ledger_timestamp": str(ledger_timestamp) if ledger_timestamp else None,
"tx_hash": tx_hash,
}


def verify_anchor_onchain(ledger_iri: str) -> bool:
"""Check if an anchor exists on-chain via the Regen REST API.

Expand Down
Loading