Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release-notes.d/add-imgtool-pkcs11-ecdsa.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added support for PKCS#11 URIs and ECDSA-P384 keys.
5 changes: 3 additions & 2 deletions scripts/imgtool/image.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2018 Nordic Semiconductor ASA
# Copyright 2017-2020 Linaro Limited
# Copyright 2019-2024 Arm Limited
# Copyright 2019-2025 Arm Limited
#
# SPDX-License-Identifier: Apache-2.0
#
Expand Down Expand Up @@ -184,6 +184,7 @@ def tlv_sha_to_sha(tlv):
ALLOWED_KEY_SHA = {
keys.ECDSA384P1 : ['384'],
keys.ECDSA384P1Public : ['384'],
keys.PKCS11 : ['384'],
keys.ECDSA256P1 : ['256'],
keys.ECDSA256P1Public : ['256'],
keys.RSA : ['256'],
Expand Down Expand Up @@ -220,7 +221,7 @@ def key_and_user_sha_to_alg_and_tlv(key, user_sha, is_pure = False):
allowed = allowed_key_ssh[type(key)]

except KeyError:
raise click.UsageError("Colud not find allowed hash algorithms for {}"
raise click.UsageError("Could not find allowed hash algorithms for {}"
.format(type(key)))

# Pure enforces auto, and user selection is ignored
Expand Down
19 changes: 19 additions & 0 deletions scripts/imgtool/keys/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey, X25519PublicKey)

import pkcs11
import pkcs11.exceptions
import sys

from .rsa import RSA, RSAPublic, RSAUsageError, RSA_KEY_SIZES
from .ecdsa import (ECDSA256P1, ECDSA256P1Public,
ECDSA384P1, ECDSA384P1Public, ECDSAUsageError)
from .ed25519 import Ed25519, Ed25519Public, Ed25519UsageError
from .x25519 import X25519, X25519Public, X25519UsageError

from .imgtool_keys_pkcs11 import PKCS11


class PasswordRequired(Exception):
"""Raised to indicate that the key is password protected, but a
Expand All @@ -44,6 +50,19 @@ class PasswordRequired(Exception):


def load(path, passwd=None):
if path.startswith("pkcs11:"):
try:
return PKCS11(path) # assume a PKCS #11 URI according to RFC7512
except pkcs11.exceptions.PinIncorrect:
print('ERROR: WRONG PIN')
sys.exit(1)
except pkcs11.exceptions.PinLocked:
print('ERROR: WRONG PIN, MAX ATTEMPTS REACHED. CONTACT YOUR SECURITY OFFICER.')
sys.exit(1)
except pkcs11.exceptions.DataLenRange:
print('ERROR: PIN IS TOO SHORT OR TOO LONG')
sys.exit(1)

"""Try loading a key from the given path.
Returns None if the password wasn't specified."""
with open(path, 'rb') as f:
Expand Down
202 changes: 202 additions & 0 deletions scripts/imgtool/keys/imgtool_keys_pkcs11.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""
PKCS11 key management
"""
# SPDX-License-Identifier: Apache-2.0

import hashlib
import os
import pkcs11
import pkcs11.util.ec

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import (
load_der_public_key,
Encoding,
PublicFormat
)
from cryptography.hazmat.primitives.asymmetric.ec import (
ECDSA, SECP256R1, SECP384R1,
EllipticCurvePublicKey
)
from urllib.parse import unquote, urlparse

from .general import KeyClass


def unquote_to_bytes(urlencoded_string):
"""Replace %xx escapes by their single-character equivalent,
using the “iso-8859-1” encoding to decode all 8-bit values.
"""
return bytes(
unquote(urlencoded_string, encoding='iso-8859-1'),
encoding='iso-8859-1'
)

def get_pkcs11_uri_params(uri):
"""Return a dict of decoded URI key=val pairs
"""
uri_tokens = urlparse(uri)
assert uri_tokens.scheme == 'pkcs11'
assert uri_tokens.query == ''
assert uri_tokens.fragment == ''
return {
unquote_to_bytes(key): unquote_to_bytes(value)
for key, value
in [
line.split('=')
for line
in uri_tokens.path.split(';')
]
}

class PKCS11UsageError(Exception):
pass


class PKCS11(KeyClass):
"""
Wrapper around an ECDSA P384 key accessed via PKCS#11 URIs
"""
def __init__(self, uri, env=None):
if env is None:
env = os.environ
if 'PKCS11_PIN' not in env:
raise RuntimeError("Environment variable PKCS11_PIN not set. Set it to the user PIN.")
params = get_pkcs11_uri_params(uri)
assert b'serial' in params
assert b'id' in params or b'label' in params
self.user_pin = env['PKCS11_PIN']

# Fall back to OpenSC
pkcs11_module_path = env.get('PKCS11_MODULE', 'opensc-pkcs11.so')

try:
lib = pkcs11.lib(pkcs11_module_path)
except RuntimeError:
raise RuntimeError(f"PKCS11 module {pkcs11_module_path} not loaded.")

self.token = lib.get_token(token_serial=params[b'serial'])
# try to open a session to see if the PIN is valid
with self.token.open(user_pin=self.user_pin) as _:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if PIN is not valid? Should we try.. except the code to print a user friendly message?

pass
self.key_id = params.get(b'id', None)
self.key_label = params.get(b'label', None)
self.key_label = self.key_label.decode('utf-8') if self.key_label else None

def shortname(self):
return "ecdsa"

def _unsupported(self, name):
raise PKCS11UsageError(f"Operation {name} requires private key")

def get_public_bytes(self):
with self.token.open(user_pin=self.user_pin) as session:
pub = session.get_key(
id=self.key_id,
label=self.key_label,
key_type=pkcs11.KeyType.EC,
object_class=pkcs11.ObjectClass.PUBLIC_KEY
)
key = pkcs11.util.ec.encode_ec_public_key(pub)
return key

def get_private_bytes(self, minimal):
self._unsupported('get_private_bytes')

def export_private(self, path, passwd=None):
self._unsupported('export_private')

def export_public(self, path):
"""Write the public key to the given file."""
with self.token.open(user_pin=self.user_pin) as session:
pub = session.get_key(
id=self.key_id,
label=self.key_label,
key_type=pkcs11.KeyType.EC,
object_class=pkcs11.ObjectClass.PUBLIC_KEY
)
# Encode to DER
der_bytes = pkcs11.util.ec.encode_ec_public_key(pub)

# Convert to PEM using cryptography
public_key = load_der_public_key(der_bytes)
pem = public_key.public_bytes(
encoding=Encoding.PEM,
format=PublicFormat.SubjectPublicKeyInfo
)

with open(path, 'wb') as f:
f.write(pem)

def sig_type(self):
return "ECDSA384_SHA384"

def sig_tlv(self):
return "ECDSASIG"

def sig_len(self):
# Early versions of MCUboot (< v1.5.0) required ECDSA
# signatures to be padded to a fixed length. Because the DER
# encoding is done with signed integers, the size of the
# signature will vary depending on whether the high bit is set
# in each value. This padding was done in a
# not-easily-reversible way (by just adding zeros).
#
# The signing code no longer requires this padding, and newer
# versions of MCUboot don't require it. But, continue to
# return the total length so that the padding can be done if
# requested.
return 103

def raw_sign(self, payload):
"""Return the actual signature"""
with self.token.open(user_pin=self.user_pin) as session:
priv = session.get_key(
id=self.key_id,
label=self.key_label,
key_type=pkcs11.KeyType.EC,
object_class=pkcs11.ObjectClass.PRIVATE_KEY
)
sig = priv.sign(
hashlib.sha384(payload).digest(),
mechanism=pkcs11.Mechanism.ECDSA
)
return pkcs11.util.ec.encode_ecdsa_signature(sig)

def sign(self, payload):
"""Return signature with legacy padding"""
# To make fixed length, pad with one or two zeros.
while True:
sig = self.raw_sign(payload)
if sig[-1] != 0x00:
break

sig += b'\000' * (self.sig_len() - len(sig))
return sig

def verify(self, signature, payload):
"""Verify the signature of the payload"""
# strip possible paddings added during sign
signature = signature[:signature[1] + 2]

# Load public key from DER bytes
public_key = load_der_public_key(self.get_public_bytes())

if not isinstance(public_key, EllipticCurvePublicKey):
raise TypeError(f"Unsupported key type: {type(public_key).__name__}")

# Determine correct hash algorithm based on curve
if isinstance(public_key.curve, SECP256R1):
hash_alg = hashes.SHA256()
elif isinstance(public_key.curve, SECP384R1):
hash_alg = hashes.SHA384()
else:
raise ValueError(f"Unsupported curve: {public_key.curve.name}")

try:
# Attempt ECDSA verification
public_key.verify(signature, payload, ECDSA(hash_alg))
return True
except InvalidSignature:
return False
Loading