Skip to content

rekor: use sigstore_rekor_types for models #788

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Oct 6, 2023
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"requests",
"securesystemslib",
"sigstore-protobuf-specs ~= 0.2.0",
"sigstore-rekor-types >= 0.0.11",
"tuf >= 2.1,< 4.0",
]
requires-python = ">=3.8"
Expand Down
53 changes: 7 additions & 46 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@

from __future__ import annotations

import base64
import logging
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, NewType, Optional
from urllib.parse import urljoin

import requests
from cryptography.x509 import Certificate
import sigstore_rekor_types

from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.keyring import Keyring
from sigstore._internal.tuf import TrustUpdater
from sigstore._utils import B64Str, base64_encode_pem_cert
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,29 +137,15 @@ def get(

def post(
self,
b64_artifact_signature: B64Str,
sha256_artifact_hash: str,
b64_cert: B64Str,
proposed_entry: sigstore_rekor_types.Hashedrekord,
) -> LogEntry:
"""
Submit a new entry for inclusion in the Rekor log.
"""
# TODO(ww): Dedupe this payload construction with the retrieve endpoint below.
data = {
"kind": "hashedrekord",
"apiVersion": "0.0.1",
"spec": {
"signature": {
"content": b64_artifact_signature,
"publicKey": {"content": b64_cert},
},
"data": {
"hash": {"algorithm": "sha256", "value": sha256_artifact_hash}
},
},
}

resp: requests.Response = self.session.post(self.url, json=data)
resp: requests.Response = self.session.post(
self.url, json=proposed_entry.model_dump(mode="json", by_alias=True)
)
try:
resp.raise_for_status()
except requests.HTTPError as http_error:
Expand All @@ -186,9 +170,7 @@ class RekorEntriesRetrieve(_Endpoint):

def post(
self,
signature: bytes,
artifact_hash: str,
certificate: Certificate,
expected_entry: sigstore_rekor_types.Hashedrekord,
) -> Optional[LogEntry]:
"""
Retrieves an extant Rekor entry, identified by its artifact signature,
Expand All @@ -197,28 +179,7 @@ def post(
Returns None if Rekor has no entry corresponding to the signing
materials.
"""
data = {
"entries": [
{
"kind": "hashedrekord",
"apiVersion": "0.0.1",
"spec": {
"signature": {
"content": B64Str(base64.b64encode(signature).decode()),
"publicKey": {
"content": B64Str(base64_encode_pem_cert(certificate)),
},
},
"data": {
"hash": {
"algorithm": "sha256",
"value": artifact_hash,
}
},
},
}
]
}
data = {"entries": [expected_entry.model_dump(mode="json", by_alias=True)]}

resp: requests.Response = self.session.post(self.url, json=data)
try:
Expand Down
23 changes: 19 additions & 4 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from typing import IO, Iterator, Optional

import cryptography.x509 as x509
import sigstore_rekor_types
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
Expand Down Expand Up @@ -210,11 +211,25 @@ def sign(
)

# Create the transparency log entry
entry = self._signing_ctx._rekor.log.entries.post(
b64_artifact_signature=B64Str(b64_artifact_signature),
sha256_artifact_hash=input_digest.hex(),
b64_cert=B64Str(b64_cert.decode()),
proposed_entry = sigstore_rekor_types.Hashedrekord(
kind="hashedrekord",
api_version="0.0.1",
spec=sigstore_rekor_types.HashedrekordV001Schema(
signature=sigstore_rekor_types.Signature1(
content=b64_artifact_signature,
public_key=sigstore_rekor_types.PublicKey1(
content=b64_cert.decode()
),
),
data=sigstore_rekor_types.Data(
hash=sigstore_rekor_types.Hash(
algorithm=sigstore_rekor_types.Algorithm.SHA256,
value=input_digest.hex(),
)
),
),
)
entry = self._signing_ctx._rekor.log.entries.post(proposed_entry)

logger.debug(f"Transparency log entry created with index: {entry.log_index}")

Expand Down
66 changes: 29 additions & 37 deletions sigstore/verify/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from textwrap import dedent
from typing import IO

import sigstore_rekor_types
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import (
Certificate,
Expand Down Expand Up @@ -403,6 +404,28 @@ def rekor_entry(self, client: RekorClient) -> LogEntry:
f"has_inclusion_promise={has_inclusion_promise}"
)

# This "expected" entry is used both to retrieve the Rekor entry
# (if we don't have one) *and* to cross-check whatever response
# we receive. See below.
expected_entry = sigstore_rekor_types.Hashedrekord(
kind="hashedrekord",
api_version="0.0.1",
spec=sigstore_rekor_types.HashedrekordV001Schema(
signature=sigstore_rekor_types.Signature1(
content=base64.b64encode(self.signature).decode(),
public_key=sigstore_rekor_types.PublicKey1(
content=base64_encode_pem_cert(self.certificate)
),
),
data=sigstore_rekor_types.Data(
hash=sigstore_rekor_types.Hash(
algorithm=sigstore_rekor_types.Algorithm.SHA256,
value=self.input_digest.hex(),
),
),
),
)

entry: LogEntry | None = None
if offline:
logger.debug("offline mode; using offline log entry")
Expand All @@ -415,53 +438,22 @@ def rekor_entry(self, client: RekorClient) -> LogEntry:
# entry doesn't have one, then we perform a lookup.
if not has_inclusion_proof:
logger.debug("retrieving transparency log entry")
entry = client.log.entries.retrieve.post(
self.signature,
self.input_digest.hex(),
self.certificate,
)
entry = client.log.entries.retrieve.post(expected_entry)
else:
entry = self._rekor_entry

# No matter what we do above, we must end up with a Rekor entry.
if entry is None:
raise RekorEntryMissing

# To verify that an entry matches our other signing materials,
# we transform our signature, artifact hash, and certificate
# into a "hashedrekord" style payload and compare it against the
# entry's own body.
#
# This is done by:
#
# * Serializing the certificate as PEM, and then base64-encoding it;
# * base64-encoding the signature;
# * Packing the resulting cert, signature, and hash into the
# hashedrekord body format;
# * Comparing that body against the entry's own body, which
# is extracted from its base64(json(...)) encoding.

logger.debug("Rekor entry: ensuring contents match signing materials")

expected_body = {
"kind": "hashedrekord",
"apiVersion": "0.0.1",
"spec": {
"signature": {
"content": B64Str(base64.b64encode(self.signature).decode()),
"publicKey": {
"content": B64Str(base64_encode_pem_cert(self.certificate))
},
},
"data": {
"hash": {"algorithm": "sha256", "value": self.input_digest.hex()}
},
},
}

# To catch a potentially dishonest or compromised Rekor instance, we compare
# the expected entry (generated above) with the JSON structure returned
# by Rekor. If the two don't match, then we have an invalid entry
# and can't proceed.
actual_body = json.loads(base64.b64decode(entry.body))

if expected_body != actual_body:
if actual_body != expected_entry.model_dump(mode="json", by_alias=True):
raise InvalidRekorEntry

return entry
Expand Down
2 changes: 1 addition & 1 deletion test/unit/verify/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_rekor_entry_missing(self, signing_materials):
a_materials._rekor_entry = None
client = pretend.stub(
log=pretend.stub(
entries=pretend.stub(retrieve=pretend.stub(post=lambda a, b, c: None))
entries=pretend.stub(retrieve=pretend.stub(post=lambda a: None))
)
)

Expand Down