This repository was archived by the owner on Mar 9, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathverify_attestation.py
More file actions
237 lines (193 loc) · 8.53 KB
/
verify_attestation.py
File metadata and controls
237 lines (193 loc) · 8.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import base64
import cbor2
import cose
import json
import logging
from collections import namedtuple
import subprocess
import sys
import hashlib
from cose import EC2, CoseAlgorithms, CoseEllipticCurves
from Crypto.Util.number import long_to_bytes
from OpenSSL import crypto
measurement_path = "measurements.txt" # Fill this with path to PCR measurements
root_cert_path = "aws_root_cert.pem" # Root Cert given by AWS
enclave_url = "https://<INSERT_TEE_IP_HERE>/enclave/attestation" # Enclave attestation URl
nonce = "0123456789abcdef0123456789abcdef01234567" # User set Nonce
logging.basicConfig(
filename='verification_logs.log',
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
PCR_tuple = namedtuple("PCRs", ["PCR0", "PCR1", "PCR2"])
# This library is based on richardfan1126: nitro-enclave-python-demo
# (https://github.com/richardfan1126/nitro-enclave-python-demo/tree/master)
def get_pcrs() -> PCR_tuple:
"""
Gets expected PCR values from enclave measurements JSON, returns a tuple of the PCR values.
"""
with open(measurement_path, 'r') as file:
json_measurement_data = file.read()
try:
measurement_data = json.loads(json_measurement_data)
PCRs = PCR_tuple(measurement_data["Measurements"]["PCR0"],
measurement_data["Measurements"]["PCR1"],
measurement_data["Measurements"]["PCR2"])
logging.debug("Given PCR measurements:\n"
"PCR0 %s\n"
"PCR1 %s\n"
"PCR2 %s\n",
PCRs.PCR0,
PCRs.PCR1,
PCRs.PCR2)
except json.JSONDecodeError as e:
raise ValueError("Error reading measurement file for PCRs: %s" % e)
return PCRs
def get_root_cert_pem() -> str:
with open(root_cert_path, 'r') as file:
return file.read()
def get_attestation(url: str, nonce: str) -> str:
# Construct curl command
curl_command = [
"curl",
"-k",
"-G",
url,
"--data-urlencode",
f"nonce={nonce}"
]
# Run the curl command and capture the output
result = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Check if the command was successful
if result.returncode != 0:
print(f"Error: {result.stderr}")
return None
# Return the output of the curl command
if result.stdout == None:
print(f"Curl command result was None")
return result.stdout
def verify_attestation_doc(attestation_string: str) -> None:
"""
Verify the attestation document
This uses the expected PCR values stored in measurements.txt,
and the root_cert_pem provided by AWS Nitro Attestation PKI.
If invalid, raise an exception
"""
# Load in PCR and root cert pem
logging.debug("Loading expected PCR values and root cert pem")
expected_pcrs = get_pcrs()
root_cert_pem = get_root_cert_pem()
# Decode CBOR attestation document
decoded_data = base64.b64decode(attestation_string)
data = cbor2.loads(decoded_data)
# Load and decode document payload
doc = data[2]
doc_obj = cbor2.loads(doc)
logging.debug("Loaded an attestation document")
# Get PCRs from attestation document
document_pcrs_arr = doc_obj['pcrs']
# Expose user data
user_data = doc_obj['user_data']
logging.debug("Enclave generated user data: %s", user_data)
prefix_length = 2 # Taken from Nitriding documentation
hash_length = hashlib.sha256().digest_size # 32 bytes for a SHA256 hash
# Extract the tlsKeyHash
tls_key_start = prefix_length
tls_key_end = tls_key_start + hash_length
tls_key_hash = user_data[tls_key_start:tls_key_end]
# Extract the appKeyHash
app_key_start = tls_key_end + prefix_length
app_key_end = app_key_start + hash_length
app_key_hash = user_data[app_key_start:app_key_end]
logging.info("Enclave returned TLS key: %s", base64.b64encode(tls_key_hash).decode('utf-8'))
logging.info("Enclave returned app key: %s", base64.b64encode(app_key_hash).decode('utf-8'))
# Expose public key
# TODO (kyle): Write API to expose public key to inference node
# This will be needed for the sequencer to encrypt
# input data.
public_key = doc_obj['public_key']
logging.debug("Enclave generated public key: %s", public_key)
## Validating Attestation document ##
# 1. Validate PCRs and Nonce
logging.debug("Validating PCRs")
for index, expected_pcr in enumerate(expected_pcrs):
# Attestation document doesn't have specified PCR, raise exception
if index not in document_pcrs_arr or document_pcrs_arr[index] is None:
raise Exception("PCR%s not found" % index)
# Get PCR hexcode
doc_pcr = document_pcrs_arr[index].hex()
logging.debug("PCR%s:\n"
"Attestation value: %s\n"
"Expected PCR: %s",
index,
doc_pcr,
expected_pcr)
# Check if PCR match
if expected_pcr != doc_pcr:
logging.warn("PCRs do not match:\n"
"Attestation PCR%s: %s\n"
"Expected PCR%s: %s",
index, doc_pcr,
index, expected_pcr)
raise Exception("PCR%s does not match" % index)
logging.debug("Validating nonce")
# Check that nonce matches
attestation_nonce = doc_obj['nonce'].hex()
logging.info("Received nonce is %s", attestation_nonce)
logging.info("Given nonce is %s", nonce)
if attestation_nonce != nonce:
raise Exception(f"Attestation nonce: {attestation_nonce}, did not match given nonce: {nonce}")
# 2. Validate Signature
logging.debug("Validating signature of attestation document")
# Get signing certificate from attestation document
logging.debug("Getting signing certificate from attestation document:\n %s", doc_obj['certificate'])
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, doc_obj['certificate'])
# Get the key parameters from the cert public key
logging.debug("Creating EC2 key from the signing certificates public key")
cert_public_numbers = cert.get_pubkey().to_cryptography_key().public_numbers()
x = cert_public_numbers.x
y = cert_public_numbers.y
curve = cert_public_numbers.curve
x = long_to_bytes(x)
y = long_to_bytes(y)
# Create the EC2 key from public key parameters
key = EC2(alg = CoseAlgorithms.ES384, x = x, y = y, crv = CoseEllipticCurves.P_384)
# Get the protected header from attestation document
phdr = cbor2.loads(data[0])
# Construct the Sign1 message
logging.debug("Constructing Sign1 message from the attestation document")
msg = cose.Sign1Message(phdr = phdr, uhdr = data[1], payload = doc)
msg.signature = data[3]
# Verify the signature using the EC2 key
logging.debug("Comparing EC2 key against the Sign1")
if not msg.verify_signature(key):
raise Exception("Wrong signature")
logging.debug("Signature of attestation document verified")
# 3. Validate signing certificate PKI
logging.debug("Verifying the certificate of the attestation document "
"is signed by the root certificate of the AWS Nitro Attestation PKI")
if root_cert_pem is not None:
# Create an X509Store object for the CA bundles
store = crypto.X509Store()
# Create the CA cert object from PEM string, and store into X509Store
_cert = crypto.load_certificate(crypto.FILETYPE_PEM, root_cert_pem)
store.add_cert(_cert)
# Get the CA bundle from attestation document and store into X509Store
# Except the first certificate, which is the root certificate
for _cert_binary in doc_obj['cabundle'][1:]:
_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, _cert_binary)
store.add_cert(_cert)
# Get the X509Store context
store_ctx = crypto.X509StoreContext(store, cert)
# Validate the certificate
# If the cert is invalid, it will raise exception
store_ctx.verify_certificate()
logging.debug("Certificate verified by AWS Nitro Attestation PKI")
print("Verification successful")
return
if __name__ == "__main__":
print("Starting verification")
attestation_str = get_attestation(enclave_url, nonce)
print("Attestation string:\n", attestation_str)
verify_attestation_doc(attestation_str)