Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion tplink_smartplug.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@
import socket
from struct import pack

import binascii
import socket
import requests

# pycryptodome
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util import Counter, Padding

import logging
import hashlib
import time
import json

import http.client as http_client

version = 0.4

# Check if hostname is valid
Expand Down Expand Up @@ -85,6 +101,97 @@ def decrypt(string):
result += chr(a)
return result

# New TP-Link HS110 handshake protocol
# Classes EncryptionSession and Handshake, and function encrypt2 based on:
# https://gist.github.com/chriswheeldon/3b17d974db3817613c69191c0480fe55

class EncryptionSession:
def __init__(self, local_seed, remote_seed, user_hash):
self._key = self._key_derive(local_seed, remote_seed, user_hash)
(self._iv, self._seq) = self._iv_derive(local_seed, remote_seed, user_hash)
self._sig = self._sig_derive(local_seed, remote_seed, user_hash)

def _key_derive(self, local_seed, remote_seed, user_hash):
payload = 'lsk'.encode('utf-8') + local_seed + remote_seed + user_hash
return hashlib.sha256(payload).digest()[:16]

def _iv_derive(self, local_seed, remote_seed, user_hash):
# iv is first 16 bytes of sha256, where the last 4 bytes forms the
# sequence number used in requests and is incremented on each request
payload = 'iv'.encode('utf-8') + local_seed + remote_seed + user_hash
iv = hashlib.sha256(payload).digest()[:16]
return (iv[:12], (int.from_bytes(iv[12:16], 'big') & 0x7fffffff))

def _sig_derive(self, local_seed, remote_seed, user_hash):
# used to create a hash with which to prefix each request
payload = 'ldk'.encode('utf-8') + local_seed + remote_seed + user_hash
return hashlib.sha256(payload).digest()[:28]

def iv(self):
seq = self._seq.to_bytes(4, 'big')
iv = self._iv + seq
assert(len(iv) == 16)
return iv

def encrypt(self, msg):
self._seq = self._seq + 1
if (type(msg) == str):
msg = msg.encode('utf-8')
assert(type(msg) == bytes)
cipher = AES.new(self._key, AES.MODE_CBC, self.iv())
ciphertext = cipher.encrypt(Padding.pad(msg, AES.block_size))
signature = hashlib.sha256(self._sig + self._seq.to_bytes(4, 'big') + ciphertext).digest()
return (signature + ciphertext, self._seq)

def decrypt(self, msg):
assert(type(msg) == bytes)
cipher = AES.new(self._key, AES.MODE_CBC, self.iv())
plaintext = Padding.unpad(cipher.decrypt(msg[32:]), AES.block_size)
return plaintext

class Handshake:
def __init__(self, ip):
self.ip = ip
self.local_seed = Random.get_random_bytes(16)

def user_hash(self):
# md5(md5(email) + md5(pass))
# device is not connected to tplink cloud i.e. email and pass are empty
# may need to include your email and password below if app/plug are associated with a tplink account?
# i.e. user_hash = hashlib.md5(b'<email>').digest() + hashlib.md5(b'<pass>').digest()
user_hash = hashlib.md5(b'').digest() + hashlib.md5(b'').digest()
return hashlib.md5(user_hash).digest()

def perform(self, http_session):
# step 1 - send our seed
result = http_session.post('http://{}:80/app/handshake1'.format(self.ip), data=self.local_seed)
assert(result.status_code == 200)
body = result.content
self.remote_seed = body[:16]
assert(hashlib.sha256(self.local_seed + self.user_hash()).digest() == body[16:]) # device responds with hash of seed + user hash

# step 2 - send hash of remote seed + user hash
payload = hashlib.sha256(self.remote_seed + self.user_hash()).digest()
result = http_session.post('http://{}:80/app/handshake2'.format(self.ip), data=payload)
assert(result.status_code == 200)

return EncryptionSession(self.local_seed, self.remote_seed, self.user_hash())

def encrypt2(session, ip, string):
handshake = Handshake(ip)
retry = 0

while retry < 9:
encryption = handshake.perform(session)
(msg, seq) = encryption.encrypt(string)
res = session.post('http://{}:80/app/request'.format(ip), params={'seq': seq}, data=msg)
if res.status_code == 200:
break
retry += 1
time.sleep(0.25)
assert(res.status_code == 200)
return(encryption.decrypt(res.content).decode("utf-8"))


# Parse commandline arguments
parser = argparse.ArgumentParser(description=f"TP-Link Wi-Fi Smart Plug Client v{version}")
Expand Down Expand Up @@ -132,4 +239,14 @@ def decrypt(string):
print("Received: ", decrypted)

except socket.error:
quit(f"Could not connect to host {ip}:{port}")
try:
session = requests.Session()
port = 80
response = encrypt2(session, ip, cmd)
if args.quiet:
print(response)
else:
print("Sent: ", cmd)
print("Received: ", response)
except socket.error:
quit(f"Could not connect to host {ip}:{port}")