|
| 1 | +""" |
| 2 | +This module implements the communication protocol to power on/off ports on a |
| 3 | +LinkPi SmartHUB, a 12-port USB3.0 HUB utilizing four RTS5411 USB3.0 4-port HUB |
| 4 | +controllers, a FT232R USB UART IC and a STM32F103RB MCU for port power control. |
| 5 | +
|
| 6 | +The protocol is a simple line-based protocol over a serial port. |
| 7 | +
|
| 8 | +Known commands: |
| 9 | +- onoff <port> <1|0> - switch port power on/off |
| 10 | +- state - get current power state of all ports |
| 11 | +- SetOWP <1|0> <1|0> ... - set the power-on state of all ports |
| 12 | +- GetOWP - get the power-on state of all ports |
| 13 | +
|
| 14 | +Responses are in JSON format, e.g.: |
| 15 | +
|
| 16 | +.. code-block:: text |
| 17 | +
|
| 18 | + > onoff 5 1 |
| 19 | + < {"Cmd":"OnOffResp","SeqNum":1,"ret":0} |
| 20 | + > state |
| 21 | + < {"Cmd":"StateResp","SeqNum":2,"state":[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]} |
| 22 | + > SetOWP 0 0 0 0 0 0 0 0 0 0 0 1 |
| 23 | + < {"Cmd":"SetOWPResp","SeqNum":3,"ret":0} |
| 24 | + > GetOWP |
| 25 | + < {"Cmd":"GetOWPResp","SeqNum":4,"owp":[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]} |
| 26 | +
|
| 27 | +A version announcement is continuously sent every second: |
| 28 | +
|
| 29 | +.. code-block:: text |
| 30 | +
|
| 31 | + < {"Cmd":"VerResp","ver":"SmartHUB_<ver>","uid":"<uid>"} |
| 32 | +""" |
| 33 | +import json |
| 34 | +import serial |
| 35 | + |
| 36 | + |
| 37 | +PORT_INDEX_MAP = { |
| 38 | + "1": 5, "2": 4, "3": 3, "4": 2, "5": 1, "6": 0, |
| 39 | + "7": 11, "8": 10, "9": 9, "10": 8, "11": 7, "12": 6, |
| 40 | +} |
| 41 | + |
| 42 | + |
| 43 | +# Port names printed on the device do not match the internal port index |
| 44 | +def name_to_index(name_or_index): |
| 45 | + return PORT_INDEX_MAP.get(name_or_index, name_or_index) |
| 46 | + |
| 47 | + |
| 48 | +class LinkPiSmartHUB: |
| 49 | + def __init__(self, path): |
| 50 | + if not path: |
| 51 | + raise ValueError("Device not found") |
| 52 | + self.path = path |
| 53 | + |
| 54 | + def _command(self, command): |
| 55 | + with serial.Serial(self.path, 115200, timeout=2) as s: |
| 56 | + # wait on next version announcement |
| 57 | + s.readline() |
| 58 | + # send the command |
| 59 | + s.write(f"{command}\r\n".encode()) |
| 60 | + # read and return the response |
| 61 | + return s.readline().decode().strip() |
| 62 | + |
| 63 | + def onoff(self, index, state): |
| 64 | + return self._command(f"onoff {index} {state}") |
| 65 | + |
| 66 | + def state(self): |
| 67 | + return self._command("state") |
| 68 | + |
| 69 | + |
| 70 | +def handle_set(path, name_or_index, state): |
| 71 | + smarthub = LinkPiSmartHUB(path) |
| 72 | + smarthub.onoff(name_to_index(name_or_index), state) |
| 73 | + |
| 74 | + |
| 75 | +def handle_get(path, name_or_index): |
| 76 | + smarthub = LinkPiSmartHUB(path) |
| 77 | + state = smarthub.state() |
| 78 | + return bool(json.loads(state).get("state")[name_to_index(name_or_index)]) |
| 79 | + |
| 80 | + |
| 81 | +methods = { |
| 82 | + "set": handle_set, |
| 83 | + "get": handle_get, |
| 84 | +} |
0 commit comments