Skip to content

Commit 8f09297

Browse files
authored
fix: NanoTDF support (#114)
1 parent 9a3b4ae commit 8f09297

24 files changed

+2767
-445
lines changed

src/otdf_python/asym_crypto.py

Lines changed: 135 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
Asymmetric encryption and decryption utilities for RSA keys in PEM format.
33
"""
44

5+
import base64
6+
import re
7+
58
from cryptography.hazmat.backends import default_backend
69
from cryptography.hazmat.primitives import hashes, serialization
710
from cryptography.hazmat.primitives.asymmetric import padding, rsa
@@ -13,18 +16,68 @@
1316
class AsymDecryption:
1417
"""
1518
Provides functionality for asymmetric decryption using an RSA private key.
19+
20+
Supports both PEM string and key object initialization for flexibility.
1621
"""
1722

18-
def __init__(self, private_key_pem: str):
19-
try:
20-
self.private_key = serialization.load_pem_private_key(
21-
private_key_pem.encode(), password=None, backend=default_backend()
22-
)
23-
except Exception as e:
24-
raise SDKException(f"Failed to load private key: {e}")
23+
CIPHER_TRANSFORM = "RSA/ECB/OAEPWithSHA-1AndMGF1Padding"
24+
PRIVATE_KEY_HEADER = "-----BEGIN PRIVATE KEY-----"
25+
PRIVATE_KEY_FOOTER = "-----END PRIVATE KEY-----"
26+
27+
def __init__(self, private_key_pem: str | None = None, private_key_obj=None):
28+
"""
29+
Initialize with either a PEM string or a key object.
30+
31+
Args:
32+
private_key_pem: Private key in PEM format (with or without headers)
33+
private_key_obj: Pre-loaded private key object from cryptography library
34+
35+
Raises:
36+
SDKException: If key loading fails
37+
"""
38+
if private_key_obj is not None:
39+
self.private_key = private_key_obj
40+
elif private_key_pem is not None:
41+
try:
42+
# Try direct PEM loading first (most common case)
43+
try:
44+
self.private_key = serialization.load_pem_private_key(
45+
private_key_pem.encode(),
46+
password=None,
47+
backend=default_backend(),
48+
)
49+
except Exception:
50+
# Fallback: strip headers and load as DER (for base64-only keys)
51+
private_key_pem = (
52+
private_key_pem.replace(self.PRIVATE_KEY_HEADER, "")
53+
.replace(self.PRIVATE_KEY_FOOTER, "")
54+
.replace("\n", "")
55+
.replace("\r", "")
56+
.replace(" ", "")
57+
)
58+
decoded = base64.b64decode(private_key_pem)
59+
self.private_key = serialization.load_der_private_key(
60+
decoded, password=None, backend=default_backend()
61+
)
62+
except Exception as e:
63+
raise SDKException(f"Failed to load private key: {e}")
64+
else:
65+
self.private_key = None
2566

2667
def decrypt(self, data: bytes) -> bytes:
27-
if not self.private_key:
68+
"""
69+
Decrypt data using RSA OAEP with SHA-1.
70+
71+
Args:
72+
data: Encrypted bytes to decrypt
73+
74+
Returns:
75+
Decrypted bytes
76+
77+
Raises:
78+
SDKException: If decryption fails or key is not set
79+
"""
80+
if self.private_key is None:
2881
raise SDKException("Failed to decrypt, private key is empty")
2982
try:
3083
return self.private_key.decrypt(
@@ -42,26 +95,77 @@ def decrypt(self, data: bytes) -> bytes:
4295
class AsymEncryption:
4396
"""
4497
Provides functionality for asymmetric encryption using an RSA public key or certificate in PEM format.
98+
99+
Supports PEM public keys, X.509 certificates, and pre-loaded key objects.
100+
Also handles base64-encoded keys without PEM headers.
45101
"""
46102

47-
def __init__(self, public_key_pem: str):
48-
try:
49-
if "BEGIN CERTIFICATE" in public_key_pem:
50-
cert = load_pem_x509_certificate(
51-
public_key_pem.encode(), default_backend()
52-
)
53-
self.public_key = cert.public_key()
54-
else:
55-
self.public_key = serialization.load_pem_public_key(
56-
public_key_pem.encode(), backend=default_backend()
57-
)
58-
except Exception as e:
59-
raise SDKException(f"Failed to load public key: {e}")
103+
PUBLIC_KEY_HEADER = "-----BEGIN PUBLIC KEY-----"
104+
PUBLIC_KEY_FOOTER = "-----END PUBLIC KEY-----"
105+
CIPHER_TRANSFORM = "RSA/ECB/OAEPWithSHA-1AndMGF1Padding"
106+
107+
def __init__(self, public_key_pem: str | None = None, public_key_obj=None):
108+
"""
109+
Initialize with either a PEM string or a key object.
110+
111+
Args:
112+
public_key_pem: Public key in PEM format, X.509 certificate, or base64 string
113+
public_key_obj: Pre-loaded public key object from cryptography library
114+
115+
Raises:
116+
SDKException: If key loading fails or key is not RSA
117+
"""
118+
if public_key_obj is not None:
119+
self.public_key = public_key_obj
120+
elif public_key_pem is not None:
121+
try:
122+
if "BEGIN CERTIFICATE" in public_key_pem:
123+
# Load from X.509 certificate
124+
cert = load_pem_x509_certificate(
125+
public_key_pem.encode(), default_backend()
126+
)
127+
self.public_key = cert.public_key()
128+
else:
129+
# Try direct PEM loading first (most common case)
130+
try:
131+
self.public_key = serialization.load_pem_public_key(
132+
public_key_pem.encode(), backend=default_backend()
133+
)
134+
except Exception:
135+
# Fallback: strip headers and load as DER (for base64-only keys)
136+
pem_body = re.sub(r"-----BEGIN (.*)-----", "", public_key_pem)
137+
pem_body = re.sub(r"-----END (.*)-----", "", pem_body)
138+
pem_body = re.sub(r"\s", "", pem_body)
139+
decoded = base64.b64decode(pem_body)
140+
self.public_key = serialization.load_der_public_key(
141+
decoded, backend=default_backend()
142+
)
143+
except Exception as e:
144+
raise SDKException(f"Failed to load public key: {e}")
145+
else:
146+
self.public_key = None
60147

61-
if not isinstance(self.public_key, rsa.RSAPublicKey):
148+
# Validate that it's an RSA key
149+
if self.public_key is not None and not isinstance(
150+
self.public_key, rsa.RSAPublicKey
151+
):
62152
raise SDKException("Not an RSA PEM formatted public key")
63153

64154
def encrypt(self, data: bytes) -> bytes:
155+
"""
156+
Encrypt data using RSA OAEP with SHA-1.
157+
158+
Args:
159+
data: Plaintext bytes to encrypt
160+
161+
Returns:
162+
Encrypted bytes
163+
164+
Raises:
165+
SDKException: If encryption fails or key is not set
166+
"""
167+
if self.public_key is None:
168+
raise SDKException("Failed to encrypt, public key is empty")
65169
try:
66170
return self.public_key.encrypt(
67171
data,
@@ -75,6 +179,15 @@ def encrypt(self, data: bytes) -> bytes:
75179
raise SDKException(f"Error performing encryption: {e}")
76180

77181
def public_key_in_pem_format(self) -> str:
182+
"""
183+
Export the public key to PEM format.
184+
185+
Returns:
186+
Public key as PEM-encoded string
187+
188+
Raises:
189+
SDKException: If export fails
190+
"""
78191
try:
79192
pem = self.public_key.public_bytes(
80193
encoding=serialization.Encoding.PEM,

src/otdf_python/asym_decryption.py

Lines changed: 0 additions & 53 deletions
This file was deleted.

src/otdf_python/asym_encryption.py

Lines changed: 0 additions & 75 deletions
This file was deleted.

src/otdf_python/cli.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,13 @@ def create_nano_tdf_config(sdk: SDK, args) -> NanoTDFConfig:
201201
kas_endpoints = parse_kas_endpoints(args.kas_endpoint)
202202
kas_info_list = [KASInfo(url=kas_url) for kas_url in kas_endpoints]
203203
config.kas_info_list.extend(kas_info_list)
204+
elif args.platform_url:
205+
# If no explicit KAS endpoint provided, derive from platform URL
206+
# This matches the default KAS path convention
207+
kas_url = args.platform_url.rstrip("/") + "/kas"
208+
logger.debug(f"Deriving KAS endpoint from platform URL: {kas_url}")
209+
kas_info = KASInfo(url=kas_url)
210+
config.kas_info_list.append(kas_info)
204211

205212
if hasattr(args, "policy_binding") and args.policy_binding:
206213
if args.policy_binding.lower() == "ecdsa":
@@ -554,7 +561,7 @@ def main():
554561
sys.exit(1)
555562
except Exception as e:
556563
logger.error(f"Unexpected error: {e}")
557-
logger.debug("", exc_info=True)
564+
logger.error("", exc_info=True) # Always print traceback for unexpected errors
558565
sys.exit(1)
559566

560567

0 commit comments

Comments
 (0)