Skip to content

Commit

Permalink
Ensure jwt_token is used if present
Browse files Browse the repository at this point in the history
  • Loading branch information
rjprins committed Nov 11, 2020
1 parent e69f552 commit 5574138
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
4 changes: 3 additions & 1 deletion cabby/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ def prepare_generic_session(self):
key_file=self.key_file,
key_password=self.key_password,
ca_cert=self.ca_cert,
verify_ssl=self.verify_ssl)
verify_ssl=self.verify_ssl,
jwt_token=self.jwt_token,
)

def _execute_request(self, request, uri=None, service_type=None):
'''
Expand Down
23 changes: 13 additions & 10 deletions cabby/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,17 @@ def __call__(self, r):


def get_generic_session(
proxies=None,
headers=None,
username=None,
password=None,
cert_file=None,
key_file=None,
key_password=None,
ca_cert=None,
verify_ssl=True):

proxies=None,
headers=None,
username=None,
password=None,
cert_file=None,
key_file=None,
key_password=None,
ca_cert=None,
verify_ssl=True,
jwt_token=None,
):
session = requests.Session()
if ca_cert:
session.verify = ca_cert
Expand All @@ -328,6 +329,8 @@ def get_generic_session(
session.auth = HTTPBasicAuth(username, password)
if cert_file and key_file:
session.cert = (cert_file, key_file)
if jwt_token:
session.auth = JWTAuth(jwt_token)
session._cabby_key_password = key_password
return session

Expand Down
25 changes: 25 additions & 0 deletions tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,28 @@ def poll_callback(request):

list(client.poll(collection_name="X", uri="/poll"))
assert client.jwt_token == second_token


@pytest.mark.parametrize("version", [11, 10])
@responses.activate
def test_jwt_token_when_set_directly(version):
given_token = "abcd"
client = make_client(version)

# The purpose of this test is to check that this assignment has effect:
client.jwt_token = given_token

def poll_callback(request):
_, _, token = request.headers["Authorization"].partition("Bearer ")
assert token == given_token
return (200, make_taxii_headers(version), get_fix(version).POLL_RESPONSE)

responses.mock._matches.append(
responses.CallbackResponse(
responses.POST,
url="http://example.localhost/poll",
callback=poll_callback,
stream=True,
)
)
list(client.poll(collection_name="X", uri="/poll"))

0 comments on commit 5574138

Please sign in to comment.