1
1
"""Gitlab authenticator."""
2
2
3
- import base64
4
3
import contextlib
5
- import json
6
- import re
7
4
import urllib .parse as parse
8
5
from contextlib import suppress
9
6
from dataclasses import dataclass
10
7
from datetime import datetime
11
- from typing import Any
12
8
13
9
import gitlab
14
10
from sanic import Request
@@ -30,6 +26,7 @@ class GitlabAuthenticator:
30
26
gitlab_url : str
31
27
32
28
token_field : str = "Gitlab-Access-Token"
29
+ expires_at_field : str = "Gitlab-Access-Token-Expires-At"
33
30
34
31
def __post_init__ (self ) -> None :
35
32
"""Properly set gitlab url."""
@@ -76,7 +73,12 @@ async def _get_gitlab_api_user(self, access_token: str, headers: Header) -> base
76
73
if len (name_parts ) >= 1 :
77
74
last_name = " " .join (name_parts )
78
75
79
- _ , _ , _ , expires_at = self .git_creds_from_headers (headers )
76
+ expires_at : datetime | None = None
77
+ expires_at_raw : str | None = headers .get (self .expires_at_field )
78
+ if expires_at_raw is not None and len (expires_at_raw ) > 0 :
79
+ with suppress (ValueError ):
80
+ expires_at = datetime .fromtimestamp (float (expires_at_raw ))
81
+
80
82
return base_models .APIUser (
81
83
id = str (user_id ),
82
84
access_token = access_token ,
@@ -86,25 +88,3 @@ async def _get_gitlab_api_user(self, access_token: str, headers: Header) -> base
86
88
full_name = full_name ,
87
89
access_token_expires_at = expires_at ,
88
90
)
89
-
90
- @staticmethod
91
- def git_creds_from_headers (headers : Header ) -> tuple [Any , Any , Any , datetime | None ]:
92
- """Extract git credentials from the encoded header sent by the gateway."""
93
- parsed_dict = json .loads (base64 .decodebytes (headers ["Renku-Auth-Git-Credentials" ].encode ()))
94
- git_url , git_credentials = next (iter (parsed_dict .items ()))
95
- token_match = re .match (r"^[^\s]+\ ([^\s]+)$" , git_credentials ["AuthorizationHeader" ])
96
- git_token = token_match .group (1 ) if token_match is not None else None
97
- git_token_expires_at_raw = git_credentials ["AccessTokenExpiresAt" ]
98
- git_token_expires_at_num : float | None = None
99
- with suppress (ValueError , TypeError ):
100
- git_token_expires_at_num = float (git_token_expires_at_raw )
101
- git_token_expires_at : datetime | None = None
102
- if git_token_expires_at_num is not None and git_token_expires_at_num > 0 :
103
- with suppress (ValueError ):
104
- git_token_expires_at = datetime .fromtimestamp (git_token_expires_at_num )
105
- return (
106
- git_url ,
107
- git_credentials ["AuthorizationHeader" ],
108
- git_token ,
109
- git_token_expires_at ,
110
- )
0 commit comments