7
7
import hashlib
8
8
import logging
9
9
import os
10
+ from pathlib import Path
10
11
import shutil
11
12
import struct
12
13
from datetime import datetime , timedelta
13
- from tempfile import TemporaryDirectory
14
+ from tempfile import TemporaryDirectory , mkstemp
15
+ from typing import Iterable , Literal , Optional , Union , cast
14
16
from uuid import UUID
15
17
16
18
from cryptography import x509
17
19
from cryptography .hazmat .primitives import hashes , serialization
18
20
from cryptography .hazmat .primitives .serialization import Encoding , pkcs7
21
+ from cryptography .hazmat .primitives .serialization .pkcs7 import PKCS7PrivateKeyTypes
19
22
20
23
import lib .commands as commands
21
24
22
25
26
+ class _EfiGlobalTempdir :
27
+ _instance = None
28
+
29
+ def _safe_cleanup (self ):
30
+ if self ._instance is not None :
31
+ try :
32
+ self ._instance .cleanup ()
33
+ except OSError :
34
+ pass
35
+
36
+ def get (self ):
37
+ if self ._instance is None :
38
+ self ._instance = TemporaryDirectory ()
39
+ atexit .register (self ._safe_cleanup )
40
+ return self ._instance
41
+
42
+ def getfile (self , suffix = None , prefix = None ):
43
+ fd , path = mkstemp (suffix = suffix , prefix = prefix , dir = self .get ().name )
44
+ os .close (fd )
45
+ return path
46
+
47
+
48
+ _tempdir = _EfiGlobalTempdir ()
49
+
50
+
23
51
class GUID (UUID ):
24
52
def as_bytes (self ):
25
53
return self .bytes_le
@@ -128,8 +156,14 @@ def get_secure_boot_guid(variable: str) -> GUID:
128
156
def cert_to_efi_sig_list (cert ):
129
157
"""Return an ESL from a PEM cert."""
130
158
with open (cert , 'rb' ) as f :
131
- pem = f .read ()
132
- cert = x509 .load_pem_x509_certificate (pem )
159
+ cert_raw = f .read ()
160
+ # Cert files can come in either PEM or DER form, and we can't assume
161
+ # that they come in a specific form. Since `cryptography` doesn't have
162
+ # a way to detect cert format, we have to detect it ourselves.
163
+ try :
164
+ cert = x509 .load_pem_x509_certificate (cert_raw )
165
+ except ValueError :
166
+ cert = x509 .load_der_x509_certificate (cert_raw )
133
167
der = cert .public_bytes (Encoding .DER )
134
168
135
169
signature_type = EFI_CERT_X509_GUID
@@ -164,7 +198,13 @@ def certs_to_sig_db(certs) -> bytes:
164
198
return db
165
199
166
200
167
- def sign_efi_sig_db (sig_db , var , key , cert , time = None , guid = None ):
201
+ def sign_efi_sig_db (
202
+ sig_db : bytes ,
203
+ var : str ,
204
+ key : str ,
205
+ cert : str ,
206
+ time : Optional [datetime ] = None ,
207
+ guid : Optional [GUID ] = None ):
168
208
"""Return a pkcs7 SignedData from a UEFI signature database."""
169
209
global p7_out
170
210
@@ -214,10 +254,10 @@ def sign_efi_sig_db(sig_db, var, key, cert, time=None, guid=None):
214
254
return create_auth2_header (p7 , timestamp ) + p7 + sig_db
215
255
216
256
217
- def sign (payload , key_file , cert_file ):
257
+ def sign (payload : bytes , key_file : str , cert_file : str ):
218
258
"""Returns a signed PKCS7 of payload signed by key and cert."""
219
259
with open (key_file , 'rb' ) as f :
220
- priv_key = serialization .load_pem_private_key (f .read (), password = None )
260
+ priv_key = cast ( PKCS7PrivateKeyTypes , serialization .load_pem_private_key (f .read (), password = None ) )
221
261
222
262
with open (cert_file , 'rb' ) as f :
223
263
cert = x509 .load_pem_x509_certificate (f .read ())
@@ -231,12 +271,12 @@ def sign(payload, key_file, cert_file):
231
271
return (
232
272
pkcs7 .PKCS7SignatureBuilder ()
233
273
.set_data (payload )
234
- .add_signer (cert , priv_key , hashes .SHA256 ()) # type: ignore
274
+ .add_signer (cert , priv_key , hashes .SHA256 ())
235
275
.sign (serialization .Encoding .DER , options )
236
276
)
237
277
238
278
239
- def create_auth2_header (sig_db , timestamp ):
279
+ def create_auth2_header (sig_db : bytes , timestamp : bytes ):
240
280
"""Return an EFI_AUTHENTICATE_VARIABLE_2 from a signature database."""
241
281
length = len (sig_db ) + WIN_CERTIFICATE_UEFI_GUID_offset
242
282
revision = 0x200
@@ -293,52 +333,67 @@ def pesign(key, cert, name, image):
293
333
294
334
295
335
class Certificate :
296
- def __init__ (self , common_name = 'XCP-ng Test Common Name' , init_keys = True ):
297
- self .common_name = common_name
298
- self .name = common_name .replace (' ' , '_' ).lower ()
299
- self .tempdir = TemporaryDirectory (prefix = 'cert_' + self .name )
300
- atexit .register (self .tempdir .cleanup )
301
- self .key = os .path .join (self .tempdir .name , '%s.key' % self .name )
302
- self .pub = os .path .join (self .tempdir .name , 'tmp.crt' )
303
-
304
- if init_keys :
305
- commands .local_cmd ([
306
- 'openssl' , 'req' , '-new' , '-x509' , '-newkey' , 'rsa:2048' ,
307
- '-subj' , '/CN=%s/' % self .common_name , '-nodes' , '-keyout' ,
308
- self .key , '-sha256' , '-days' , '3650' , '-out' , self .pub
309
- ])
336
+ def __init__ (self , pub : str , key : Optional [str ]):
337
+ self .pub = pub
338
+ self .key = key
339
+
340
+ @classmethod
341
+ def self_signed (cls , common_name = 'XCP-ng Test Common Name' ):
342
+ pub = _tempdir .getfile (suffix = '.pem' )
343
+ key = _tempdir .getfile (suffix = '.pem' )
344
+
345
+ commands .local_cmd ([
346
+ 'openssl' , 'req' , '-new' , '-x509' , '-newkey' , 'rsa:2048' ,
347
+ '-subj' , '/CN=%s/' % common_name , '-nodes' , '-keyout' ,
348
+ key , '-sha256' , '-days' , '3650' , '-out' , pub
349
+ ])
350
+
351
+ return cls (pub , key )
310
352
311
- def sign_data (self , var , data , guid ):
353
+ def sign_efi_sig_db (self , var : str , data : bytes , guid : Optional [GUID ]):
354
+ assert self .key is not None
312
355
return sign_efi_sig_db (
313
356
data , var , self .key , self .pub , time = timestamp (), guid = guid
314
357
)
315
358
316
- def _get_cert_path (self ):
317
- return os .path .join (
318
- self .tempdir .name , '_' .join (self .common_name .split ()) + '.crt'
319
- )
320
-
321
359
def copy (self ):
322
- obj = Certificate (common_name = self .common_name , init_keys = False )
323
- shutil .copyfile (self .key , obj .key )
324
- shutil .copyfile (self .pub , obj .pub )
325
- return obj
360
+ newpub = _tempdir .getfile (suffix = '.pem' )
361
+ shutil .copyfile (self .pub , newpub )
362
+
363
+ newkey = None
364
+ if self .key is not None :
365
+ newkey = _tempdir .getfile (suffix = '.pem' )
366
+ shutil .copyfile (self .key , newkey )
367
+
368
+ return Certificate (newpub , newkey )
326
369
327
370
328
371
class EFIAuth :
329
- def __init__ (self , name , is_null = False ):
330
- if name not in SECURE_BOOT_VARIABLES :
331
- raise RuntimeError (f"{ name } is not a secure boot variable" )
372
+ _auth_data : Optional [bytes ]
373
+ name : Literal ["PK" , "KEK" , "db" , "dbx" ]
374
+
375
+ def __init__ (
376
+ self ,
377
+ name : Literal ["PK" , "KEK" , "db" , "dbx" ],
378
+ owner_cert : Optional [Certificate ] = None ,
379
+ other_certs : Optional [Iterable [Union [Certificate , str ]]] = None ):
380
+ assert name in SECURE_BOOT_VARIABLES
381
+ # No point having an owner cert without a matching private key
382
+ assert owner_cert is None or owner_cert .key is not None
332
383
self .name = name
333
- self .is_null = is_null
334
384
self .guid = get_secure_boot_guid (self .name )
335
- self .key = ''
336
- self .cert = Certificate ()
337
- self .tempdir = TemporaryDirectory (prefix = name + '_' )
338
- atexit .register (self .tempdir .cleanup )
339
- self .efi_signature_list = self ._get_efi_signature_list ()
340
- self .auth_data = None
341
- self .auth = os .path .join (self .tempdir .name , '%s.auth' % self .name )
385
+ self ._owner_cert = owner_cert
386
+ self ._other_certs = list (other_certs or [])
387
+ self ._efi_signature_list = self ._get_efi_signature_list ()
388
+ self ._auth_data = None
389
+ self ._auth = _tempdir .getfile (suffix = '.auth' )
390
+
391
+ @classmethod
392
+ def self_signed (
393
+ cls ,
394
+ name : Literal ["PK" , "KEK" , "db" , "dbx" ],
395
+ other_certs : Optional [Iterable [Union [Certificate , str ]]] = None ):
396
+ return cls (name , owner_cert = Certificate .self_signed (name + " Owner" ), other_certs = other_certs )
342
397
343
398
def is_signed (self ):
344
399
return self ._auth_data is not None
@@ -351,19 +406,22 @@ def auth(self):
351
406
assert self .is_signed ()
352
407
return self ._auth
353
408
354
- def sign_auth (self , other : 'EFIAuth' ):
409
+ def sign_auth (self , to_be_signed : 'EFIAuth' ):
355
410
"""
356
411
Sign another EFIAuth object.
357
412
358
413
The other EFIAuth's member `auth` will be set to
359
414
the path of the .auth file.
360
415
"""
361
- other .auth_data = self .cert .sign_data (
362
- other .name , other .efi_signature_list , other .guid
416
+ assert self ._owner_cert is not None
417
+
418
+ auth_data = self ._owner_cert .sign_efi_sig_db (
419
+ to_be_signed .name , to_be_signed ._efi_signature_list , to_be_signed .guid
363
420
)
421
+ to_be_signed ._auth_data = auth_data
364
422
365
- with open (other . auth , 'wb' ) as f :
366
- f .write (other . auth_data )
423
+ with open (to_be_signed . _auth , 'wb' ) as f :
424
+ f .write (auth_data )
367
425
368
426
def sign_image (self , image : str ) -> str :
369
427
"""
@@ -376,19 +434,19 @@ def sign_image(self, image: str) -> str:
376
434
377
435
Returns path to signed image.
378
436
"""
437
+ assert self ._owner_cert is not None
379
438
if shutil .which ('sbsign' ):
380
439
signed = get_signed_name (image )
381
440
commands .local_cmd ([
382
- 'sbsign' , '--key' , self .cert .key , '--cert' , self .cert .pub ,
441
+ 'sbsign' , '--key' , self ._owner_cert .key , '--cert' , self ._owner_cert .pub ,
383
442
image , '--output' , signed
384
443
])
385
444
else :
386
- signed = pesign (self .cert .key , self .cert .pub , self .name , image )
445
+ signed = pesign (self ._owner_cert .key , self ._owner_cert .pub , self .name , image )
387
446
388
447
return signed
389
448
390
- @classmethod
391
- def copy (cls , other , name = None ):
449
+ def copy (self , name : Optional [Literal ["PK" , "KEK" , "db" , "dbx" ]] = None ):
392
450
"""
393
451
Make a copy of an existing EFIAuth object.
394
452
@@ -408,24 +466,35 @@ def copy(cls, other, name=None):
408
466
409
467
This is ONLY useful for creating a new handle.
410
468
"""
411
- if name is None :
412
- name = other .name
469
+ assert self ._owner_cert is not None
470
+
471
+ newname = name or self .name
413
472
414
- obj = cls (name = name , is_null = other .is_null )
415
- obj .cert = other .cert .copy ()
416
- obj .efi_signature_list = other .efi_signature_list
473
+ copied = EFIAuth (
474
+ name = newname ,
475
+ owner_cert = self ._owner_cert .copy (),
476
+ other_certs = self ._other_certs .copy ())
477
+ copied ._efi_signature_list = self ._efi_signature_list
417
478
418
- if other .is_signed ():
419
- obj . auth_data = copy .copy (other . auth_data )
420
- shutil .copyfile (other . auth , obj . auth )
479
+ if self .is_signed ():
480
+ copied . _auth_data = copy .copy (self . _auth_data )
481
+ shutil .copyfile (self . _auth , copied . _auth )
421
482
422
- return obj
483
+ return copied
423
484
424
485
def _get_efi_signature_list (self ) -> bytes :
425
- if self .is_null :
426
- return b''
486
+ certs = []
487
+ if self ._owner_cert is not None :
488
+ certs .append (self ._owner_cert .pub )
489
+ for other_cert in self ._other_certs :
490
+ if isinstance (other_cert , str ):
491
+ certs .append (other_cert )
492
+ elif isinstance (other_cert , Certificate ):
493
+ certs .append (other_cert .pub )
494
+ else :
495
+ raise TypeError ('other_cert is not Certificate or str' )
427
496
428
- return certs_to_sig_db (self . cert . pub )
497
+ return certs_to_sig_db (certs )
429
498
430
499
431
500
def esl_from_auth_file (auth : str ) -> bytes :
@@ -442,16 +511,16 @@ def esl_from_auth_file(auth: str) -> bytes:
442
511
return esl_from_auth_bytes (data )
443
512
444
513
445
- def esl_from_auth_bytes (auth : bytes ) -> bytes :
514
+ def esl_from_auth_bytes (auth_data : bytes ) -> bytes :
446
515
"""
447
516
Return the ESL contained inside the AUTH2 structure.
448
517
449
518
Warning: This will break if used on any ESL containing certs of non-X509 GUID type.
450
519
All of the certs used in Secure Boot are X509 GUID type.
451
520
"""
452
- return auth [ auth .index (EFI_CERT_X509_GUID ):]
521
+ return auth_data [ auth_data .index (EFI_CERT_X509_GUID ):]
453
522
454
- def get_md5sum_from_auth (auth ):
523
+ def get_md5sum_from_auth (auth : str ):
455
524
return hashlib .md5 (esl_from_auth_file (auth )).hexdigest ()
456
525
457
526
if __name__ == '__main__' :
0 commit comments