diff --git a/.github/workflows/code_quality.yaml b/.github/workflows/code_quality.yaml new file mode 100644 index 0000000..0151a12 --- /dev/null +++ b/.github/workflows/code_quality.yaml @@ -0,0 +1,42 @@ +name: Code Quality + +on: + pull_request: + branches: + - master + push: + +jobs: + code_quality: + name: ${{ matrix.name }} + runs-on: ubuntu-latest + strategy: + matrix: + include: + - id: black + name: Check code with black + - id: isort + name: Check code with isort + - id: pylint + name: Check code with pylint + - id: mypy + name: Check code with mypy + steps: + - name: Checkout the repository + uses: actions/checkout@v4 + + - name: Set up Python 3 + uses: actions/setup-python@v5 + id: python + with: + python-version: "3.11" + + - name: Install workflow dependencies + run: | + pip install -r .github/workflows/requirements.txt + + - name: Install Python dependencies + run: poetry install --no-interaction + + - name: Run ${{ matrix.id }} checks + run: poetry run ${{ matrix.id }} src \ No newline at end of file diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml new file mode 100644 index 0000000..e1d329e --- /dev/null +++ b/.github/workflows/codeql.yaml @@ -0,0 +1,36 @@ +name: "CodeQL" + +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: ["python"] + + steps: + - name: Checkout repository + uses: actions/checkout@v5 + + - name: Initialize CodeQL + uses: github/codeql-action/init@v4 + with: + languages: ${{ matrix.language }} + + - name: Autobuild + uses: github/codeql-action/autobuild@v4 + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v4 \ No newline at end of file diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml deleted file mode 100644 index c73e032..0000000 --- a/.github/workflows/pylint.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: Pylint - -on: [push] - -jobs: - build: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.8", "3.9", "3.10"] - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install pylint - - name: Analysing the code with pylint - run: | - pylint $(git ls-files '*.py') diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..f60e63c --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,43 @@ +name: Publish release + +on: + release: + types: [published] + +jobs: + build-and-publish-pypi: + name: Builds and publishes release to PyPI + runs-on: ubuntu-latest + outputs: + version: ${{ steps.vars.outputs.tag }} + steps: + - uses: actions/checkout@v5 + + - name: Set up Python 3.11 + uses: actions/setup-python@v6 + with: + python-version: "3.11" + + - name: Install workflow dependencies + run: | + pip install -r .github/workflows/requirements.txt + + - name: Install dependencies + run: poetry install --no-interaction + + - name: Set package version + run: | + version="${{ github.event.release.tag_name }}" + version="${version,,}" + version="${version#v}" + poetry version --no-interaction "${version}" + + - name: Build package + run: poetry build --no-interaction + + - name: Publish to PyPi + env: + PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} + run: | + poetry config pypi-token.pypi "${PYPI_TOKEN}" + poetry publish --no-interaction \ No newline at end of file diff --git a/.github/workflows/requirements.txt b/.github/workflows/requirements.txt new file mode 100644 index 0000000..f2b9c88 --- /dev/null +++ b/.github/workflows/requirements.txt @@ -0,0 +1,2 @@ +pip>=23.3 +poetry==1.5.1 \ No newline at end of file diff --git a/README.md b/README.md index 8d90d0e..c574d6f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,15 @@ +[![GitHub Latest Release][releases_shield]][latest_release] +[![PyPI][pypi_releases_shield]][pypi_latest_release] +[![PyPI - Downloads][pypi_downloads_shield]][pypi_downloads] + +[latest_release]: https://github.com/maksp86/Python-package-vacuum-map-parser-ijai/releases/latest +[releases_shield]: https://img.shields.io/github/v/release/maksp86/Python-package-vacuum-map-parser-ijai + +[pypi_latest_release]: https://pypi.org/project/vacuum-map-parser-ijai/ +[pypi_releases_shield]: https://img.shields.io/pypi/v/vacuum-map-parser-ijai + +[pypi_downloads]: https://pepy.tech/project/vacuum-map-parser-ijai +[pypi_downloads_shield]: https://static.pepy.tech/badge/vacuum-map-parser-ijai # Vacuum map parser - Ijai @@ -36,6 +48,13 @@ unpacked_map = parser.unpack_map(raw_map, device_mac='**:**:**:**:**:**') parsed_map = parser.parse(unpacked_map) ``` + +## Supported vacuums: +- ijai.vacuum.* (at least v1, v2, v3, v10, v13, v18, v19) +- xiaomi.vacuum.c103 +- xiaomi.vacuum.c104 +- xiaomi.vacuum.b106eu +*If you got another vacuum to work, please tell us* ## Special thanks diff --git a/pyproject.toml b/pyproject.toml index e8252c8..cefc17e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ version = "0.0.0" license = "Apache-2.0" description = "Functionalities for Ijai vacuum map parsing" readme = "README.md" -authors = ["Alexander Vassilyevsky "] +authors = ["maksp86 ", "Alexander Vassilyevsky "] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", @@ -13,22 +13,22 @@ classifiers = [ "Programming Language :: Python :: 3.11", "Topic :: Home Automation", ] -packages = [ - { include = "vacuum_map_parser_ijai", from = "src" }, -] +packages = [{ include = "vacuum_map_parser_ijai", from = "src" }] [tool.poetry.urls] -"Homepage" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai" -"Repository" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai" -"Bug Tracker" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai/issues" -"Changelog" = "https://github.com/Tarh-76/Python-package-vacuum-map-parser-ijai/releases" +"Homepage" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai" +"Repository" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai" +"Bug Tracker" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai/issues" +"Changelog" = "https://github.com/maksp86/Python-package-vacuum-map-parser-ijai/releases" [tool.poetry.dependencies] python = "^3.11" Pillow = "*" -vacuum-map-parser-base = "0.1.2" +pycryptodome = "*" + +vacuum-map-parser-base = ">=0.1.5" -[tool.poetry.dev-dependencies] +[tool.poetry.group.dev.dependencies] black = "*" mypy = "*" ruff = "*" @@ -45,6 +45,12 @@ line_length = 120 [tool.mypy] platform = "linux" +exclude = '''(?x) + ( ^.*RobotMap_pb2\.py$ + | ^.*RobotMap_pb2\.pyi$ + | ^.*beautify_min\.py$ + ) +''' check_untyped_defs = true disallow_any_generics = true @@ -64,7 +70,18 @@ warn_unused_configs = true warn_unused_ignores = true [tool.pylint] -disable = ["C0103", "C0116", "R0902", "R0903", "R0912", "R0913", "R0914", "R0915", "W0640"] +disable = [ + "C0103", + "C0116", + "R0902", + "R0903", + "R0912", + "R0913", + "R0914", + "R0915", + "W0640", + "R0917", +] max-line-length = 120 [build-system] diff --git a/src/vacuum_map_parser_ijai/RobotMap_pb2.py b/src/vacuum_map_parser_ijai/RobotMap_pb2.py index 44a4b22..e216713 100644 --- a/src/vacuum_map_parser_ijai/RobotMap_pb2.py +++ b/src/vacuum_map_parser_ijai/RobotMap_pb2.py @@ -1,3 +1,4 @@ +# pylint: skip-file # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: RobotMap.proto @@ -7,6 +8,7 @@ from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder + # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/src/vacuum_map_parser_ijai/aes_decryptor.py b/src/vacuum_map_parser_ijai/aes_decryptor.py index 3e468de..72c1db3 100644 --- a/src/vacuum_map_parser_ijai/aes_decryptor.py +++ b/src/vacuum_map_parser_ijai/aes_decryptor.py @@ -1,12 +1,16 @@ +"""Module that provides functions for decrypting a map.""" + +import base64 +import binascii + from Crypto.Cipher import AES from Crypto.Hash import MD5 from Crypto.Util.Padding import pad, unpad -import base64 +from .status_mapping import is_EncryptKeyTypeHex_model -isEncryptKeyTypeHex = True -def aes_encrypt(data, key: str): +def aes_encrypt(data: str, key: str) -> str: cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB) encryptedData = cipher.encrypt( @@ -15,7 +19,8 @@ def aes_encrypt(data, key: str): return encryptedBase64Str -def aes_decrypt(data, key: str): + +def aes_decrypt(data: bytes, key: str, isEncryptKeyTypeHex: bool) -> bytes: parsedKey = key.encode("utf-8") if isEncryptKeyTypeHex: parsedKey = bytes.fromhex(key) @@ -29,7 +34,7 @@ def aes_decrypt(data, key: str): return bytes.fromhex(decryptedData.decode("utf-8")) -def md5key(string: str, model: str, device_mac: str): +def md5key(string: str, model: str, device_mac: str) -> str: pjstr = "".join(device_mac.lower().split(":")) tempModel = model.split('.')[-1] @@ -45,20 +50,29 @@ def md5key(string: str, model: str, device_mac: str): aeskey = aes_encrypt(string, tempKey) temp = MD5.new(aeskey.encode('utf-8')).hexdigest() - if isEncryptKeyTypeHex: + if is_EncryptKeyTypeHex_model(model): return temp return temp[8:-8].upper() -def gen_md5_key(wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str): +def gen_md5_key(wifi_info_sn: str, owner_id: str, + device_id: str, model: str, + device_mac: str) -> str: arr = [wifi_info_sn, owner_id, device_id] tempString = '+'.join(arr) return md5key(tempString, model, device_mac) -def decrypt(data: bytes, wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str) -> bytes: +def decrypt(data: bytes, wifi_info_sn: str, + owner_id: str, device_id: str, + model: str, device_mac: str) -> bytes: try: data = base64.b64decode(data, validate=True) - except: + except binascii.Error: pass - return aes_decrypt(data, gen_md5_key(wifi_info_sn, owner_id, device_id, model, device_mac)) + key_type_hex = is_EncryptKeyTypeHex_model(model) + return aes_decrypt(data, + gen_md5_key(wifi_info_sn, owner_id, + device_id, model, + device_mac), + key_type_hex) diff --git a/src/vacuum_map_parser_ijai/beautify_min.py b/src/vacuum_map_parser_ijai/beautify_min.py index 6ec9d97..fb4f329 100644 --- a/src/vacuum_map_parser_ijai/beautify_min.py +++ b/src/vacuum_map_parser_ijai/beautify_min.py @@ -1,6 +1,9 @@ +# pylint: skip-file +import collections + from vacuum_map_parser_base.map_data import Point + from vacuum_map_parser_ijai.RobotMap_pb2 import RobotMap -import collections class BeautifyMap: @@ -33,7 +36,7 @@ def setMap(self, mapData: RobotMap.MapDataInfo): self.map = tempArray - def normalizeMap(self): + def normalizeMap(self) -> None: # normalizing all data to bytes and values suitable for map_data_parser for i in range(len(self.map)): if self.map[i] < 0: @@ -45,10 +48,10 @@ def normalizeMap(self): elif self.map[i] == 40: self.map[i] = 255 - def getMap(self): + def getMap(self) -> list[int]: return self.map - def transform(self): + def transform(self) -> None: non_boundary_noise = [] self.findRoiMap() self.expandBlackRect(4, 4, self.map[0]) @@ -927,7 +930,7 @@ def searchLineForNewSeed(self, dst: list[int], x_left, x_right, line_row, raw_va return scan_line_seed - def fillInternalObstacles(self): + def fillInternalObstacles(self) -> None: if (self.tRect["width"] == 0 and self.tRect["height"] == 0): self.findRoiMap() diff --git a/src/vacuum_map_parser_ijai/ijai_coordinate_transforms.py b/src/vacuum_map_parser_ijai/ijai_coordinate_transforms.py index 9cc726d..815e236 100644 --- a/src/vacuum_map_parser_ijai/ijai_coordinate_transforms.py +++ b/src/vacuum_map_parser_ijai/ijai_coordinate_transforms.py @@ -1,17 +1,23 @@ -import vacuum_map_parser_ijai.RobotMap_pb2 as RobotMap +"""Module for transforming coordinates.""" from vacuum_map_parser_base.map_data import Point -class Transformer: - def __init__(self, map:RobotMap): - self.map_head = map.mapHead - self.to_image_multiplier = Point(self.map_head.sizeX/(self.map_head.maxX - self.map_head.minX), \ - self.map_head.sizeY/(self.map_head.maxY - self.map_head.minY)) - def map_to_image(self, pt:Point): - return Point((pt.x - self.map_head.minX) * self.to_image_multiplier.x, \ +import vacuum_map_parser_ijai.RobotMap_pb2 as RobotMap + + +class Transformer: # pylint: disable=E1101 + """Class for transforming coordinates.""" + + def __init__(self, robotmap: RobotMap.RobotMap): + self.map_head = robotmap.mapHead + self.to_image_multiplier = Point(self.map_head.sizeX/(self.map_head.maxX - self.map_head.minX), + self.map_head.sizeY/(self.map_head.maxY - self.map_head.minY)) + + def map_to_image(self, pt: Point) -> Point: + return Point((pt.x - self.map_head.minX) * self.to_image_multiplier.x, (pt.y - self.map_head.minY) * self.to_image_multiplier.y) - def image_to_map_x (self, x:int): + def image_to_map_x(self, x: int) -> float: return x/self.to_image_multiplier.x + self.map_head.minX - - def image_to_map_y (self, y:int): + + def image_to_map_y(self, y: int) -> float: return y/self.to_image_multiplier.y + self.map_head.minY diff --git a/src/vacuum_map_parser_ijai/image_parser.py b/src/vacuum_map_parser_ijai/image_parser.py index 17a7363..0126fe4 100644 --- a/src/vacuum_map_parser_ijai/image_parser.py +++ b/src/vacuum_map_parser_ijai/image_parser.py @@ -5,7 +5,6 @@ from PIL import Image from PIL.Image import Image as ImageType from PIL.Image import Resampling - from vacuum_map_parser_base.config.color import ColorsPalette, SupportedColor from vacuum_map_parser_base.config.drawable import Drawable from vacuum_map_parser_base.config.image_config import ImageConfig @@ -15,7 +14,7 @@ class IjaiImageParser: - + """Ijai map image parser.""" MAP_OUTSIDE = 0x00 @@ -32,16 +31,17 @@ def __init__(self, palette: ColorsPalette, image_config: ImageConfig, drawables: self._image_config = image_config self._drawables = drawables self.color_map = { - IjaiImageParser.MAP_OUTSIDE: palette.get_color(SupportedColor.MAP_OUTSIDE), - IjaiImageParser.MAP_WALL:palette.get_color(SupportedColor.MAP_WALL_V2), - IjaiImageParser.MAP_SCAN: palette.get_color(SupportedColor.SCAN), - IjaiImageParser.MAP_NEW_DISCOVERED_AREA: palette.get_color(SupportedColor.NEW_DISCOVERED_AREA)} + IjaiImageParser.MAP_OUTSIDE: palette.get_color(SupportedColor.MAP_OUTSIDE), + IjaiImageParser.MAP_WALL: palette.get_color(SupportedColor.MAP_WALL_V2), + IjaiImageParser.MAP_SCAN: palette.get_color(SupportedColor.SCAN), + IjaiImageParser.MAP_NEW_DISCOVERED_AREA: palette.get_color(SupportedColor.NEW_DISCOVERED_AREA)} + def parse( self, map_data: bytes, width: int, height: int - ) -> tuple[ImageType | None, dict[int, tuple[int, int, int, int]], set[int], ImageType | None]: - rooms = {} + ) -> tuple[ImageType | None, dict[int, tuple[int, int, int, int]], set[int]]: + rooms: dict[int, tuple[int, int, int, int]] = {} cleaned_areas = set() - _LOGGER.debug(f"ijai parser: image_config = {self._image_config}") + _LOGGER.debug("ijai parser: image_config = %s", self._image_config) scale = self._image_config.scale trim_left = int(self._image_config.trim.left * width / 100) trim_right = int(self._image_config.trim.right * width / 100) @@ -50,60 +50,56 @@ def parse( trimmed_height = height - trim_top - trim_bottom trimmed_width = width - trim_left - trim_right if trimmed_width == 0 or trimmed_height == 0: - return None, {}, set(), None + return None, {}, set() image = Image.new('RGBA', (trimmed_width, trimmed_height)) pixels = image.load() - cleaned_areas_layer = None - cleaned_areas_pixels = None - draw_cleaned_area = Drawable.CLEANED_AREA in self._drawables - if draw_cleaned_area: - cleaned_areas_layer = Image.new("RGBA", (trimmed_width, trimmed_height)) - cleaned_areas_pixels = cleaned_areas_layer.load() - _LOGGER.debug(f"trim_bottom = {trim_bottom}, trim_top = {trim_top}, trim_left = {trim_left}, trim_right = {trim_right}") + _LOGGER.debug("trim_bottom = %s, trim_top = %s, trim_left = %s, trim_right = %s", + trim_bottom, trim_top, trim_left, trim_right) unknown_pixels = set() for img_y in range(trimmed_height): y = trimmed_height - 1 - img_y for img_x in range(trimmed_width): x = img_x - pixel_type = map_data[(img_y + trim_bottom)*width + x + trim_left] - if pixel_type in self.color_map.keys(): + pixel_type = map_data[(img_y + trim_bottom) + * width + x + trim_left] + if pixel_type in self.color_map: pixels[x, y] = self.color_map[pixel_type] elif IjaiImageParser.MAP_ROOM_MIN <= pixel_type <= IjaiImageParser.MAP_SELECTED_ROOM_MAX: room_x = img_x + trim_left room_y = img_y + trim_bottom room_number = pixel_type if pixel_type >= IjaiImageParser.MAP_SELECTED_ROOM_MIN: - room_number = pixel_type - IjaiImageParser.MAP_SELECTED_ROOM_MIN + IjaiImageParser.MAP_ROOM_MIN + room_number = pixel_type - IjaiImageParser.MAP_SELECTED_ROOM_MIN + \ + IjaiImageParser.MAP_ROOM_MIN cleaned_areas.add(room_number) - if draw_cleaned_area: - cleaned_areas_pixels[x, y] = IjaiImageParser.get_color(SupportedColor.CLEANED_AREA) rooms[room_number] = (room_x, room_y, room_x, room_y) \ - if room_number not in rooms \ - else (min(rooms[room_number][0], room_x), - min(rooms[room_number][1], room_y), - max(rooms[room_number][2], room_x), - max(rooms[room_number][3], room_y)) + if room_number not in rooms \ + else (min(rooms[room_number][0], room_x), + min(rooms[room_number][1], room_y), + max(rooms[room_number][2], room_x), + max(rooms[room_number][3], room_y)) pixels[x, y] = self._palette.get_room_color(room_number) else: - pixels[x, y] = IjaiImageParser.get_color(SupportedColor.UNKNOWN) + pixels[x, y] = self._palette.get_color( + SupportedColor.UNKNOWN) unknown_pixels.add(pixel_type) - _LOGGER.debug(f"unknown pixel [{x},{y}] = {pixel_type}") + _LOGGER.debug( + "unknown pixel [%s,%s] = %s", x, y, pixel_type) if self._image_config.scale != 1 and trimmed_width != 0 and trimmed_height != 0: - image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Resampling.NEAREST) - if draw_cleaned_area: - cleaned_areas_layer = cleaned_areas_layer.resize( - (int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) + image = image.resize( + (int(trimmed_width * scale), int(trimmed_height * scale)), resample=Resampling.NEAREST) if len(unknown_pixels) > 0: _LOGGER.warning('unknown pixel_types: %s', unknown_pixels) - return image, rooms, cleaned_areas, cleaned_areas_layer + return image, rooms, cleaned_areas @staticmethod def get_current_vacuum_room(map_data: bytes, vacuum_position_on_image: Point, image_width: int) -> int | None: - _LOGGER.debug(f"pos on image: {vacuum_position_on_image}") - pixel_type = map_data[int(vacuum_position_on_image.y) * image_width + int(vacuum_position_on_image.x)] + _LOGGER.debug("pos on image: %s", vacuum_position_on_image) + pixel_type = map_data[int(vacuum_position_on_image.y) + * image_width + int(vacuum_position_on_image.x)] if IjaiImageParser.MAP_ROOM_MIN <= pixel_type <= IjaiImageParser.MAP_ROOM_MAX: return pixel_type if IjaiImageParser.MAP_SELECTED_ROOM_MIN <= pixel_type <= IjaiImageParser.MAP_SELECTED_ROOM_MAX: return pixel_type - IjaiImageParser.MAP_SELECTED_ROOM_MIN + IjaiImageParser.MAP_ROOM_MIN - return None \ No newline at end of file + return None diff --git a/src/vacuum_map_parser_ijai/map_data_parser.py b/src/vacuum_map_parser_ijai/map_data_parser.py index 1707918..a9690fe 100644 --- a/src/vacuum_map_parser_ijai/map_data_parser.py +++ b/src/vacuum_map_parser_ijai/map_data_parser.py @@ -12,12 +12,13 @@ from vacuum_map_parser_base.config.text import Text from vacuum_map_parser_base.map_data import Area, ImageData, MapData, Path, Point, Room, Wall, Zone from vacuum_map_parser_base.map_data_parser import MapDataParser -from .ijai_coordinate_transforms import Transformer -import vacuum_map_parser_ijai.RobotMap_pb2 as RobotMap + import vacuum_map_parser_ijai.beautify_min as Beautify +import vacuum_map_parser_ijai.RobotMap_pb2 as RobotMap -from .image_parser import IjaiImageParser from .aes_decryptor import decrypt +from .ijai_coordinate_transforms import Transformer +from .image_parser import IjaiImageParser _LOGGER = logging.getLogger(__name__) @@ -30,6 +31,7 @@ class IjaiMapDataParser(MapDataParser): VIRTUALWALL_TYPE_NO_MOP = 6 VIRTUALWALL_TYPE_NO_GO = 3 + # pylint: disable=E1101 robot_map = RobotMap.RobotMap() def __init__( @@ -46,17 +48,18 @@ def __init__( def unpack_map(self, raw_encoded: bytes, *args: Any, **kwargs: Any) -> bytes: return zlib.decompress( decrypt( - raw_encoded, - kwargs['wifi_sn'], - kwargs['owner_id'], - kwargs['device_id'], - kwargs['model'], + raw_encoded, + kwargs['wifi_sn'], + kwargs['owner_id'], + kwargs['device_id'], + kwargs['model'], kwargs['device_mac'])) def parse(self, raw: bytes, *args: Any, **kwargs: Any) -> MapData: map_data = MapData(0, 1) - IjaiMapDataParser.robot_map.ParseFromString(raw) + self.robot_map.ParseFromString(raw) + # pylint: disable=W0201 self.coord_transformer = Transformer(self.robot_map) if hasattr(self.robot_map, "mapData"): @@ -67,28 +70,48 @@ def parse(self, raw: bytes, *args: Any, **kwargs: Any) -> MapData: if hasattr(self.robot_map, "chargeStation"): pos_info = self.robot_map.chargeStation - map_data.charger = Point(x = pos_info.x, y = pos_info.y, a = pos_info.phi * 180 / math.pi) + map_data.charger = Point( + x=pos_info.x, y=pos_info.y, a=pos_info.phi * 180 / math.pi) _LOGGER.debug("pos: %s", map_data.charger) if hasattr(self.robot_map, "currentPose"): pos_info = self.robot_map.currentPose - map_data.vacuum_position = Point(x = pos_info.x, y = pos_info.y, a = pos_info.phi * 180 / math.pi) + map_data.vacuum_position = Point( + x=pos_info.x, y=pos_info.y, a=pos_info.phi * 180 / math.pi) _LOGGER.debug("pos: %s", map_data.vacuum_position) - if hasattr(self.robot_map, "mapInfo") and hasattr(self.robot_map, "roomDataInfo") and map_data.rooms is not None: + if ( + hasattr(self.robot_map, "mapInfo") + and hasattr(self.robot_map, "roomDataInfo") + and map_data.rooms is not None): IjaiMapDataParser._parse_rooms(map_data.rooms) if hasattr(self.robot_map, "virtualWalls"): - map_data.walls, map_data.no_go_areas, map_data.no_mopping_areas = IjaiMapDataParser._parse_restricted_areas() + (map_data.walls, + map_data.no_go_areas, + map_data.no_mopping_areas) = IjaiMapDataParser._parse_restricted_areas() + + if hasattr(self.robot_map, "areasInfo"): + map_data.zones = IjaiMapDataParser._parse_cleaning_zones() + + if hasattr(self.robot_map, "navigationPoints"): + map_data.goto = IjaiMapDataParser._parse_goto_point() if map_data.rooms is not None: - _LOGGER.debug("rooms: %s", [str(room) for number, room in map_data.rooms.items()]) + _LOGGER.debug("rooms: %s", [str(room) + for number, room in map_data.rooms.items()]) if map_data.rooms is not None and len(map_data.rooms) > 0 and map_data.vacuum_position is not None: - vacuum_position_on_image = self.coord_transformer.map_to_image(map_data.vacuum_position) - map_data.vacuum_room = IjaiImageParser.get_current_vacuum_room(self.robot_map.mapData.mapData, vacuum_position_on_image, IjaiMapDataParser.robot_map.mapHead.sizeX) + vacuum_position_on_image = self.coord_transformer.map_to_image( + map_data.vacuum_position) + map_data.vacuum_room = IjaiImageParser.get_current_vacuum_room( + self.robot_map.mapData.mapData, vacuum_position_on_image, IjaiMapDataParser.robot_map.mapHead.sizeX) if map_data.vacuum_room is not None: map_data.vacuum_room_name = map_data.rooms[map_data.vacuum_room].name _LOGGER.debug("current vacuum room: %s", map_data.vacuum_room) + + if map_data.image is not None and not map_data.image.is_empty: + self._image_generator.draw_map(map_data) + return map_data def _parse_image(self) -> tuple[ImageData, dict[int, Room], set[int]]: @@ -100,7 +123,11 @@ def _parse_image(self) -> tuple[ImageData, dict[int, Room], set[int]]: _LOGGER.debug("width: %d, height: %d", image_width, image_height) # Non painted map tranformation - if (len(set(self.robot_map.mapData.mapData).symmetric_difference([0, 128, 127])) == 0 and len(self.robot_map.roomChain) > 0 and self.robot_map.mapType == 0): + if ( + len(set(self.robot_map.mapData.mapData).symmetric_difference( + [0, 128, 127])) == 0 + and len(self.robot_map.roomChain) > 0 + and self.robot_map.mapType == 0): buautify_obj = Beautify.BeautifyMap(self.robot_map.mapHead) buautify_obj.setMap(self.robot_map.mapData) buautify_obj.transform() @@ -109,10 +136,12 @@ def _parse_image(self) -> tuple[ImageData, dict[int, Room], set[int]]: buautify_obj.normalizeMap() self.robot_map.mapData.mapData = bytes(buautify_obj.getMap()) - image, rooms_raw, cleaned_areas, cleaned_areas_layer = self._image_parser.parse(self.robot_map.mapData.mapData, image_width, image_height) + image, rooms_raw, cleaned_areas = self._image_parser.parse( + self.robot_map.mapData.mapData, image_width, image_height) if image is None: image = self._image_generator.create_empty_map_image() - _LOGGER.debug("img: number of rooms: %d, numbers: %s", len(rooms_raw), rooms_raw.keys()) + _LOGGER.debug("img: number of rooms: %d, numbers: %s", + len(rooms_raw), rooms_raw.keys()) rooms = {} for number, room in rooms_raw.items(): rooms[number] = Room( @@ -132,7 +161,6 @@ def _parse_image(self) -> tuple[ImageData, dict[int, Room], set[int]]: self._image_config, image, self.coord_transformer.map_to_image, - additional_layers={Drawable.CLEANED_AREA: cleaned_areas_layer}, ), rooms, cleaned_areas, @@ -143,7 +171,7 @@ def _parse_history() -> Path: path_points = [] for pt in IjaiMapDataParser.robot_map.historyPose.points: # 0: taxi, 1: working - path_points.append(Point(x = pt.x, y = pt.y)) + path_points.append(Point(x=pt.x, y=pt.y)) return Path(len(path_points), 1, 0, [path_points]) @staticmethod @@ -165,11 +193,34 @@ def _parse_restricted_areas() -> tuple[list[Wall], list[Area], list[Area]]: Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) return walls, no_go_areas, no_mop_areas + @staticmethod + def _parse_cleaning_zones() -> list[Zone]: + zones = [] + for areaInfo in IjaiMapDataParser.robot_map.areasInfo: + zones.append(Zone(areaInfo.points[0].x, + areaInfo.points[0].y, + areaInfo.points[2].x, + areaInfo.points[2].y)) + return zones + + @staticmethod + def _parse_goto_point() -> Point | None: + for navigationPoint in IjaiMapDataParser.robot_map.navigationPoints: + if ( + navigationPoint.status == 0 + and navigationPoint.pointType == 1 + and navigationPoint.x != 1100.0 # outside map + and navigationPoint.y != 1100.0): + return Point(navigationPoint.x, + navigationPoint.y, + navigationPoint.phi * 180 / math.pi) + return None + @staticmethod def _parse_rooms(map_data_rooms: dict[int, Room]) -> None: map_id = IjaiMapDataParser.robot_map.mapHead.mapHeadId for map_data in IjaiMapDataParser.robot_map.mapInfo: - if (map_data.mapHeadId == map_id): + if map_data.mapHeadId == map_id: current_map = map_data break map_name = current_map.mapName @@ -181,4 +232,5 @@ def _parse_rooms(map_data_rooms: dict[int, Room]) -> None: map_data_rooms[r.roomId].pos_y = r.roomNamePost.y room_text_pos = Point(r.roomNamePost.x, r.roomNamePost.y) - _LOGGER.debug("room#%d: %s %s", r.roomId, r.roomName, room_text_pos) \ No newline at end of file + _LOGGER.debug("room#%d: %s %s", r.roomId, + r.roomName, room_text_pos) diff --git a/src/vacuum_map_parser_ijai/status_mapping.py b/src/vacuum_map_parser_ijai/status_mapping.py new file mode 100644 index 0000000..ac72ff1 --- /dev/null +++ b/src/vacuum_map_parser_ijai/status_mapping.py @@ -0,0 +1,103 @@ +"""Module that provides mapping for status property""" +from dataclasses import dataclass + + +@dataclass +class IjaiVacuumStatusMapping: + """Dataclass containing mapping for status property""" + # vacuum service id + siid: int = 2 + + # status property id in vacuum service + piid: int = 1 + + # idle_at is status property values from https://home.miot-spec.com/spec/model + # 0,1,2,4,8,10 are common idle states for most ijai/xiaomi miot robot-vacuums + idle_at: tuple[int, ...] = (0, 1, 2, 4, 8, 10) + + +_NON_STANDARD_STATUS_PROP = [ + ( + [ + "xiaomi.vacuum.c107", + "xiaomi.vacuum.d101", + "xiaomi.vacuum.d102gl", + "xiaomi.vacuum.d102ev", + "xiaomi.vacuum.d109gl", + ], + IjaiVacuumStatusMapping(idle_at=(1, 2, 5, 9, 11, 12, 13, 14, 15, 18)) + ), + + ( + [ + "xiaomi.vacuum.c108" + ], + IjaiVacuumStatusMapping(idle_at=(1, 3, 4, 5, 7)) + ), + + ( + [ + "xiaomi.vacuum.b108gl" + ], + IjaiVacuumStatusMapping(idle_at=(1, 2, 5, 8, 10)) + ), + + ( + [ + "xiaomi.vacuum.c102gl", + "xiaomi.vacuum.c102cn", + "xiaomi.vacuum.d103cn", + "xiaomi.vacuum.d110ch", + ], + IjaiVacuumStatusMapping(piid=2, + idle_at=(2, 3, 4, 6, 8, 9, 13, 19, 21, 22, 30)) + ) +] + +_IsEncryptKeyTypeHex_Models = [ + "v1", + "v2", + "v3", + "v13", + "v15", + "v17", + "v18", + "v19", + "sz02", # unknown model + "c101", + "c103", + "c104", + "c101eu", + "b106eu", + "b106tr", +] + +_K3_Models = [ + "v13", + "v14", + "v15", + "v17", + "v18", + "v19", + "v21", + "sz02", # unknown model + "c101", + "c103", + "c104", + "b106eu", + "b106tr", +] + + +def is_K3_model(model: str) -> bool: + return model.split(".")[-1].lower() in _K3_Models + + +def is_EncryptKeyTypeHex_model(model: str) -> bool: + return model.split(".")[-1].lower() in _IsEncryptKeyTypeHex_Models + + +def get_status_mapping(model: str) -> IjaiVacuumStatusMapping: + return next((mapping for models, + mapping in _NON_STANDARD_STATUS_PROP if model in models), + IjaiVacuumStatusMapping())