15
15
_logger = logging .getLogger (__name__ )
16
16
17
17
18
+ def token_is_valid (token ) -> bool :
19
+ """
20
+ returns true iff the token expiration date is far enough in the future. By "enough" I mean:
21
+ more than 1 minute (because the clients' request using the token shouldn't take longer than that)
22
+ """
23
+ try :
24
+ decoded_token = jwt .decode (token , algorithms = ["HS256" ], options = {"verify_signature" : False })
25
+ expiration_timestamp = decoded_token .get ("exp" )
26
+ expiration_datetime = datetime .fromtimestamp (expiration_timestamp )
27
+ _logger .debug ("Token is valid until %s" , expiration_datetime .isoformat ())
28
+ current_datetime = datetime .utcnow ()
29
+ token_is_valid_one_minute_into_the_future = expiration_datetime > current_datetime + timedelta (minutes = 1 )
30
+ return token_is_valid_one_minute_into_the_future
31
+ except jwt .ExpiredSignatureError :
32
+ _logger .info ("The token is expired" , exc_info = True )
33
+ return False
34
+ except jwt .InvalidTokenError :
35
+ _logger .info ("The token is invalid" , exc_info = True )
36
+ return False
37
+
38
+
18
39
class _ValidateTokenMixin : # pylint:disable=too-few-public-methods
19
40
"""
20
41
Mixin for classes which need to validate tokens
@@ -23,26 +44,6 @@ class _ValidateTokenMixin: # pylint:disable=too-few-public-methods
23
44
def __init__ (self ):
24
45
self ._session_lock = asyncio .Lock ()
25
46
26
- def _token_is_valid (self , token ) -> bool :
27
- """
28
- returns true iff the token expiration date is far enough in the future. By "enough" I mean:
29
- more than 1 minute (because the clients' request using the token shouldn't take longer than that)
30
- """
31
- try :
32
- decoded_token = jwt .decode (token , algorithms = ["HS256" ], options = {"verify_signature" : False })
33
- expiration_timestamp = decoded_token .get ("exp" )
34
- expiration_datetime = datetime .fromtimestamp (expiration_timestamp )
35
- _logger .debug ("Token is valid until %s" , expiration_datetime .isoformat ())
36
- current_datetime = datetime .utcnow ()
37
- token_is_valid_one_minute_into_the_future = expiration_datetime > current_datetime + timedelta (minutes = 1 )
38
- return token_is_valid_one_minute_into_the_future
39
- except jwt .ExpiredSignatureError :
40
- _logger .info ("The token is expired" , exc_info = True )
41
- return False
42
- except jwt .InvalidTokenError :
43
- _logger .info ("The token is invalid" , exc_info = True )
44
- return False
45
-
46
47
47
48
class _OAuthHttpClient (_ValidateTokenMixin , ABC ): # pylint:disable=too-few-public-methods
48
49
"""
@@ -86,7 +87,7 @@ async def _get_oauth_token(self) -> str:
86
87
if self ._token is None :
87
88
_logger .info ("Initially retrieving a new token" )
88
89
self ._token = await self ._get_new_token ()
89
- elif not self . _token_is_valid (self ._token ):
90
+ elif not token_is_valid (self ._token ):
90
91
_logger .info ("Token is not valid anymore, retrieving a new token" )
91
92
self ._token = await self ._get_new_token ()
92
93
else :
0 commit comments