Skip to content

Commit b3385e7

Browse files
Allow to persist the created CA
1 parent f09ebfa commit b3385e7

File tree

4 files changed

+245
-2
lines changed

4 files changed

+245
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ uv.lock
44
.pytest_cache
55
.ruff_cache
66
docs/_*
7+
*.pem
78

89
.serena
910
CLAUDE.md

docs/custom-ca.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,80 @@ To intercept HTTPS traffic, you need to create your own CA certificate and confi
1717
to trust this CA. This allows the proxy to generate and sign certificates for the target domains on-the-fly,
1818
allowing it to decrypt and inspect the HTTPS traffic.
1919

20+
By default the library create a temporary Root CA on the fly but if you want to trust the CA in your
21+
browser you will need the persist the CA.
22+
23+
## Using Custom CA Keys with TLSStore
24+
25+
The `TLSStore` class can accept an existing CA private key and certificate instead of generating new ones. This is useful when you want to:
26+
27+
- Reuse the same CA across multiple proxy runs
28+
- Use a pre-existing CA certificate
29+
- Maintain certificate consistency
30+
31+
### Basic Usage
32+
33+
```python
34+
from asyncio_https_proxy import TLSStore
35+
36+
# Load existing CA key and certificate from files using the proper method
37+
tls_store = TLSStore.load_ca_from_disk("ca_private_key.pem", "ca_certificate.pem")
38+
```
39+
40+
### Persisting CA to Disk
41+
42+
To persist a CA to disk for reuse across multiple proxy runs:
43+
44+
```python
45+
from asyncio_https_proxy import TLSStore
46+
47+
# Create a new TLS store (will generate a new CA)
48+
tls_store = TLSStore()
49+
50+
# Save the CA to disk for future use
51+
tls_store.save_ca_to_disk("ca_private_key.pem", "ca_certificate.pem")
52+
```
53+
54+
### Complete Persistence Example
55+
56+
```python
57+
from asyncio_https_proxy import TLSStore
58+
import os
59+
60+
# Check if CA files already exist
61+
if os.path.exists("ca_private_key.pem") and os.path.exists("ca_certificate.pem"):
62+
# Load existing CA from disk
63+
tls_store = TLSStore.load_ca_from_disk("ca_private_key.pem", "ca_certificate.pem")
64+
print("Loaded existing CA from disk")
65+
else:
66+
# Create new CA and save it to disk
67+
tls_store = TLSStore()
68+
tls_store.save_ca_to_disk("ca_private_key.pem", "ca_certificate.pem")
69+
print("Created new CA and saved to disk")
70+
```
71+
72+
### Persistent CA Example
73+
74+
For a complete example of creating and reusing a CA across multiple proxy runs, see [`examples/persistent_ca_usage.py`](../examples/persistent_ca_usage.py). This example demonstrates:
75+
76+
- Generating a CA and saving it to disk if it doesn't exist
77+
- Loading an existing CA from disk on subsequent runs
78+
- Proper error handling for CA file operations
79+
80+
```python
81+
# Run the persistent CA example
82+
python examples/persistent_ca_usage.py
83+
```
84+
85+
The example will create `ca_private_key.pem` and `ca_certificate.pem` files in the current directory and reuse them on subsequent runs.
86+
87+
### Important Considerations
88+
89+
- Both `ca_key` and `ca_cert` parameters must be provided together or neither
90+
- The CA key must be an EllipticCurve private key (SECP256R1)
91+
- Store CA private keys securely and restrict file permissions appropriately
92+
- Consider the certificate validity period when reusing CA certificates
93+
2094
## Usage of custom CA in your applications
2195

2296
### Curl

examples/persistent_ca_usage.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Persistent CA usage example for asyncio-https-proxy.
4+
5+
This example demonstrates how to create and reuse a Certificate Authority (CA)
6+
across multiple proxy runs. If CA files don't exist, creates a default TLSStore
7+
and saves its CA to disk for reuse on subsequent runs.
8+
"""
9+
10+
import asyncio
11+
from pathlib import Path
12+
13+
from asyncio_https_proxy import HTTPSForwardProxyHandler, TLSStore, start_proxy_server
14+
15+
CA_KEY_FILE = "ca_private_key.pem"
16+
CA_CERT_FILE = "ca_certificate.pem"
17+
18+
19+
def get_or_create_ca():
20+
"""Get existing CA from disk or create a new TLSStore and persist its CA."""
21+
ca_files_exist = Path(CA_KEY_FILE).exists() and Path(CA_CERT_FILE).exists()
22+
23+
if ca_files_exist:
24+
print("Loading existing CA from disk...")
25+
tls_store = TLSStore.load_ca_from_disk(CA_KEY_FILE, CA_CERT_FILE)
26+
print("✅ CA loaded from disk")
27+
return tls_store
28+
else:
29+
print("No existing CA files found.")
30+
31+
# Create new TLSStore with default CA generation
32+
print("Creating new TLS store...")
33+
tls_store = TLSStore()
34+
35+
# Save the generated CA to disk for future use
36+
print("Saving CA to disk for future reuse...")
37+
tls_store.save_ca_to_disk(CA_KEY_FILE, CA_CERT_FILE)
38+
print(f"✅ CA key saved to: {CA_KEY_FILE}")
39+
print(f"✅ CA certificate saved to: {CA_CERT_FILE}")
40+
41+
return tls_store
42+
43+
44+
class LoggingForwardProxyHandler(HTTPSForwardProxyHandler):
45+
"""Example forward proxy handler with logging."""
46+
47+
async def on_client_connected(self):
48+
print(f"Client connected: {self.request}")
49+
await super().on_client_connected()
50+
51+
async def on_request_received(self):
52+
print(f"Request: {self.request.method} {self.request.url()}")
53+
await super().on_request_received()
54+
55+
56+
async def main():
57+
"""Run a proxy with persistent CA."""
58+
59+
host = "127.0.0.1"
60+
port = 8888
61+
62+
print("=" * 60)
63+
print("HTTPS Forward Proxy with Persistent CA")
64+
print("=" * 60)
65+
66+
# Get existing CA from disk or create new TLSStore and persist its CA
67+
tls_store = get_or_create_ca()
68+
69+
print(f"\nStarting HTTPS forward proxy on {host}:{port}")
70+
print("\nTest the proxy with:")
71+
print(f" curl --cacert {CA_CERT_FILE} --proxy http://{host}:{port} https://httpbin.org/get")
72+
print(f" curl --cacert {CA_CERT_FILE} --proxy http://{host}:{port} http://httpbin.org/get")
73+
print("\nPress Ctrl+C to stop the proxy")
74+
print("=" * 60)
75+
76+
server = await start_proxy_server(
77+
handler_builder=lambda: LoggingForwardProxyHandler(),
78+
host=host,
79+
port=port,
80+
tls_store=tls_store,
81+
)
82+
83+
async with server:
84+
try:
85+
await server.serve_forever()
86+
except KeyboardInterrupt:
87+
print("\nShutting down proxy...")
88+
server.close()
89+
await server.wait_closed()
90+
print("Proxy shut down.")
91+
92+
93+
if __name__ == "__main__":
94+
asyncio.run(main())

src/asyncio_https_proxy/tls_store.py

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import datetime
22
import ssl
33
import tempfile
4+
from pathlib import Path
5+
from typing import Optional, Union
46

57
from cryptography import x509
68
from cryptography.hazmat.primitives import hashes, serialization
@@ -13,10 +15,25 @@
1315
class TLSStore:
1416
"""
1517
A simple in-memory TLS store that generates a CA and signs certificates for domains on the fly.
18+
19+
Args:
20+
ca_key: Optional CA private key. If provided, ca_cert must also be provided.
21+
ca_cert: Optional CA certificate. If provided, ca_key must also be provided.
1622
"""
1723

18-
def __init__(self):
19-
self._ca = self._generate_ca()
24+
def __init__(
25+
self,
26+
ca_key: Optional[ec.EllipticCurvePrivateKey] = None,
27+
ca_cert: Optional[x509.Certificate] = None,
28+
):
29+
if (ca_key is None) != (ca_cert is None):
30+
raise ValueError("Both ca_key and ca_cert must be provided together, or neither")
31+
32+
if ca_key is not None and ca_cert is not None:
33+
self._ca = (ca_key, ca_cert)
34+
else:
35+
self._ca = self._generate_ca()
36+
2037
self._store = {}
2138

2239
def _generate_ca(self):
@@ -181,3 +198,60 @@ def get_ssl_context(self, domain: str) -> ssl.SSLContext:
181198
ssl_context.load_cert_chain(certfile=cert_file.name, keyfile=key_file.name)
182199

183200
return ssl_context
201+
202+
def save_ca_to_disk(self, key_file: Union[str, Path], cert_file: Union[str, Path]) -> None:
203+
"""
204+
Save the CA private key and certificate to disk files.
205+
206+
Args:
207+
key_file: Path where to save the CA private key (PEM format)
208+
cert_file: Path where to save the CA certificate (PEM format)
209+
"""
210+
ca_key, ca_cert = self._ca
211+
212+
# Save private key to disk
213+
with open(key_file, "wb") as f:
214+
f.write(ca_key.private_bytes(
215+
encoding=serialization.Encoding.PEM,
216+
format=serialization.PrivateFormat.PKCS8,
217+
encryption_algorithm=serialization.NoEncryption(),
218+
))
219+
220+
# Save certificate to disk
221+
with open(cert_file, "wb") as f:
222+
f.write(ca_cert.public_bytes(serialization.Encoding.PEM))
223+
224+
@classmethod
225+
def load_ca_from_disk(
226+
cls,
227+
key_file: Union[str, Path],
228+
cert_file: Union[str, Path]
229+
) -> "TLSStore":
230+
"""
231+
Load CA private key and certificate from disk and create a TLSStore instance.
232+
233+
Args:
234+
key_file: Path to the CA private key file (PEM format)
235+
cert_file: Path to the CA certificate file (PEM format)
236+
237+
Returns:
238+
TLSStore instance using the loaded CA
239+
240+
Raises:
241+
ValueError: If the key file doesn't contain an EllipticCurve private key
242+
FileNotFoundError: If either file doesn't exist
243+
ValueError: If the files cannot be parsed
244+
"""
245+
# Load private key
246+
with open(key_file, "rb") as f:
247+
ca_key = serialization.load_pem_private_key(f.read(), password=None)
248+
249+
# Verify it's an EC key
250+
if not isinstance(ca_key, ec.EllipticCurvePrivateKey):
251+
raise ValueError(f"CA key must be an EllipticCurve private key, got {type(ca_key)}")
252+
253+
# Load certificate
254+
with open(cert_file, "rb") as f:
255+
ca_cert = x509.load_pem_x509_certificate(f.read())
256+
257+
return cls(ca_key=ca_key, ca_cert=ca_cert)

0 commit comments

Comments
 (0)