diff --git a/pyproject.toml b/pyproject.toml index 4f25a7f5..1c723b91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,8 @@ dependencies = [ "PyQt6", "darkdetect", "typer", + "bleak", + "numpy", ] classifiers = [ "Operating System :: POSIX :: Linux", diff --git a/src/labelle/cli/cli.py b/src/labelle/cli/cli.py index 4a4ffb27..17efbfd1 100755 --- a/src/labelle/cli/cli.py +++ b/src/labelle/cli/cli.py @@ -100,7 +100,10 @@ def list_devices() -> NoReturn: table = Table(*headers, show_header=True) for device in device_manager.devices: table.add_row( - device.manufacturer, device.product, device.serial_number, device.usb_id + device.manufacturer, + device.product, + device.serial_number, + device.connection_id, ) console.print(table) raise typer.Exit() diff --git a/src/labelle/gui/q_device_selector.py b/src/labelle/gui/q_device_selector.py index 0465cb7e..f0d6e831 100644 --- a/src/labelle/gui/q_device_selector.py +++ b/src/labelle/gui/q_device_selector.py @@ -9,15 +9,15 @@ QToolBar, ) +from labelle.lib.devices.device import Device from labelle.lib.devices.online_device_manager import OnlineDeviceManager -from labelle.lib.devices.usb_device import UsbDevice LOG = logging.getLogger(__name__) class QDeviceSelector(QToolBar): _device_manager: OnlineDeviceManager - _selected_device: UsbDevice | None + _selected_device: Device | None selectedDeviceChangedSignal = QtCore.pyqtSignal(name="selectedDeviceChangedSignal") @@ -89,7 +89,7 @@ def _init_layout(self) -> None: self._action_error_label = self.addWidget(self._error_label) @property - def selected_device(self) -> UsbDevice | None: + def selected_device(self) -> Device | None: device = None if self._devices.currentIndex() >= 0: device = self.device_manager.devices[self._devices.currentIndex()] diff --git a/src/labelle/lib/constants.py b/src/labelle/lib/constants.py index dc17da49..9c11b147 100755 --- a/src/labelle/lib/constants.py +++ b/src/labelle/lib/constants.py @@ -47,6 +47,10 @@ 0x1008: f"LabelManager Wireless PnP (mode switch) {UNCONFIRMED_MESSAGE}", 0x1009: f"MobileLabeler {UNCONFIRMED_MESSAGE}", } + +SUPPORTED_BLE_PRODUCTS = ["Letratag"] +SUPPORTED_BLE_MODELS = ["LT200B"] + DEV_VENDOR = 0x0922 PRINTER_INTERFACE_CLASS = 0x07 diff --git a/src/labelle/lib/devices/ble_device.py b/src/labelle/lib/devices/ble_device.py new file mode 100644 index 00000000..86aa5d4c --- /dev/null +++ b/src/labelle/lib/devices/ble_device.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +# TODO: Check sources for protocol +# https://github.com/monti-tls/Dymo-LT-Application/blob/main/dymo_lt_ble_interface.cpp +# https://github.com/brz/letra200bsharp/blob/main/letra200bsharp/LetraHelper.cs +# https://github.com/alexhorn/lt200b +import asyncio +import logging +import platform +from sys import version_info +from typing import Final, NoReturn + +if version_info.micro < 12: + from itertools import islice + + # From python docs, backport + def batched(iterable, n, *, strict=False): + # batched('ABCDEFG', 3) → ABC DEF G + if n < 1: + raise ValueError("n must be at least one") + iterator = iter(iterable) + while batch := tuple(islice(iterator, n)): + if strict and len(batch) != n: + raise ValueError("batched(): incomplete batch") + yield batch +else: + from itertools import batched # type: ignore + +# TODO: Check if Qt6.bluetooth would be a better option +# https://doc.qt.io/qtforpython-6/PySide6/QtBluetooth/index.html +# says, that BLE scanning is not supported +from bleak import BleakClient, BleakScanner +from bleak.backends.device import BLEDevice + +from labelle.lib.constants import ( + SUPPORTED_BLE_MODELS, + SUPPORTED_BLE_PRODUCTS, +) +from labelle.lib.devices.device import Device + +LOG = logging.getLogger(__name__) +GITHUB_ISSUE_MAC = "" +GITHUB_ISSUE_UDEV = "" + + +class BleDeviceError(RuntimeError): + pass + + +class BleDevice(Device): + END_BYTES: Final[list[int]] = [0x12, 0x34] + START_BYTES: Final[list[int]] = [0xFF, 0xF0, *END_BYTES] + + _dev: BLEDevice + _manufacturer: str + _serial_number: str + _model: str = "" + _device_name: str = "" + _loop: asyncio.AbstractEventLoop + _client: BleakClient + _devices: list[BLEDevice] | None = None + + def __init__(self, dev: BLEDevice) -> None: + self._dev = dev + self._loop = asyncio.new_event_loop() + + @property + def hash(self) -> str: + return self.connection_id + + @property + def manufacturer(self) -> str | None: + return self._manufacturer + + @property + def product(self) -> str | None: + return f"{self._device_name} {self._model}" + + @property + def serial_number(self) -> str | None: + return self._serial_number + + @property + def ble_id(self) -> str: + return self._dev.address + + @property + def connection_id(self) -> str: + return self.ble_id + + @staticmethod + def _is_supported_vendor(dev: BLEDevice) -> bool: + return dev.name is not None and dev.name.split()[0] in SUPPORTED_BLE_PRODUCTS + + @property + def is_supported(self) -> bool: + return ( + self._is_supported_vendor(self._dev) and self._model in SUPPORTED_BLE_MODELS + ) + + @classmethod + async def _scan(cls): + cls._devices = await BleakScanner.discover(2.0) + possible_devices: list[BLEDevice] = list( + filter(cls._is_supported_vendor, cls._devices) # type: ignore + ) + for device in possible_devices: + print(f"{device.name}") + + @classmethod + def supported_devices(cls) -> set[Device]: + devices = cls._devices + if not devices: + return set() + return { + BleDevice(dev) for dev in filter(BleDevice._is_supported_vendor, devices) + } + + @property + def device_info(self) -> str: + try: + _ = self.manufacturer + except ValueError: + self._instruct_on_access_denied() + res = "" + res += f"{self._dev!r}\n" + res += f" manufacturer: {self.manufacturer}\n" + res += f" product: {self.product}\n" + res += f" serial: {self.serial_number}\n" + return res + + # TODO: rewrite + def _instruct_on_access_denied(self) -> NoReturn: + system = platform.system() + if system == "Linux": + self._instruct_on_access_denied_linux() + elif system == "Windows": + raise BleDeviceError( + "Couldn't access the device. Please make sure that the " + "device driver is set to WinUSB. This can be accomplished " + "with Zadig ." + ) + elif system == "Darwin": + raise BleDeviceError( + f"Could not access {self._dev}. Thanks for bravely trying this on a " + f"Mac. You are in uncharted territory. It would be appreciated if you " + f"share the results of your experimentation at {GITHUB_ISSUE_MAC}." + ) + else: + raise BleDeviceError(f"Unknown platform {system}") + + # TODO: rewrite + def _instruct_on_access_denied_linux(self) -> NoReturn: + raise BleDeviceError("TODO") + + def setup(self) -> None: + try: + self._loop.run_until_complete(self._setup()) + except Exception as e: + raise BleDeviceError(f"Failed setup BLE device: {e}") from e + + async def _setup(self): + self._client = BleakClient(self._dev) + await self._client.connect() + if self._client._backend.__class__.__name__ == "BleakClientBlueZDBus": # type: ignore + await self._client._backend._acquire_mtu() # type: ignore + data = await self._client.read_gatt_char("00002a29-0000-1000-8000-00805f9b34fb") + self._manufacturer = "".join(map(chr, data)) + data = await self._client.read_gatt_char("00002a24-0000-1000-8000-00805f9b34fb") + self._model = "".join(map(chr, data)) + data = await self._client.read_gatt_char("00002a25-0000-1000-8000-00805f9b34fb") + self._serial_number = "".join(map(chr, data)) + data = await self._client.read_gatt_char("00002a00-0000-1000-8000-00805f9b34fb") + self._device_name = "".join(map(chr, data)) + + def dispose(self) -> None: + self._loop.run_until_complete(self._client.disconnect()) + + def is_match(self, patterns: list[str] | None) -> bool: + if patterns is None: + return True + match = True + for pattern in patterns: + pattern = pattern.lower() + match &= ( + pattern in (self.manufacturer or "").lower() + or pattern in (self.product or "").lower() + or pattern in (self.serial_number or "").lower() + ) + return match + + @staticmethod + def checksum(data): + return sum(data) & 0xFF + + @staticmethod + def chunkify(body, chunk_size=498): + chunks = [list(x) for x in batched(body, chunk_size)] + chunks[-1].extend(BleDevice.END_BYTES) + return chunks + + @staticmethod + def get_header(body): + body_size = len(body).to_bytes(4, byteorder="little") + result = [ + *BleDevice.START_BYTES, + *body_size, + ] + result.append(BleDevice.checksum(result)) + return result + + def execute_command( + self, cmd: list[int], synwait: int | None = None, response: bool = False + ) -> list[int] | None: + return self._loop.run_until_complete(self._execute_command(cmd, response)) + + async def _execute_command( + self, cmd: list[int], response=False + ) -> list[int] | None: + if not self._client.is_connected: + print("Connection lost, reconnecting…") + await self._client.connect() + if self._client._backend.__class__.__name__ == "BleakClientBlueZDBus": # type: ignore + self._client._backend._acquire_mtu() # type: ignore + print("Connceted") + head = BleDevice.get_header(cmd) + print("sending header:") + chunks = BleDevice.chunkify(cmd, self._client.mtu_size - 2) + print("sending chunkks") + await self._client.write_gatt_char( + "be3dd651-2b3d-42f1-99c1-f0f749dd0678", bytearray(head), response=False + ) + for chunk in chunks: + await self._client.write_gatt_char( + "be3dd651-2b3d-42f1-99c1-f0f749dd0678", + bytearray(chunk), + response=response, + ) + if response: + # TODO: Not sure which UUID is correct, could also be + # be3dd653-2b3d-42f1-99c1-f0f749dd0678 + data = await self._client.read_gatt_char( + "be3dd652-2b3d-42f1-99c1-f0f749dd0678" + ) + return list(data) + return None + + +# TODO: Make this toggelable +asyncio.run(BleDevice._scan()) diff --git a/src/labelle/lib/devices/device.py b/src/labelle/lib/devices/device.py new file mode 100644 index 00000000..cde4e727 --- /dev/null +++ b/src/labelle/lib/devices/device.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import NoReturn + +LOG = logging.getLogger(__name__) +GITHUB_ISSUE_MAC = "" +GITHUB_ISSUE_UDEV = "" + + +class DeviceError(RuntimeError): + pass + + +class Device(ABC): + @property + @abstractmethod + def hash(self) -> str: + pass + + @property + @abstractmethod + def manufacturer(self) -> str | None: + pass + + @property + @abstractmethod + def product(self) -> str | None: + pass + + @property + @abstractmethod + def serial_number(self) -> str | None: + pass + + @property + @abstractmethod + def connection_id(self) -> str: + pass + + @staticmethod + @abstractmethod + def _is_supported_vendor(dev) -> bool: + return False + + @property + @abstractmethod + def is_supported(self) -> bool: + pass + + @classmethod + def supported_devices(cls) -> set[Device]: + results = set() + for subclass in Device.__subclasses__(): + results |= subclass.supported_devices() + return results + + @property + @abstractmethod + def device_info(self) -> str: + pass + + @abstractmethod + def _instruct_on_access_denied(self) -> NoReturn: + exit(-1) + + @abstractmethod + def _instruct_on_access_denied_linux(self) -> NoReturn: + exit(-2) + + @abstractmethod + def setup(self) -> None: + pass + + @abstractmethod + def dispose(self) -> None: + pass + + def is_match(self, patterns: list[str] | None) -> bool: + if patterns is None: + return True + match = True + for pattern in patterns: + pattern = pattern.lower() + match &= ( + pattern in (self.manufacturer or "").lower() + or pattern in (self.product or "").lower() + or pattern in (self.serial_number or "").lower() + ) + return match + + @abstractmethod + def execute_command( + self, cmd: list[int], synwait: int | None, response: bool + ) -> list[int] | None: + pass diff --git a/src/labelle/lib/devices/device_manager.py b/src/labelle/lib/devices/device_manager.py index 0cd9b508..48bac3e2 100644 --- a/src/labelle/lib/devices/device_manager.py +++ b/src/labelle/lib/devices/device_manager.py @@ -8,10 +8,11 @@ SUPPORTED_PRODUCTS, UNCONFIRMED_MESSAGE, ) +from labelle.lib.devices.device import Device from labelle.lib.devices.usb_device import UsbDevice LOG = logging.getLogger(__name__) -POSSIBLE_USB_ERRORS = (NoBackendError, USBError) +POSSIBLE_DEVICE_ERRORS = (NoBackendError, USBError) class DeviceManagerError(RuntimeError): @@ -23,7 +24,7 @@ class DeviceManagerNoDevices(DeviceManagerError): class DeviceManager: - _devices: dict[str, UsbDevice] + _devices: dict[str, Device] def __init__(self) -> None: self._devices = {} @@ -31,8 +32,8 @@ def __init__(self) -> None: def scan(self) -> bool: prev = self._devices try: - cur = {dev.hash: dev for dev in UsbDevice.supported_devices() if dev.hash} - except POSSIBLE_USB_ERRORS as e: + cur = {dev.hash: dev for dev in Device.supported_devices() if dev.hash} + except POSSIBLE_DEVICE_ERRORS as e: self._devices.clear() raise DeviceManagerError(f"Failed scanning devices: {e}") from e if len(cur) == 0: @@ -51,22 +52,22 @@ def scan(self) -> bool: return changed @property - def devices(self) -> list[UsbDevice]: + def devices(self) -> list[Device]: try: return sorted(self._devices.values(), key=lambda dev: dev.hash) - except POSSIBLE_USB_ERRORS: + except POSSIBLE_DEVICE_ERRORS: return [] - def matching_devices(self, patterns: list[str] | None) -> list[UsbDevice]: + def matching_devices(self, patterns: list[str] | None) -> list[Device]: try: matching = filter( lambda dev: dev.is_match(patterns), self._devices.values() ) return sorted(matching, key=lambda dev: dev.hash) - except POSSIBLE_USB_ERRORS: + except POSSIBLE_DEVICE_ERRORS: return [] - def find_and_select_device(self, patterns: list[str] | None = None) -> UsbDevice: + def find_and_select_device(self, patterns: list[str] | None = None) -> Device: devices = [ device for device in self.matching_devices(patterns) if device.is_supported ] @@ -79,9 +80,13 @@ def find_and_select_device(self, patterns: list[str] | None = None) -> UsbDevice for dev in devices: LOG.debug(dev.device_info) dev = devices[0] - if dev.is_supported: + if dev.is_supported and type(dev) is UsbDevice: msg = f"Recognized device as {SUPPORTED_PRODUCTS[dev.id_product]}" - else: + elif dev.is_supported: + msg = f"Recognized device as {dev.product}" + elif type(dev) is UsbDevice: msg = f"Unrecognized device: {hex(dev.id_product)}. {UNCONFIRMED_MESSAGE}" + else: + msg = f"Unrecognized device: {dev.product}" LOG.debug(msg) return dev diff --git a/src/labelle/lib/devices/dymo_labeler.py b/src/labelle/lib/devices/dymo_labeler.py index 6629598a..c0e5c02f 100755 --- a/src/labelle/lib/devices/dymo_labeler.py +++ b/src/labelle/lib/devices/dymo_labeler.py @@ -11,16 +11,19 @@ import logging import math -import usb +import numpy as np from PIL import Image from usb.core import NoBackendError, USBError from labelle.lib.constants import ESC, SYN +from labelle.lib.devices.ble_device import BleDevice, BleDeviceError +from labelle.lib.devices.device import Device, DeviceError from labelle.lib.devices.usb_device import UsbDevice, UsbDeviceError from labelle.lib.utils import mm_to_px LOG = logging.getLogger(__name__) POSSIBLE_USB_ERRORS = (UsbDeviceError, NoBackendError, USBError) +POSSIBLE_BLE_ERRORS = BleDeviceError class DymoLabelerDetectError(Exception): @@ -58,13 +61,11 @@ class DymoLabelerFunctions: # sensible timeout can also be calculated dynamically. _synwait: int | None _bytesPerLine: int | None - _devout: usb.core.Endpoint - _devin: usb.core.Endpoint + _device: Device def __init__( self, - devout: usb.core.Endpoint, - devin: usb.core.Endpoint, + device: Device, synwait: int | None = None, ): """Initialize the LabelManager object (HLF).""" @@ -73,8 +74,7 @@ def __init__( self._bytesPerLine = None self._dotTab = 0 self._maxLines = 200 - self._devout = devout - self._devin = devin + self._device = device self._synwait = synwait @classmethod @@ -86,52 +86,7 @@ def height_px(cls, tape_size_mm: int): return cls._max_bytes_per_line(tape_size_mm) * 8 def _send_command(self): - """Send the already built command to the LabelManager (MLF).""" - if len(self._cmd) == 0: - return None - - while len(self._cmd) > 0: - if self._synwait is None: - cmd_to_send = self._cmd - cmd_rest = [] - else: - # Send a status request - cmdBin = array.array("B", [ESC, ord("A")]) - cmdBin.tofile(self._devout) - rspBin = self._devin.read(512) - _ = array.array("B", rspBin).tolist() - # Ok, we got a response. Now we can send a chunk of data - - # Compute a chunk with at most synwait SYN characters - synCount = 0 # Number of SYN characters encountered in iteration - pos = -1 # Index of last SYN character encountered in iteration - while synCount < self._synwait: - try: - # Increment pos to the index of the next SYN character - pos += self._cmd[pos + 1 :].index(SYN) + 1 - synCount += 1 - except ValueError: - # No more SYN characters in cmd - pos = len(self._cmd) - break - cmd_to_send = self._cmd[:pos] - cmd_rest = self._cmd[pos:] - LOG.debug(f"Sending chunk of {len(cmd_to_send)} bytes") - - # Remove the computed chunk from the command to be processed - self._cmd = cmd_rest - - # Send the chunk - cmdBin = array.array("B", cmd_to_send) - cmdBin.tofile(self._devout) - - self._cmd = [] # This looks redundant. - if not self._response: - return None - self._response = False - responseBin = self._devin.read(512) - response = array.array("B", responseBin).tolist() - return response + return self._device.execute_command(self._cmd, self._synwait, self._response) def _reset_command(self) -> None: """Remove a partially built command (MLF).""" @@ -183,6 +138,27 @@ def _line(self, value) -> None: cmd = [SYN, *value] self._build_command(cmd) + def _ble_print_data(self, lines: list[int], width: int, height: int) -> None: + assert height * width == len(lines) * 8 + cmd = [ + ESC, + ord("D"), + 1, + 2, # Seems to be ignored? + *width.to_bytes(4, byteorder="little"), + *height.to_bytes(4, byteorder="little"), + *lines, + ] + self._build_command(cmd) + + def _start(self) -> None: + cmd = [ESC, ord("s"), 0x9A, 2, 0, 0] + self._build_command(cmd) + + def _end(self) -> None: + cmd = [ESC, ord("Q")] + self._build_command(cmd) + def _chain_mark(self, tape_size_mm: int) -> None: """Set Chain Mark (MLF).""" self._dot_tab(0, tape_size_mm) @@ -223,6 +199,15 @@ def print_label(self, lines: list[list[int]]): del lines[0 : self._maxLines] self._raw_print_label(lines) + def _ble_print(self, lines: list[int], width: int, height: int) -> None: + self._start() + self._ble_print_data(lines, width, height) + # self._cut() + self._status_request() + self._end() + status = self._send_command() + LOG.debug(f"Post-send response: {status}") + def _raw_print_label(self, lines: list[list[int]]): """Print the label described by lines (HLF).""" # Here used to be a matrix optimization code that caused problems in issue #87 @@ -235,7 +220,7 @@ def _raw_print_label(self, lines: list[list[int]]): class DymoLabeler: - _device: UsbDevice | None + _device: Device | None tape_size_mm: int LABELER_DISTANCE_BETWEEN_PRINT_HEAD_AND_CUTTER_MM = 8.1 @@ -246,7 +231,7 @@ class DymoLabeler: def __init__( self, tape_size_mm: int | None = None, - device: UsbDevice | None = None, + device: Device | None = None, ): if tape_size_mm is None: tape_size_mm = self.DEFAULT_TAPE_SIZE_MM @@ -266,8 +251,7 @@ def height_px(self): def _functions(self) -> DymoLabelerFunctions: assert self._device is not None return DymoLabelerFunctions( - devout=self._device.devout, - devin=self._device.devin, + device=self._device, synwait=64, ) @@ -287,15 +271,15 @@ def labeler_margin_px(self) -> tuple[float, float]: ) @property - def device(self) -> UsbDevice | None: + def device(self) -> Device | None: return self._device @device.setter - def device(self, device: UsbDevice | None): + def device(self, device: Device | None): try: if device: device.setup() - except UsbDeviceError as e: + except DeviceError as e: device = None LOG.error(e) self._device = device @@ -313,37 +297,60 @@ def print( The label bitmap is a PIL image in 1-bit format (mode=1), and pixels with value equal to 1 are burned. """ - # Convert the image to the proper matrix for the dymo labeler object so that - # rows span the width of the label, and the first row corresponds to the left - # edge of the label. - rotated_bitmap = bitmap.transpose(Image.Transpose.ROTATE_270) - - # Convert the image to raw bytes. Pixels along rows are chunked into groups of - # 8 pixels, and subsequent rows are concatenated. - stream: bytes = rotated_bitmap.tobytes() - - # Regather the bytes into rows - stream_row_length = math.ceil(bitmap.height / 8) - if len(stream) // stream_row_length != bitmap.width: - raise RuntimeError( - "An internal problem was encountered while processing the label bitmap!" - ) - label_rows: list[bytes] = [ - stream[i : i + stream_row_length] - for i in range(0, len(stream), stream_row_length) - ] - - # Convert bytes into ints - label_matrix: list[list[int]] = [ - array.array("B", label_row).tolist() for label_row in label_rows - ] - - try: - LOG.debug("Printing label..") - self._functions.print_label(label_matrix) - LOG.debug("Done printing.") - if self._device is not None: - self._device.dispose() - LOG.debug("Cleaned up.") - except POSSIBLE_USB_ERRORS as e: - raise DymoLabelerPrintError(str(e)) from e + if type(self.device) is UsbDevice: + # Convert the image to the proper matrix for the dymo labeler object so that + # rows span the width of the label, and the first row corresponds to the + # left edge of the label. + + rotated_bitmap = bitmap.transpose(Image.Transpose.ROTATE_270) + + # Convert the image to raw bytes. Pixels along rows are chunked into groups + # of 8 pixels, and subsequent rows are concatenated. + stream: bytes = rotated_bitmap.tobytes() + + # Regather the bytes into rows + stream_row_length = math.ceil(bitmap.height / 8) + if len(stream) // stream_row_length != bitmap.width: + raise RuntimeError( + "An internal problem was encountered while processing" + + "the label bitmap!" + ) + label_rows: list[bytes] = [ + stream[i : i + stream_row_length] + for i in range(0, len(stream), stream_row_length) + ] + + # Convert bytes into ints + label_matrix: list[list[int]] = [ + array.array("B", label_row).tolist() for label_row in label_rows + ] + + try: + LOG.debug("Printing label..") + self._functions.print_label(label_matrix) + LOG.debug("Done printing.") + if self._device is not None: + self._device.dispose() + LOG.debug("Cleaned up.") + except POSSIBLE_USB_ERRORS as e: + raise DymoLabelerPrintError(str(e)) from e + elif type(self.device) is BleDevice: + # bitmap = bitmap.convert("1", dither=Image.Dither.NONE) + # rotated_bitmap = bitmap.rotate(-90, expand=1) + rotated_bitmap = bitmap.transpose(Image.Transpose.ROTATE_270) + height = 32 # TODO: Make this part of the config + width = int(64 * (rotated_bitmap.height / rotated_bitmap.width)) + rotated_bitmap = rotated_bitmap.resize((height, width)) + data = np.packbits( + np.array(rotated_bitmap.getdata()), + bitorder="little", + ).tolist() + try: + LOG.debug("Printing label..") + self._functions._ble_print(data, width, height) + LOG.debug("Done printing.") + if self._device is not None: + self._device.dispose() + LOG.debug("Cleaned up.") + except POSSIBLE_BLE_ERRORS as e: + raise DymoLabelerPrintError(str(e)) from e diff --git a/src/labelle/lib/devices/online_device_manager.py b/src/labelle/lib/devices/online_device_manager.py index a8e0394c..4459cef0 100644 --- a/src/labelle/lib/devices/online_device_manager.py +++ b/src/labelle/lib/devices/online_device_manager.py @@ -7,8 +7,8 @@ from PyQt6.QtWidgets import QWidget from usb.core import NoBackendError, USBError +from labelle.lib.devices.device import Device from labelle.lib.devices.device_manager import DeviceManager, DeviceManagerError -from labelle.lib.devices.usb_device import UsbDevice LOG = logging.getLogger(__name__) POSSIBLE_USB_ERRORS = (NoBackendError, USBError) @@ -54,5 +54,5 @@ def last_scan_error(self) -> DeviceManagerError | None: return self._last_scan_error @property - def devices(self) -> list[UsbDevice]: + def devices(self) -> list[Device]: return self._device_manager.devices diff --git a/src/labelle/lib/devices/usb_device.py b/src/labelle/lib/devices/usb_device.py index c17a988b..0cb4761b 100644 --- a/src/labelle/lib/devices/usb_device.py +++ b/src/labelle/lib/devices/usb_device.py @@ -1,5 +1,6 @@ from __future__ import annotations +import array import logging import platform from textwrap import dedent @@ -9,10 +10,13 @@ from labelle.lib.constants import ( DEV_VENDOR, + ESC, HID_INTERFACE_CLASS, PRINTER_INTERFACE_CLASS, SUPPORTED_PRODUCTS, + SYN, ) +from labelle.lib.devices.device import Device LOG = logging.getLogger(__name__) GITHUB_ISSUE_MAC = "" @@ -23,7 +27,7 @@ class UsbDeviceError(RuntimeError): pass -class UsbDevice: +class UsbDevice(Device): _dev: usb.core.Device _intf: usb.core.Interface | None _devin: usb.core.Endpoint | None @@ -81,6 +85,10 @@ def usb_id(self) -> str: address = self._get_dev_attribute("address") return f"Bus {bus:03} Device {address:03}: ID {self.vendor_product_id}" + @property + def connection_id(self) -> str: + return self.usb_id + @staticmethod def _is_supported_vendor(dev: usb.core.Device) -> bool: return dev.idVendor == DEV_VENDOR @@ -92,14 +100,14 @@ def is_supported(self) -> bool: and self.id_product in SUPPORTED_PRODUCTS ) - @staticmethod - def supported_devices() -> set[UsbDevice]: - return { - UsbDevice(dev) - for dev in usb.core.find( - find_all=True, custom_match=UsbDevice._is_supported_vendor - ) - } + @classmethod + def supported_devices(cls) -> set[Device]: + usb_devices = usb.core.find( + find_all=True, custom_match=UsbDevice._is_supported_vendor + ) + if not usb_devices: + usb_devices = [] + return {UsbDevice(dev) for dev in usb_devices if type(dev) is usb.core.Device} @property def device_info(self) -> str: @@ -288,9 +296,58 @@ def is_match(self, patterns: list[str] | None) -> bool: return match @property - def devin(self): + def devin(self) -> usb.Endpoint: return self._devin @property - def devout(self): + def devout(self) -> usb.Endpoint: return self._devout + + def execute_command( + self, cmd: list[int], synwait: int | None = None, response: bool = False + ) -> list[int] | None: + """Send the already built command to the LabelManager (MLF).""" + if len(cmd) == 0: + return None + + while len(cmd) > 0: + if synwait is None: + cmd_to_send: list[int] = self._cmd + cmd_rest = [] + else: + # Send a status request + cmdBin = array.array("B", [ESC, ord("A")]) + cmdBin.tofile(self.devout) + rspBin = self.devin.read(512) + _ = array.array("B", rspBin).tolist() + # Ok, we got a response. Now we can send a chunk of data + + # Compute a chunk with at most synwait SYN characters + synCount = 0 # Number of SYN characters encountered in iteration + pos = -1 # Index of last SYN character encountered in iteration + while synCount < synwait: + try: + # Increment pos to the index of the next SYN character + pos += cmd[pos + 1 :].index(SYN) + 1 + synCount += 1 + except ValueError: + # No more SYN characters in cmd + pos = len(cmd) + break + cmd_to_send = cmd[:pos] + cmd_rest = cmd[pos:] + LOG.debug(f"Sending chunk of {len(cmd_to_send)} bytes") + + # Remove the computed chunk from the command to be processed + cmd = cmd_rest + + # Send the chunk + cmdBin = array.array("B", cmd_to_send) + cmdBin.tofile(self.devout) + + self._cmd: list[int] = [] # This looks redundant. + if not response: + return None + responseBin = self.devin.read(512) + responseList = array.array("B", responseBin).tolist() + return responseList