diff --git a/tplink_smartplug.py b/tplink_smartplug.py index e40a0ca..0b80a93 100755 --- a/tplink_smartplug.py +++ b/tplink_smartplug.py @@ -23,6 +23,22 @@ import socket from struct import pack +import binascii +import socket +import requests + +# pycryptodome +from Crypto import Random +from Crypto.Cipher import AES +from Crypto.Util import Counter, Padding + +import logging +import hashlib +import time +import json + +import http.client as http_client + version = 0.4 # Check if hostname is valid @@ -85,6 +101,97 @@ def decrypt(string): result += chr(a) return result +# New TP-Link HS110 handshake protocol +# Classes EncryptionSession and Handshake, and function encrypt2 based on: +# https://gist.github.com/chriswheeldon/3b17d974db3817613c69191c0480fe55 + +class EncryptionSession: + def __init__(self, local_seed, remote_seed, user_hash): + self._key = self._key_derive(local_seed, remote_seed, user_hash) + (self._iv, self._seq) = self._iv_derive(local_seed, remote_seed, user_hash) + self._sig = self._sig_derive(local_seed, remote_seed, user_hash) + + def _key_derive(self, local_seed, remote_seed, user_hash): + payload = 'lsk'.encode('utf-8') + local_seed + remote_seed + user_hash + return hashlib.sha256(payload).digest()[:16] + + def _iv_derive(self, local_seed, remote_seed, user_hash): + # iv is first 16 bytes of sha256, where the last 4 bytes forms the + # sequence number used in requests and is incremented on each request + payload = 'iv'.encode('utf-8') + local_seed + remote_seed + user_hash + iv = hashlib.sha256(payload).digest()[:16] + return (iv[:12], (int.from_bytes(iv[12:16], 'big') & 0x7fffffff)) + + def _sig_derive(self, local_seed, remote_seed, user_hash): + # used to create a hash with which to prefix each request + payload = 'ldk'.encode('utf-8') + local_seed + remote_seed + user_hash + return hashlib.sha256(payload).digest()[:28] + + def iv(self): + seq = self._seq.to_bytes(4, 'big') + iv = self._iv + seq + assert(len(iv) == 16) + return iv + + def encrypt(self, msg): + self._seq = self._seq + 1 + if (type(msg) == str): + msg = msg.encode('utf-8') + assert(type(msg) == bytes) + cipher = AES.new(self._key, AES.MODE_CBC, self.iv()) + ciphertext = cipher.encrypt(Padding.pad(msg, AES.block_size)) + signature = hashlib.sha256(self._sig + self._seq.to_bytes(4, 'big') + ciphertext).digest() + return (signature + ciphertext, self._seq) + + def decrypt(self, msg): + assert(type(msg) == bytes) + cipher = AES.new(self._key, AES.MODE_CBC, self.iv()) + plaintext = Padding.unpad(cipher.decrypt(msg[32:]), AES.block_size) + return plaintext + +class Handshake: + def __init__(self, ip): + self.ip = ip + self.local_seed = Random.get_random_bytes(16) + + def user_hash(self): + # md5(md5(email) + md5(pass)) + # device is not connected to tplink cloud i.e. email and pass are empty + # may need to include your email and password below if app/plug are associated with a tplink account? + # i.e. user_hash = hashlib.md5(b'').digest() + hashlib.md5(b'').digest() + user_hash = hashlib.md5(b'').digest() + hashlib.md5(b'').digest() + return hashlib.md5(user_hash).digest() + + def perform(self, http_session): + # step 1 - send our seed + result = http_session.post('http://{}:80/app/handshake1'.format(self.ip), data=self.local_seed) + assert(result.status_code == 200) + body = result.content + self.remote_seed = body[:16] + assert(hashlib.sha256(self.local_seed + self.user_hash()).digest() == body[16:]) # device responds with hash of seed + user hash + + # step 2 - send hash of remote seed + user hash + payload = hashlib.sha256(self.remote_seed + self.user_hash()).digest() + result = http_session.post('http://{}:80/app/handshake2'.format(self.ip), data=payload) + assert(result.status_code == 200) + + return EncryptionSession(self.local_seed, self.remote_seed, self.user_hash()) + +def encrypt2(session, ip, string): + handshake = Handshake(ip) + retry = 0 + + while retry < 9: + encryption = handshake.perform(session) + (msg, seq) = encryption.encrypt(string) + res = session.post('http://{}:80/app/request'.format(ip), params={'seq': seq}, data=msg) + if res.status_code == 200: + break + retry += 1 + time.sleep(0.25) + assert(res.status_code == 200) + return(encryption.decrypt(res.content).decode("utf-8")) + # Parse commandline arguments parser = argparse.ArgumentParser(description=f"TP-Link Wi-Fi Smart Plug Client v{version}") @@ -132,4 +239,14 @@ def decrypt(string): print("Received: ", decrypted) except socket.error: - quit(f"Could not connect to host {ip}:{port}") + try: + session = requests.Session() + port = 80 + response = encrypt2(session, ip, cmd) + if args.quiet: + print(response) + else: + print("Sent: ", cmd) + print("Received: ", response) + except socket.error: + quit(f"Could not connect to host {ip}:{port}")