diff --git a/doc/configuration.rst b/doc/configuration.rst index c85cc7a0c..34e2c8532 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -356,6 +356,31 @@ NetworkUSBPowerPort A :any:`NetworkUSBPowerPort` describes a `USBPowerPort`_ resource available on a remote computer. +LinkPiSmartHUBPowerPort ++++++++++++++++++++++++ +A :any:`LinkPiSmartHUBPowerPort` describes a LinkPi SmartHUB port with power switching. + +.. code-block:: yaml + + LinkPiSmartHUBPowerPort: + match: + ID_SERIAL_SHORT: 'ABCD1234' + index: 5 + +The example describes port '1' on the SmartHUB with the ID_SERIAL_SHORT ``ABCD1234``. + +Arguments: + - index (str or int): port name ('1' to '12') or internal port index (0 to 11) + - match (dict): key and value pairs for a udev match, see `udev Matching`_ + +Used by: + - `LinkPiSmartHUBPowerDriver`_ + +NetworkLinkPiSmartHUBPowerPort +++++++++++++++++++++++++++++++ +A :any:`NetworkLinkPiSmartHUBPowerPort` describes a `LinkPiSmartHUBPowerPort`_ +resource available on a remote computer. + SiSPMPowerPort ++++++++++++++ A :any:`SiSPMPowerPort` describes a *GEMBIRD SiS-PM* as supported by @@ -2275,6 +2300,28 @@ Implements: Arguments: - delay (float, default=2.0): delay in seconds between off and on +LinkPiSmartHUBPowerDriver +~~~~~~~~~~~~~~~~~~~~~~~~~ +A :any:`LinkPiSmartHUBPowerDriver` controls a `LinkPiSmartHUBPowerPort`_, +allowing control of the target power state without user interaction. + +Binds to: + port: + - `LinkPiSmartHUBPowerPort`_ + - `NetworkLinkPiSmartHUBPowerPort`_ + +Implements: + - :any:`PowerProtocol` + - :any:`ResetProtocol` + +.. code-block:: yaml + + LinkPiSmartHUBPowerDriver: + delay: 5.0 + +Arguments: + - delay (float, default=2.0): delay in seconds between off and on + SiSPMPowerDriver ~~~~~~~~~~~~~~~~ A :any:`SiSPMPowerDriver` controls a `SiSPMPowerPort`_, allowing control of the diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index edf1ad2b1..4b18d1501 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -14,8 +14,8 @@ from .onewiredriver import OneWirePIODriver from .powerdriver import ManualPowerDriver, ExternalPowerDriver, \ DigitalOutputPowerDriver, YKUSHPowerDriver, \ - USBPowerDriver, SiSPMPowerDriver, NetworkPowerDriver, \ - PDUDaemonDriver + USBPowerDriver, LinkPiSmartHUBPowerDriver, SiSPMPowerDriver, \ + NetworkPowerDriver, PDUDaemonDriver from .usbloader import MXSUSBDriver, IMXUSBDriver, BDIMXUSBDriver, RKUSBDriver, UUUDriver from .usbsdmuxdriver import USBSDMuxDriver from .usbsdwiredriver import USBSDWireDriver diff --git a/labgrid/driver/powerdriver.py b/labgrid/driver/powerdriver.py index 80c8377fb..88b4b9347 100644 --- a/labgrid/driver/powerdriver.py +++ b/labgrid/driver/powerdriver.py @@ -8,7 +8,9 @@ from ..factory import target_factory from ..protocol import PowerProtocol, DigitalOutputProtocol, ResetProtocol from ..resource import NetworkPowerPort +from ..resource.remote import NetworkLinkPiSmartHUBPowerPort from ..step import step +from ..util.agentwrapper import AgentWrapper from ..util.proxy import proxymanager from ..util.helper import processwrapper from .common import Driver @@ -57,6 +59,53 @@ def cycle(self): ) +@target_factory.reg_driver +@attr.s(eq=False) +class LinkPiSmartHUBPowerDriver(Driver, PowerResetMixin, PowerProtocol): + bindings = {"port": {"LinkPiSmartHUBPowerPort", NetworkLinkPiSmartHUBPowerPort}, } + delay = attr.ib(default=2.0, validator=attr.validators.instance_of(float)) + + def __attrs_post_init__(self): + super().__attrs_post_init__() + self.wrapper = None + + def on_activate(self): + if isinstance(self.port, NetworkLinkPiSmartHUBPowerPort): + host = self.port.host + else: + host = None + self.wrapper = AgentWrapper(host) + self.proxy = self.wrapper.load('linkpismarthub') + + def on_deactivate(self): + self.proxy = None + if self.wrapper: + self.wrapper.close() + self.wrapper = None + + @Driver.check_active + @step() + def on(self): + self.proxy.set(self.port.path, self.port.index, 1) + + @Driver.check_active + @step() + def off(self): + self.proxy.set(self.port.path, self.port.index, 0) + + @Driver.check_active + @step() + def cycle(self): + self.off() + time.sleep(self.delay) + self.on() + + @Driver.check_active + @step() + def get(self): + return self.proxy.get(self.port.path, self.port.index) + + @target_factory.reg_driver @attr.s(eq=False) class SiSPMPowerDriver(Driver, PowerResetMixin, PowerProtocol): diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index 27108a7b4..1c3e1e37f 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -883,7 +883,12 @@ def power(self): name = self.args.name target = self._get_target(place) from ..resource.power import NetworkPowerPort, PDUDaemonPort - from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort, NetworkSysfsGPIO + from ..resource.remote import ( + NetworkUSBPowerPort, + NetworkLinkPiSmartHUBPowerPort, + NetworkSiSPMPowerPort, + NetworkSysfsGPIO, + ) from ..resource import TasmotaPowerPort, NetworkYKUSHPowerPort drv = None @@ -897,6 +902,8 @@ def power(self): drv = self._get_driver_or_new(target, "NetworkPowerDriver", name=name) elif isinstance(resource, NetworkUSBPowerPort): drv = self._get_driver_or_new(target, "USBPowerDriver", name=name) + elif isinstance(resource, NetworkLinkPiSmartHUBPowerPort): + drv = self._get_driver_or_new(target, "LinkPiSmartHUBPowerDriver", name=name) elif isinstance(resource, NetworkSiSPMPowerPort): drv = self._get_driver_or_new(target, "SiSPMPowerDriver", name=name) elif isinstance(resource, PDUDaemonPort): diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index d3b406503..e0ccb24de 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -446,6 +446,26 @@ def _get_params(self): } +@attr.s(eq=False) +class LinkPiSmartHUBPowerPortExport(USBGenericExport): + """ResourceExport for ports on power switchable LinkPi SmartHUBs""" + + def __attrs_post_init__(self): + super().__attrs_post_init__() + + def _get_params(self): + """Helper function to return parameters""" + return { + "host": self.host, + "busnum": self.local.busnum, + "devnum": self.local.devnum, + "path": self.local.path, + "vendor_id": self.local.vendor_id, + "model_id": self.local.model_id, + "index": self.local.index, + } + + @attr.s(eq=False) class SiSPMPowerPortExport(USBGenericExport): """ResourceExport for ports on GEMBRID switches""" @@ -563,6 +583,7 @@ def __attrs_post_init__(self): exports["USBVideo"] = USBGenericExport exports["USBAudioInput"] = USBAudioInputExport exports["USBTMC"] = USBGenericExport +exports["LinkPiSmartHUBPowerPort"] = LinkPiSmartHUBPowerPortExport exports["SiSPMPowerPort"] = SiSPMPowerPortExport exports["USBPowerPort"] = USBPowerPortExport exports["DeditecRelais8"] = USBDeditecRelaisExport diff --git a/labgrid/resource/__init__.py b/labgrid/resource/__init__.py index 6ec9d5db8..a4f781e3c 100644 --- a/labgrid/resource/__init__.py +++ b/labgrid/resource/__init__.py @@ -14,6 +14,7 @@ DeditecRelais8, HIDRelay, IMXUSBLoader, + LinkPiSmartHUBPowerPort, LXAUSBMux, MatchedSysfsGPIO, MXSUSBLoader, diff --git a/labgrid/resource/remote.py b/labgrid/resource/remote.py index a29e58ee8..b42cb38f2 100644 --- a/labgrid/resource/remote.py +++ b/labgrid/resource/remote.py @@ -250,6 +250,17 @@ def __attrs_post_init__(self): self.timeout = 10.0 super().__attrs_post_init__() + +@target_factory.reg_resource +@attr.s(eq=False) +class NetworkLinkPiSmartHUBPowerPort(RemoteUSBResource): + """The NetworkLinkPiSmartHUBPowerPort describes a remotely accessible LinkPi SmartHUB port with power switching""" + index = attr.ib(default=None, validator=attr.validators.instance_of((str, int))) + def __attrs_post_init__(self): + self.timeout = 10.0 + super().__attrs_post_init__() + + @target_factory.reg_resource @attr.s(eq=False) class NetworkSiSPMPowerPort(RemoteUSBResource): diff --git a/labgrid/resource/suggest.py b/labgrid/resource/suggest.py index 707779bf8..0d90ac3a5 100644 --- a/labgrid/resource/suggest.py +++ b/labgrid/resource/suggest.py @@ -17,6 +17,7 @@ AlteraUSBBlaster, RKUSBLoader, USBNetworkInterface, + LinkPiSmartHUBPowerPort, SiSPMPowerPort, USBAudioInput, LXAUSBMux, @@ -51,6 +52,7 @@ def __init__(self, args): self.resources.append(AlteraUSBBlaster(**args)) self.resources.append(RKUSBLoader(**args)) self.resources.append(USBNetworkInterface(**args)) + self.resources.append(LinkPiSmartHUBPowerPort(**args)) self.resources.append(SiSPMPowerPort(**args)) self.resources.append(USBAudioInput(**args)) self.resources.append(LXAUSBMux(**args)) diff --git a/labgrid/resource/udev.py b/labgrid/resource/udev.py index 1200f458f..0f10b25c2 100644 --- a/labgrid/resource/udev.py +++ b/labgrid/resource/udev.py @@ -604,6 +604,30 @@ def __attrs_post_init__(self): self.match['DRIVER'] = 'hub' super().__attrs_post_init__() +@target_factory.reg_resource +@attr.s(eq=False) +class LinkPiSmartHUBPowerPort(USBResource): + """The LinkPiSmartHUBPowerPort describes a LinkPi SmartHUB port with power switching. + + Args: + index (str or int): port name ('1' to '12') or internal port index (0 to 11) + """ + index = attr.ib(default=None, validator=attr.validators.instance_of((str, int))) + + def __attrs_post_init__(self): + self.match['SUBSYSTEM'] = 'tty' + self.match['@SUBSYSTEM'] = 'usb' + self.match.setdefault('ID_VENDOR_ID', '0403') + self.match.setdefault('ID_MODEL_ID', '6001') + super().__attrs_post_init__() + + @property + def path(self): + if self.device is not None: + return self.device.device_node + + return None + @target_factory.reg_resource @attr.s(eq=False) class SiSPMPowerPort(USBResource): diff --git a/labgrid/util/agents/linkpismarthub.py b/labgrid/util/agents/linkpismarthub.py new file mode 100644 index 000000000..836bc4e5d --- /dev/null +++ b/labgrid/util/agents/linkpismarthub.py @@ -0,0 +1,84 @@ +""" +This module implements the communication protocol to power on/off ports on a +LinkPi SmartHUB, a 12-port USB3.0 HUB utilizing four RTS5411 USB3.0 4-port HUB +controllers, a FT232R USB UART IC and a STM32F103RB MCU for port power control. + +The protocol is a simple line-based protocol over a serial port. + +Known commands: +- onoff <1|0> - switch port power on/off +- state - get current power state of all ports +- SetOWP <1|0> <1|0> ... - set the power-on state of all ports +- GetOWP - get the power-on state of all ports + +Responses are in JSON format, e.g.: + +.. code-block:: text + + > onoff 5 1 + < {"Cmd":"OnOffResp","SeqNum":1,"ret":0} + > state + < {"Cmd":"StateResp","SeqNum":2,"state":[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]} + > SetOWP 0 0 0 0 0 0 0 0 0 0 0 1 + < {"Cmd":"SetOWPResp","SeqNum":3,"ret":0} + > GetOWP + < {"Cmd":"GetOWPResp","SeqNum":4,"owp":[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]} + +A version announcement is continuously sent every second: + +.. code-block:: text + + < {"Cmd":"VerResp","ver":"SmartHUB_","uid":""} +""" +import json +import serial + + +PORT_INDEX_MAP = { + "1": 5, "2": 4, "3": 3, "4": 2, "5": 1, "6": 0, + "7": 11, "8": 10, "9": 9, "10": 8, "11": 7, "12": 6, +} + + +# Port names printed on the device do not match the internal port index +def name_to_index(name_or_index): + return PORT_INDEX_MAP.get(name_or_index, name_or_index) + + +class LinkPiSmartHUB: + def __init__(self, path): + if not path: + raise ValueError("Device not found") + self.path = path + + def _command(self, command): + with serial.Serial(self.path, 115200, timeout=2) as s: + # wait on next version announcement + s.readline() + # send the command + s.write(f"{command}\r\n".encode()) + # read and return the response + return s.readline().decode().strip() + + def onoff(self, index, state): + return self._command(f"onoff {index} {state}") + + def state(self): + return self._command("state") + + +def handle_set(path, name_or_index, state): + smarthub = LinkPiSmartHUB(path) + smarthub.onoff(name_to_index(name_or_index), state) + + +def handle_get(path, name_or_index): + smarthub = LinkPiSmartHUB(path) + state = smarthub.state() + return bool(json.loads(state).get("state")[name_to_index(name_or_index)]) + + +methods = { + "set": handle_set, + "get": handle_get, +} diff --git a/tests/test_agent.py b/tests/test_agent.py index f5ac21653..27641caf7 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -102,6 +102,10 @@ def test_all_modules(): methods = aw.list() assert 'deditec_relais8.set' in methods assert 'deditec_relais8.get' in methods + aw.load('linkpismarthub') + methods = aw.list() + assert 'linkpismarthub.set' in methods + assert 'linkpismarthub.get' in methods aw.load('sysfsgpio') methods = aw.list() assert 'sysfsgpio.set' in methods @@ -114,4 +118,4 @@ def test_all_modules(): def test_import_modules(): import labgrid.util.agents import labgrid.util.agents.dummy - from labgrid.util.agents import deditec_relais8, sysfsgpio + from labgrid.util.agents import deditec_relais8, linkpismarthub, sysfsgpio diff --git a/tests/test_linkpismarthub.py b/tests/test_linkpismarthub.py new file mode 100644 index 000000000..7218d8cfd --- /dev/null +++ b/tests/test_linkpismarthub.py @@ -0,0 +1,143 @@ +import pytest + +from labgrid.driver import LinkPiSmartHUBPowerDriver +from labgrid.protocol import PowerProtocol, ResetProtocol +from labgrid.resource import LinkPiSmartHUBPowerPort +from labgrid.resource.remote import NetworkLinkPiSmartHUBPowerPort +from labgrid.util.agents.linkpismarthub import methods, name_to_index +from labgrid.util.agentwrapper import AgentWrapper + + +@pytest.fixture(scope='function') +def local_smarthub_port(target): + port = LinkPiSmartHUBPowerPort(target, name=None, index=1) + port.avail = True + yield port + + +@pytest.fixture(scope='function') +def remote_smarthub_port(target): + yield NetworkLinkPiSmartHUBPowerPort(target, + name=None, + host="localhost", + busnum=0, + devnum=1, + path='/dev/ttyUSB0', + vendor_id=0x0, + model_id=0x0, + index=1, + ) + + +@pytest.fixture(scope='function') +def smarthub_driver(target, mocker): + load_mock = mocker.patch.object(AgentWrapper, 'load') + driver = LinkPiSmartHUBPowerDriver(target, name=None) + target.activate(driver) + yield driver, load_mock.return_value + target.deactivate(driver) + load_mock.reset_mock() + + +class TestLinkPiSmartHUBPowerDriver: + def test_instanziation_local(self, target, local_smarthub_port): + driver = LinkPiSmartHUBPowerDriver(target, name=None) + assert (isinstance(driver, LinkPiSmartHUBPowerDriver)) + assert (isinstance(driver, PowerProtocol)) + assert (isinstance(driver, ResetProtocol)) + + def test_instanziation_remote(self, target, remote_smarthub_port): + driver = LinkPiSmartHUBPowerDriver(target, name=None) + assert (isinstance(driver, LinkPiSmartHUBPowerDriver)) + assert (isinstance(driver, PowerProtocol)) + assert (isinstance(driver, ResetProtocol)) + + def test_get(self, local_smarthub_port, smarthub_driver): + driver, proxy_mock = smarthub_driver + proxy_mock.get.return_value = True + state = driver.get() + assert state is True + proxy_mock.get.assert_called_once_with(local_smarthub_port.path, local_smarthub_port.index) + + def test_on(self, local_smarthub_port, smarthub_driver): + driver, proxy_mock = smarthub_driver + driver.on() + proxy_mock.set.assert_called_once_with(local_smarthub_port.path, local_smarthub_port.index, 1) + + def test_off(self, local_smarthub_port, smarthub_driver): + driver, proxy_mock = smarthub_driver + driver.off() + proxy_mock.set.assert_called_once_with(local_smarthub_port.path, local_smarthub_port.index, 0) + + def test_cycle(self, mocker, local_smarthub_port, smarthub_driver): + sleep_mock = mocker.patch("time.sleep") + driver, proxy_mock = smarthub_driver + driver.cycle() + proxy_mock.set.assert_has_calls([ + mocker.call(local_smarthub_port.path, local_smarthub_port.index, 0), + mocker.call(local_smarthub_port.path, local_smarthub_port.index, 1), + ]) + sleep_mock.assert_called_once_with(driver.delay) + + +def test_linkpismarthub_name_to_index(): + for i in range(1, 7): + assert name_to_index(str(i)) == 6 - i + assert name_to_index(i) == i + for i in range(7, 13): + assert name_to_index(str(i)) == 12 + 6 - i + assert name_to_index(i) == i + + +@pytest.mark.parametrize("path", ['', None]) +def test_linkpismarthub_fail_missing_path(path): + with pytest.raises(ValueError): + methods["set"](path, 0, 1) + with pytest.raises(ValueError): + methods["get"](path, 0) + + +@pytest.mark.parametrize("path", ["/dev/ttyUSB0", "/dev/ttyUSB1"]) +def test_linkpismarthub_use_correct_path(mocker, path): + serial_mock = mocker.patch("serial.Serial") + methods["set"](path, 0, 1) + serial_mock.assert_called_once_with(path, 115200, timeout=mocker.ANY) + serial_mock.reset_mock() + s = serial_mock.return_value.__enter__() + s.readline.return_value = b'{"Cmd":"StateResp","SeqNum":1,"state":[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}\r\n' + methods["get"](path, 0) + serial_mock.assert_called_once_with(path, 115200, timeout=mocker.ANY) + + +def test_linkpismarthub_set(mocker): + serial_mock = mocker.patch("serial.Serial") + s = serial_mock.return_value.__enter__() + methods["set"]("/dev/ttyUSB0", '1', 0) + s.write.assert_called_once_with(b'onoff 5 0\r\n') + serial_mock.reset_mock() + methods["set"]("/dev/ttyUSB0", 5, 1) + s.write.assert_called_once_with(b'onoff 5 1\r\n') + + +def test_linkpismarthub_get(mocker): + serial_mock = mocker.patch("serial.Serial") + s = serial_mock.return_value.__enter__() + s.readline.side_effect = [ + b'{"Cmd":"VerResp","ver":"SmartHUB_1.0","uid":"1234"}\r\n', + b'{"Cmd":"StateResp","SeqNum":1,"state":[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]}\r\n', + b'{"Cmd":"VerResp","ver":"SmartHUB_1.0","uid":"1234"}\r\n', + b'{"Cmd":"StateResp","SeqNum":2,"state":[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]}\r\n', + b'{"Cmd":"VerResp","ver":"SmartHUB_1.0","uid":"1234"}\r\n', + b'{"Cmd":"StateResp","SeqNum":3,"state":[0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}\r\n', + ] + state = methods["get"]("/dev/ttyUSB0", '1') + assert state == True + s.write.assert_called_once_with(b'state\r\n') + serial_mock.reset_mock() + state = methods["get"]("/dev/ttyUSB0", 5) + assert state == True + s.write.assert_called_once_with(b'state\r\n') + serial_mock.reset_mock() + state = methods["get"]("/dev/ttyUSB0", 0) + assert state == False + s.write.assert_called_once_with(b'state\r\n')