diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 970e501..e654efc 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -17,16 +17,14 @@ jobs: id: release - uses: actions/checkout@v4 if: ${{ steps.release.outputs.release_created }} + - name: Install poetry + if: ${{ steps.release.outputs.release_created }} + run: pipx install poetry - uses: actions/setup-python@v5 if: ${{ steps.release.outputs.release_created }} with: - python-version: '3.10' - cache: 'pip' - - run: pip install -r requirements.txt - if: ${{ steps.release.outputs.release_created }} - - run: python3 -m build + python-version: '3.11' + - run: poetry install --without dev if: ${{ steps.release.outputs.release_created }} - - uses: pypa/gh-action-pypi-publish@release/v1 + - run: poetry publish --build --username __token__ --password ${{ secrets.PYPI_PASSWORD }} if: ${{ steps.release.outputs.release_created }} - with: - password: ${{ secrets.PYPI_PASSWORD }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 35c9257..aca2c0d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,14 +15,15 @@ jobs: steps: - name: Clone uses: actions/checkout@v4 + - name: Install poetry + run: pipx install poetry - uses: actions/setup-python@v5 with: - python-version: '3.10' - cache: 'pip' + python-version: '3.11' - name: Install deps - run: pip install -r requirements.txt + run: poetry install - name: Run tests - run: python -m coverage run -m pytest + run: poetry run coverage run -m pytest env: CLIENT_ID: ${{ secrets.CLIENT_ID }} USERNAME: ${{ secrets.USERNAME }} @@ -30,10 +31,9 @@ jobs: PASSWORD: ${{ secrets.PASSWORD }} OTPCODE: ${{ secrets.OTPCODE }} USER_POOL_ID: ${{ secrets.USER_POOL_ID }} - FEDERATED_POOL_ID: ${{ secrets.FEDERATED_POOL_ID }} - name: Publish code coverage - uses: paambaati/codeclimate-action@v6.0.0 + uses: paambaati/codeclimate-action@v9.0.0 env: CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} with: - coverageCommand: coverage xml + coverageCommand: poetry run coverage xml diff --git a/.github/workflows/update-doc.yml b/.github/workflows/update-doc.yml index 9715957..142fc2e 100644 --- a/.github/workflows/update-doc.yml +++ b/.github/workflows/update-doc.yml @@ -11,8 +11,13 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + - name: Install poetry + run: pipx install poetry + - uses: actions/setup-python@v5 + with: + python-version: '3.11' - name: Install deps - run: pip install -r requirements.txt + run: poetry install - name: Create md files run: sphinx-build -M markdown ./docs_source ./docs - name: Update docs website diff --git a/.gitignore b/.gitignore index 3797236..4ad52d2 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ auth venv .env docs +poetry.lock \ No newline at end of file diff --git a/LICENSE.md b/LICENSE.md index d238010..6378188 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022 Scribe Labs Limited +Copyright (c) 2025 Scribe Labs Limited Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 566b2c2..3538428 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ This library interacts directly with our authentication provider [AWS Cognito](h pip install scribeauth ``` -This library requires Python >= 3.10 that supports typing. +This library requires Python >= 3.11 that supports typing. ## Methods @@ -58,30 +58,6 @@ access = ScribeAuth(client_id) access.revoke_refresh_token('refresh_token') ``` -### 5. Getting federated id - -```python -from scribeauth import ScribeAuth -access = ScribeAuth({'client_id': your_client_id, 'user_pool_id': your_user_pool_id, 'identity_pool_id': your_identity_pool_id}) -access.get_federated_id('your_id_token') -``` - -### 6. Getting federated credentials - -```python -from scribeauth import ScribeAuth -access = ScribeAuth({'client_id': your_client_id, 'user_pool_id': your_user_pool_id, 'identity_pool_id': your_identity_pool_id}) -access.get_federated_credentials('your_federated_id', 'your_id_token') -``` - -### 7. Getting signature for request - -```python -from scribeauth import ScribeAuth -access = ScribeAuth({'client_id': your_client_id, 'user_pool_id': your_user_pool_id, 'identity_pool_id': your_identity_pool_id}) -access.get_signature_for_request(request='your_request', credentials='your_federated_credentials') -``` - ## Flow - If you never have accessed your Scribe account, it probably still contains the temporary password we generated for you. You can change it directly on the [platform](https://platform.scribelabs.ai) or with the `change_password` method. You won't be able to access anything else until the temporary password has been changed. @@ -92,10 +68,6 @@ access.get_signature_for_request(request='your_request', credentials='your_feder - In case you suspect that your refresh token has been leaked, you can revoke it with `revoke_token`. This will also invalidate any access/id token that has been issued with it. In order to get a new one, you'll need to use your username and password again. -- You can get your federated id by using `get_federated_id` and providing your id token. The federated id will allow you to use `get_federated_credentials` to get an access key id, secret key and session token. - -- Every API call to be made to Scribe's API Gateway needs to have a signature. You can get the signature for your request by using `get_signature_for_request`. Provide the request you'll be using and your credentials (use `get_federated_credentials` to get them). - ## Command line You can also use the package as follows for quick access to tokens: @@ -104,6 +76,12 @@ You can also use the package as follows for quick access to tokens: python -m scribeauth --client_id clientid --user_pool_id user_pool_id --username username --password password ``` +## Development + +First step is to install poetry https://python-poetry.org/docs/. Then `poetry install` will install all the dependencies. Might require setting a virtualenv through poetry itself, or manually. + +Run the tests with `poetry run pytest`. + --- To flag an issue, open a ticket on [Github](https://github.com/ScribeLabsAI/ScribeAuth/issues) and contact us on Intercom through the platform. diff --git a/docs_source/conf.py b/docs_source/conf.py index e032a09..c261a7d 100644 --- a/docs_source/conf.py +++ b/docs_source/conf.py @@ -6,10 +6,13 @@ # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information +import json + project = "Scribe Auth" -copyright = "2023, Scribe Labs Limited" +copyright = "2025, Scribe Labs Limited" author = "Ailin Venerus" -release = "1.0.2" +with open("../.release-please-manifest.json") as f: + release = json.load(f)["."] # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/docs_source/installation.rst b/docs_source/installation.rst index 809a086..f31b37c 100644 --- a/docs_source/installation.rst +++ b/docs_source/installation.rst @@ -7,7 +7,7 @@ To use Scribe Auth, first install it using pip: (.venv) $ pip install scribeauth -This library requires Python >= 3.10 that supports typings. +This library requires Python >= 3.11 that supports typings. -------------- diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..504a8e0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,36 @@ +[project] +name = "scribeauth" +version = "1.2.0" +description = "Library to authenticate to Scribe's platform" +readme = "README.md" +license = "LICENSE.md" +requires-python = ">=3.11" +dependencies = [ + "boto3[cognito-idp] (>=1.37.11,<2.0.0)" +] +keywords = [ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'Operating System :: OS Independent', + 'License :: OSI Approved :: MIT License', + 'Natural Language :: English', + 'Programming Language :: Python :: 3.11', + 'Topic :: Security', + 'Typing :: Typed' +] +url = 'https://github.com/ScribeLabsAI/ScribeAuth' + + +[build-system] +requires = ["poetry-core>=2.0.0,<3.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.group.dev.dependencies] +types-boto3 = {extras = ["cognito-idp"], version = "^1.37.11"} +pytest = "^8.3.5" +sphinx = "^8.2.3" +pyotp = "^2.9.0" +dotenv = "^0.9.9" +sphinx-markdown-builder = "^0.6.8" +coverage = "^7.6.12" + diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index eda9337..0000000 --- a/requirements.txt +++ /dev/null @@ -1,62 +0,0 @@ -alabaster==0.7.16 -Babel==2.14.0 -boto3==1.34.93 -boto3-stubs==1.34.93 -botocore==1.34.93 -botocore-stubs==1.34.93 -build==1.2.1 -certifi==2024.2.2 -charset-normalizer==3.3.2 -coverage==7.5.0 -docutils==0.21.2 -execnet==2.1.1 -idna==3.7 -imagesize==1.4.1 -importlib-metadata==7.1.0 -iniconfig==2.0.0 -jaraco.classes==3.4.0 -Jinja2==3.1.3 -jmespath==1.0.1 -keyring==25.2.0 -markdown-it-py==3.0.0 -MarkupSafe==2.1.5 -mdurl==0.1.2 -more-itertools==10.2.0 -mypy-boto3-cognito-identity==1.34.0 -mypy-boto3-cognito-idp==1.34.93 -nh3==0.2.17 -packaging==24.0 -pkginfo==1.10.0 -pluggy==1.5.0 -Pygments==2.17.2 -pyotp==2.9.0 -pyproject_hooks==1.0.0 -pytest==8.2.0 -pytest-cov==5.0.0 -pytest-xdist==3.6.1 -python-dateutil==2.9.0.post0 -python-dotenv==1.0.1 -readme-renderer==43.0 -requests==2.31.0 -requests-toolbelt==1.0.0 -rfc3986==2.0.0 -rich==13.7.1 -s3transfer==0.10.1 -six==1.16.0 -snowballstemmer==2.2.0 -Sphinx==7.3.7 -sphinx-markdown-builder==0.6.6 -sphinxcontrib-applehelp==1.0.8 -sphinxcontrib-devhelp==1.0.6 -sphinxcontrib-htmlhelp==2.0.5 -sphinxcontrib-jsmath==1.0.1 -sphinxcontrib-qthelp==1.0.7 -sphinxcontrib-serializinghtml==1.1.10 -tabulate==0.9.0 -twine==5.0.0 -types-awscrt==0.20.9 -types-s3transfer==0.10.1 -typing==3.7.4.3 -typing_extensions==4.11.0 -urllib3<2.3 -zipp==3.18.1 diff --git a/scribeauth/__init__.py b/scribeauth/__init__.py index 1e79d45..a29731b 100644 --- a/scribeauth/__init__.py +++ b/scribeauth/__init__.py @@ -1,7 +1,9 @@ from .scribeauth import ( + Challenge, MissingIdException, ResourceNotFoundException, ScribeAuth, Tokens, + TooManyRequestsException, UnauthorizedException, ) diff --git a/scribeauth/__main__.py b/scribeauth/__main__.py index 3c767a8..42b71c8 100644 --- a/scribeauth/__main__.py +++ b/scribeauth/__main__.py @@ -1,6 +1,7 @@ import argparse from scribeauth import ScribeAuth +from scribeauth.scribeauth import Challenge if __name__ == "__main__": parser = argparse.ArgumentParser("scribeauth") @@ -12,17 +13,17 @@ parser.add_argument("--password", help="Password", type=str) parser.add_argument("--refresh_token", help="Refresh Token", type=str) args = parser.parse_args() - auth = ScribeAuth({"client_id": args.client_id, "user_pool_id": args.user_pool_id}) + auth = ScribeAuth(client_id=args.client_id, user_pool_id=args.user_pool_id) if args.refresh_token: tokens = auth.get_tokens(refresh_token=args.refresh_token) else: tokens = auth.get_tokens(username=args.username, password=args.password) if ( - "challenge_name" in tokens - and tokens["challenge_name"] == "SOFTWARE_TOKEN_MFA" + isinstance(tokens, Challenge) + and tokens.challenge_name == "SOFTWARE_TOKEN_MFA" ): code = input("Enter MFA code: ") tokens = auth.respond_to_auth_challenge_mfa( - args.username, tokens["session"], code + username=args.username, session=tokens.session, code=code ) print(tokens) diff --git a/scribeauth/scribeauth.py b/scribeauth/scribeauth.py index 2d41a4f..aaa3910 100644 --- a/scribeauth/scribeauth.py +++ b/scribeauth/scribeauth.py @@ -1,49 +1,26 @@ -from datetime import datetime +from dataclasses import dataclass from typing import overload import boto3 import botocore import botocore.session -from botocore.auth import SigV4Auth -from botocore.awsrequest import AWSRequest from botocore.config import Config from botocore.exceptions import NoAuthTokenError -from typing_extensions import NotRequired, TypedDict, Unpack -class Tokens(TypedDict): +@dataclass +class Tokens: refresh_token: str access_token: str id_token: str -class Challenge(TypedDict): +@dataclass +class Challenge: challenge_name: str session: str -class RefreshToken(TypedDict): - refresh_token: str - - -class UsernamePassword(TypedDict): - username: str - password: str - - -class Credentials(TypedDict): - AccessKeyId: str - SecretKey: str - SessionToken: str - Expiration: datetime - - -class PoolConfiguration(TypedDict): - client_id: str - user_pool_id: str - identity_pool_id: NotRequired[str | None] - - class UnauthorizedException(Exception): """ Exception raised when a user cannot perform an action. @@ -81,32 +58,21 @@ class UnknownException(Exception): pass -def is_complete_credentials(cred: Credentials) -> bool: - return "AccessKeyId" in cred and "SecretKey" in cred and "SessionToken" in cred - - class ScribeAuth: - def __init__(self, param: PoolConfiguration): + def __init__(self, client_id: str, user_pool_id: str): """Constructs an authorisation client. - PoolConfiguration: - :param client_id: The client ID of the application provided by Scribe. :param user_pool_id: The user pool ID provided by Scribe. - - :param identity_pool_id: The identity pool ID provided by Scribe. (Optional) """ config = Config(signature_version=botocore.UNSIGNED) self.client_unsigned = boto3.client( "cognito-idp", config=config, region_name="eu-west-2" ) self.client_signed = boto3.client("cognito-idp", region_name="eu-west-2") - self.client_id = param.get("client_id") - self.user_pool_id = param.get("user_pool_id") - self.identity_pool_id = param.get("identity_pool_id") - if param.get("identity_pool_id"): - self.fed_client = boto3.client("cognito-identity", region_name="eu-west-2") + self.client_id = client_id + self.user_pool_id = user_pool_id def change_password( self, username: str, password: str, new_password: str @@ -198,17 +164,19 @@ def forgot_password( raise err @overload - def get_tokens(self, **param: Unpack[UsernamePassword]) -> Tokens | Challenge: - ... + def get_tokens( + self, *, username: str = "", password: str = "" + ) -> Tokens | Challenge: ... @overload - def get_tokens(self, **param: Unpack[RefreshToken]) -> Tokens | Challenge: - ... + def get_tokens(self, *, refresh_token: str = "") -> Tokens | Challenge: ... - def get_tokens(self, **param) -> Tokens | Challenge: + def get_tokens( + self, *, username: str = "", refresh_token: str = "", password: str = "" + ) -> Tokens | Challenge: """A user gets their tokens (refresh_token, access_token and id_token). - It is possible to pass a UsernamePassword or a RefreshToken: + It is possible to pass a username/password pair or a refresh token: :param username: Username (usually an email address). :type username: str @@ -222,18 +190,13 @@ def get_tokens(self, **param) -> Tokens | Challenge: It returns Tokens or a Challenge: - :return: Tokens -- Dictionary {"refresh_token": "str", "access_token": "str", "id_token": "str"} - - :return: Challenge -- Dictionary { "challenge_name": "str", "session": "str"} + :return: Tokens or Challenge + :rtype: Tokens | Challenge """ - refresh_token = param.get("refresh_token") - username = param.get("username") - password = param.get("password") - if refresh_token == None: - if isinstance(username, str) and isinstance(password, str): - return self.__get_tokens_with_pair(username, password) - elif isinstance(refresh_token, str): + if refresh_token: return self.__get_tokens_with_refresh(refresh_token) + elif username and password: + return self.__get_tokens_with_pair(username, password) raise UnauthorizedException( "Username and/or Password are missing or refresh_token is missing" ) @@ -252,16 +215,17 @@ def respond_to_auth_challenge_mfa( :param code: Code generated from the auth app. :type code: str - :return: Tokens -- Dictionary {"refresh_token": "str", "access_token": "str", "id_token": "str"} + :return: Tokens + :rtype: Tokens """ try: response = self.__respond_to_mfa_challenge(username, session, code) result = response.get("AuthenticationResult") - return { - "refresh_token": result.get("RefreshToken", ""), - "access_token": result.get("AccessToken", ""), - "id_token": result.get("IdToken", ""), - } + return Tokens( + refresh_token=result.get("RefreshToken", ""), + access_token=result.get("AccessToken", ""), + id_token=result.get("IdToken", ""), + ) except self.client_signed.exceptions.CodeMismatchException: raise UnauthorizedException("Wrong MFA code") except self.client_signed.exceptions.ExpiredCodeException: @@ -288,144 +252,27 @@ def revoke_refresh_token(self, refresh_token: str) -> bool: except Exception: raise Exception("InternalServerError: Try again later") - def get_federated_id(self, id_token: str) -> str: - """A user gets their federated id. - - :param id_token: Id token to use. - - :return: str - """ - if not hasattr(self, "user_pool_id"): - raise MissingIdException("Missing user pool ID") - if not hasattr(self, "fed_client"): - raise MissingIdException( - "Federated pool ID is not provided. Create a new ScribeAuth object using identity_pool_id" - ) - if self.identity_pool_id is not None: - try: - response = self.fed_client.get_id( - IdentityPoolId=self.identity_pool_id, - Logins={ - f"cognito-idp.eu-west-2.amazonaws.com/{self.user_pool_id}": id_token - }, - ) - if not response.get("IdentityId"): - raise UnknownException("Could not retrieve federated id") - return response.get("IdentityId") - except self.fed_client.exceptions.NotAuthorizedException: - raise UnauthorizedException("Could not retrieve federated id") - except self.fed_client.exceptions.TooManyRequestsException: - raise TooManyRequestsException("Too many requests. Try again later") - except Exception as err: - raise err - else: - raise Exception( - "Federated pool ID is not provided. Create a new ScribeAuth object using identity_pool_id" - ) - - def get_federated_credentials(self, id: str, id_token: str) -> Credentials: - """A user gets their federated credentials (AccessKeyId, SecretKey and SessionToken). - - :param id: Federated id. - - :param id_token: Id token to use. - - :return: Credentials -- Dictionary {"AccessKeyId": "str", "SecretKey": "str", "SessionToken": "str", "Expiration": "str"} - """ - if not hasattr(self, "user_pool_id"): - raise MissingIdException("Missing user pool ID") - if not hasattr(self, "fed_client"): - raise MissingIdException( - "Federated pool ID is not provided. Create a new ScribeAuth object using identity_pool_id" - ) - try: - response = self.fed_client.get_credentials_for_identity( - IdentityId=id, - Logins={ - f"cognito-idp.eu-west-2.amazonaws.com/{self.user_pool_id}": id_token - }, - ) - all_credentials = response.get("Credentials") - accessKeyId = all_credentials.get("AccessKeyId") - secretKey = all_credentials.get("SecretKey") - sessionToken = all_credentials.get("SessionToken") - expiration = all_credentials.get("Expiration") - if ( - accessKeyId != None - and secretKey != None - and sessionToken != None - and expiration != None - ): - credentials = Credentials( - AccessKeyId=accessKeyId, - SecretKey=secretKey, - SessionToken=sessionToken, - Expiration=expiration, - ) - - if not is_complete_credentials(credentials): - raise UnknownException("Could not retrieve tokens") - return credentials - else: - raise UnknownException("Could not retrieve federated credentials") - except self.fed_client.exceptions.NotAuthorizedException: - raise UnauthorizedException("Could not retrieve federated credentials") - except self.fed_client.exceptions.TooManyRequestsException: - raise TooManyRequestsException("Too many requests. Try again later") - except self.fed_client.exceptions.ResourceNotFoundException: - raise ResourceNotFoundException("Invalid federated_id") - except Exception as err: - raise err - - def get_signature_for_request(self, request: AWSRequest, credentials: Credentials): - """A user gets a signature for a request. - - :param request: Request to send. - - :param credentials: Credentials for the signature creation. - - :return: Headers -- Headers containing the signature for the request. - """ - try: - session = botocore.session.Session() - session.set_credentials( - access_key=credentials["AccessKeyId"], - secret_key=credentials["SecretKey"], - token=credentials["SessionToken"], - ) - signer = SigV4Auth( - credentials=session.get_credentials(), - service_name="execute-api", - region_name="eu-west-2", - ) - request.context["payload_signing_enabled"] = False - signer.add_auth(request=request) - prepped = request.prepare() - return prepped.headers - except Exception as err: - raise err - def __get_tokens_with_pair( self, username: str, password: str ) -> Tokens | Challenge: auth_result = "AuthenticationResult" - if username != None and password != None: + if username is not None and password is not None: try: response = self.__initiate_auth(username, password) result = response.get(auth_result) if "ChallengeName" in response: - return { - "challenge_name": response.get("ChallengeName"), - "session": response.get("Session"), - } + return Challenge( + challenge_name=response.get("ChallengeName"), + session=response.get("Session"), + ) else: refresh_token_resp = result.get("RefreshToken") access_token_resp = result.get("AccessToken") id_token_resp = result.get("IdToken") if ( - refresh_token_resp != None - and access_token_resp != None - and id_token_resp != None + refresh_token_resp is not None + and access_token_resp is not None + and id_token_resp is not None ): return Tokens( refresh_token=refresh_token_resp, @@ -454,7 +301,7 @@ def __get_tokens_with_refresh(self, refresh_token: str) -> Tokens: result = response.get(auth_result) access_token_resp = result.get("AccessToken") id_token_resp = result.get("IdToken") - if access_token_resp != None and id_token_resp != None: + if access_token_resp is not None and id_token_resp is not None: return Tokens( refresh_token=refresh_token, access_token=access_token_resp, diff --git a/setup.py b/setup.py deleted file mode 100644 index d37f968..0000000 --- a/setup.py +++ /dev/null @@ -1,29 +0,0 @@ -from setuptools import setup - -def readme(): - with open('README.md') as f: - return f.read() - -setup( - name='scribeauth', - python_requires='>=3.10.0', - version='1.2.0', - description="Library to authenticate to Scribe's platform", - long_description=readme(), - url='https://github.com/ScribeLabsAI/ScribeAuth', - long_description_content_type='text/markdown', - author='Ailin Venerus', - author_email='ailin@scribelabs.ai', - packages=['scribeauth'], - install_requires=['boto3', 'typing-extensions'], - classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'Operating System :: OS Independent', - 'License :: OSI Approved :: MIT License', - 'Natural Language :: English', - 'Programming Language :: Python :: 3.10', - 'Topic :: Security', - 'Typing :: Typed' - ], -) diff --git a/tests/test_scribeauth.py b/tests/test_scribeauth.py index 3ad855d..dfb8ba5 100644 --- a/tests/test_scribeauth.py +++ b/tests/test_scribeauth.py @@ -1,17 +1,15 @@ import os +import re import unittest from time import sleep import pyotp +import pytest from botocore.awsrequest import AWSRequest from dotenv import load_dotenv -from scribeauth import ( - MissingIdException, - ResourceNotFoundException, - ScribeAuth, - UnauthorizedException, -) +from scribeauth import ScribeAuth, UnauthorizedException +from scribeauth.scribeauth import Challenge, Tokens load_dotenv(override=True) @@ -20,62 +18,56 @@ username2: str = os.environ.get("USERNAME2", "") password: str = os.environ.get("PASSWORD", "") user_pool_id: str = os.environ.get("USER_POOL_ID", "") -federated_pool_id: str = os.environ.get("FEDERATED_POOL_ID", "") otp = pyotp.TOTP(os.environ.get("OTPCODE", "")) -access = ScribeAuth({"client_id": client_id, "user_pool_id": user_pool_id}) -pool_access = ScribeAuth( - { - "client_id": client_id, - "user_pool_id": user_pool_id, - "identity_pool_id": federated_pool_id, - } -) +access = ScribeAuth(client_id=client_id, user_pool_id=user_pool_id) -class TestScribeAuthGetTokensNoMFA(unittest.TestCase): +class TestScribeAuthGetTokensNoMFA: def test_get_tokens_username_password_successfully(self): user_tokens = access.get_tokens(username=username, password=password) assert_tokens(self, user_tokens) def test_get_tokens_wrong_username_fails(self): - with self.assertRaises(UnauthorizedException): + with pytest.raises(UnauthorizedException): access.get_tokens(username="username", password=password) def test_get_tokens_wrong_password_fails(self): - with self.assertRaises(UnauthorizedException): + with pytest.raises(UnauthorizedException): access.get_tokens(username=username, password="password") def test_get_tokens_refresh_token_successfully(self): refresh_token = generate_refresh_token_for_test() user_tokens = access.get_tokens(refresh_token=refresh_token) - assert_tokens(self, user_tokens) - self.assertEqual(refresh_token, user_tokens.get("refresh_token")) + user_tokens = assert_tokens(self, user_tokens) + assert refresh_token == user_tokens.refresh_token def test_get_tokens_refresh_token_fails(self): - with self.assertRaises(UnauthorizedException): + with pytest.raises(UnauthorizedException): access.get_tokens(refresh_token="refresh_token") def test_get_tokens_refresh_token_multiple_params_successfully(self): refresh_token = generate_refresh_token_for_test() - user_tokens = access.get_tokens(**{"refresh_token": refresh_token}) - assert_tokens(self, user_tokens) - self.assertEqual(refresh_token, user_tokens.get("refresh_token")) + user_tokens = access.get_tokens(refresh_token=refresh_token) + user_tokens = assert_tokens(self, user_tokens) + assert refresh_token == user_tokens.refresh_token def test_get_tokens_refresh_token_multiple_params_fails(self): - with self.assertRaises(UnauthorizedException): - access.get_tokens(**{"refresh_token": "refresh_token"}) + with pytest.raises(UnauthorizedException): + access.get_tokens(refresh_token="refresh_token") -class TestScribeAuthGetTokensMFA(unittest.TestCase): +class TestScribeAuthGetTokensMFA: def test_get_tokens_asks_mfa(self): challenge = access.get_tokens(username=username2, password=password) - self.assertEqual(challenge.get("challenge_name"), "SOFTWARE_TOKEN_MFA") + assert isinstance(challenge, Challenge) + assert challenge.challenge_name == "SOFTWARE_TOKEN_MFA" def test_get_tokens_username_password_successfully(self): challenge = access.get_tokens(username=username2, password=password) + assert isinstance(challenge, Challenge) user_tokens = access.respond_to_auth_challenge_mfa( - username=username2, session=challenge.get("session", ""), code=otp.now() + username=username2, session=challenge.session, code=otp.now() ) sleep(61) assert_tokens(self, user_tokens) @@ -84,125 +76,82 @@ def test_get_tokens_refresh_token_successfully(self): refresh_token = generate_refresh_token_for_test_with_mfa() sleep(61) user_tokens = access.get_tokens(refresh_token=refresh_token) - assert_tokens(self, user_tokens) - self.assertEqual(refresh_token, user_tokens.get("refresh_token")) + user_tokens = assert_tokens(self, user_tokens) + assert refresh_token == user_tokens.refresh_token def test_get_tokens_fails_with_wrong_mfa_code(self): challenge = access.get_tokens(username=username2, password=password) - with self.assertRaises(UnauthorizedException): + with pytest.raises(UnauthorizedException): + assert isinstance(challenge, Challenge) access.respond_to_auth_challenge_mfa( - username=username2, session=challenge.get("session", ""), code="000000" + username=username2, session=challenge.session, code="000000" ) def test_get_tokens_fails_with_expired_mfa_code(self): challenge = access.get_tokens(username=username2, password=password) code = otp.now() sleep(61) - with self.assertRaises(UnauthorizedException): + with pytest.raises(UnauthorizedException): + assert isinstance(challenge, Challenge) access.respond_to_auth_challenge_mfa( - username=username2, session=challenge.get("session", ""), code=code + username=username2, session=challenge.session, code=code ) -class TestScribeAuthRevokeRefreshTokens(unittest.TestCase): +class TestScribeAuthRevokeRefreshTokens: def test_revoke_refresh_token_successfully(self): refresh_token = generate_refresh_token_for_test() - self.assertTrue(access.revoke_refresh_token(refresh_token)) + + assert access.revoke_refresh_token(refresh_token) def test_revoke_refresh_token_unexistent_successfully(self): - self.assertTrue(access.revoke_refresh_token("refresh_token")) + assert access.revoke_refresh_token("refresh_token") def test_revoke_refresh_token_and_use_old_refresh_token_fails(self): refresh_token = generate_refresh_token_for_test() - self.assertTrue(access.revoke_refresh_token(refresh_token)) - with self.assertRaises(UnauthorizedException): + assert access.revoke_refresh_token(refresh_token) + with pytest.raises(UnauthorizedException): access.get_tokens(refresh_token=refresh_token) def test_revoke_refresh_token_invalid_and_use_valid_refresh_token_successfully( self, ): refresh_token = generate_refresh_token_for_test() - self.assertTrue(access.revoke_refresh_token("refresh_token")) + assert access.revoke_refresh_token("refresh_token") user_tokens = access.get_tokens(refresh_token=refresh_token) - assert_tokens(self, user_tokens) - self.assertEqual(refresh_token, user_tokens.get("refresh_token")) - - -class TestScribeAuthFederatedCredentials(unittest.TestCase): - def test_get_federated_id_successfully(self): - id_token = generate_id_token_for_test() - federated_id = pool_access.get_federated_id(id_token) - self.assertTrue(federated_id) - - def test_get_federated_id_fails(self): - with self.assertRaises(UnauthorizedException): - pool_access.get_federated_id("id_token") - - def test_get_federated_id_with_NO_identityPoolId_fails(self): - id_token = generate_id_token_for_test() - with self.assertRaises(MissingIdException): - access.get_federated_id(id_token) - - def test_get_federated_credentials_successfully(self): - id_token = generate_id_token_for_test() - federated_id = pool_access.get_federated_id(id_token) - federated_credentials = pool_access.get_federated_credentials( - federated_id, id_token - ) - self.assertTrue(federated_credentials.get("AccessKeyId")) - self.assertTrue(federated_credentials.get("SecretKey")) - self.assertTrue(federated_credentials.get("SessionToken")) - self.assertTrue(federated_credentials.get("Expiration")) - - def test_get_federated_credentials_fails(self): - id_token = generate_id_token_for_test() - id = "eu-west-2:00000000-1111-2abc-3def-4444aaaa5555" - with self.assertRaises(ResourceNotFoundException): - pool_access.get_federated_credentials(id, id_token) - - -class TestScribeAuthGetSignatureForRequest(unittest.TestCase): - def test_get_signature_for_request_successfully(self): - id_token = generate_id_token_for_test() - federated_id = pool_access.get_federated_id(id_token) - federated_credentials = pool_access.get_federated_credentials( - federated_id, id_token - ) - request = AWSRequest("GET", url="http://google.com") - signature = pool_access.get_signature_for_request( - request=request, credentials=federated_credentials - ) - self.assertTrue(signature) + user_tokens = assert_tokens(self, user_tokens) + assert user_tokens.refresh_token == refresh_token def generate_refresh_token_for_test(): tokens_or_challenge = access.get_tokens(username=username, password=password) - if "refresh_token" in tokens_or_challenge: - return tokens_or_challenge.get("refresh_token") + if isinstance(tokens_or_challenge, Tokens): + return tokens_or_challenge.refresh_token raise Exception("Could not get refresh_token") def generate_id_token_for_test(): tokens_or_challenge = access.get_tokens(username=username, password=password) - if "id_token" in tokens_or_challenge: - return tokens_or_challenge.get("id_token") + if isinstance(tokens_or_challenge, Tokens): + return tokens_or_challenge.id_token raise Exception("Could not get id_token") def generate_refresh_token_for_test_with_mfa(): challenge = access.get_tokens(username=username2, password=password) + assert isinstance(challenge, Challenge) return access.respond_to_auth_challenge_mfa( - username=username2, session=challenge.get("session", ""), code=otp.now() - ).get("refresh_token") - - -def assert_tokens(self, user_tokens): - self.assertIsNone(user_tokens.get("challenge_name")) - self.assertIsNotNone(user_tokens.get("refresh_token")) - self.assertIsNotNone(user_tokens.get("access_token")) - self.assertIsNotNone(user_tokens.get("id_token")) - self.assertNotEqual( - user_tokens.get("refresh_token"), user_tokens.get("access_token") - ) - self.assertNotEqual(user_tokens.get("refresh_token"), user_tokens.get("id_token")) - self.assertNotEqual(user_tokens.get("id_token"), user_tokens.get("access_token")) + username=username2, session=challenge.session, code=otp.now() + ).refresh_token + + +def assert_tokens(self, user_tokens: Tokens | Challenge) -> Tokens: + if isinstance(user_tokens, Challenge): + pytest.fail() + assert user_tokens is not None + assert user_tokens.access_token is not None + assert user_tokens.id_token is not None + assert user_tokens.refresh_token != user_tokens.access_token + assert user_tokens.refresh_token != user_tokens.id_token + assert user_tokens.id_token != user_tokens.access_token + return user_tokens