Skip to content

Commit

Permalink
Fixed OTAA handling and added an example how to derive the nwskey and…
Browse files Browse the repository at this point in the history
… appskey
  • Loading branch information
jeroennijhof committed May 23, 2017
1 parent 1771e82 commit 5cf4eea
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 39 deletions.
2 changes: 1 addition & 1 deletion LoRaWAN/DataPayload.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def compute_mic(self, key, direction, mhdr):
computed_mic = cmac.encode(bytes(key), bytes(mic))[:4]
return list(map(int, computed_mic))

def decrypt_payload(self, key, direction):
def decrypt_payload(self, key, direction, mic):
k = int(math.ceil(len(self.payload) / 16.0))

a = []
Expand Down
5 changes: 4 additions & 1 deletion LoRaWAN/FHDR.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ def read(self, mac_payload):
def create(self, mtype, args):
self.devaddr = [0x00, 0x00, 0x00, 0x00]
self.fctrl = 0x00
self.fcnt = args['fcnt'].to_bytes(2, byteorder='little')
if 'fcnt' in args:
self.fcnt = args['fcnt'].to_bytes(2, byteorder='little')
else:
self.fcnt = [0x00, 0x00]
self.fopts = []
if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\
mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN:
Expand Down
73 changes: 48 additions & 25 deletions LoRaWAN/JoinAcceptPayload.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
#
# frm_payload: appnonce(3) netid(3) devaddr(4) dlsettings(1) rxdelay(1) cflist(0..16)
#
from .MalformedPacketException import MalformedPacketException
from .AES_CMAC import AES_CMAC
from Crypto.Cipher import AES

class JoinAcceptPayload:

def __init__(self, payload):
if len(payload) < 16:
def read(self, payload):
if len(payload) < 12:
raise MalformedPacketException("Invalid join accept");
self.encrypted_payload = payload
self.payload = self.decrypt_payload(payload)

self.appnonce = self.payload[:3]
self.netid = self.payload[3:6]
self.devaddr = self.payload[6:10]
self.dlsettings = self.payload[10]
self.rxdelay = self.payload[11]
self.cflist = None
if self.payload[12:]:
self.cflist = self.payload[12:]
def create(self, args):
pass

def length(self):
return len(self.encrypted_payload)
Expand Down Expand Up @@ -50,27 +44,56 @@ def get_cflist(self):

def compute_mic(self, key, direction, mhdr):
mic = []
mic += self.to_clear_raw()
mic += [mhdr.to_raw()]
mic += self.to_clear_raw()

cmac = AES_CMAC()
computed_mic = cmac.encode(str(bytearray(key)), str(bytearray(mic)))[:4]
return map(int, bytearray(computed_mic))
computed_mic = cmac.encode(bytes(key), bytes(mic))[:4]
return list(map(int, computed_mic))

def decrypt_payload(self, key, direction):
def decrypt_payload(self, key, direction, mic):
a = []
a += self.encrypted_payload
a += self.mic
a += mic

cipher = AES.new(str(bytearray(key)))
s = map(ord, cipher.encrypt(str(bytearray(a))))
return s
cipher = AES.new(bytes(key))
self.payload = cipher.encrypt(bytes(a))[:-4]

def encrypt_payload(self, key):
self.appnonce = self.payload[:3]
self.netid = self.payload[3:6]
self.devaddr = self.payload[6:10]
self.dlsettings = self.payload[10]
self.rxdelay = self.payload[11]
self.cflist = None
if self.payload[12:]:
self.cflist = self.payload[12:]

return list(map(int, self.payload))

def encrypt_payload(self, key, direction, mhdr):
a = []
a += self.to_clear_raw()
a += self.compute_mic()

cipher = AES.new(str(bytearray(key)))
s = map(ord, cipher.decrypt(str(bytearray(a))))
return s[:-4], s[-4:]
a += self.compute_mic(key, direction, mhdr)

cipher = AES.new(bytes(key))
return list(map(int, cipher.decrypt(bytes(a))))

def derive_nwkey(self, key, devnonce):
a = [0x01]
a += self.get_appnonce()
a += self.get_netid()
a += devnonce
a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]

cipher = AES.new(bytes(key))
return list(map(hex, cipher.encrypt(bytes(a))))

def derive_appkey(self, key, devnonce):
a = [0x02]
a += self.get_appnonce()
a += self.get_netid()
a += devnonce
a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]

cipher = AES.new(bytes(key))
return list(map(hex, cipher.encrypt(bytes(a))))
13 changes: 7 additions & 6 deletions LoRaWAN/JoinRequestPayload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# frm_payload: appeui(8) deveui(8) devnonce(2)
#
from .MalformedPacketException import MalformedPacketException
from .AES_CMAC import AES_CMAC
from Crypto.Cipher import AES

Expand All @@ -9,13 +10,13 @@ class JoinRequestPayload:
def read(self, payload):
if len(payload) != 18:
raise MalformedPacketException("Invalid join request");
self.appeui = payload[:8]
self.deveui = payload[8:16]
self.appeui = payload[:8]
self.devnonce = payload[16:18]

def create(self, args):
self.appeui = args['appeui']
self.deveui = args['deveui']
self.deveui = list(reversed(args['deveui']))
self.appeui = list(reversed(args['appeui']))
self.devnonce = args['devnonce']

def length(self):
Expand All @@ -42,8 +43,8 @@ def compute_mic(self, key, direction, mhdr):
mic += self.to_raw()

cmac = AES_CMAC()
computed_mic = cmac.encode(str(bytearray(key)), str(bytearray(mic)))[:4]
return map(int, bytearray(computed_mic))
computed_mic = cmac.encode(bytes(key), bytes(mic))[:4]
return list(map(int, computed_mic))

def decrypt_payload(self, key, direction):
def decrypt_payload(self, key, direction, mic):
return self.to_raw()
4 changes: 2 additions & 2 deletions LoRaWAN/MacPayload.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ def read(self, mtype, mac_payload):
self.frm_payload = None
if mtype == MHDR.JOIN_REQUEST:
self.frm_payload = JoinRequestPayload()
self.frm_payload.read(mac_payload[self.fhdr.length() + 1:])
self.frm_payload.read(mac_payload)
if mtype == MHDR.JOIN_ACCEPT:
self.frm_payload = JoinAcceptPayload()
self.frm_payload.read(mac_payload[self.fhdr.length() + 1:])
self.frm_payload.read(mac_payload)
if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\
mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN:
self.frm_payload = DataPayload()
Expand Down
18 changes: 15 additions & 3 deletions LoRaWAN/PhyPayload.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,22 @@ def set_mic(self, mic):
self.mic = mic

def compute_mic(self):
return self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
return self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:]
else:
return self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())

def valid_mic(self):
return self.get_mic() == self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
return self.get_mic() == self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:]
else:
return self.get_mic() == self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())

def get_payload(self):
return self.mac_payload.frm_payload.decrypt_payload(self.appkey, self.get_direction())
return self.mac_payload.frm_payload.decrypt_payload(self.appkey, self.get_direction(), self.mic)

def derive_nwkey(self, devnonce):
return self.mac_payload.frm_payload.derive_nwkey(self.appkey, devnonce)

def derive_appkey(self, devnonce):
return self.mac_payload.frm_payload.derive_appkey(self.appkey, devnonce)
2 changes: 1 addition & 1 deletion LoRaWAN/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .PhyPayload import PhyPayload

def new(nwkey,appkey):
def new(nwkey = [], appkey = []):
return PhyPayload(nwkey, appkey)
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ See: https://www.lora-alliance.org/portals/0/specs/LoRaWAN%20Specification%201R0
## Installation
Just git clone and check rx_ttn.py for reading LoRaWAN messages and tx_ttn.py for sending LoRaWAN messages.

## TODO
Make code more readable and easier to use
113 changes: 113 additions & 0 deletions otaa_ttn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python3
import sys
from time import sleep
from SX127x.LoRa import *
from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config import BOARD
import LoRaWAN
from LoRaWAN.MHDR import MHDR

BOARD.setup()
parser = LoRaArgumentParser("LoRaWAN sender")

class LoRaWANsend(LoRa):
def __init__(self, deveui = [], appeui = [], appkey = [], devnonce = [], verbose = False):
super(LoRaWANsend, self).__init__(verbose)
self.deveui = deveui
self.appeui = appeui
self.appkey = appkey
self.devnonce = devnonce

def on_rx_done(self):
print("RxDone")

self.clear_irq_flags(RxDone=1)
payload = self.read_payload(nocheck=True)

lorawan = LoRaWAN.new([], appkey)
lorawan.read(payload)
print(lorawan.get_payload())
print(lorawan.get_mhdr().get_mversion())
print(lorawan.get_mhdr().get_mtype())
print(lorawan.get_mic())
print(lorawan.compute_mic())
print(lorawan.valid_mic())
print(lorawan.derive_nwkey(devnonce))
print(lorawan.derive_appkey(devnonce))
print("\n")
sys.exit(0)

def on_tx_done(self):
self.clear_irq_flags(TxDone=1)
print("TxDone")

self.set_mode(MODE.STDBY)
self.set_dio_mapping([0,0,0,0,0,0])
self.set_invert_iq(1)
self.reset_ptr_rx()
self.set_mode(MODE.RXCONT)

def on_cad_done(self):
print("on_CadDone")
print(self.get_irq_flags())

def on_rx_timeout(self):
print("on_RxTimeout")
print(self.get_irq_flags())

def on_valid_header(self):
print("on_ValidHeader")
print(self.get_irq_flags())

def on_payload_crc_error(self):
print("on_PayloadCrcError")
print(self.get_irq_flags())

def on_fhss_change_channel(self):
print("on_FhssChangeChannel")
print(self.get_irq_flags())

def start(self):
self.tx_counter = 1

lorawan = LoRaWAN.new(self.appkey)
lorawan.create(MHDR.JOIN_REQUEST, {'deveui': self.deveui, 'appeui': self.appeui, 'devnonce': self.devnonce})

self.write_payload(lorawan.to_raw())
self.set_mode(MODE.TX)
while True:
sleep(1)


# Init
deveui = [0x00, 0x82, 0xAA, 0x0D, 0x42, 0x9C, 0x79, 0x34]
appeui = [0x70, 0xB3, 0xD5, 0x7E, 0xF0, 0x00, 0x4D, 0xBC]
appkey = [0x13, 0x1C, 0x8A, 0xF7, 0xA3, 0xE4, 0x35, 0xD0, 0xD5, 0xE9, 0x47, 0x6B, 0x04, 0xB9, 0x16, 0x39]
devnonce = [0x01, 0x25]
lora = LoRaWANsend(deveui, appeui, appkey, devnonce, False)

# Setup
lora.set_mode(MODE.SLEEP)
lora.set_dio_mapping([1,0,0,0,0,0])
lora.set_freq(868.1)
lora.set_pa_config(pa_select=1)
lora.set_spreading_factor(7)
lora.set_pa_config(max_power=0x0F, output_power=0x0E)
lora.set_sync_word(0x34)
lora.set_rx_crc(True)

print(lora)
assert(lora.get_agc_auto_on() == 1)

try:
print("Sending LoRaWAN message\n")
lora.start()
except KeyboardInterrupt:
sys.stdout.flush()
print("\nKeyboardInterrupt")
finally:
sys.stdout.flush()
print("")
lora.set_mode(MODE.SLEEP)
print(lora)
BOARD.teardown()

0 comments on commit 5cf4eea

Please sign in to comment.