diff --git a/README.md b/README.md index 3538428..f4b46fa 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ You first need a Scribe account and a client ID. Both can be requested at suppor This library interacts directly with our authentication provider [AWS Cognito](https://aws.amazon.com/cognito/) meaning that your username and password never transit through our servers. +If you want to use your own tokens, see [Self-signing tokens](#self-signing-tokens). ## Installation ```bash @@ -76,6 +77,37 @@ You can also use the package as follows for quick access to tokens: python -m scribeauth --client_id clientid --user_pool_id user_pool_id --username username --password password ``` +## Self-signing tokens + +If you have your own SSO system and want to manage your own scopes, you can sign your own tokens. In this case, you have to provide us with: +- your public key +- the value of your `iss` (issuer) claim +- your scope mapping for the following roles (it can be partial): + - read + - write + - delete + - read & write + - read & delete + - write & delete + - read, write & delete + +We will provide you with the `sub` claim that should be signed with the token. The `aud` claim has to be `https://apis.scribelabs.ai`. +You can also use the following helper: + +```python +from scribeauth import SelfManagedSigner + +private_key = "" # your private key corresponding to the public key communicated _to_ Scribe +issuer = "" # your issuer, communicated _to_ Scribe +sub = "" # Account id, communicated _by_ Scribe +signer = SelfManagedSigner(private_key, issuer, sub) + +scopes = [...some scopes] # the scopes, mapping communicated _to_ Scribe +exp = 0 # expiration timestamp +token = signer.sign(scopes, exp) +``` + + ## Development First step is to install poetry https://python-poetry.org/docs/. Then `poetry install` will install all the dependencies. Might require setting a virtualenv through poetry itself, or manually. diff --git a/docs_source/methods.rst b/docs_source/methods.rst index ed4a3ee..66ad460 100644 --- a/docs_source/methods.rst +++ b/docs_source/methods.rst @@ -48,3 +48,19 @@ With refresh token from scribeauth import ScribeAuth access = ScribeAuth(client_id) access.revoke_refresh_token('refresh_token') + +5. Self-signing tokens +--------------------- + +.. code:: python + + from scribeauth import SelfManagedSigner + + private_key = "" # your private key corresponding to the public key communicated _to_ Scribe + issuer = "" # your issuer, communicated _to_ Scribe + sub = "" # Account id, communicated _by_ Scribe + signer = SelfManagedSigner(private_key, issuer, sub) + + scopes = [...some scopes] # the scopes, mapping communicated _to_ Scribe + exp = 0 # expiration timestamp + token = signer.sign(scopes, exp) diff --git a/pyproject.toml b/pyproject.toml index 504a8e0..0e70e2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,8 @@ readme = "README.md" license = "LICENSE.md" requires-python = ">=3.11" dependencies = [ - "boto3[cognito-idp] (>=1.37.11,<2.0.0)" + "boto3[cognito-idp] (>=1.37.11,<2.0.0)", + "pyjwt[crypto] (>=2.10.1,<3.0.0)" ] keywords = [ 'Development Status :: 4 - Beta', diff --git a/scribeauth/__init__.py b/scribeauth/__init__.py index a29731b..3c27a0e 100644 --- a/scribeauth/__init__.py +++ b/scribeauth/__init__.py @@ -3,7 +3,9 @@ MissingIdException, ResourceNotFoundException, ScribeAuth, + SelfManagedSigner, Tokens, TooManyRequestsException, UnauthorizedException, + decode_self_signed_jwt, ) diff --git a/scribeauth/scribeauth.py b/scribeauth/scribeauth.py index aaa3910..6e67742 100644 --- a/scribeauth/scribeauth.py +++ b/scribeauth/scribeauth.py @@ -4,6 +4,7 @@ import boto3 import botocore import botocore.session +import jwt from botocore.config import Config from botocore.exceptions import NoAuthTokenError @@ -363,3 +364,60 @@ def __revoke_token(self, refresh_token: str): Token=refresh_token, ClientId=self.client_id ) return response + + +@dataclass +class SelfManagedSigner: + private_key: str + """ + Private key related to the public key provided to Scribe. + """ + issuer: str + """ + Issuer as communicated to Scribe. Usually the company name. + """ + sub: str + """ + Account id. Provided by Scribe. + """ + + def sign(self, scopes: list[str], exp: int) -> str: + """Signs a JWT with the private key. + + :param scopes: The scopes to include in the JWT. + :type scopes: list[str] + :param exp: The expiration time of the JWT in seconds. + :type exp: int + + :return: The signed JWT. + :rtype: str + """ + + payload = { + "iss": self.issuer, + "sub": self.sub, + "aud": "https://apis.scribelabs.ai", + "scope": " ".join(scopes), + "exp": exp, + } + + return jwt.encode( + payload, + self.private_key, + algorithm="RS256", + ) + + +def decode_self_signed_jwt(token: str, public_key: str) -> dict: + """Decodes a JWT. + + :param token: The JWT to decode. + :type token: str + + :return: The decoded JWT. + :rtype: dict + """ + + return jwt.decode( + token, public_key, algorithms=["RS256"], audience="https://apis.scribelabs.ai" + ) diff --git a/tests/test_scribeauth.py b/tests/test_scribeauth.py index dfb8ba5..329ff2c 100644 --- a/tests/test_scribeauth.py +++ b/tests/test_scribeauth.py @@ -1,15 +1,19 @@ +import datetime import os -import re -import unittest from time import sleep +import jwt import pyotp import pytest -from botocore.awsrequest import AWSRequest from dotenv import load_dotenv from scribeauth import ScribeAuth, UnauthorizedException -from scribeauth.scribeauth import Challenge, Tokens +from scribeauth.scribeauth import ( + Challenge, + SelfManagedSigner, + Tokens, + decode_self_signed_jwt, +) load_dotenv(override=True) @@ -123,6 +127,48 @@ def test_revoke_refresh_token_invalid_and_use_valid_refresh_token_successfully( assert user_tokens.refresh_token == refresh_token +class TestScribeSelfManagedSigner: + private_key = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEAwhvqCC+37A+UXgcvDl+7nbVjDI3QErdZBkI1VypVBMkKKWHM\nNLMdHk0bIKL+1aDYTRRsCKBy9ZmSSX1pwQlO/3+gRs/MWG27gdRNtf57uLk1+lQI\n6hBDozuyBR0YayQDIx6VsmpBn3Y8LS13p4pTBvirlsdX+jXrbOEaQphn0OdQo0WD\noOwwsPCNCKoIMbUOtUCowvjesFXlWkwG1zeMzlD1aDDS478PDZdckPjT96ICzqe4\nO1Ok6fRGnor2UTmuPy0f1tI0F7Ol5DHAD6pZbkhB70aTBuWDGLDR0iLenzyQecmD\n4aU19r1XC9AHsVbQzxHrP8FveZGlV/nJOBJwFwIDAQABAoIBAFCVFBA39yvJv/dV\nFiTqe1HahnckvFe4w/2EKO65xTfKWiyZzBOotBLrQbLH1/FJ5+H/82WVboQlMATQ\nSsH3olMRYbFj/NpNG8WnJGfEcQpb4Vu93UGGZP3z/1B+Jq/78E15Gf5KfFm91PeQ\nY5crJpLDU0CyGwTls4ms3aD98kNXuxhCGVbje5lCARizNKfm/+2qsnTYfKnAzN+n\nnm0WCjcHmvGYO8kGHWbFWMWvIlkoZ5YubSX2raNeg+YdMJUHz2ej1ocfW0A8/tmL\nwtFoBSuBe1Z2ykhX4t6mRHp0airhyc+MO0bIlW61vU/cPGPos16PoS7/V08S7ZED\nX64rkyECgYEA4iqeJZqny/PjOcYRuVOHBU9nEbsr2VJIf34/I9hta/mRq8hPxOdD\n/7ES/ZTZynTMnOdKht19Fi73Sf28NYE83y5WjGJV/JNj5uq2mLR7t2R0ZV8uK8tU\n4RR6b2bHBbhVLXZ9gqWtu9bWtsxWOkG1bs0iONgD3k5oZCXp+IWuklECgYEA27bA\n7UW+iBeB/2z4x1p/0wY+whBOtIUiZy6YCAOv/HtqppsUJM+W9GeaiMpPHlwDUWxr\n4xr6GbJSHrspkMtkX5bL9e7+9zBguqG5SiQVIzuues9Jio3ZHG1N2aNrr87+wMiB\nxX6Cyi0x1asmsmIBO7MdP/tSNB2ebr8qM6/6mecCgYBA82ZJfFm1+8uEuvo6E9/R\nyZTbBbq5BaVmX9Y4MB50hM6t26/050mi87J1err1Jofgg5fmlVMn/MLtz92uK/hU\nS9V1KYRyLc3h8gQQZLym1UWMG0KCNzmgDiZ/Oa/sV5y2mrG+xF/ZcwBkrNgSkO5O\n7MBoPLkXrcLTCARiZ9nTkQKBgQCsaBGnnkzOObQWnIny1L7s9j+UxHseCEJguR0v\nXMVh1+5uYc5CvGp1yj5nDGldJ1KrN+rIwMh0FYt+9dq99fwDTi8qAqoridi9Wl4t\nIXc8uH5HfBT3FivBtLucBjJgOIuK90ttj8JNp30tbynkXCcfk4NmS23L21oRCQyy\nlmqNDQKBgQDRvzEB26isJBr7/fwS0QbuIlgzEZ9T3ZkrGTFQNfUJZWcUllYI0ptv\ny7ShHOqyvjsC3LPrKGyEjeufaM5J8EFrqwtx6UB/tkGJ2bmd1YwOWFHvfHgHCZLP\n34ZNURCvxRV9ZojS1zmDRBJrSo7+/K0t28hXbiaTOjJA18XAyyWmGg==\n-----END RSA PRIVATE KEY-----\n" + public_key = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwhvqCC+37A+UXgcvDl+7\nnbVjDI3QErdZBkI1VypVBMkKKWHMNLMdHk0bIKL+1aDYTRRsCKBy9ZmSSX1pwQlO\n/3+gRs/MWG27gdRNtf57uLk1+lQI6hBDozuyBR0YayQDIx6VsmpBn3Y8LS13p4pT\nBvirlsdX+jXrbOEaQphn0OdQo0WDoOwwsPCNCKoIMbUOtUCowvjesFXlWkwG1zeM\nzlD1aDDS478PDZdckPjT96ICzqe4O1Ok6fRGnor2UTmuPy0f1tI0F7Ol5DHAD6pZ\nbkhB70aTBuWDGLDR0iLenzyQecmD4aU19r1XC9AHsVbQzxHrP8FveZGlV/nJOBJw\nFwIDAQAB\n-----END PUBLIC KEY-----\n" + + def test_signer_successfully(self): + signer = SelfManagedSigner(self.private_key, "issuer", "sub") + exp = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( + seconds=60 + ) + token = signer.sign(["read", "write"], int(exp.timestamp())) + assert isinstance(token, str) + claims = jwt.decode( + token, self.public_key, algorithms=["RS256"], options={"verify_aud": False} + ) + assert claims["sub"] == "sub" + assert claims["iss"] == "issuer" + assert claims["scope"] == "read write" + assert claims["aud"] == "https://apis.scribelabs.ai" + + +class TestDecodeSelfSignedToken: + expired_signed_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJpc3N1ZXIiLCJzdWIiOiJzdWIiLCJhdWQiOiJodHRwczovL2FwaXMuc2NyaWJlbGFicy5haSIsInNjb3BlIjoicmVhZCB3cml0ZSIsImV4cCI6MTc0MTgxNzgxMn0.LLzyN0ggqfmurfHs-fZeiCNemJgIOhlIWcxj97vYQjv6CN_deRcL6lTnrQh7G0_y2XpPkRlZu_aW6RufHi7GMVY6DvJHLuiPdKY6TnsMb7gUY9FFT5y3WawYjwQXot0x6DoeHC12D-LbLiZTPmFworEq-jDJJeM2ScdOzm03QTuARob7T7IcaE1aSPuox1TvxnaGcZzrZZ3z_odF2PQYKVxvgof2Yu3o67oXQ_FCPbdfbHn-RlAlmbl7MvkAit5-v-8BHBYSzznB1VZw3Y30n27eGBdZf4PsgNnHSpJzQmvBW59DM8CW_zdQz-oj5zcDMIV2Lw16qhTQZsuN3u1lcg" + + def test_decode_token_successfully(self): + token = generate_signed_token() + claims = decode_self_signed_jwt(token, TestScribeSelfManagedSigner.public_key) + assert claims["sub"] == "sub" + assert claims["iss"] == "issuer" + assert claims["scope"] == "read write" + assert claims["aud"] == "https://apis.scribelabs.ai" + + def test_decode_token_expired(self): + with pytest.raises(jwt.ExpiredSignatureError): + decode_self_signed_jwt( + self.expired_signed_token, TestScribeSelfManagedSigner.public_key + ) + + def test_decode_token_wrong_public_key(self): + with pytest.raises(jwt.InvalidKeyError): + decode_self_signed_jwt(self.expired_signed_token, "key") + + def generate_refresh_token_for_test(): tokens_or_challenge = access.get_tokens(username=username, password=password) if isinstance(tokens_or_challenge, Tokens): @@ -155,3 +201,10 @@ def assert_tokens(self, user_tokens: Tokens | Challenge) -> Tokens: assert user_tokens.refresh_token != user_tokens.id_token assert user_tokens.id_token != user_tokens.access_token return user_tokens + + +def generate_signed_token(): + signer = SelfManagedSigner(TestScribeSelfManagedSigner.private_key, "issuer", "sub") + exp = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + 60 + token = signer.sign(["read", "write"], exp) + return token