forked from LasLabs/python-cfssl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcfssl.py
More file actions
412 lines (375 loc) · 18.4 KB
/
cfssl.py
File metadata and controls
412 lines (375 loc) · 18.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# -*- coding: utf-8 -*-
# Copyright 2016 LasLabs Inc.
# License MIT (https://opensource.org/licenses/MIT).
import requests
import logging
from .exceptions import CFSSLException, CFSSLRemoteException
from .models.config_key import ConfigKey
log = logging.getLogger(__name__)
class CFSSL(object):
""" It provides Python bindings to a remote CFSSL server via HTTP(S).
Additional documentation regarding the API endpoints is available at
https://github.com/cloudflare/cfssl/tree/master/doc/api
"""
def __init__(self, host, port, ssl=True, verify_cert=True):
""" Initialize the CFSSL object.
Args:
host (str): Host or IP of remote CFSSL instance.
port (int): Port number of remote CFSSL instance.
ssl (bool): Whether to use SSL.
verify_cert (bool or str): File path of CA cert for verification,
`True` to use system certs, or `False` to disable certificate
verification.
"""
ssl = 'https' if ssl else 'http'
self.verify = verify_cert
self.uri_base = '%s://%s:%d' % (ssl, host, port)
def auth_sign(self, token, request, datetime=None, remote_address=None):
""" It provides returns a signed certificate.
Args:
token (str): The authentication token.
request (CertificateRequest): Signing request document.
datetime (datetime.datetime): Authentication timestamp.
remote_address (str): An address used in making the request.
Returns:
str: A PEM-encoded certificate that has been signed by the
server.
"""
data = self._clean_mapping({
'token': token,
'request': request.to_api(),
'datetime': datetime,
'remote_address': remote_address,
})
return self.call('authsign', 'POST', data=data)
def bundle(self, certificate, private_key=None,
flavor='ubiquitous', domain=None, ip=None):
""" It builds and returns certificate bundles.
Args:
certificate (str): The PEM-encoded certificate to be bundled.
If the ``certificate`` parameter is present, the following four
arguments are valid:
private_key (str): The PEM-encoded private key to be included with
the bundle. This is valid only if the server is not running in
``keyless`` mode.
flavor (str): One of ``ubiquitous``, ``force``, or ``optimal``,
with a default value of ``ubiquitous``. A ubiquitous bundle is
one that has a higher probability of being verified everywhere,
even by clients using outdated or unusual trust stores. Force will
cause the endpoint to use the bundle provided in the
``certificate`` parameter, and will only verify that the bundle
is a valid (verifiable) chain.
domain (str): The domain name to verify as the hostname of the
certificate.
ip (str): The IP address to verify against the certificate IP
SANs.
If only the ``domain`` parameter is present, the following
parameter is valid:
ip (str): The IP address of the remote host; this will fetch the
certificate from the IP, and verify that it is valid for the
domain name.
Returns:
dict: Object representing the bundle, with the following keys:
* bundle contains the concatenated list of PEM certificates
forming the certificate chain; this forms the actual
bundle. The remaining parameters are additional metadata
supporting the bundle.
* crl_support is true if CRL information is contained in the
certificate.
* crt contains the original certificate the bundle is built
from.
* expires contains the expiration date of the certificate.
* hostnames contains the SAN hostnames for the certificate.
* issuer contains the X.509 issuer information for the
certificate.
* key contains the private key for the certificate, if one
was presented.
* key_size contains the size of the key in bits for the
certificate. It will be present even if the private key wasn't
provided because this can be determined from the public key.
* key_type contains a textual description of the key type,
e.g. '2048-bit RSA'.
* ocsp contains the OCSP URLs for the certificate, if present.
* ocsp_support will be true if the certificate supports OCSP
revocation checking.
* signature contains the signature type used in the
certificate, e.g. ``SHA1WithRSA``.
* status contains a :type:`dict` of elements:
* code is bit-encoded error code. 1st bit indicates whether
there is a expiring certificate in the bundle. 2nd bit indicates
whether there is a ubiquity issue with the bundle.
* expiring_SKIs contains the SKIs (subject key identifiers)
for any certificates that might expire soon (within 30
days).
* messages is a list of human-readable warnings on bundle
ubiquity and certificate expiration. For example, an expiration
warning can be "The expiring cert is #1 in the chain",
indicating the leaf certificate is expiring. Ubiquity warnings
include SHA-1 deprecation warning (if the bundle triggers
any major browser's SHA-1 deprecation policy), SHA-2
compatibility warning (if the bundle contains signatures using
ECDSA SHA-2 hash algorithms, it will be rejected by Windows XP
SP2), compatibility warning (if the bundle contains ECDSA
certificates, it will be rejected by Windows XP, Android 2.2 and
Android 2.3 etc) and root trust warning (if the bundle cannot be
trusted by some major OSes or browsers).
* rebundled indicates whether the server had to rebundle the
certificate. The server will rebundle the uploaded
certificate as needed; for example, if the certificate
contains none of the required intermediates or a better set
of intermediates was found. In this case, the server will
mark rebundled as true.
* untrusted_root_stores contains the names of any major
OSes and browsers that doesn't trust the bundle. The names
are used to construct the root trust warnings in the messages
list
* subject contains the X.509 subject identifier from the
certificate.
"""
data = self._clean_mapping({
'certificate': certificate,
'domain': domain,
'private_key': private_key,
'flavor': flavor,
'ip': ip,
})
return self.call('bundle', 'POST', data=data)
def info(self, label, profile=None):
""" It returns information about the CA, including the cert.
Args:
label (str): A string specifying the signer.
profile (str): a string specifying the signing profile for the
signer. Signing profile specifies what key usages should be
used and how long the expiry should be set.
Returns:
dict: Mapping with three keys:
* certificate (str): a PEM-encoded certificate of the signer.
* usage (list of str): Key usages from the signing
profile.
* expiry (str): the expiry string from the signing profile.
"""
data = self._clean_mapping({
'label': label,
'profile': profile,
})
return self.call('info', 'POST', data=data)
def init_ca(self, certificate_request, ca=None):
""" It initializes a new certificate authority.
Args:
certificate_request (CertificateRequest): The certificate
request to use when creating the CA.
ca (ConfigServer, optional): The configuration of the
requested Certificate Authority.
Returns:
dict: Mapping with two keys:
* private key (str): a PEM-encoded CA private key.
* certificate (str): a PEM-encoded self-signed CA certificate.
"""
csr_api = certificate_request.to_api()
data = self._clean_mapping({
'hosts': csr_api['hosts'],
'names': csr_api['names'],
'CN': csr_api['CN'],
'key': csr_api['key'],
'ca': ca and ca.to_api() or None,
})
return self.call('init_ca', 'POST', data=data)
def new_key(self, hosts, names, common_name=None, key=None, ca=None):
""" It generates and returns a new private key + CSR.
Args:
hosts (tuple of Host): Subject Alternative Name(s) for the
requested certificate.
names (tuple of SubjectInfo): The Subject Info(s) for the
requested certificate.
CN (str): the common name for the certificate subject in the
requestedrequested CA certificate.
key (ConfigKey): Cipher and strength to use for certificate.
ca (ConfigServer): the CA configuration of the requested CA.
Returns:
dict: Mapping with three keys:
* private key (str): a PEM-encoded CA private key.
* certificate (str): a PEM-encoded self-signed CA certificate.
* sums (dict): Mapping holding both MD5 and SHA1 digests for
the certificate request
"""
data = self._clean_mapping({
'hosts': [
host.to_api() for host in hosts
],
'names': [
name.to_api() for name in names
],
'CN': common_name,
'key': key and key.to_api() or ConfigKey().to_api(),
'ca': ca and ca.to_api() or None,
})
return self.call('newkey', 'POST', data=data)
def new_cert(self, request, label=None, profile=None, bundle=None):
""" It generates and returns a new private key and certificate.
Args:
request (CertificateRequest): CSR to be used for
certificate creation.
label (str): Specifying which signer to be appointed to sign
the CSR, useful when interacting with cfssl server that stands
in front of a remote multi-root CA signer.
profile (str): Specifying the signing profile for the signer.
bundle (bool): Specifying whether to include an "optimal"
certificate bundle along with the certificate.
Returns:
dict: mapping with these keys:
* private key (str): a PEM-encoded private key.
* certificate_request (str): a PEM-encoded certificate request.
* certificate (str): a PEM-encoded certificate, signed by the server.
* sums (dict): Holding both MD5 and SHA1 digests for the
certificate request and the certificate.
* bundle (str): See the result of endpoint_bundle.txt (only included
if the bundle parameter was set).
"""
data = self._clean_mapping({
'request': request.to_api(),
'label': label,
'profile': profile,
'bundle': bundle,
})
return self.call('newcert', 'POST', data=data)
def revoke(self, serial, authority_key_id, reason):
""" It provides certificate revocation.
Args:
serial (str): Specifying the serial number of a certificate.
authority_key_id (str): Specifying the authority key identifier
of the certificate to be revoked; this is used to distinguish
which private key was used to sign the certificate.
reason (str): Identifying why the certificate was revoked; see,
for example, ReasonStringToCode in the ocsp package or section
4.2.1.13 of RFC 5280. The "reasons" used here are the ReasonFlag
names in said RFC.
"""
data = self._clean_mapping({
'serial': serial,
'authority_key_id': authority_key_id,
'reason': reason,
})
return self.call('revoke', 'POST', data=data)
def scan(self, host, ip=None, timeout=None, family=None, scanner=None):
""" It scans servers to determine the quality of their TLS setup.
Args:
host (Host): The host to scan.
ip (str): IP Address to override DNS lookup of host.
timeout (str): The amount of time allotted for the scan to complete
(default: 1 minute).
family (str): regular expression specifying scan famil(ies) to run.
scanner (str): regular expression specifying scanner(s) to run.
Returns:
dict: Mapping with keys for each scan family. Each of these
objects contains keys for each scanner run in that family
pointing to objects possibly containing the following keys:
* grade (str): Describing the exit status of the scan. Can be:
* "Good": host performing the expected state-of-the-art.
* "Warning": host with non-ideal configuration,
possibly maintaining support for legacy clients.
* "Bad": host with serious misconfiguration or vulnerability
* "Skipped": indicates that the scan was not performed for some
reason.
* error (str): Any error encountered during the scan process.
* output: (dict) Arbitrary data retrieved during the scan.
"""
data = self._clean_mapping({
'host': host.to_api(),
'ip': ip,
'timeout': timeout,
'family': family,
'scanner': scanner,
})
return self.call('scan', params=data)
def scan_info(self):
""" It lists options available for scanning.
Returns:
dict: Mapping with keys for each scan family. For each family,
there exists a `description` containing a string describing
the family and a `scanners` object mapping each of the family's
scanners to an object containing a `description` string.
"""
return self.call('scaninfo')
def sign(self, certificate_request, hosts=None, subject=None,
serial_sequence=None, label=None, profile=None):
""" It signs and returns a certificate.
Args:
certificate_request (str): the CSR bytes to be signed (in PEM).
hosts (tuple of Host): of SAN (subject alternative .names)
which overrides the ones in the CSR
subject (str): The certificate subject which overrides
the ones in the CSR.
serial_sequence (str): Specify the prefix which the generated
certificate serial should have.
label (str): Specifying which signer to be appointed to sign
the CSR, useful when interacting with a remote multi-root CA
signer.
profile (ConfigServer): Specifying the signing profile for
the signer, useful when interacting with a remote multi-root
CA signer.
Returns:
str: A PEM-encoded certificate that has been signed by the
server.
"""
data = self._clean_mapping({
'certificate_request': certificate_request.to_api(),
'hosts': [
host.to_api() for host in hosts
],
'subject': subject,
'serial_sequence': serial_sequence,
'label': label,
'profile': profile.to_api(),
})
result = self.call('sign', 'POST', data=data)
return result['certificate']
def call(self, endpoint, method='GET', params=None, data=None):
""" It calls the remote endpoint and returns the result, if success.
Args:
endpoint (str): CFSSL endpoint to call (e.g. ``newcert``).
method (str): HTTP method to utilize for the Request.
params: (dict|bytes) Data to be sent in the query string
for the Request.
data: (dict or bytes or file) Data to send in the body
of the Request.
Returns:
(mixed) Data contained in ``result`` key of the API response.
Raises:
CFSSLRemoteException: In the event of a ``False`` in the
``success`` key of the API response.
"""
endpoint = '%s/api/v1/cfssl/%s' % (self.uri_base, endpoint)
response = requests.request(
method=method,
url=endpoint,
params=params,
json=data,
verify=self.verify,
)
response = response.json()
if not response['success']:
raise CFSSLRemoteException(
'\n'.join([
'Errors:',
'\n'.join(map(CFSSL._format_response_message, response.get('errors', []))),
'Messages:'
'\n'.join(map(CFSSL._format_response_message, response.get('messages', []))),
])
)
if response['messages']:
for message in response['messages']:
log.warning(CFSSL._format_response_message(message))
return response['result']
@staticmethod
def _format_response_message(error):
message = ''
if 'message' in error:
message += error['message']
if 'code' in error:
message += ' (%s)' % error['code']
if not message:
message = str(error)
return message
def _clean_mapping(self, mapping):
""" It removes false entries from mapping """
return {k:v for k, v in mapping.items() if v}