|
14 | 14 |
|
15 | 15 | class TLSStore: |
16 | 16 | """ |
17 | | - A simple in-memory TLS store that generates a CA and signs certificates for domains on the fly. |
| 17 | + A simple in-memory TLS store that signs certificates for domains on the fly using a provided CA. |
| 18 | + |
| 19 | + Use TLSStore.generate_ca() to create a new CA, or TLSStore.load_ca_from_disk() to load an existing one. |
18 | 20 | |
19 | 21 | Args: |
20 | | - ca_key: Optional CA private key. If provided, ca_cert must also be provided. |
21 | | - ca_cert: Optional CA certificate. If provided, ca_key must also be provided. |
| 22 | + ca_key: The CA private key (must be EllipticCurve) |
| 23 | + ca_cert: The CA certificate |
22 | 24 | """ |
23 | 25 |
|
24 | 26 | def __init__( |
25 | 27 | self, |
26 | | - ca_key: Optional[ec.EllipticCurvePrivateKey] = None, |
27 | | - ca_cert: Optional[x509.Certificate] = None, |
| 28 | + ca_key: ec.EllipticCurvePrivateKey, |
| 29 | + ca_cert: x509.Certificate, |
28 | 30 | ): |
29 | | - if (ca_key is None) != (ca_cert is None): |
30 | | - raise ValueError("Both ca_key and ca_cert must be provided together, or neither") |
31 | | - |
32 | | - if ca_key is not None and ca_cert is not None: |
33 | | - self._ca = (ca_key, ca_cert) |
34 | | - else: |
35 | | - self._ca = self._generate_ca() |
| 31 | + """ |
| 32 | + Initialize TLSStore with an existing CA key and certificate. |
36 | 33 | |
| 34 | + Args: |
| 35 | + ca_key: The CA private key (must be EllipticCurve) |
| 36 | + ca_cert: The CA certificate |
| 37 | + """ |
| 38 | + self._ca = (ca_key, ca_cert) |
37 | 39 | self._store = {} |
38 | 40 |
|
39 | | - def _generate_ca(self): |
| 41 | + @classmethod |
| 42 | + def generate_ca( |
| 43 | + cls, |
| 44 | + country: str, |
| 45 | + state: str, |
| 46 | + locality: str, |
| 47 | + organization: str, |
| 48 | + common_name: str, |
| 49 | + ) -> "TLSStore": |
| 50 | + """ |
| 51 | + Generate a new CA and create a TLSStore instance with it. |
| 52 | + |
| 53 | + Args: |
| 54 | + country: Two-letter country code (e.g., "FR", "US") |
| 55 | + state: State or province name (e.g., "Ile-de-France", "California") |
| 56 | + locality: City or locality name (e.g., "Paris", "San Francisco") |
| 57 | + organization: Organization name (e.g., "My Company") |
| 58 | + common_name: Common name for the CA (e.g., "My Company CA") |
| 59 | + |
| 60 | + Returns: |
| 61 | + TLSStore instance with the generated CA |
| 62 | + """ |
40 | 63 | root_key = ec.generate_private_key(ec.SECP256R1()) |
41 | 64 | subject = issuer = x509.Name( |
42 | 65 | [ |
43 | | - x509.NameAttribute(NameOID.COUNTRY_NAME, "FR"), |
44 | | - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Ile-de-France"), |
45 | | - x509.NameAttribute(NameOID.LOCALITY_NAME, "Paris"), |
46 | | - x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Asyncio HTTPS Proxy"), |
47 | | - x509.NameAttribute(NameOID.COMMON_NAME, "Asyncio HTTPS Proxy CA"), |
| 66 | + x509.NameAttribute(NameOID.COUNTRY_NAME, country), |
| 67 | + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state), |
| 68 | + x509.NameAttribute(NameOID.LOCALITY_NAME, locality), |
| 69 | + x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization), |
| 70 | + x509.NameAttribute(NameOID.COMMON_NAME, common_name), |
48 | 71 | ] |
49 | 72 | ) |
50 | 73 | root_cert = ( |
@@ -83,7 +106,8 @@ def _generate_ca(self): |
83 | 106 | ) |
84 | 107 | .sign(root_key, hashes.SHA256()) |
85 | 108 | ) |
86 | | - return (root_key, root_cert) |
| 109 | + return cls(ca_key=root_key, ca_cert=root_cert) |
| 110 | + |
87 | 111 |
|
88 | 112 | def _generate_cert( |
89 | 113 | self, domain |
|
0 commit comments