From 88ec0b47e6d1d3771f9a2e5844a2668707106243 Mon Sep 17 00:00:00 2001 From: Cybra Date: Mon, 11 Mar 2024 15:29:21 +0800 Subject: [PATCH 1/2] serial forwarding with multiple ports --- tools/dronecan_serial_forwarding.py | 497 ++++++++++++++++++++++++++++ 1 file changed, 497 insertions(+) create mode 100644 tools/dronecan_serial_forwarding.py diff --git a/tools/dronecan_serial_forwarding.py b/tools/dronecan_serial_forwarding.py new file mode 100644 index 0000000..2ee06f9 --- /dev/null +++ b/tools/dronecan_serial_forwarding.py @@ -0,0 +1,497 @@ +import dronecan +import time +from functools import partial +import socket +import errno +import logging +import struct +from argparse import ArgumentParser +import threading + +parser = ArgumentParser(description='Serial forwarder') +parser.add_argument("--bitrate", default=1000000, type=int, help="CAN bit rate") +parser.add_argument("--baudrate", default=115200, type=int, help="Serial baud rate") +parser.add_argument("--device", "-d", default=None, type=str, help="Serial device") +parser.add_argument("--port", default=2001, type=int, help="Listen Port") +parser.add_argument("--node-id", default=None, nargs="+", type=int, help="Node ID") +global args +args = parser.parse_args() + +if args.node_id is None: + print("No node ID specified") + exit(1) + +if args.device is None: + print("No device specified") + exit(1) + +# protocol constants +PREAMBLE1 = 0xb5 +PREAMBLE2 = 0x62 + +CLASS_ACK = 0x05 +CLASS_CFG = 0x06 + +MSG_CFG_PRT = 0x00 + +MSG_ACK_NACK = 0x00 +MSG_ACK_ACK = 0x01 + +# class UBloxError(Exception): +# '''Ublox error class''' +# def __init__(self, msg): +# Exception.__init__(self, msg) +# self.message = msg + +# class UBloxDescriptor: +# '''class used to describe the layout of a UBlox message''' +# def __init__(self, name, msg_format, fields=[], count_field=None, format2=None, fields2=None): +# ''' +# (CLASS_CFG, MSG_CFG_PRT) : UBloxDescriptor('CFG_PRT', +# ' len(buf): +# raise UBloxError("%s INVALID_SIZE1: %u>%u fmt='%s'" % (self.name, size1, len(buf), fmt)) +# f1 = list(struct.unpack(fmt, buf[:size1])) # unpack the message +# i = 0 +# while i < len(f1): +# field = fields.pop(0) +# (fieldname, alen) = self.ArrayParse(field) +# if alen == -1: +# msg._fields[fieldname] = f1[i] +# if self.count_field == fieldname: +# count = int(f1[i]) +# i += 1 +# else: +# msg._fields[fieldname] = [0]*alen +# for a in range(alen): +# msg._fields[fieldname][a] = f1[i] +# i += 1 +# buf = buf[size1:] +# if len(buf) == 0: +# break + +# if self.count_field == '_remaining': +# count = len(buf) / struct.calcsize(self.format2) + +# if count == 0: +# msg._unpacked = True +# #if len(buf) != 0: +# # raise UBloxError("EXTRA_BYTES=%u" % len(buf)) +# return + +# size2 = struct.calcsize(self.format2) +# for _ in range(count): +# r = UBloxAttrDict() +# if size2 > len(buf): +# raise UBloxError("INVALID_SIZE=%u, " % len(buf)) +# f2 = list(struct.unpack(self.format2, buf[:size2])) +# for i in range(len(self.fields2)): +# r[self.fields2[i]] = f2[i] +# buf = buf[size2:] +# msg._recs.append(r) +# if len(buf) != 0: +# raise UBloxError("EXTRA_BYTES=%u" % len(buf)) +# msg._unpacked = True + +# def pack(self, msg, msg_class=None, msg_id=None): +# '''pack a UBloxMessage from the .fields and ._recs attributes in msg''' +# f1 = [] +# if msg_class is None: +# msg_class = msg.msg_class() +# if msg_id is None: +# msg_id = msg.msg_id() +# msg._buf = '' + +# fields = self.fields[:] +# for f in fields: +# (fieldname, alen) = self.ArrayParse(f) +# if not fieldname in msg._fields: +# break +# if alen == -1: +# f1.append(msg._fields[fieldname]) +# else: +# for a in range(alen): +# f1.append(msg._fields[fieldname][a]) +# try: +# # try full length message +# if self.msg_format.find(',') != -1: +# fmt = self.msg_format.replace(',', '') +# msg._buf = struct.pack(fmt, *tuple(f1)) +# else: +# raise Exception +# except Exception as e: +# # try without optional part +# fmt = self.msg_format.split(',')[0] +# msg._buf = struct.pack(fmt, *tuple(f1)) + +# length = len(msg._buf) +# if msg._recs: +# length += len(msg._recs) * struct.calcsize(self.format2) +# header = struct.pack('= level: +# print(msg) + +# def name(self): +# '''return the short string name for a message''' +# return "UBX(0x%02x,0x%02x,len=%u)" % (self.msg_class(), self.msg_id(), self.msg_length()) + +# def msg_class(self): +# '''return the message class''' +# return self._buf[2] + +# def msg_id(self): +# '''return the message id within the class''' +# return self._buf[3] + +# def msg_type(self): +# '''return the message type tuple (class, id)''' +# return (self.msg_class(), self.msg_id()) + +# def msg_length(self): +# '''return the payload length''' +# (payload_length,) = struct.unpack(' 0 and self._buf[0] != PREAMBLE1: +# return False +# if len(self._buf) > 1 and self._buf[1] != PREAMBLE2: +# return False +# needed = self.needed_bytes() +# if needed > 1000: +# return False +# if needed == 0 and not self.valid(): +# return False +# return True + +# def add(self, buf): +# '''add some bytes to a message''' +# self._buf += buf +# while not self.valid_so_far() and len(self._buf) > 0: +# '''handle corrupted streams''' +# self._buf = self._buf[1:] +# if self.needed_bytes() < 0: +# self._buf = bytearray() + +# def checksum(self, data=None): +# '''return a checksum tuple for a message''' +# if data is None: +# data = self._buf[2:-2] +# cs = 0 +# ck_a = 0 +# ck_b = 0 +# for i in data: +# ck_a = (ck_a + i) & 0xFF +# ck_b = (ck_b + ck_a) & 0xFF +# return (ck_a, ck_b) + +# def valid_checksum(self): +# '''check if the checksum is OK''' +# (ck_a, ck_b) = self.checksum() +# d = self._buf[2:-2] +# (ck_a2, ck_b2) = struct.unpack('= 8 and self.needed_bytes() == 0 and self.valid_checksum() + +# def raw(self): +# '''return the raw bytes''' +# return self._buf + +# def pack_message(self, msg_class, msg_id, payload): +# self._buf = struct.pack(' Date: Mon, 11 Mar 2024 16:41:25 +0800 Subject: [PATCH 2/2] Delete commented part --- tools/dronecan_serial_forwarding.py | 316 +--------------------------- 1 file changed, 2 insertions(+), 314 deletions(-) diff --git a/tools/dronecan_serial_forwarding.py b/tools/dronecan_serial_forwarding.py index 2ee06f9..555b9e2 100644 --- a/tools/dronecan_serial_forwarding.py +++ b/tools/dronecan_serial_forwarding.py @@ -24,319 +24,7 @@ if args.device is None: print("No device specified") exit(1) - -# protocol constants -PREAMBLE1 = 0xb5 -PREAMBLE2 = 0x62 - -CLASS_ACK = 0x05 -CLASS_CFG = 0x06 - -MSG_CFG_PRT = 0x00 - -MSG_ACK_NACK = 0x00 -MSG_ACK_ACK = 0x01 - -# class UBloxError(Exception): -# '''Ublox error class''' -# def __init__(self, msg): -# Exception.__init__(self, msg) -# self.message = msg - -# class UBloxDescriptor: -# '''class used to describe the layout of a UBlox message''' -# def __init__(self, name, msg_format, fields=[], count_field=None, format2=None, fields2=None): -# ''' -# (CLASS_CFG, MSG_CFG_PRT) : UBloxDescriptor('CFG_PRT', -# ' len(buf): -# raise UBloxError("%s INVALID_SIZE1: %u>%u fmt='%s'" % (self.name, size1, len(buf), fmt)) -# f1 = list(struct.unpack(fmt, buf[:size1])) # unpack the message -# i = 0 -# while i < len(f1): -# field = fields.pop(0) -# (fieldname, alen) = self.ArrayParse(field) -# if alen == -1: -# msg._fields[fieldname] = f1[i] -# if self.count_field == fieldname: -# count = int(f1[i]) -# i += 1 -# else: -# msg._fields[fieldname] = [0]*alen -# for a in range(alen): -# msg._fields[fieldname][a] = f1[i] -# i += 1 -# buf = buf[size1:] -# if len(buf) == 0: -# break - -# if self.count_field == '_remaining': -# count = len(buf) / struct.calcsize(self.format2) - -# if count == 0: -# msg._unpacked = True -# #if len(buf) != 0: -# # raise UBloxError("EXTRA_BYTES=%u" % len(buf)) -# return - -# size2 = struct.calcsize(self.format2) -# for _ in range(count): -# r = UBloxAttrDict() -# if size2 > len(buf): -# raise UBloxError("INVALID_SIZE=%u, " % len(buf)) -# f2 = list(struct.unpack(self.format2, buf[:size2])) -# for i in range(len(self.fields2)): -# r[self.fields2[i]] = f2[i] -# buf = buf[size2:] -# msg._recs.append(r) -# if len(buf) != 0: -# raise UBloxError("EXTRA_BYTES=%u" % len(buf)) -# msg._unpacked = True - -# def pack(self, msg, msg_class=None, msg_id=None): -# '''pack a UBloxMessage from the .fields and ._recs attributes in msg''' -# f1 = [] -# if msg_class is None: -# msg_class = msg.msg_class() -# if msg_id is None: -# msg_id = msg.msg_id() -# msg._buf = '' - -# fields = self.fields[:] -# for f in fields: -# (fieldname, alen) = self.ArrayParse(f) -# if not fieldname in msg._fields: -# break -# if alen == -1: -# f1.append(msg._fields[fieldname]) -# else: -# for a in range(alen): -# f1.append(msg._fields[fieldname][a]) -# try: -# # try full length message -# if self.msg_format.find(',') != -1: -# fmt = self.msg_format.replace(',', '') -# msg._buf = struct.pack(fmt, *tuple(f1)) -# else: -# raise Exception -# except Exception as e: -# # try without optional part -# fmt = self.msg_format.split(',')[0] -# msg._buf = struct.pack(fmt, *tuple(f1)) - -# length = len(msg._buf) -# if msg._recs: -# length += len(msg._recs) * struct.calcsize(self.format2) -# header = struct.pack('= level: -# print(msg) - -# def name(self): -# '''return the short string name for a message''' -# return "UBX(0x%02x,0x%02x,len=%u)" % (self.msg_class(), self.msg_id(), self.msg_length()) - -# def msg_class(self): -# '''return the message class''' -# return self._buf[2] - -# def msg_id(self): -# '''return the message id within the class''' -# return self._buf[3] - -# def msg_type(self): -# '''return the message type tuple (class, id)''' -# return (self.msg_class(), self.msg_id()) - -# def msg_length(self): -# '''return the payload length''' -# (payload_length,) = struct.unpack(' 0 and self._buf[0] != PREAMBLE1: -# return False -# if len(self._buf) > 1 and self._buf[1] != PREAMBLE2: -# return False -# needed = self.needed_bytes() -# if needed > 1000: -# return False -# if needed == 0 and not self.valid(): -# return False -# return True - -# def add(self, buf): -# '''add some bytes to a message''' -# self._buf += buf -# while not self.valid_so_far() and len(self._buf) > 0: -# '''handle corrupted streams''' -# self._buf = self._buf[1:] -# if self.needed_bytes() < 0: -# self._buf = bytearray() - -# def checksum(self, data=None): -# '''return a checksum tuple for a message''' -# if data is None: -# data = self._buf[2:-2] -# cs = 0 -# ck_a = 0 -# ck_b = 0 -# for i in data: -# ck_a = (ck_a + i) & 0xFF -# ck_b = (ck_b + ck_a) & 0xFF -# return (ck_a, ck_b) - -# def valid_checksum(self): -# '''check if the checksum is OK''' -# (ck_a, ck_b) = self.checksum() -# d = self._buf[2:-2] -# (ck_a2, ck_b2) = struct.unpack('= 8 and self.needed_bytes() == 0 and self.valid_checksum() - -# def raw(self): -# '''return the raw bytes''' -# return self._buf - -# def pack_message(self, msg_class, msg_id, payload): -# self._buf = struct.pack('