diff --git a/pyproject.toml b/pyproject.toml index 69c7445a..340651b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "requests", "securesystemslib", "sigstore-protobuf-specs ~= 0.2.0", + "sigstore-rekor-types >= 0.0.11", "tuf >= 2.1,< 4.0", ] requires-python = ">=3.8" diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index 478a494a..576a6cb7 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -18,7 +18,6 @@ from __future__ import annotations -import base64 import logging from abc import ABC from dataclasses import dataclass @@ -26,12 +25,11 @@ from urllib.parse import urljoin import requests -from cryptography.x509 import Certificate +import sigstore_rekor_types from sigstore._internal.ctfe import CTKeyring from sigstore._internal.keyring import Keyring from sigstore._internal.tuf import TrustUpdater -from sigstore._utils import B64Str, base64_encode_pem_cert from sigstore.transparency import LogEntry logger = logging.getLogger(__name__) @@ -139,29 +137,15 @@ def get( def post( self, - b64_artifact_signature: B64Str, - sha256_artifact_hash: str, - b64_cert: B64Str, + proposed_entry: sigstore_rekor_types.Hashedrekord, ) -> LogEntry: """ Submit a new entry for inclusion in the Rekor log. """ - # TODO(ww): Dedupe this payload construction with the retrieve endpoint below. - data = { - "kind": "hashedrekord", - "apiVersion": "0.0.1", - "spec": { - "signature": { - "content": b64_artifact_signature, - "publicKey": {"content": b64_cert}, - }, - "data": { - "hash": {"algorithm": "sha256", "value": sha256_artifact_hash} - }, - }, - } - resp: requests.Response = self.session.post(self.url, json=data) + resp: requests.Response = self.session.post( + self.url, json=proposed_entry.model_dump(mode="json", by_alias=True) + ) try: resp.raise_for_status() except requests.HTTPError as http_error: @@ -186,9 +170,7 @@ class RekorEntriesRetrieve(_Endpoint): def post( self, - signature: bytes, - artifact_hash: str, - certificate: Certificate, + expected_entry: sigstore_rekor_types.Hashedrekord, ) -> Optional[LogEntry]: """ Retrieves an extant Rekor entry, identified by its artifact signature, @@ -197,28 +179,7 @@ def post( Returns None if Rekor has no entry corresponding to the signing materials. """ - data = { - "entries": [ - { - "kind": "hashedrekord", - "apiVersion": "0.0.1", - "spec": { - "signature": { - "content": B64Str(base64.b64encode(signature).decode()), - "publicKey": { - "content": B64Str(base64_encode_pem_cert(certificate)), - }, - }, - "data": { - "hash": { - "algorithm": "sha256", - "value": artifact_hash, - } - }, - }, - } - ] - } + data = {"entries": [expected_entry.model_dump(mode="json", by_alias=True)]} resp: requests.Response = self.session.post(self.url, json=data) try: diff --git a/sigstore/sign.py b/sigstore/sign.py index 61935d18..bbde5cd2 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -46,6 +46,7 @@ from typing import IO, Iterator, Optional import cryptography.x509 as x509 +import sigstore_rekor_types from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric.utils import Prehashed @@ -210,11 +211,25 @@ def sign( ) # Create the transparency log entry - entry = self._signing_ctx._rekor.log.entries.post( - b64_artifact_signature=B64Str(b64_artifact_signature), - sha256_artifact_hash=input_digest.hex(), - b64_cert=B64Str(b64_cert.decode()), + proposed_entry = sigstore_rekor_types.Hashedrekord( + kind="hashedrekord", + api_version="0.0.1", + spec=sigstore_rekor_types.HashedrekordV001Schema( + signature=sigstore_rekor_types.Signature1( + content=b64_artifact_signature, + public_key=sigstore_rekor_types.PublicKey1( + content=b64_cert.decode() + ), + ), + data=sigstore_rekor_types.Data( + hash=sigstore_rekor_types.Hash( + algorithm=sigstore_rekor_types.Algorithm.SHA256, + value=input_digest.hex(), + ) + ), + ), ) + entry = self._signing_ctx._rekor.log.entries.post(proposed_entry) logger.debug(f"Transparency log entry created with index: {entry.log_index}") diff --git a/sigstore/verify/models.py b/sigstore/verify/models.py index 9f45518c..8d36e4b1 100644 --- a/sigstore/verify/models.py +++ b/sigstore/verify/models.py @@ -25,6 +25,7 @@ from textwrap import dedent from typing import IO +import sigstore_rekor_types from cryptography.hazmat.primitives.serialization import Encoding from cryptography.x509 import ( Certificate, @@ -403,6 +404,28 @@ def rekor_entry(self, client: RekorClient) -> LogEntry: f"has_inclusion_promise={has_inclusion_promise}" ) + # This "expected" entry is used both to retrieve the Rekor entry + # (if we don't have one) *and* to cross-check whatever response + # we receive. See below. + expected_entry = sigstore_rekor_types.Hashedrekord( + kind="hashedrekord", + api_version="0.0.1", + spec=sigstore_rekor_types.HashedrekordV001Schema( + signature=sigstore_rekor_types.Signature1( + content=base64.b64encode(self.signature).decode(), + public_key=sigstore_rekor_types.PublicKey1( + content=base64_encode_pem_cert(self.certificate) + ), + ), + data=sigstore_rekor_types.Data( + hash=sigstore_rekor_types.Hash( + algorithm=sigstore_rekor_types.Algorithm.SHA256, + value=self.input_digest.hex(), + ), + ), + ), + ) + entry: LogEntry | None = None if offline: logger.debug("offline mode; using offline log entry") @@ -415,11 +438,7 @@ def rekor_entry(self, client: RekorClient) -> LogEntry: # entry doesn't have one, then we perform a lookup. if not has_inclusion_proof: logger.debug("retrieving transparency log entry") - entry = client.log.entries.retrieve.post( - self.signature, - self.input_digest.hex(), - self.certificate, - ) + entry = client.log.entries.retrieve.post(expected_entry) else: entry = self._rekor_entry @@ -427,41 +446,14 @@ def rekor_entry(self, client: RekorClient) -> LogEntry: if entry is None: raise RekorEntryMissing - # To verify that an entry matches our other signing materials, - # we transform our signature, artifact hash, and certificate - # into a "hashedrekord" style payload and compare it against the - # entry's own body. - # - # This is done by: - # - # * Serializing the certificate as PEM, and then base64-encoding it; - # * base64-encoding the signature; - # * Packing the resulting cert, signature, and hash into the - # hashedrekord body format; - # * Comparing that body against the entry's own body, which - # is extracted from its base64(json(...)) encoding. - logger.debug("Rekor entry: ensuring contents match signing materials") - expected_body = { - "kind": "hashedrekord", - "apiVersion": "0.0.1", - "spec": { - "signature": { - "content": B64Str(base64.b64encode(self.signature).decode()), - "publicKey": { - "content": B64Str(base64_encode_pem_cert(self.certificate)) - }, - }, - "data": { - "hash": {"algorithm": "sha256", "value": self.input_digest.hex()} - }, - }, - } - + # To catch a potentially dishonest or compromised Rekor instance, we compare + # the expected entry (generated above) with the JSON structure returned + # by Rekor. If the two don't match, then we have an invalid entry + # and can't proceed. actual_body = json.loads(base64.b64decode(entry.body)) - - if expected_body != actual_body: + if actual_body != expected_entry.model_dump(mode="json", by_alias=True): raise InvalidRekorEntry return entry diff --git a/test/unit/verify/test_models.py b/test/unit/verify/test_models.py index 97e7638d..a22b5bd8 100644 --- a/test/unit/verify/test_models.py +++ b/test/unit/verify/test_models.py @@ -57,7 +57,7 @@ def test_rekor_entry_missing(self, signing_materials): a_materials._rekor_entry = None client = pretend.stub( log=pretend.stub( - entries=pretend.stub(retrieve=pretend.stub(post=lambda a, b, c: None)) + entries=pretend.stub(retrieve=pretend.stub(post=lambda a: None)) ) )