diff --git a/AES_CMAC.py b/LoRaWAN/AES_CMAC.py similarity index 100% rename from AES_CMAC.py rename to LoRaWAN/AES_CMAC.py diff --git a/DataPayload.py b/LoRaWAN/DataPayload.py similarity index 100% rename from DataPayload.py rename to LoRaWAN/DataPayload.py diff --git a/Direction.py b/LoRaWAN/Direction.py similarity index 100% rename from Direction.py rename to LoRaWAN/Direction.py diff --git a/FHDR.py b/LoRaWAN/FHDR.py similarity index 100% rename from FHDR.py rename to LoRaWAN/FHDR.py diff --git a/JoinAcceptPayload.py b/LoRaWAN/JoinAcceptPayload.py similarity index 100% rename from JoinAcceptPayload.py rename to LoRaWAN/JoinAcceptPayload.py diff --git a/JoinRequestPayload.py b/LoRaWAN/JoinRequestPayload.py similarity index 100% rename from JoinRequestPayload.py rename to LoRaWAN/JoinRequestPayload.py diff --git a/MHDR.py b/LoRaWAN/MHDR.py similarity index 100% rename from MHDR.py rename to LoRaWAN/MHDR.py diff --git a/MacPayload.py b/LoRaWAN/MacPayload.py similarity index 100% rename from MacPayload.py rename to LoRaWAN/MacPayload.py diff --git a/MalformedPacketException.py b/LoRaWAN/MalformedPacketException.py similarity index 100% rename from MalformedPacketException.py rename to LoRaWAN/MalformedPacketException.py diff --git a/PhyPayload.py b/LoRaWAN/PhyPayload.py similarity index 100% rename from PhyPayload.py rename to LoRaWAN/PhyPayload.py diff --git a/__init__.py b/LoRaWAN/__init__.py similarity index 100% rename from __init__.py rename to LoRaWAN/__init__.py diff --git a/README.md b/README.md index 3dcb0b7..f4da01b 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,10 @@ # LoRaWAN This is a LoRaWAN v1.0 implementation in python. -You still need a LoRa driver to actually get or send a packet at radio level. -Testing has been done with https://github.com/mayeranalytics/pySX127x icw a RFM95 attached to a Raspberry PI. +It uses https://github.com/mayeranalytics/pySX127x and it's currently being tested with a RFM95 attached to a Raspberry PI. See: https://www.lora-alliance.org/portals/0/specs/LoRaWAN%20Specification%201R0.pdf ## Installation -Just git clone to your python source directory where you want to use this module. +Just git clone and check rx_ttn.py for reading LoRaWAN messages and tx_ttn.py for sending LoRaWAN messages. -## Example -``` -#!/usr/bin/env python3 -import LoRaWAN -from LoRaWAN.MHDR import MHDR - -nwkey = [0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c] -appkey = [0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c] -packet = [0x80,0x8f,0x77,0xbb,0x07,0x00,0x02,0x00,0x06,0xbd,0x33,0x42,0xa1,0x9f,0xcc,0x3c,0x8d,0x6b,0xcb,0x5f,0xdb,0x05,0x48,0xdb,0x4d,0xc8,0x50,0x14,0xae,0xeb,0xfe,0x0b,0x54,0xb1,0xc9,0x98,0xde,0xf5,0x3e,0x97,0x9b,0x70,0x1d,0xab,0xb0,0x45,0x30,0x0e,0xf8,0x69,0x9c,0x38,0xfc,0x1a,0x34,0xd5] - -print("Reading lorawan packet:") -lorawan = LoRaWAN.new(nwkey, appkey) -lorawan.read(packet) -print(lorawan.get_mhdr().get_mversion()) -print(lorawan.get_mhdr().get_mtype()) -print(lorawan.get_mic()) -print(lorawan.compute_mic()) -print(lorawan.valid_mic()) -print(lorawan.get_payload()) - -print("\nWriting lorawan packet:") -lorawan = LoRaWAN.new(nwkey, appkey) -lorawan.create(MHDR.JOIN_REQUEST, {'appeui': [1,2,3,4,5,6,7,8], 'deveui': [1,2,3,4,5,6,7,8], 'devnonce': [23,12]}) -print(lorawan.get_mic()) -print(lorawan.to_raw()) -``` diff --git a/SX127x/LoRa.py b/SX127x/LoRa.py new file mode 100644 index 0000000..43ff31d --- /dev/null +++ b/SX127x/LoRa.py @@ -0,0 +1,950 @@ +""" Defines the SX127x class and a few utility functions. """ + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +import sys +from .constants import * +from .board_config import BOARD + + +################################################## Some utility functions ############################################## + +def set_bit(value, index, new_bit): + """ Set the index'th bit of value to new_bit, and return the new value. + :param value: The integer to set the new_bit in + :type value: int + :param index: 0-based index + :param new_bit: New value the bit shall have (0 or 1) + :return: Changed value + :rtype: int + """ + mask = 1 << index + value &= ~mask + if new_bit: + value |= mask + return value + + +def getter(register_address): + """ The getter decorator reads the register content and calls the decorated function to do + post-processing. + :param register_address: Register address + :return: Register value + :rtype: int + """ + def decorator(func): + def wrapper(self): + return func(self, self.spi.xfer([register_address, 0])[1]) + return wrapper + return decorator + + +def setter(register_address): + """ The setter decorator calls the decorated function for pre-processing and + then writes the result to the register + :param register_address: Register address + :return: New register value + :rtype: int + """ + def decorator(func): + def wrapper(self, val): + return self.spi.xfer([register_address | 0x80, func(self, val)])[1] + return wrapper + return decorator + + +############################################### Definition of the LoRa class ########################################### + +class LoRa(object): + + spi = BOARD.SpiDev() # init and get the baord's SPI + mode = None # the mode is backed up here + backup_registers = [] + verbose = True + dio_mapping = [None] * 6 # store the dio mapping here + + def __init__(self, verbose=True, do_calibration=True, calibration_freq=868): + """ Init the object + + Send the device to sleep, read all registers, and do the calibration (if do_calibration=True) + :param verbose: Set the verbosity True/False + :param calibration_freq: call rx_chain_calibration with this parameter. Default is 868 + :param do_calibration: Call rx_chain_calibration, default is True. + """ + self.verbose = verbose + # set the callbacks for DIO0..5 IRQs. + BOARD.add_events(self._dio0, self._dio1, self._dio2, self._dio3, self._dio4, self._dio5) + # set mode to sleep and read all registers + self.set_mode(MODE.SLEEP) + self.backup_registers = self.get_all_registers() + # more setup work: + if do_calibration: + self.rx_chain_calibration(calibration_freq) + # the FSK registers are set up exactly as modtronix do it: + lookup_fsk = [ + #[REG.FSK.LNA , 0x23], + #[REG.FSK.RX_CONFIG , 0x1E], + #[REG.FSK.RSSI_CONFIG , 0xD2], + #[REG.FSK.PREAMBLE_DETECT, 0xAA], + #[REG.FSK.OSC , 0x07], + #[REG.FSK.SYNC_CONFIG , 0x12], + #[REG.FSK.SYNC_VALUE_1 , 0xC1], + #[REG.FSK.SYNC_VALUE_2 , 0x94], + #[REG.FSK.SYNC_VALUE_3 , 0xC1], + #[REG.FSK.PACKET_CONFIG_1, 0xD8], + #[REG.FSK.FIFO_THRESH , 0x8F], + #[REG.FSK.IMAGE_CAL , 0x02], + #[REG.FSK.DIO_MAPPING_1 , 0x00], + #[REG.FSK.DIO_MAPPING_2 , 0x30] + ] + self.set_mode(MODE.FSK_STDBY) + for register_address, value in lookup_fsk: + self.set_register(register_address, value) + self.set_mode(MODE.SLEEP) + # set the dio_ mapping by calling the two get_dio_mapping_* functions + self.get_dio_mapping_1() + self.get_dio_mapping_2() + + + # Overridable functions: + + def on_rx_done(self): + pass + + def on_tx_done(self): + pass + + def on_cad_done(self): + pass + + def on_rx_timeout(self): + pass + + def on_valid_header(self): + pass + + def on_payload_crc_error(self): + pass + + def on_fhss_change_channel(self): + pass + + # Internal callbacks for add_events() + + def _dio0(self, channel): + # DIO0 00: RxDone + # DIO0 01: TxDone + # DIO0 10: CadDone + if self.dio_mapping[0] == 0: + self.on_rx_done() + elif self.dio_mapping[0] == 1: + self.on_tx_done() + elif self.dio_mapping[0] == 2: + self.on_cad_done() + else: + raise RuntimeError("unknown dio0mapping!") + + def _dio1(self, channel): + # DIO1 00: RxTimeout + # DIO1 01: FhssChangeChannel + # DIO1 10: CadDetected + if self.dio_mapping[1] == 0: + self.on_rx_timeout() + elif self.dio_mapping[1] == 1: + self.on_fhss_change_channel() + elif self.dio_mapping[1] == 2: + self.on_CadDetected() + else: + raise RuntimeError("unknown dio1mapping!") + + def _dio2(self, channel): + # DIO2 00: FhssChangeChannel + # DIO2 01: FhssChangeChannel + # DIO2 10: FhssChangeChannel + self.on_fhss_change_channel() + + def _dio3(self, channel): + # DIO3 00: CadDone + # DIO3 01: ValidHeader + # DIO3 10: PayloadCrcError + if self.dio_mapping[3] == 0: + self.on_cad_done() + elif self.dio_mapping[3] == 1: + self.on_valid_header() + elif self.dio_mapping[3] == 2: + self.on_payload_crc_error() + else: + raise RuntimeError("unknown dio3 mapping!") + + def _dio4(self, channel): + raise RuntimeError("DIO4 is not used") + + def _dio5(self, channel): + raise RuntimeError("DIO5 is not used") + + # All the set/get/read/write functions + + def get_mode(self): + """ Get the mode + :return: New mode + """ + self.mode = self.spi.xfer([REG.LORA.OP_MODE, 0])[1] + return self.mode + + def set_mode(self, mode): + """ Set the mode + :param mode: Set the mode. Use constants.MODE class + :return: New mode + """ + # the mode is backed up in self.mode + if mode == self.mode: + return mode + if self.verbose: + sys.stderr.write("Mode <- %s\n" % MODE.lookup[mode]) + self.mode = mode + return self.spi.xfer([REG.LORA.OP_MODE | 0x80, mode])[1] + + def write_payload(self, payload): + """ Get FIFO ready for TX: Set FifoAddrPtr to FifoTxBaseAddr. The transceiver is put into STDBY mode. + :param payload: Payload to write (list) + :return: Written payload + """ + payload_size = len(payload) + self.set_payload_length(payload_size) + + self.set_mode(MODE.STDBY) + base_addr = self.get_fifo_tx_base_addr() + self.set_fifo_addr_ptr(base_addr) + return self.spi.xfer([REG.LORA.FIFO | 0x80] + payload)[1:] + + def reset_ptr_rx(self): + """ Get FIFO ready for RX: Set FifoAddrPtr to FifoRxBaseAddr. The transceiver is put into STDBY mode. """ + self.set_mode(MODE.STDBY) + base_addr = self.get_fifo_rx_base_addr() + self.set_fifo_addr_ptr(base_addr) + + def rx_is_good(self): + """ Check the IRQ flags for RX errors + :return: True if no errors + :rtype: bool + """ + flags = self.get_irq_flags() + return not any([flags[s] for s in ['valid_header', 'crc_error', 'rx_done', 'rx_timeout']]) + + def read_payload(self , nocheck = False): + """ Read the payload from FIFO + :param nocheck: If True then check rx_is_good() + :return: Payload + :rtype: list[int] + """ + if not nocheck and not self.rx_is_good(): + return None + rx_nb_bytes = self.get_rx_nb_bytes() + fifo_rx_current_addr = self.get_fifo_rx_current_addr() + self.set_fifo_addr_ptr(fifo_rx_current_addr) + payload = self.spi.xfer([REG.LORA.FIFO] + [0] * rx_nb_bytes)[1:] + return payload + + def get_freq(self): + """ Get the frequency (MHz) + :return: Frequency in MHz + :rtype: float + """ + msb, mid, lsb = self.spi.xfer([REG.LORA.FR_MSB, 0, 0, 0])[1:] + f = lsb + 256*(mid + 256*msb) + return f / 16384. + + def set_freq(self, f): + """ Set the frequency (MHz) + :param f: Frequency in MHz + "type f: float + :return: New register settings (3 bytes [msb, mid, lsb]) + :rtype: list[int] + """ + assert self.mode == MODE.SLEEP or self.mode == MODE.STDBY or self.mode == MODE.FSK_STDBY + i = int(f * 16384.) # choose floor + msb = i // 65536 + i -= msb * 65536 + mid = i // 256 + i -= mid * 256 + lsb = i + return self.spi.xfer([REG.LORA.FR_MSB | 0x80, msb, mid, lsb]) + + def get_pa_config(self, convert_dBm=False): + v = self.spi.xfer([REG.LORA.PA_CONFIG, 0])[1] + pa_select = v >> 7 + max_power = v >> 4 & 0b111 + output_power = v & 0b1111 + if convert_dBm: + max_power = max_power * .6 + 10.8 + output_power = max_power - (15 - output_power) + return dict( + pa_select = pa_select, + max_power = max_power, + output_power = output_power + ) + + def set_pa_config(self, pa_select=None, max_power=None, output_power=None): + """ Configure the PA + :param pa_select: Selects PA output pin, 0->RFO, 1->PA_BOOST + :param max_power: Select max output power Pmax=10.8+0.6*MaxPower + :param output_power: Output power Pout=Pmax-(15-OutputPower) if PaSelect = 0, + Pout=17-(15-OutputPower) if PaSelect = 1 (PA_BOOST pin) + :return: new register value + """ + loc = locals() + current = self.get_pa_config() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['pa_select'] << 7) | (loc['max_power'] << 4) | (loc['output_power']) + return self.spi.xfer([REG.LORA.PA_CONFIG | 0x80, val])[1] + + @getter(REG.LORA.PA_RAMP) + def get_pa_ramp(self, val): + return val & 0b1111 + + @setter(REG.LORA.PA_RAMP) + def set_pa_ramp(self, val): + return val & 0b1111 + + def get_ocp(self, convert_mA=False): + v = self.spi.xfer([REG.LORA.OCP, 0])[1] + ocp_on = v >> 5 & 0x01 + ocp_trim = v & 0b11111 + if convert_mA: + if ocp_trim <= 15: + ocp_trim = 45. + 5. * ocp_trim + elif ocp_trim <= 27: + ocp_trim = -30. + 10. * ocp_trim + else: + assert ocp_trim <= 27 + return dict( + ocp_on = ocp_on, + ocp_trim = ocp_trim + ) + + def set_ocp_trim(self, I_mA): + assert(I_mA >= 45 and I_mA <= 240) + ocp_on = self.spi.xfer([REG.LORA.OCP, 0])[1] >> 5 & 0x01 + if I_mA <= 120: + v = int(round((I_mA-45.)/5.)) + else: + v = int(round((I_mA+30.)/10.)) + v = set_bit(v, 5, ocp_on) + return self.spi.xfer([REG.LORA.OCP | 0x80, v])[1] + + def get_lna(self): + v = self.spi.xfer([REG.LORA.LNA, 0])[1] + return dict( + lna_gain = v >> 5, + lna_boost_lf = v >> 3 & 0b11, + lna_boost_hf = v & 0b11 + ) + + def set_lna(self, lna_gain=None, lna_boost_lf=None, lna_boost_hf=None): + assert lna_boost_hf is None or lna_boost_hf == 0b00 or lna_boost_hf == 0b11 + self.set_mode(MODE.STDBY) + if lna_gain is not None: + # Apparently agc_auto_on must be 0 in order to set lna_gain + self.set_agc_auto_on(lna_gain == GAIN.NOT_USED) + loc = locals() + current = self.get_lna() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['lna_gain'] << 5) | (loc['lna_boost_lf'] << 3) | (loc['lna_boost_hf']) + retval = self.spi.xfer([REG.LORA.LNA | 0x80, val])[1] + if lna_gain is not None: + # agc_auto_on must track lna_gain: GAIN=NOT_USED -> agc_auto=ON, otherwise =OFF + self.set_agc_auto_on(lna_gain == GAIN.NOT_USED) + return retval + + def set_lna_gain(self, lna_gain): + self.set_lna(lna_gain=lna_gain) + + def get_fifo_addr_ptr(self): + return self.spi.xfer([REG.LORA.FIFO_ADDR_PTR, 0])[1] + + def set_fifo_addr_ptr(self, ptr): + return self.spi.xfer([REG.LORA.FIFO_ADDR_PTR | 0x80, ptr])[1] + + def get_fifo_tx_base_addr(self): + return self.spi.xfer([REG.LORA.FIFO_TX_BASE_ADDR, 0])[1] + + def set_fifo_tx_base_addr(self, ptr): + return self.spi.xfer([REG.LORA.FIFO_TX_BASE_ADDR | 0x80, ptr])[1] + + def get_fifo_rx_base_addr(self): + return self.spi.xfer([REG.LORA.FIFO_RX_BASE_ADDR, 0])[1] + + def set_fifo_rx_base_addr(self, ptr): + return self.spi.xfer([REG.LORA.FIFO_RX_BASE_ADDR | 0x80, ptr])[1] + + def get_fifo_rx_current_addr(self): + return self.spi.xfer([REG.LORA.FIFO_RX_CURR_ADDR, 0])[1] + + def get_fifo_rx_byte_addr(self): + return self.spi.xfer([REG.LORA.FIFO_RX_BYTE_ADDR, 0])[1] + + def get_irq_flags_mask(self): + v = self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK, 0])[1] + return dict( + rx_timeout = v >> 7 & 0x01, + rx_done = v >> 6 & 0x01, + crc_error = v >> 5 & 0x01, + valid_header = v >> 4 & 0x01, + tx_done = v >> 3 & 0x01, + cad_done = v >> 2 & 0x01, + fhss_change_ch = v >> 1 & 0x01, + cad_detected = v >> 0 & 0x01, + ) + + def set_irq_flags_mask(self, + rx_timeout=None, rx_done=None, crc_error=None, valid_header=None, tx_done=None, + cad_done=None, fhss_change_ch=None, cad_detected=None): + loc = locals() + v = self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK, 0])[1] + for i, s in enumerate(['cad_detected', 'fhss_change_ch', 'cad_done', 'tx_done', 'valid_header', + 'crc_error', 'rx_done', 'rx_timeout']): + this_bit = locals()[s] + if this_bit is not None: + v = set_bit(v, i, this_bit) + return self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK | 0x80, v])[1] + + def get_irq_flags(self): + v = self.spi.xfer([REG.LORA.IRQ_FLAGS, 0])[1] + return dict( + rx_timeout = v >> 7 & 0x01, + rx_done = v >> 6 & 0x01, + crc_error = v >> 5 & 0x01, + valid_header = v >> 4 & 0x01, + tx_done = v >> 3 & 0x01, + cad_done = v >> 2 & 0x01, + fhss_change_ch = v >> 1 & 0x01, + cad_detected = v >> 0 & 0x01, + ) + + def set_irq_flags(self, + rx_timeout=None, rx_done=None, crc_error=None, valid_header=None, tx_done=None, + cad_done=None, fhss_change_ch=None, cad_detected=None): + v = self.spi.xfer([REG.LORA.IRQ_FLAGS, 0])[1] + for i, s in enumerate(['cad_detected', 'fhss_change_ch', 'cad_done', 'tx_done', 'valid_header', + 'crc_error', 'rx_done', 'rx_timeout']): + this_bit = locals()[s] + if this_bit is not None: + v = set_bit(v, i, this_bit) + return self.spi.xfer([REG.LORA.IRQ_FLAGS | 0x80, v])[1] + + def clear_irq_flags(self, + RxTimeout=None, RxDone=None, PayloadCrcError=None, + ValidHeader=None, TxDone=None, CadDone=None, + FhssChangeChannel=None, CadDetected=None): + v = 0 + for i, s in enumerate(['CadDetected', 'FhssChangeChannel', 'CadDone', + 'TxDone', 'ValidHeader', 'PayloadCrcError', + 'RxDone', 'RxTimeout']): + this_bit = locals()[s] + if this_bit is not None: + v = set_bit(v, eval('MASK.IRQ_FLAGS.' + s), this_bit) + return self.spi.xfer([REG.LORA.IRQ_FLAGS | 0x80, v])[1] + + + def get_rx_nb_bytes(self): + return self.spi.xfer([REG.LORA.RX_NB_BYTES, 0])[1] + + def get_rx_header_cnt(self): + msb, lsb = self.spi.xfer([REG.LORA.RX_HEADER_CNT_MSB, 0, 0])[1:] + return lsb + 256 * msb + + def get_rx_packet_cnt(self): + msb, lsb = self.spi.xfer([REG.LORA.RX_PACKET_CNT_MSB, 0, 0])[1:] + return lsb + 256 * msb + + def get_modem_status(self): + status = self.spi.xfer([REG.LORA.MODEM_STAT, 0])[1] + return dict( + rx_coding_rate = status >> 5 & 0x03, + modem_clear = status >> 4 & 0x01, + header_info_valid = status >> 3 & 0x01, + rx_ongoing = status >> 2 & 0x01, + signal_sync = status >> 1 & 0x01, + signal_detected = status >> 0 & 0x01 + ) + + def get_pkt_snr_value(self): + v = self.spi.xfer([REG.LORA.PKT_SNR_VALUE, 0])[1] + return float(256-v) / 4. + + def get_pkt_rssi_value(self): + v = self.spi.xfer([REG.LORA.PKT_RSSI_VALUE, 0])[1] + return v - 157 + + def get_rssi_value(self): + v = self.spi.xfer([REG.LORA.RSSI_VALUE, 0])[1] + return v - 157 + + def get_hop_channel(self): + v = self.spi.xfer([REG.LORA.HOP_CHANNEL, 0])[1] + return dict( + pll_timeout = v >> 7, + crc_on_payload = v >> 6 & 0x01, + fhss_present_channel = v >> 5 & 0b111111 + ) + + def get_modem_config_1(self): + val = self.spi.xfer([REG.LORA.MODEM_CONFIG_1, 0])[1] + return dict( + bw = val >> 4 & 0x0F, + coding_rate = val >> 1 & 0x07, + implicit_header_mode = val & 0x01 + ) + + def set_modem_config_1(self, bw=None, coding_rate=None, implicit_header_mode=None): + loc = locals() + current = self.get_modem_config_1() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = loc['implicit_header_mode'] | (loc['coding_rate'] << 1) | (loc['bw'] << 4) + return self.spi.xfer([REG.LORA.MODEM_CONFIG_1 | 0x80, val])[1] + + def set_bw(self, bw): + """ Set the bandwidth 0=7.8kHz ... 9=500kHz + :param bw: A number 0,2,3,...,9 + :return: + """ + self.set_modem_config_1(bw=bw) + + def set_coding_rate(self, coding_rate): + """ Set the coding rate 4/5, 4/6, 4/7, 4/8 + :param coding_rate: A number 1,2,3,4 + :return: New register value + """ + self.set_modem_config_1(coding_rate=coding_rate) + + def set_implicit_header_mode(self, implicit_header_mode): + self.set_modem_config_1(implicit_header_mode=implicit_header_mode) + + def get_modem_config_2(self, include_symb_timout_lsb=False): + val = self.spi.xfer([REG.LORA.MODEM_CONFIG_2, 0])[1] + d = dict( + spreading_factor = val >> 4 & 0x0F, + tx_cont_mode = val >> 3 & 0x01, + rx_crc = val >> 2 & 0x01, + ) + if include_symb_timout_lsb: + d['symb_timout_lsb'] = val & 0x03 + return d + + def set_modem_config_2(self, spreading_factor=None, tx_cont_mode=None, rx_crc=None): + loc = locals() + # RegModemConfig2 contains the SymbTimout MSB bits. We tack the back on when writing this register. + current = self.get_modem_config_2(include_symb_timout_lsb=True) + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['spreading_factor'] << 4) | (loc['tx_cont_mode'] << 3) | (loc['rx_crc'] << 2) | current['symb_timout_lsb'] + return self.spi.xfer([REG.LORA.MODEM_CONFIG_2 | 0x80, val])[1] + + def set_spreading_factor(self, spreading_factor): + self.set_modem_config_2(spreading_factor=spreading_factor) + + def set_rx_crc(self, rx_crc): + self.set_modem_config_2(rx_crc=rx_crc) + + def get_modem_config_3(self): + val = self.spi.xfer([REG.LORA.MODEM_CONFIG_3, 0])[1] + return dict( + low_data_rate_optim = val >> 3 & 0x01, + agc_auto_on = val >> 2 & 0x01 + ) + + def set_modem_config_3(self, low_data_rate_optim=None, agc_auto_on=None): + loc = locals() + current = self.get_modem_config_3() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['low_data_rate_optim'] << 3) | (loc['agc_auto_on'] << 2) + return self.spi.xfer([REG.LORA.MODEM_CONFIG_3 | 0x80, val])[1] + + @setter(REG.LORA.INVERT_IQ) + def set_invert_iq(self, invert): + """ Invert the LoRa I and Q signals + :param invert: 0: normal mode, 1: I and Q inverted + :return: New value of register + """ + return 0x27 | (invert & 0x01) << 6 + + @getter(REG.LORA.INVERT_IQ) + def get_invert_iq(self, val): + """ Get the invert the I and Q setting + :return: 0: normal mode, 1: I and Q inverted + """ + return (val >> 6) & 0x01 + + def get_agc_auto_on(self): + return self.get_modem_config_3()['agc_auto_on'] + + def set_agc_auto_on(self, agc_auto_on): + self.set_modem_config_3(agc_auto_on=agc_auto_on) + + def get_low_data_rate_optim(self): + return self.set_modem_config_3()['low_data_rate_optim'] + + def set_low_data_rate_optim(self, low_data_rate_optim): + self.set_modem_config_3(low_data_rate_optim=low_data_rate_optim) + + def get_symb_timeout(self): + SYMB_TIMEOUT_MSB = REG.LORA.MODEM_CONFIG_2 + msb, lsb = self.spi.xfer([SYMB_TIMEOUT_MSB, 0, 0])[1:] # the MSB bits are stored in REG.LORA.MODEM_CONFIG_2 + msb = msb & 0b11 + return lsb + 256 * msb + + def set_symb_timeout(self, timeout): + bkup_reg_modem_config_2 = self.spi.xfer([REG.LORA.MODEM_CONFIG_2, 0])[1] + msb = timeout >> 8 & 0b11 # bits 8-9 + lsb = timeout - 256 * msb # bits 0-7 + reg_modem_config_2 = bkup_reg_modem_config_2 & 0xFC | msb # bits 2-7 of bkup_reg_modem_config_2 ORed with the two msb bits + old_msb = self.spi.xfer([REG.LORA.MODEM_CONFIG_2 | 0x80, reg_modem_config_2])[1] & 0x03 + old_lsb = self.spi.xfer([REG.LORA.SYMB_TIMEOUT_LSB | 0x80, lsb])[1] + return old_lsb + 256 * old_msb + + def get_preamble(self): + msb, lsb = self.spi.xfer([REG.LORA.PREAMBLE_MSB, 0, 0])[1:] + return lsb + 256 * msb + + def set_preamble(self, preamble): + msb = preamble >> 8 + lsb = preamble - msb * 256 + old_msb, old_lsb = self.spi.xfer([REG.LORA.PREAMBLE_MSB | 0x80, msb, lsb])[1:] + return old_lsb + 256 * old_msb + + @getter(REG.LORA.PAYLOAD_LENGTH) + def get_payload_length(self, val): + return val + + @setter(REG.LORA.PAYLOAD_LENGTH) + def set_payload_length(self, payload_length): + return payload_length + + @getter(REG.LORA.MAX_PAYLOAD_LENGTH) + def get_max_payload_length(self, val): + return val + + @setter(REG.LORA.MAX_PAYLOAD_LENGTH) + def set_max_payload_length(self, max_payload_length): + return max_payload_length + + @getter(REG.LORA.HOP_PERIOD) + def get_hop_period(self, val): + return val + + @setter(REG.LORA.HOP_PERIOD) + def set_hop_period(self, hop_period): + return hop_period + + def get_fei(self): + msb, mid, lsb = self.spi.xfer([REG.LORA.FEI_MSB, 0, 0, 0])[1:] + msb &= 0x0F + freq_error = lsb + 256 * (mid + 256 * msb) + return freq_error + + @getter(REG.LORA.DETECT_OPTIMIZE) + def get_detect_optimize(self, val): + """ Get LoRa detection optimize setting + :return: detection optimize setting 0x03: SF7-12, 0x05: SF6 + + """ + return val & 0b111 + + @setter(REG.LORA.DETECT_OPTIMIZE) + def set_detect_optimize(self, detect_optimize): + """ Set LoRa detection optimize + :param detect_optimize 0x03: SF7-12, 0x05: SF6 + :return: New register value + """ + assert detect_optimize == 0x03 or detect_optimize == 0x05 + return detect_optimize & 0b111 + + @getter(REG.LORA.DETECTION_THRESH) + def get_detection_threshold(self, val): + """ Get LoRa detection threshold setting + :return: detection threshold 0x0A: SF7-12, 0x0C: SF6 + + """ + return val + + @setter(REG.LORA.DETECTION_THRESH) + def set_detection_threshold(self, detect_threshold): + """ Set LoRa detection optimize + :param detect_threshold 0x0A: SF7-12, 0x0C: SF6 + :return: New register value + """ + assert detect_threshold == 0x0A or detect_threshold == 0x0C + return detect_threshold + + @getter(REG.LORA.SYNC_WORD) + def get_sync_word(self, sync_word): + return sync_word + + @setter(REG.LORA.SYNC_WORD) + def set_sync_word(self, sync_word): + return sync_word + + @getter(REG.LORA.DIO_MAPPING_1) + def get_dio_mapping_1(self, mapping): + """ Get mapping of pins DIO0 to DIO3. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: Value of the mapping list + :rtype: list[int] + """ + self.dio_mapping = [mapping>>6 & 0x03, mapping>>4 & 0x03, mapping>>2 & 0x03, mapping>>0 & 0x03] \ + + self.dio_mapping[4:6] + return self.dio_mapping + + @setter(REG.LORA.DIO_MAPPING_1) + def set_dio_mapping_1(self, mapping): + """ Set mapping of pins DIO0 to DIO3. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: New value of the register + :rtype: int + """ + self.dio_mapping = [mapping>>6 & 0x03, mapping>>4 & 0x03, mapping>>2 & 0x03, mapping>>0 & 0x03] \ + + self.dio_mapping[4:6] + return mapping + + @getter(REG.LORA.DIO_MAPPING_2) + def get_dio_mapping_2(self, mapping): + """ Get mapping of pins DIO4 to DIO5. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: Value of the mapping list + :rtype: list[int] + """ + self.dio_mapping = self.dio_mapping[0:4] + [mapping>>6 & 0x03, mapping>>4 & 0x03] + return self.dio_mapping + + @setter(REG.LORA.DIO_MAPPING_2) + def set_dio_mapping_2(self, mapping): + """ Set mapping of pins DIO4 to DIO5. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: New value of the register + :rtype: int + """ + assert mapping & 0b00001110 == 0 + self.dio_mapping = self.dio_mapping[0:4] + [mapping>>6 & 0x03, mapping>>4 & 0x03] + return mapping + + def get_dio_mapping(self): + """ Utility function that returns the list of current DIO mappings. Object variable dio_mapping will be set. + :return: List of current DIO mappings + :rtype: list[int] + """ + self.get_dio_mapping_1() + return self.get_dio_mapping_2() + + def set_dio_mapping(self, mapping): + """ Utility function that returns the list of current DIO mappings. Object variable dio_mapping will be set. + :param mapping: DIO mapping list + :type mapping: list[int] + :return: New DIO mapping list + :rtype: list[int] + """ + mapping_1 = (mapping[0] & 0x03) << 6 | (mapping[1] & 0x03) << 4 | (mapping[2] & 0x3) << 2 | mapping[3] & 0x3 + mapping_2 = (mapping[4] & 0x03) << 6 | (mapping[5] & 0x03) << 4 + self.set_dio_mapping_1(mapping_1) + return self.set_dio_mapping_2(mapping_2) + + @getter(REG.LORA.VERSION) + def get_version(self, version): + """ Version code of the chip. + Bits 7-4 give the full revision number; bits 3-0 give the metal mask revision number. + :return: Version code + :rtype: int + """ + return version + + @getter(REG.LORA.TCXO) + def get_tcxo(self, tcxo): + """ Get TCXO or XTAL input setting + 0 -> "XTAL": Crystal Oscillator with external Crystal + 1 -> "TCXO": External clipped sine TCXO AC-connected to XTA pin + :param tcxo: 1=TCXO or 0=XTAL input setting + :return: TCXO or XTAL input setting + :type: int (0 or 1) + """ + return tcxo & 0b00010000 + + @setter(REG.LORA.TCXO) + def set_tcxo(self, tcxo): + """ Make TCXO or XTAL input setting. + 0 -> "XTAL": Crystal Oscillator with external Crystal + 1 -> "TCXO": External clipped sine TCXO AC-connected to XTA pin + :param tcxo: 1=TCXO or 0=XTAL input setting + :return: new TCXO or XTAL input setting + """ + return (tcxo >= 1) << 4 | 0x09 # bits 0-3 must be 0b1001 + + @getter(REG.LORA.PA_DAC) + def get_pa_dac(self, pa_dac): + """ Enables the +20dBm option on PA_BOOST pin + False -> Default value + True -> +20dBm on PA_BOOST when OutputPower=1111 + :return: True/False if +20dBm option on PA_BOOST on/off + :rtype: bool + """ + pa_dac &= 0x07 # only bits 0-2 + if pa_dac == 0x04: + return False + elif pa_dac == 0x07: + return True + else: + raise RuntimeError("Bad PA_DAC value %s" % hex(pa_dac)) + + @setter(REG.LORA.PA_DAC) + def set_pa_dac(self, pa_dac): + """ Enables the +20dBm option on PA_BOOST pin + False -> Default value + True -> +20dBm on PA_BOOST when OutputPower=1111 + :param pa_dac: 1/0 if +20dBm option on PA_BOOST on/off + :return: New pa_dac register value + :rtype: int + """ + return 0x87 if pa_dac else 0x84 + + def rx_chain_calibration(self, freq=868.): + """ Run the image calibration (see Semtech documentation section 4.2.3.8) + :param freq: Frequency for the HF calibration + :return: None + """ + # backup some registers + op_mode_bkup = self.get_mode() + pa_config_bkup = self.get_register(REG.LORA.PA_CONFIG) + freq_bkup = self.get_freq() + # for image calibration device must be in FSK standby mode + self.set_mode(MODE.FSK_STDBY) + # cut the PA + self.set_register(REG.LORA.PA_CONFIG, 0x00) + # calibration for the LF band + image_cal = (self.get_register(REG.FSK.IMAGE_CAL) & 0xBF) | 0x40 + self.set_register(REG.FSK.IMAGE_CAL, image_cal) + while (self.get_register(REG.FSK.IMAGE_CAL) & 0x20) == 0x20: + pass + # Set a Frequency in HF band + self.set_freq(freq) + # calibration for the HF band + image_cal = (self.get_register(REG.FSK.IMAGE_CAL) & 0xBF) | 0x40 + self.set_register(REG.FSK.IMAGE_CAL, image_cal) + while (self.get_register(REG.FSK.IMAGE_CAL) & 0x20) == 0x20: + pass + # put back the saved parameters + self.set_mode(op_mode_bkup) + self.set_register(REG.LORA.PA_CONFIG, pa_config_bkup) + self.set_freq(freq_bkup) + + def dump_registers(self): + """ Returns a list of [reg_addr, reg_name, reg_value] tuples. Chip is put into mode SLEEP. + :return: List of [reg_addr, reg_name, reg_value] tuples + :rtype: list[tuple] + """ + self.set_mode(MODE.SLEEP) + values = self.get_all_registers() + skip_set = set([REG.LORA.FIFO]) + result_list = [] + for i, s in REG.LORA.lookup.iteritems(): + if i in skip_set: + continue + v = values[i] + result_list.append((i, s, v)) + return result_list + + def get_register(self, register_address): + return self.spi.xfer([register_address & 0x7F, 0])[1] + + def set_register(self, register_address, val): + return self.spi.xfer([register_address | 0x80, val])[1] + + def get_all_registers(self): + # read all registers + reg = [0] + self.spi.xfer([1]+[0]*0x3E)[1:] + self.mode = reg[1] + return reg + + def __del__(self): + self.set_mode(MODE.SLEEP) + if self.verbose: + sys.stderr.write("MODE=SLEEP\n") + + def __str__(self): + # don't use __str__ while in any mode other that SLEEP or STDBY + assert(self.mode == MODE.SLEEP or self.mode == MODE.STDBY) + + onoff = lambda i: 'ON' if i else 'OFF' + f = self.get_freq() + cfg1 = self.get_modem_config_1() + cfg2 = self.get_modem_config_2() + cfg3 = self.get_modem_config_3() + pa_config = self.get_pa_config(convert_dBm=True) + ocp = self.get_ocp(convert_mA=True) + lna = self.get_lna() + s = "SX127x LoRa registers:\n" + s += " mode %s\n" % MODE.lookup[self.get_mode()] + s += " freq %f MHz\n" % f + s += " coding_rate %s\n" % CODING_RATE.lookup[cfg1['coding_rate']] + s += " bw %s\n" % BW.lookup[cfg1['bw']] + s += " spreading_factor %s chips/symb\n" % (1 << cfg2['spreading_factor']) + s += " implicit_hdr_mode %s\n" % onoff(cfg1['implicit_header_mode']) + s += " rx_payload_crc %s\n" % onoff(cfg2['rx_crc']) + s += " tx_cont_mode %s\n" % onoff(cfg2['tx_cont_mode']) + s += " preamble %d\n" % self.get_preamble() + s += " low_data_rate_opti %s\n" % onoff(cfg3['low_data_rate_optim']) + s += " agc_auto_on %s\n" % onoff(cfg3['agc_auto_on']) + s += " symb_timeout %s\n" % self.get_symb_timeout() + s += " freq_hop_period %s\n" % self.get_hop_period() + s += " hop_channel %s\n" % self.get_hop_channel() + s += " payload_length %s\n" % self.get_payload_length() + s += " max_payload_length %s\n" % self.get_max_payload_length() + s += " irq_flags_mask %s\n" % self.get_irq_flags_mask() + s += " irq_flags %s\n" % self.get_irq_flags() + s += " rx_nb_byte %d\n" % self.get_rx_nb_bytes() + s += " rx_header_cnt %d\n" % self.get_rx_header_cnt() + s += " rx_packet_cnt %d\n" % self.get_rx_packet_cnt() + s += " pkt_snr_value %f\n" % self.get_pkt_snr_value() + s += " pkt_rssi_value %d\n" % self.get_pkt_rssi_value() + s += " rssi_value %d\n" % self.get_rssi_value() + s += " fei %d\n" % self.get_fei() + s += " pa_select %s\n" % PA_SELECT.lookup[pa_config['pa_select']] + s += " max_power %f dBm\n" % pa_config['max_power'] + s += " output_power %f dBm\n" % pa_config['output_power'] + s += " ocp %s\n" % onoff(ocp['ocp_on']) + s += " ocp_trim %f mA\n" % ocp['ocp_trim'] + s += " lna_gain %s\n" % GAIN.lookup[lna['lna_gain']] + s += " lna_boost_lf %s\n" % bin(lna['lna_boost_lf']) + s += " lna_boost_hf %s\n" % bin(lna['lna_boost_hf']) + s += " detect_optimize %#02x\n" % self.get_detect_optimize() + s += " detection_thresh %#02x\n" % self.get_detection_threshold() + s += " sync_word %#02x\n" % self.get_sync_word() + s += " dio_mapping 0..5 %s\n" % self.get_dio_mapping() + s += " tcxo %s\n" % ['XTAL', 'TCXO'][self.get_tcxo()] + s += " pa_dac %s\n" % ['default', 'PA_BOOST'][self.get_pa_dac()] + s += " fifo_addr_ptr %#02x\n" % self.get_fifo_addr_ptr() + s += " fifo_tx_base_addr %#02x\n" % self.get_fifo_tx_base_addr() + s += " fifo_rx_base_addr %#02x\n" % self.get_fifo_rx_base_addr() + s += " fifo_rx_curr_addr %#02x\n" % self.get_fifo_rx_current_addr() + s += " fifo_rx_byte_addr %#02x\n" % self.get_fifo_rx_byte_addr() + s += " status %s\n" % self.get_modem_status() + s += " version %#02x\n" % self.get_version() + return s diff --git a/SX127x/LoRaArgumentParser.py b/SX127x/LoRaArgumentParser.py new file mode 100644 index 0000000..0e2451e --- /dev/null +++ b/SX127x/LoRaArgumentParser.py @@ -0,0 +1,75 @@ +""" Defines LoRaArgumentParser which extends argparse.ArgumentParser with standard config parameters for the SX127x. """ + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +import argparse + + +class LoRaArgumentParser(argparse.ArgumentParser): + """ This class extends argparse.ArgumentParser. + Some commonly used LoRa config parameters are defined + * ocp + * spreading factor + * frequency + * bandwidth + * preamble + Call the parse_args with an additional parameter referencing a LoRa object. The args will be used to configure + the LoRa. + """ + + bw_lookup = dict(BW7_8=0, BW10_4=1, BW15_6=2, BW20_8=3, BW31_25=4, BW41_7=5, BW62_5=6, BW125=7, BW250=8, BW500=9) + cr_lookup = dict(CR4_5=1, CR4_6=2,CR4_7=3,CR4_8=4) + + def __init__(self, description): + argparse.ArgumentParser.__init__(self, description=description) + self.add_argument('--ocp', '-c', dest='ocp', default=100, action="store", type=float, + help="Over current protection in mA (45 .. 240 mA)") + self.add_argument('--sf', '-s', dest='sf', default=7, action="store", type=int, + help="Spreading factor (6...12). Default is 7.") + self.add_argument('--freq', '-f', dest='freq', default=869., action="store", type=float, + help="Frequency") + self.add_argument('--bw', '-b', dest='bw', default='BW125', action="store", type=str, + help="Bandwidth (one of BW7_8 BW10_4 BW15_6 BW20_8 BW31_25 BW41_7 BW62_5 BW125 BW250 BW500).\nDefault is BW125.") + self.add_argument('--cr', '-r', dest='coding_rate', default='CR4_5', action="store", type=str, + help="Coding rate (one of CR4_5 CR4_6 CR4_7 CR4_8).\nDefault is CR4_5.") + self.add_argument('--preamble', '-p', dest='preamble', default=8, action="store", type=int, + help="Preamble length. Default is 8.") + + def parse_args(self, lora): + """ Parse the args, perform some sanity checks and configure the LoRa accordingly. + :param lora: Reference to LoRa object + :return: args + """ + args = argparse.ArgumentParser.parse_args(self) + args.bw = self.bw_lookup.get(args.bw, None) + args.coding_rate = self.cr_lookup.get(args.coding_rate, None) + # some sanity checks + assert(args.bw is not None) + assert(args.coding_rate is not None) + assert(args.sf >=6 and args.sf <= 12) + # set the LoRa object + lora.set_freq(args.freq) + lora.set_preamble(args.preamble) + lora.set_spreading_factor(args.sf) + lora.set_bw(args.bw) + lora.set_coding_rate(args.coding_rate) + lora.set_ocp_trim(args.ocp) + return args diff --git a/SX127x/__init__.py b/SX127x/__init__.py new file mode 100644 index 0000000..78c5d36 --- /dev/null +++ b/SX127x/__init__.py @@ -0,0 +1 @@ +__all__ = ['SX127x'] diff --git a/SX127x/board_config.py b/SX127x/board_config.py new file mode 100644 index 0000000..9055acb --- /dev/null +++ b/SX127x/board_config.py @@ -0,0 +1,126 @@ +""" Defines the BOARD class that contains the board pin mappings. """ + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +import RPi.GPIO as GPIO +import spidev + +import time + + +class BOARD: + """ Board initialisation/teardown and pin configuration is kept here. + This is the Raspberry Pi board with one LED and a modtronix inAir9B + """ + # Note that the BCOM numbering for the GPIOs is used. + DIO0 = 22 # RaspPi GPIO 22 + DIO1 = 23 # RaspPi GPIO 23 + DIO2 = 24 # RaspPi GPIO 24 + DIO3 = 25 # RaspPi GPIO 25 + LED = 18 # RaspPi GPIO 18 connects to the LED on the proto shield + SWITCH = 4 # RaspPi GPIO 4 connects to a switch + + # The spi object is kept here + spi = None + + @staticmethod + def setup(): + """ Configure the Raspberry GPIOs + :rtype : None + """ + GPIO.setmode(GPIO.BCM) + # LED + GPIO.setup(BOARD.LED, GPIO.OUT) + GPIO.output(BOARD.LED, 0) + # switch + GPIO.setup(BOARD.SWITCH, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) + # DIOx + for gpio_pin in [BOARD.DIO0, BOARD.DIO1, BOARD.DIO2, BOARD.DIO3]: + GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) + # blink 2 times to signal the board is set up + BOARD.blink(.1, 2) + + @staticmethod + def teardown(): + """ Cleanup GPIO and SpiDev """ + GPIO.cleanup() + BOARD.spi.close() + + @staticmethod + def SpiDev(spi_bus=0, spi_cs=0): + """ Init and return the SpiDev object + :return: SpiDev object + :param spi_bus: The RPi SPI bus to use: 0 or 1 + :param spi_cs: The RPi SPI chip select to use: 0 or 1 + :rtype: SpiDev + """ + BOARD.spi = spidev.SpiDev() + BOARD.spi.open(spi_bus, spi_cs) + return BOARD.spi + + @staticmethod + def add_event_detect(dio_number, callback): + """ Wraps around the GPIO.add_event_detect function + :param dio_number: DIO pin 0...5 + :param callback: The function to call when the DIO triggers an IRQ. + :return: None + """ + GPIO.add_event_detect(dio_number, GPIO.RISING, callback=callback) + + @staticmethod + def add_events(cb_dio0, cb_dio1, cb_dio2, cb_dio3, cb_dio4, cb_dio5, switch_cb=None): + BOARD.add_event_detect(BOARD.DIO0, callback=cb_dio0) + BOARD.add_event_detect(BOARD.DIO1, callback=cb_dio1) + BOARD.add_event_detect(BOARD.DIO2, callback=cb_dio2) + BOARD.add_event_detect(BOARD.DIO3, callback=cb_dio3) + # the modtronix inAir9B does not expose DIO4 and DIO5 + if switch_cb is not None: + GPIO.add_event_detect(BOARD.SWITCH, GPIO.RISING, callback=switch_cb, bouncetime=300) + + @staticmethod + def led_on(value=1): + """ Switch the proto shields LED + :param value: 0/1 for off/on. Default is 1. + :return: value + :rtype : int + """ + GPIO.output(BOARD.LED, value) + return value + + @staticmethod + def led_off(): + """ Switch LED off + :return: 0 + """ + GPIO.output(BOARD.LED, 0) + return 0 + + @staticmethod + def blink(time_sec, n_blink): + if n_blink == 0: + return + BOARD.led_on() + for i in range(n_blink): + time.sleep(time_sec) + BOARD.led_off() + time.sleep(time_sec) + BOARD.led_on() + BOARD.led_off() diff --git a/SX127x/constants.py b/SX127x/constants.py new file mode 100644 index 0000000..910783b --- /dev/null +++ b/SX127x/constants.py @@ -0,0 +1,190 @@ +""" Defines constants (modes, bandwidths, registers, etc.) needed by SX127x. """ + + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +def add_lookup(cls): + """ A decorator that adds a lookup dictionary to the class. + The lookup dictionary maps the codes back to the names. This is used for pretty-printing. """ + varnames = filter(str.isupper, cls.__dict__.keys()) + lookup = dict(map(lambda varname: (cls.__dict__.get(varname, None), varname), varnames)) + setattr(cls, 'lookup', lookup) + return cls + + +@add_lookup +class MODE: + SLEEP = 0x80 + STDBY = 0x81 + FSTX = 0x82 + TX = 0x83 + FSRX = 0x84 + RXCONT = 0x85 + RXSINGLE = 0x86 + CAD = 0x87 + FSK_STDBY= 0x01 # needed for calibration + + +@add_lookup +class BW: + BW7_8 = 0 + BW10_4 = 1 + BW15_6 = 2 + BW20_8 = 3 + BW31_25 = 4 + BW41_7 = 5 + BW62_5 = 6 + BW125 = 7 + BW250 = 8 + BW500 = 9 + + +@add_lookup +class CODING_RATE: + CR4_5 = 1 + CR4_6 = 2 + CR4_7 = 3 + CR4_8 = 4 + + +@add_lookup +class GAIN: + NOT_USED = 0b000 + G1 = 0b001 + G2 = 0b010 + G3 = 0b011 + G4 = 0b100 + G5 = 0b101 + G6 = 0b110 + + +@add_lookup +class PA_SELECT: + RFO = 0 + PA_BOOST = 1 + + +@add_lookup +class PA_RAMP: + RAMP_3_4_ms = 0 + RAMP_2_ms = 1 + RAMP_1_ms = 2 + RAMP_500_us = 3 + RAMP_250_us = 4 + RAMP_125_us = 5 + RAMP_100_us = 6 + RAMP_62_us = 7 + RAMP_50_us = 8 + RAMP_40_us = 9 + RAMP_31_us = 10 + RAMP_25_us = 11 + RAMP_20_us = 12 + RAMP_15_us = 13 + RAMP_12_us = 14 + RAMP_10_us = 15 + + +class MASK: + class IRQ_FLAGS: + RxTimeout = 7 + RxDone = 6 + PayloadCrcError = 5 + ValidHeader = 4 + TxDone = 3 + CadDone = 2 + FhssChangeChannel = 1 + CadDetected = 0 + + +class REG: + + @add_lookup + class LORA: + FIFO = 0x00 + OP_MODE = 0x01 + FR_MSB = 0x06 + FR_MID = 0x07 + FR_LSB = 0x08 + PA_CONFIG = 0x09 + PA_RAMP = 0x0A + OCP = 0x0B + LNA = 0x0C + FIFO_ADDR_PTR = 0x0D + FIFO_TX_BASE_ADDR = 0x0E + FIFO_RX_BASE_ADDR = 0x0F + FIFO_RX_CURR_ADDR = 0x10 + IRQ_FLAGS_MASK = 0x11 + IRQ_FLAGS = 0x12 + RX_NB_BYTES = 0x13 + RX_HEADER_CNT_MSB = 0x14 + RX_PACKET_CNT_MSB = 0x16 + MODEM_STAT = 0x18 + PKT_SNR_VALUE = 0x19 + PKT_RSSI_VALUE = 0x1A + RSSI_VALUE = 0x1B + HOP_CHANNEL = 0x1C + MODEM_CONFIG_1 = 0x1D + MODEM_CONFIG_2 = 0x1E + SYMB_TIMEOUT_LSB = 0x1F + PREAMBLE_MSB = 0x20 + PAYLOAD_LENGTH = 0x22 + MAX_PAYLOAD_LENGTH = 0x23 + HOP_PERIOD = 0x24 + FIFO_RX_BYTE_ADDR = 0x25 + MODEM_CONFIG_3 = 0x26 + PPM_CORRECTION = 0x27 + FEI_MSB = 0x28 + DETECT_OPTIMIZE = 0X31 + INVERT_IQ = 0x33 + DETECTION_THRESH = 0X37 + SYNC_WORD = 0X39 + DIO_MAPPING_1 = 0x40 + DIO_MAPPING_2 = 0x41 + VERSION = 0x42 + TCXO = 0x4B + PA_DAC = 0x4D + AGC_REF = 0x61 + AGC_THRESH_1 = 0x62 + AGC_THRESH_2 = 0x63 + AGC_THRESH_3 = 0x64 + PLL = 0x70 + + @add_lookup + class FSK: + LNA = 0x0C + RX_CONFIG = 0x0D + RSSI_CONFIG = 0x0E + PREAMBLE_DETECT = 0x1F + OSC = 0x24 + SYNC_CONFIG = 0x27 + SYNC_VALUE_1 = 0x28 + SYNC_VALUE_2 = 0x29 + SYNC_VALUE_3 = 0x2A + SYNC_VALUE_4 = 0x2B + SYNC_VALUE_5 = 0x2C + SYNC_VALUE_6 = 0x2D + SYNC_VALUE_7 = 0x2E + SYNC_VALUE_8 = 0x2F + PACKET_CONFIG_1 = 0x30 + FIFO_THRESH = 0x35 + IMAGE_CAL = 0x3B + DIO_MAPPING_1 = 0x40 + DIO_MAPPING_2 = 0x41 diff --git a/reset.py b/reset.py new file mode 100755 index 0000000..60ca390 --- /dev/null +++ b/reset.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python +import RPi.GPIO as GPIO +import time + +GPIO.setmode(GPIO.BCM) +GPIO.setup(17, GPIO.OUT) +GPIO.output(17, GPIO.HIGH) +time.sleep(.100) +GPIO.output(17, GPIO.LOW) +GPIO.cleanup() diff --git a/rx_ttn.py b/rx_ttn.py new file mode 100755 index 0000000..9a0d60b --- /dev/null +++ b/rx_ttn.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +from time import sleep +from SX127x.LoRa import * +from SX127x.LoRaArgumentParser import LoRaArgumentParser +from SX127x.board_config import BOARD +import LoRaWAN + +BOARD.setup() +parser = LoRaArgumentParser("LoRaWAN receiver") + +class LoRaWANrcv(LoRa): + def __init__(self, nwkey = [], appkey = [], verbose = False): + super(LoRaWANrcv, self).__init__(verbose) + self.set_mode(MODE.SLEEP) + self.set_dio_mapping([0] * 6) + self.nwkey = nwkey + self.appkey = appkey + + def on_rx_done(self): + print("RxDone") + + self.clear_irq_flags(RxDone=1) + payload = self.read_payload(nocheck=True) + print("".join(format(x, '02x') for x in bytes(payload))) + + lorawan = LoRaWAN.new(self.nwkey, self.appkey) + lorawan.read(payload) + print(lorawan.get_mhdr().get_mversion()) + print(lorawan.get_mhdr().get_mtype()) + print(lorawan.get_mic()) + print(lorawan.compute_mic()) + print(lorawan.valid_mic()) + print("".join(list(map(chr, lorawan.get_payload())))) + + self.set_mode(MODE.SLEEP) + self.reset_ptr_rx() + self.set_mode(MODE.RXCONT) + + def on_tx_done(self): + print("TxDone") + print(self.get_irq_flags()) + + def on_cad_done(self): + print("on_CadDone") + print(self.get_irq_flags()) + + def on_rx_timeout(self): + print("on_RxTimeout") + print(self.get_irq_flags()) + + def on_valid_header(self): + print("on_ValidHeader") + print(self.get_irq_flags()) + + def on_payload_crc_error(self): + print("on_PayloadCrcError") + print(self.get_irq_flags()) + + def on_fhss_change_channel(self): + print("on_FhssChangeChannel") + print(self.get_irq_flags()) + + def start(self): + self.reset_ptr_rx() + self.set_mode(MODE.RXCONT) + while True: + sleep(.5) + + +# Init +nwkey = [0xC3, 0x24, 0x64, 0x98, 0xDE, 0x56, 0x5D, 0x8C, 0x55, 0x88, 0x7C, 0x05, 0x86, 0xF9, 0x82, 0x26] +appkey = [0x15, 0xF6, 0xF4, 0xD4, 0x2A, 0x95, 0xB0, 0x97, 0x53, 0x27, 0xB7, 0xC1, 0x45, 0x6E, 0xC5, 0x45] +lora = LoRaWANrcv(nwkey, appkey, False) + +# Setup +lora.set_freq(868.1) +lora.set_pa_config(pa_select=1) +lora.set_spreading_factor(7) +lora.set_sync_word(0x34) +lora.set_rx_crc(True) + +print(lora) +assert(lora.get_agc_auto_on() == 1) + +try: + print("Waiting for incoming LoRaWAN messages\n") + lora.start() +except KeyboardInterrupt: + sys.stdout.flush() + print("\nKeyboardInterrupt") +finally: + sys.stdout.flush() + print("") + lora.set_mode(MODE.SLEEP) + print(lora) + BOARD.teardown() diff --git a/tx_ttn.py b/tx_ttn.py new file mode 100755 index 0000000..0053099 --- /dev/null +++ b/tx_ttn.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +import sys +from time import sleep +from SX127x.LoRa import * +from SX127x.LoRaArgumentParser import LoRaArgumentParser +from SX127x.board_config import BOARD +import LoRaWAN +from LoRaWAN.MHDR import MHDR + +BOARD.setup() +parser = LoRaArgumentParser("LoRaWAN sender") + +class LoRaWANsend(LoRa): + def __init__(self, devaddr = [], nwkey = [], appkey = [], verbose = False): + super(LoRaWANsend, self).__init__(verbose) + self.set_mode(MODE.SLEEP) + self.set_dio_mapping([1,0,0,0,0,0]) + self.devaddr = devaddr + self.nwkey = nwkey + self.appkey = appkey + + def on_rx_done(self): + print("RxDone") + print(self.get_irq_flags()) + + def on_tx_done(self): + self.set_mode(MODE.STDBY) + self.clear_irq_flags(TxDone=1) + print("TxDone") + sys.exit(0) + + def on_cad_done(self): + print("on_CadDone") + print(self.get_irq_flags()) + + def on_rx_timeout(self): + print("on_RxTimeout") + print(self.get_irq_flags()) + + def on_valid_header(self): + print("on_ValidHeader") + print(self.get_irq_flags()) + + def on_payload_crc_error(self): + print("on_PayloadCrcError") + print(self.get_irq_flags()) + + def on_fhss_change_channel(self): + print("on_FhssChangeChannel") + print(self.get_irq_flags()) + + def start(self): + self.tx_counter = 1 + + lorawan = LoRaWAN.new(self.nwkey, self.appkey) + lorawan.create(MHDR.UNCONF_DATA_UP, {'devaddr': self.devaddr, 'fcnt': self.tx_counter, 'data': list(map(ord, 'Python rules!')) }) + + self.write_payload(lorawan.to_raw()) + self.set_mode(MODE.TX) + while True: + sleep(1) + + +# Init +devaddr = [0x26, 0x01, 0x11, 0x5F] +nwkey = [0xC3, 0x24, 0x64, 0x98, 0xDE, 0x56, 0x5D, 0x8C, 0x55, 0x88, 0x7C, 0x05, 0x86, 0xF9, 0x82, 0x26] +appkey = [0x15, 0xF6, 0xF4, 0xD4, 0x2A, 0x95, 0xB0, 0x97, 0x53, 0x27, 0xB7, 0xC1, 0x45, 0x6E, 0xC5, 0x45] +lora = LoRaWANsend(devaddr, nwkey, appkey, False) + +# Setup +lora.set_freq(868.1) +lora.set_pa_config(pa_select=1) +lora.set_spreading_factor(7) +lora.set_pa_config(max_power=0x0F, output_power=0x0E) +lora.set_sync_word(0x34) +lora.set_rx_crc(True) + +print(lora) +assert(lora.get_agc_auto_on() == 1) + +try: + print("Sending LoRaWAN message\n") + lora.start() +except KeyboardInterrupt: + sys.stdout.flush() + print("\nKeyboardInterrupt") +finally: + sys.stdout.flush() + print("") + lora.set_mode(MODE.SLEEP) + print(lora) + BOARD.teardown()