-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth.py
91 lines (77 loc) · 2.73 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import datetime as dt
import requests
import json
import os
def req_token(ct, iam_auth_url):
"""func for senfing request to huawei cloud
Arguments:
ct {[datetime]} -- current datetime
iam_auth_token_url {[str]} -- iam url
Returns:
token -- token string
"""
print('===== func req_token =====')
token = None
current_time = format(ct).replace(' ', '').replace(
'-', '').replace(':', '').replace('.', '')
print('>> Current time: {}'.format(format(ct)))
auth_path = './data/iam/auth_data'
f_name = "./data/iam/{}.aidate".format(current_time)
f = open(auth_path, "r")
body_data = f.read()
body_data = json.dumps(json.loads(body_data))
headers_data = {'Content-Type': 'application/json;charset=utf8'}
f.close()
print('>> Sending requests...')
r = requests.post(url=iam_auth_url, headers=headers_data, data=body_data)
print('>> Sending requests SUC')
print('>> Getting token...')
token = r.headers.get('X-Subject-Token')
assert token != None
print('>> Getting token SUC')
print('>> Saving token {}'.format(f_name))
f = open(f_name, "w+")
f.write(token)
f.close()
print('>> Saving token SUC')
return token
def get_token(iam_auth_token_url):
"""get latest token from cache or new req
Arguments:
iam_auth_token_url {[str]} -- iam url
Returns:
token -- token string
"""
print('===== func get_token =====')
dayday = dt.timedelta(days=1) - dt.timedelta(hours=1)
current_time = dt.datetime.now()
token_path = './data/iam/'
token = None
files = os.listdir(token_path)
if len(files) == 0:
print('>> No file in token folder')
token = req_token(current_time, iam_auth_token_url)
else:
print('>> Looking for valid token...')
token_existence = False
token_expire = True
for f in files:
if f.endswith('.aidate'):
token_existence = True
saved_token_time = f.split('.')[0]
saved_token_time = dt.datetime.strptime(
saved_token_time,
'%Y%m%d%H%M%S%f'
)
if current_time - saved_token_time > dayday:
print('>> [DEL] expired token {}'.format(f))
os.remove(token_path + f)
else:
token_expire = False
print('>> [READ] valid token {}'.format(f))
read_token = open(token_path + f, "r")
token = read_token.read()
if not token_existence or token_expire:
print('>> No valid token or all expired')
token = req_token(current_time, iam_auth_token_url)
return token