-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtokens.py
111 lines (93 loc) · 3.58 KB
/
tokens.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from flask import request, redirect, render_template
import requests
import os
from app import app
from logger import logger
TOKEN_REFRESH_ENDPOINT = '/'
@app.route(TOKEN_REFRESH_ENDPOINT)
def index():
if read_tokens():
status, err = request_tokens(refresh=True)
if status:
redirect_to = request.args.get('redirect_to', 'update_files', type=str)
return redirect(redirect_to)
return err
return render_template('index.html', tenant=app.config['TENANT'], application_id=app.config['APPLICATION_ID'], redirect_url=app.config['REDIRECT_URL'], scope=permission_scope())
@app.route('/register_token')
def register_token():
status, err = request_tokens(refresh=False, code=request.args.get('code', type=str))
if status:
redirect_to = request.args.get('redirect_to', 'update_files', type=str)
return redirect(redirect_to)
return err['error_description']
"""
Send a POST request to obtain access and refresh tokens from Microsoft.
code:
"""
def request_tokens(refresh=False, code=''):
url = 'https://login.microsoftonline.com/' + app.config['TENANT'] + '/oauth2/v2.0/token'
# It is not clear whether we need to include the scope in this request.
# In https://docs.microsoft.com/en-us/onedrive/developer/rest-api/getting-started/graph-oauth ,
# it does not say that scope is required.
# However, in https://docs.microsoft.com/en-us/graph/auth-v2-user , the scope is required for the same endpoint.
# It seems like it works without the scope.
data = {
'client_id': app.config['APPLICATION_ID'],
'grant_type': 'refresh_token' if refresh else 'authorization_code',
'redirect_uri': app.config['REDIRECT_URL'],
'client_secret': app.config['CLIENT_SECRET']
}
if refresh:
data['refresh_token'] = app.config['REFRESH_TOKEN']
else:
data['code'] = code
r = requests.post(url, data=data)
res = r.json()
if 'error' in res:
logger.warn(res)
return False, res
app.config['ACCESS_TOKEN'] = res['access_token']
if 'refresh_token' in res:
app.config['REFRESH_TOKEN'] = res['refresh_token']
write_tokens()
return True, None
"""
Create permission scope string.
"""
def permission_scope():
permissions = ['offline_access', 'user.read', 'files.readwrite']
return "%20".join(permissions)
"""
Read refresh and access tokens from the refresh_token and access_token files.
If they do not exist or are empty, return False.
Otherwise, read the tokens into the configuration files and return True.
"""
def read_tokens():
if 'refresh_token' in os.listdir() and 'access_token' in os.listdir():
with open('refresh_token', 'r') as f:
t = f.readline()
if t:
app.config['REFRESH_TOKEN'] = t
else:
logger.debug("Refresh token not obtained.")
return False
with open('access_token', 'r') as f:
t = f.readline()
if t:
app.config['ACCESS_TOKEN'] = t
else:
logger.debug("Access token not obtained.")
return False
logger.debug("Access token successfully obtained.")
return True
logger.debug("Token(s) missing.")
return False
"""
Write refresh and access tokens into the refresh_token and access_token files.
"""
def write_tokens():
with open('access_token', 'w+') as f:
f.write(app.config['ACCESS_TOKEN'])
if 'REFRESH_TOKEN' in app.config:
with open('refresh_token', 'w+') as f:
f.write(app.config['REFRESH_TOKEN'])