From 5cf4eea3a3e6f2ab0fdecf757235c8f6d7b76190 Mon Sep 17 00:00:00 2001 From: Jeroen Nijhof Date: Tue, 23 May 2017 21:59:30 +0200 Subject: [PATCH] Fixed OTAA handling and added an example how to derive the nwskey and appskey --- LoRaWAN/DataPayload.py | 2 +- LoRaWAN/FHDR.py | 5 +- LoRaWAN/JoinAcceptPayload.py | 73 ++++++++++++++-------- LoRaWAN/JoinRequestPayload.py | 13 ++-- LoRaWAN/MacPayload.py | 4 +- LoRaWAN/PhyPayload.py | 18 +++++- LoRaWAN/__init__.py | 2 +- README.md | 2 + otaa_ttn.py | 113 ++++++++++++++++++++++++++++++++++ 9 files changed, 193 insertions(+), 39 deletions(-) create mode 100755 otaa_ttn.py diff --git a/LoRaWAN/DataPayload.py b/LoRaWAN/DataPayload.py index b057363..6a35513 100644 --- a/LoRaWAN/DataPayload.py +++ b/LoRaWAN/DataPayload.py @@ -41,7 +41,7 @@ def compute_mic(self, key, direction, mhdr): computed_mic = cmac.encode(bytes(key), bytes(mic))[:4] return list(map(int, computed_mic)) - def decrypt_payload(self, key, direction): + def decrypt_payload(self, key, direction, mic): k = int(math.ceil(len(self.payload) / 16.0)) a = [] diff --git a/LoRaWAN/FHDR.py b/LoRaWAN/FHDR.py index 48f2bd0..6d78b49 100644 --- a/LoRaWAN/FHDR.py +++ b/LoRaWAN/FHDR.py @@ -19,7 +19,10 @@ def read(self, mac_payload): def create(self, mtype, args): self.devaddr = [0x00, 0x00, 0x00, 0x00] self.fctrl = 0x00 - self.fcnt = args['fcnt'].to_bytes(2, byteorder='little') + if 'fcnt' in args: + self.fcnt = args['fcnt'].to_bytes(2, byteorder='little') + else: + self.fcnt = [0x00, 0x00] self.fopts = [] if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\ mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN: diff --git a/LoRaWAN/JoinAcceptPayload.py b/LoRaWAN/JoinAcceptPayload.py index b53d4e9..8ca346f 100644 --- a/LoRaWAN/JoinAcceptPayload.py +++ b/LoRaWAN/JoinAcceptPayload.py @@ -1,25 +1,19 @@ # # frm_payload: appnonce(3) netid(3) devaddr(4) dlsettings(1) rxdelay(1) cflist(0..16) # +from .MalformedPacketException import MalformedPacketException from .AES_CMAC import AES_CMAC from Crypto.Cipher import AES class JoinAcceptPayload: - def __init__(self, payload): - if len(payload) < 16: + def read(self, payload): + if len(payload) < 12: raise MalformedPacketException("Invalid join accept"); self.encrypted_payload = payload - self.payload = self.decrypt_payload(payload) - self.appnonce = self.payload[:3] - self.netid = self.payload[3:6] - self.devaddr = self.payload[6:10] - self.dlsettings = self.payload[10] - self.rxdelay = self.payload[11] - self.cflist = None - if self.payload[12:]: - self.cflist = self.payload[12:] + def create(self, args): + pass def length(self): return len(self.encrypted_payload) @@ -50,27 +44,56 @@ def get_cflist(self): def compute_mic(self, key, direction, mhdr): mic = [] - mic += self.to_clear_raw() mic += [mhdr.to_raw()] + mic += self.to_clear_raw() cmac = AES_CMAC() - computed_mic = cmac.encode(str(bytearray(key)), str(bytearray(mic)))[:4] - return map(int, bytearray(computed_mic)) + computed_mic = cmac.encode(bytes(key), bytes(mic))[:4] + return list(map(int, computed_mic)) - def decrypt_payload(self, key, direction): + def decrypt_payload(self, key, direction, mic): a = [] a += self.encrypted_payload - a += self.mic + a += mic - cipher = AES.new(str(bytearray(key))) - s = map(ord, cipher.encrypt(str(bytearray(a)))) - return s + cipher = AES.new(bytes(key)) + self.payload = cipher.encrypt(bytes(a))[:-4] - def encrypt_payload(self, key): + self.appnonce = self.payload[:3] + self.netid = self.payload[3:6] + self.devaddr = self.payload[6:10] + self.dlsettings = self.payload[10] + self.rxdelay = self.payload[11] + self.cflist = None + if self.payload[12:]: + self.cflist = self.payload[12:] + + return list(map(int, self.payload)) + + def encrypt_payload(self, key, direction, mhdr): a = [] a += self.to_clear_raw() - a += self.compute_mic() - - cipher = AES.new(str(bytearray(key))) - s = map(ord, cipher.decrypt(str(bytearray(a)))) - return s[:-4], s[-4:] + a += self.compute_mic(key, direction, mhdr) + + cipher = AES.new(bytes(key)) + return list(map(int, cipher.decrypt(bytes(a)))) + + def derive_nwkey(self, key, devnonce): + a = [0x01] + a += self.get_appnonce() + a += self.get_netid() + a += devnonce + a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + + cipher = AES.new(bytes(key)) + return list(map(hex, cipher.encrypt(bytes(a)))) + + def derive_appkey(self, key, devnonce): + a = [0x02] + a += self.get_appnonce() + a += self.get_netid() + a += devnonce + a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + + cipher = AES.new(bytes(key)) + return list(map(hex, cipher.encrypt(bytes(a)))) diff --git a/LoRaWAN/JoinRequestPayload.py b/LoRaWAN/JoinRequestPayload.py index 9e2a821..f3a7a3e 100644 --- a/LoRaWAN/JoinRequestPayload.py +++ b/LoRaWAN/JoinRequestPayload.py @@ -1,6 +1,7 @@ # # frm_payload: appeui(8) deveui(8) devnonce(2) # +from .MalformedPacketException import MalformedPacketException from .AES_CMAC import AES_CMAC from Crypto.Cipher import AES @@ -9,13 +10,13 @@ class JoinRequestPayload: def read(self, payload): if len(payload) != 18: raise MalformedPacketException("Invalid join request"); - self.appeui = payload[:8] self.deveui = payload[8:16] + self.appeui = payload[:8] self.devnonce = payload[16:18] def create(self, args): - self.appeui = args['appeui'] - self.deveui = args['deveui'] + self.deveui = list(reversed(args['deveui'])) + self.appeui = list(reversed(args['appeui'])) self.devnonce = args['devnonce'] def length(self): @@ -42,8 +43,8 @@ def compute_mic(self, key, direction, mhdr): mic += self.to_raw() cmac = AES_CMAC() - computed_mic = cmac.encode(str(bytearray(key)), str(bytearray(mic)))[:4] - return map(int, bytearray(computed_mic)) + computed_mic = cmac.encode(bytes(key), bytes(mic))[:4] + return list(map(int, computed_mic)) - def decrypt_payload(self, key, direction): + def decrypt_payload(self, key, direction, mic): return self.to_raw() diff --git a/LoRaWAN/MacPayload.py b/LoRaWAN/MacPayload.py index 6560cf4..0af3ce7 100644 --- a/LoRaWAN/MacPayload.py +++ b/LoRaWAN/MacPayload.py @@ -20,10 +20,10 @@ def read(self, mtype, mac_payload): self.frm_payload = None if mtype == MHDR.JOIN_REQUEST: self.frm_payload = JoinRequestPayload() - self.frm_payload.read(mac_payload[self.fhdr.length() + 1:]) + self.frm_payload.read(mac_payload) if mtype == MHDR.JOIN_ACCEPT: self.frm_payload = JoinAcceptPayload() - self.frm_payload.read(mac_payload[self.fhdr.length() + 1:]) + self.frm_payload.read(mac_payload) if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\ mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN: self.frm_payload = DataPayload() diff --git a/LoRaWAN/PhyPayload.py b/LoRaWAN/PhyPayload.py index 280e589..057290b 100644 --- a/LoRaWAN/PhyPayload.py +++ b/LoRaWAN/PhyPayload.py @@ -65,10 +65,22 @@ def set_mic(self, mic): self.mic = mic def compute_mic(self): - return self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr()) + if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT: + return self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:] + else: + return self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr()) def valid_mic(self): - return self.get_mic() == self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr()) + if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT: + return self.get_mic() == self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:] + else: + return self.get_mic() == self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr()) def get_payload(self): - return self.mac_payload.frm_payload.decrypt_payload(self.appkey, self.get_direction()) + return self.mac_payload.frm_payload.decrypt_payload(self.appkey, self.get_direction(), self.mic) + + def derive_nwkey(self, devnonce): + return self.mac_payload.frm_payload.derive_nwkey(self.appkey, devnonce) + + def derive_appkey(self, devnonce): + return self.mac_payload.frm_payload.derive_appkey(self.appkey, devnonce) diff --git a/LoRaWAN/__init__.py b/LoRaWAN/__init__.py index b395e4d..9a87c68 100644 --- a/LoRaWAN/__init__.py +++ b/LoRaWAN/__init__.py @@ -1,4 +1,4 @@ from .PhyPayload import PhyPayload -def new(nwkey,appkey): +def new(nwkey = [], appkey = []): return PhyPayload(nwkey, appkey) diff --git a/README.md b/README.md index f4da01b..237339f 100644 --- a/README.md +++ b/README.md @@ -8,3 +8,5 @@ See: https://www.lora-alliance.org/portals/0/specs/LoRaWAN%20Specification%201R0 ## Installation Just git clone and check rx_ttn.py for reading LoRaWAN messages and tx_ttn.py for sending LoRaWAN messages. +## TODO +Make code more readable and easier to use diff --git a/otaa_ttn.py b/otaa_ttn.py new file mode 100755 index 0000000..a2b4e4f --- /dev/null +++ b/otaa_ttn.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +import sys +from time import sleep +from SX127x.LoRa import * +from SX127x.LoRaArgumentParser import LoRaArgumentParser +from SX127x.board_config import BOARD +import LoRaWAN +from LoRaWAN.MHDR import MHDR + +BOARD.setup() +parser = LoRaArgumentParser("LoRaWAN sender") + +class LoRaWANsend(LoRa): + def __init__(self, deveui = [], appeui = [], appkey = [], devnonce = [], verbose = False): + super(LoRaWANsend, self).__init__(verbose) + self.deveui = deveui + self.appeui = appeui + self.appkey = appkey + self.devnonce = devnonce + + def on_rx_done(self): + print("RxDone") + + self.clear_irq_flags(RxDone=1) + payload = self.read_payload(nocheck=True) + + lorawan = LoRaWAN.new([], appkey) + lorawan.read(payload) + print(lorawan.get_payload()) + print(lorawan.get_mhdr().get_mversion()) + print(lorawan.get_mhdr().get_mtype()) + print(lorawan.get_mic()) + print(lorawan.compute_mic()) + print(lorawan.valid_mic()) + print(lorawan.derive_nwkey(devnonce)) + print(lorawan.derive_appkey(devnonce)) + print("\n") + sys.exit(0) + + def on_tx_done(self): + self.clear_irq_flags(TxDone=1) + print("TxDone") + + self.set_mode(MODE.STDBY) + self.set_dio_mapping([0,0,0,0,0,0]) + self.set_invert_iq(1) + self.reset_ptr_rx() + self.set_mode(MODE.RXCONT) + + def on_cad_done(self): + print("on_CadDone") + print(self.get_irq_flags()) + + def on_rx_timeout(self): + print("on_RxTimeout") + print(self.get_irq_flags()) + + def on_valid_header(self): + print("on_ValidHeader") + print(self.get_irq_flags()) + + def on_payload_crc_error(self): + print("on_PayloadCrcError") + print(self.get_irq_flags()) + + def on_fhss_change_channel(self): + print("on_FhssChangeChannel") + print(self.get_irq_flags()) + + def start(self): + self.tx_counter = 1 + + lorawan = LoRaWAN.new(self.appkey) + lorawan.create(MHDR.JOIN_REQUEST, {'deveui': self.deveui, 'appeui': self.appeui, 'devnonce': self.devnonce}) + + self.write_payload(lorawan.to_raw()) + self.set_mode(MODE.TX) + while True: + sleep(1) + + +# Init +deveui = [0x00, 0x82, 0xAA, 0x0D, 0x42, 0x9C, 0x79, 0x34] +appeui = [0x70, 0xB3, 0xD5, 0x7E, 0xF0, 0x00, 0x4D, 0xBC] +appkey = [0x13, 0x1C, 0x8A, 0xF7, 0xA3, 0xE4, 0x35, 0xD0, 0xD5, 0xE9, 0x47, 0x6B, 0x04, 0xB9, 0x16, 0x39] +devnonce = [0x01, 0x25] +lora = LoRaWANsend(deveui, appeui, appkey, devnonce, False) + +# Setup +lora.set_mode(MODE.SLEEP) +lora.set_dio_mapping([1,0,0,0,0,0]) +lora.set_freq(868.1) +lora.set_pa_config(pa_select=1) +lora.set_spreading_factor(7) +lora.set_pa_config(max_power=0x0F, output_power=0x0E) +lora.set_sync_word(0x34) +lora.set_rx_crc(True) + +print(lora) +assert(lora.get_agc_auto_on() == 1) + +try: + print("Sending LoRaWAN message\n") + lora.start() +except KeyboardInterrupt: + sys.stdout.flush() + print("\nKeyboardInterrupt") +finally: + sys.stdout.flush() + print("") + lora.set_mode(MODE.SLEEP) + print(lora) + BOARD.teardown()