-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtplink.py
358 lines (267 loc) · 11.5 KB
/
tplink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
'''
TP-Link Archer C2300 API client v1.1.0
Compatible (tested) with versions:
Firmware: 1.1.1 Build 20200918 rel.67850(4555)
Hardware: Archer C2300 v2.0
Copyright (c) 2021 Michal Chvila <[email protected]>.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import requests
import json
import binascii
import time
import random
import logging
from Crypto.Cipher import AES
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import pad, unpad
from Crypto.Hash import MD5
from base64 import b64encode, b64decode
class LoginException(Exception):
pass
class UserConflictException(LoginException):
pass
class TPLinkClient:
HEADERS = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
}
def __init__(self, host, log_level = logging.INFO):
logging.basicConfig()
self.logger = logging.getLogger(__name__)
self.logger.setLevel(log_level)
self.req = requests.Session()
self.host = host
self.token = None
self.rsa_key_pw = None
self.rsa_key_auth = None
self.md5_hash_pw = None
def get_url(self, endpoint, form):
stok = self.token if self.token is not None else ''
return 'http://{}/cgi-bin/luci/;stok={}/{}?form={}'.format(self.host, stok, endpoint, form)
def connect(self, password, logout_others = False):
# hash the password
self.md5_hash_pw = self.__hash_pw('admin', password)
# request public RSA keys from the router
self.rsa_key_pw = self.__req_rsa_key_password()
self.rsa_key_auth = self.__req_rsa_key_auth()
# generate AES key
self.aes_key = self.__gen_aes_key()
# encrypt the password
encrypted_pw = self.__encrypt_pw(password)
# authenticate
try:
self.token = self.__req_login(encrypted_pw)
except UserConflictException as e:
if logout_others:
self.token = self.__req_login(encrypted_pw, True)
else:
raise e
def logout(self):
if self.token is None:
return False
success = self.__req_logout()
self.token = None
return success
def get_client_list(self):
url = self.get_url('admin/status', 'client_status')
data = {
'operation': 'read'
}
return self.__request(url, data, encrypt = True)
def __request(self, url, data, encrypt = False, is_login = False):
if encrypt:
data_str = self.__format_body_to_encrypt(data)
# pad to a multiple of 16 with pkcs7
data_padded = pad(data_str.encode('utf8'), 16, 'pkcs7')
# encrypt the body
aes_encryptor = self.__gen_aes_cipher(self.aes_key)
encrypted_data_bytes = aes_encryptor.encrypt(data_padded)
# encode encrypted binary data to base64
encrypted_data = b64encode(encrypted_data_bytes).decode('utf8')
# get encrypted signature
signature = self.__get_signature(len(encrypted_data), is_login)
# order matters here! signature needs to go first (or we get empty 403 response)
form_data = {
'sign': signature,
'data': encrypted_data
}
else:
form_data = data
r = self.req.post(url, data = form_data, headers = self.HEADERS)
self.logger.debug('<Request {}>'.format(r.url))
self.logger.debug(r)
self.logger.debug(r.text)
assert r.text != ''
if encrypt:
# parse the json response
raw_response_json = json.loads(r.text)
assert 'data' in raw_response_json # base64
# decode base64 string
encrypted_response_data = b64decode(raw_response_json['data'])
# decrypt the response using our AES key
aes_decryptor = self.__gen_aes_cipher(self.aes_key)
response = aes_decryptor.decrypt(encrypted_response_data)
# unpad using pkcs7
j = unpad(response, 16, 'pkcs7').decode('utf8')
return json.loads(j)
else:
return json.loads(r.text)
def __format_body_to_encrypt(self, data):
# format form data into a string
data_arr = []
for attr, value in data.items():
data_arr.append('{}={}'.format(attr, value))
return '&'.join(data_arr)
def __hash_pw(self, arg1, arg2 = None):
md5 = MD5.new()
if arg2 is not None:
md5.update((arg1 + arg2).encode('utf8'))
else:
md5.update(arg1)
result = md5.hexdigest()
assert len(result) == 32
return result
def __encrypt_pw(self, password):
'''
pkcs1pad2 - PKCS#1 (type 2, random) pad input string s to n bytes
'''
pub_key = self.__make_rsa_pub_key(self.rsa_key_pw)
rsa = PKCS1_v1_5.new(pub_key)
binpw = password.encode('utf8')
encrypted = rsa.encrypt(binpw)
as_string = binascii.hexlify(encrypted).decode('utf8')
assert len(as_string) == 256
assert len(as_string) == (len(hex(pub_key.n)) - 2)
return as_string
def __make_rsa_pub_key(self, key):
n = int('0x' + key[0], 16)
e = int('0x' + key[1], 16)
return RSA.construct((n, e))
def __gen_aes_key(self):
KEY_LEN = 128 // 8
IV_LEN = 16
ts = str(round(time.time() * 1000))
key = (ts + str(random.randint(100000000, 1000000000-1)))[:KEY_LEN]
iv = (ts + str(random.randint(100000000, 1000000000-1)))[:IV_LEN]
assert len(key) == 16
assert len(iv) == 16
return (key, iv)
def __gen_aes_cipher(self, aes_key):
key, iv = aes_key
# CBC mode, PKCS7 padding
return AES.new(key.encode('utf8'), AES.MODE_CBC, iv = iv.encode('utf8'))
def __get_signature(self, body_data_len, is_login = False):
'''
aes_key: generated pseudo-random AES key (CBC, PKCS7)
rsa_auth_key: RSA public key from the TP-Link API endpoint (login?form=auth)
auth_md5_hash: MD5 hash of the username+password as string
body_data_len: length of the encrypted body message
is_login: set to True for login request
'''
rsa_n, rsa_e, rsa_seq = self.rsa_key_auth
if is_login:
# on login we also send our AES key, which is subsequently
# used for E2E encrypted communication
aes_key, aes_iv = self.aes_key
aes_key_string = 'k={}&i={}'.format(aes_key, aes_iv)
sign_data = '{}&h={}&s={}'.format(aes_key_string, self.md5_hash_pw, rsa_seq + body_data_len)
else:
sign_data = 'h={}&s={}'.format(self.md5_hash_pw, rsa_seq + body_data_len)
signature = ''
pos = 0
# encrypt the signature using the RSA auth public key
rsa = PKCS1_v1_5.new(self.__make_rsa_pub_key(self.rsa_key_auth))
while pos < len(sign_data):
enc = rsa.encrypt(sign_data[pos : pos+53].encode('utf8'))
signature += binascii.hexlify(enc).decode('utf8')
pos = pos + 53
return signature
def __req_rsa_key_password(self):
'''
Return value:
(n, e) RSA public key for encrypting the password
'''
url = self.get_url('login', 'keys')
data = {
'operation': 'read'
}
response = self.__request(url, data, encrypt = False)
assert response['success'] == True
pw_pub_key = response['data']['password']
assert len(pw_pub_key[0]) == 256
assert len(pw_pub_key[1]) == 6
return (pw_pub_key[0], pw_pub_key[1])
def __req_rsa_key_auth(self):
'''
Return value:
(n, e, seq) RSA public key for encrypting the signature
'''
url = self.get_url('login', 'auth')
data = {
'operation': 'read'
}
response = self.__request(url, data, encrypt = False)
assert response['success'] == True
auth_pub_key = response['data']['key']
assert len(auth_pub_key[0]) == 128
assert len(auth_pub_key[1]) == 6
return (auth_pub_key[0], auth_pub_key[1], response['data']['seq'])
def __req_login(self, encrypted_pw, force_login = False):
'''
Return value (on successful login):
stok - API auth token
'''
url = self.get_url('login', 'login')
data = {
'operation': 'login',
'password': encrypted_pw
}
if force_login:
data['confirm'] = 'true'
response = self.__request(url, data, encrypt = True, is_login = True)
self.logger.info(response)
assert 'success' in response
if response['success'] is False:
assert 'errorcode' in response
if response['errorcode'] == 'login failed':
attempts_allowed = response['data']['attemptsAllowed']
attempts_total = response['data']['failureCount'] + attempts_allowed
raise LoginException('Login failed, wrong password. Remaining attempts: {}/{}'.format(attempts_allowed, attempts_total))
elif response['errorcode'] == 'exceeded max attempts':
raise LoginException('Login failed, maximum login attempts exceeded. Please wait for 60-120 minutes.')
elif response['errorcode'] == 'user conflict':
raise UserConflictException('Login conflict. Someone else is logged in.')
else:
raise LoginException(response)
assert response['success'] == True
'''
Example responses:
{'errorcode': 'login failed', 'success': False, 'data': {'failureCount': 1, 'errorcode': '-5002', 'attemptsAllowed': 9}}
{'errorcode': 'exceeded max attempts', 'success': False, 'data': {'failureCount': 10, 'attemptsAllowed': 0}}
{'errorcode': 'user conflict', 'success': False, 'data': {}}
{'success': True, 'data': {'stok': '94640fd8887fb5750d6a426345581b87'}}
'''
return response['data']['stok']
def __req_logout(self):
assert self.token is not None
url = self.get_url('admin/system', 'logout')
data = {
'operation': 'write'
}
response = self.__request(url, data, encrypt = True)
self.logger.info(response)
assert 'success' in response
return response['success']