-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathbrowser_azure_oauth2_credentials_provider.py
311 lines (259 loc) · 12.5 KB
/
browser_azure_oauth2_credentials_provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import logging
import socket
import typing
from enum import Enum
from redshift_connector.error import InterfaceError
from redshift_connector.plugin.credential_provider_constants import azure_headers
from redshift_connector.plugin.jwt_credentials_provider import JwtCredentialsProvider
from redshift_connector.redshift_property import RedshiftProperty
if typing.TYPE_CHECKING:
from redshift_connector.plugin.i_native_plugin import INativePlugin
logging.getLogger(__name__).addHandler(logging.NullHandler())
_logger: logging.Logger = logging.getLogger(__name__)
class BrowserAzureOAuth2CredentialsProvider(JwtCredentialsProvider):
"""
Class to get JWT Token from any IDP using OAuth 2.0 API
"""
class OAuthParamNames(Enum):
"""
Defines OAuth parameter names used when requesting JWT token from the IdP
"""
STATE: str = "state"
REDIRECT: str = "redirect_uri"
IDP_CODE: str = "code"
CLIENT_ID: str = "client_id"
RESPONSE_TYPE: str = "response_type"
REQUESTED_TOKEN_TYPE: str = "requested_token_type"
GRANT_TYPE: str = "grant_type"
SCOPE: str = "scope"
RESPONSE_MODE: str = "response_mode"
RESOURCE: str = "resource"
MICROSOFT_IDP_HOST: str = "login.microsoftonline.com"
CURRENT_INTERACTION_SCHEMA: str = "https"
def __init__(self: "BrowserAzureOAuth2CredentialsProvider") -> None:
super().__init__()
self.idp_tenant: typing.Optional[str] = None
self.client_id: typing.Optional[str] = None
self.scope: str = ""
self.idp_response_timeout: int = 120
self.listen_port: int = 0
def add_parameter(
self: "BrowserAzureOAuth2CredentialsProvider",
info: RedshiftProperty,
) -> None:
super().add_parameter(info)
self.idp_tenant = info.idp_tenant
self.client_id = info.client_id
self.scope = info.scope
if info.idp_response_timeout:
self.idp_response_timeout = info.idp_response_timeout
if info.listen_port:
self.listen_port = info.listen_port
def check_required_parameters(self: "BrowserAzureOAuth2CredentialsProvider") -> None:
super().check_required_parameters()
if not self.idp_tenant:
BrowserAzureOAuth2CredentialsProvider.handle_missing_required_property("idp_tenant")
if not self.client_id:
BrowserAzureOAuth2CredentialsProvider.handle_missing_required_property("client_id")
if not self.idp_response_timeout or int(self.idp_response_timeout) < 10:
BrowserAzureOAuth2CredentialsProvider.handle_invalid_property_value(
"idp_response_timeout", "Must be 10 seconds or greater"
)
def get_jwt_assertion(self: "BrowserAzureOAuth2CredentialsProvider") -> str:
_logger.debug("BrowserAzureOAuth2CredentialsProvider.get_jwt_assertion")
self.check_required_parameters()
if self.listen_port == 0:
_logger.debug("Listen port set to 0. Will pick random port")
token: str = self.fetch_authorization_token()
content: str = self.fetch_jwt_response(token)
jwt_assertion: str = self.extract_jwt_assertion(content)
return jwt_assertion
def get_cache_key(self: "BrowserAzureOAuth2CredentialsProvider") -> str:
return "{}{}".format(self.idp_tenant if self.idp_tenant else "", self.client_id if self.client_id else "")
def run_server(
self: "BrowserAzureOAuth2CredentialsProvider",
listen_socket: socket.socket,
idp_response_timeout: int,
state: str,
):
"""
Runs a server on localhost to listen for the IdP's response to our HTTP POST request for JWT assertion.
Parameters
----------
:param listen_socket: socket.socket
The socket on which the method listens for a response
:param idp_response_timeout: int
The maximum time to listen on the socket, specified in seconds
:param state: str
The state generated by the client. This must match the state received from the IdP server
Returns
-------
The IdP's response, including JWT assertion
"""
_logger.debug("BrowserAzureOAuth2CredentialsProvider.run_server")
conn, addr = listen_socket.accept()
_logger.debug("local server socket connection established")
conn.settimeout(float(idp_response_timeout))
size: int = 102400
with conn:
while True:
part: bytes = conn.recv(size)
decoded_part = part.decode()
state_idx: int = decoded_part.find(
"{}=".format(BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.STATE.value)
)
if state_idx > -1:
received_state: str = decoded_part[state_idx + 6 : decoded_part.find("&", state_idx)]
if received_state != state:
exec_msg = "Incoming state {received} does not match the outgoing state {expected}".format(
received=received_state, expected=state
)
_logger.debug(exec_msg)
raise InterfaceError(exec_msg)
code_idx: int = decoded_part.find(
"{}=".format(BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.IDP_CODE.value)
)
if code_idx < 0:
exec_msg = "No code found"
_logger.debug(exec_msg)
raise InterfaceError(exec_msg)
received_code: str = decoded_part[code_idx + 5 : state_idx - 1]
if received_code == "":
exec_msg = "No valid code found"
_logger.debug(exec_msg)
raise InterfaceError(exec_msg)
conn.send(self.close_window_http_resp())
return received_code
def open_browser(self: "BrowserAzureOAuth2CredentialsProvider", state: str) -> None:
"""
Opens the default browser to allow user authentication with the IdP
Parameters
----------
:param state: str
The state generated by the client
Returns
-------
None
"""
_logger.debug("BrowserAzureOAuth2CredentialsProvider.open_browser")
import webbrowser
url: str = self.get_authorization_token_url(state=state)
if url is None:
BrowserAzureOAuth2CredentialsProvider.handle_missing_required_property("login_url")
self.validate_url(url)
webbrowser.open(url)
def get_listen_socket(self: "BrowserAzureOAuth2CredentialsProvider") -> socket.socket:
"""
Returns a listen socket used for user authentication
"""
s: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_logger.debug("attempting socket bind on localhost with any free port")
s.bind(("127.0.0.1", 0)) # bind to any free port
s.listen()
self.listen_port = s.getsockname()[1]
return s
def get_authorization_token_url(self, state: str) -> str:
"""
Returns a URL used for requesting authentication token from IdP
"""
_logger.debug("BrowserAzureOAuth2CredentialsProvider.get_authorization_token_url")
from urllib.parse import urlencode, urlunsplit
params: typing.Dict[str, str] = {
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.RESPONSE_TYPE.value: "code",
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.RESPONSE_MODE.value: "form_post",
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.CLIENT_ID.value: typing.cast(str, self.client_id),
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.REDIRECT.value: self.redirectUri,
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.STATE.value: state,
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.SCOPE.value: "openid {}".format(self.scope),
}
encoded_params: str = urlencode(params)
return urlunsplit(
(
BrowserAzureOAuth2CredentialsProvider.CURRENT_INTERACTION_SCHEMA,
BrowserAzureOAuth2CredentialsProvider.MICROSOFT_IDP_HOST,
"/{}/oauth2/v2.0/authorize".format(self.idp_tenant),
encoded_params,
"",
)
)
def fetch_authorization_token(self: "BrowserAzureOAuth2CredentialsProvider") -> str:
"""
Returns authorization token retrieved from IdP following user authentication in web browser.
"""
_logger.debug("BrowserAzureOAuth2CredentialsProvider.fetch_authorization_token")
import concurrent
import random
import socket
alphabet: str = "abcdefghijklmnopqrstuvwxyz"
state: str = "".join(random.sample(alphabet, 10))
listen_socket: socket.socket = self.get_listen_socket()
self.redirectUri = "http://localhost:{port}/redshift/".format(port=self.listen_port)
_logger.debug("redirectUri=%s", self.redirectUri)
try:
return_value: str = ""
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.run_server, listen_socket, self.idp_response_timeout, state)
self.open_browser(state)
return_value = future.result()
return str(return_value)
except socket.error as e:
_logger.debug("Socket error: %s", e)
raise e
except Exception as e:
_logger.debug("Other Exception: %s", e)
raise e
finally:
listen_socket.close()
def get_jwt_post_request_url(self: "BrowserAzureOAuth2CredentialsProvider") -> str:
"""
Returns URL used for sending HTTP POST request to retrieve JWT assertion.
"""
return "{}://{}{}".format(
BrowserAzureOAuth2CredentialsProvider.CURRENT_INTERACTION_SCHEMA,
BrowserAzureOAuth2CredentialsProvider.MICROSOFT_IDP_HOST,
"/{}/oauth2/v2.0/token".format(self.idp_tenant),
)
def fetch_jwt_response(self: "BrowserAzureOAuth2CredentialsProvider", token: str) -> str:
"""
Returns JWT Response from IdP POST request.
"""
_logger.debug("BrowserAzureOAuth2CredentialsProvider.fetch_jwt_response")
import requests
url: str = self.get_jwt_post_request_url()
self.validate_url(url)
params: typing.Dict[str, str] = {
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.IDP_CODE.value: token,
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.GRANT_TYPE.value: "authorization_code",
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.SCOPE.value: typing.cast(str, self.scope),
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.CLIENT_ID.value: typing.cast(str, self.client_id),
BrowserAzureOAuth2CredentialsProvider.OAuthParamNames.REDIRECT.value: self.redirectUri,
}
_logger.debug("Issuing POST request uri=%s verify=%s", url, self.do_verify_ssl_cert())
response: requests.Response = requests.post(
url, data=params, headers=azure_headers, verify=self.do_verify_ssl_cert()
)
_logger.debug("Response code: %s", response.status_code)
response.raise_for_status()
return response.text
def extract_jwt_assertion(self: "BrowserAzureOAuth2CredentialsProvider", content: str) -> str:
"""
Returns encoded JWT assertion extracted from IdP response content
"""
_logger.debug("BrowserAzureOAuth2CredentialsProvider.extract_jwt_assertion")
import json
_logger.debug("parsing payload JSON response")
response_content: typing.Dict[str, str] = json.loads(content)
if "access_token" not in response_content:
exec_msg = "Failed to find access_token"
_logger.debug(exec_msg)
raise InterfaceError(exec_msg)
encoded_jwt_assertion: str = response_content["access_token"]
if not encoded_jwt_assertion:
exec_msg = "Invalid access_token value"
_logger.debug(exec_msg)
raise InterfaceError(exec_msg)
return encoded_jwt_assertion
def get_idp_token(self: "BrowserAzureOAuth2CredentialsProvider") -> str:
return super().get_idp_token()
def get_sub_type(self: "BrowserAzureOAuth2CredentialsProvider") -> int:
return super().get_sub_type()