From 2755daed498d7f2b4ef414a8829a535d401cd268 Mon Sep 17 00:00:00 2001 From: Marcel Kwaschny <9392778+marcelkwaschny@users.noreply.github.com> Date: Tue, 5 Mar 2024 20:22:33 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Intial=20commit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ✨ Added library for tmf8805 ✅ Added tests for the library 🤡 Added mocked machine and utime modules for tests 📄 Added MIT license file 👷 Added github ci with ruff, black and pytest --- .github/workflows/pipeline.yml | 40 +++ .gitignore | 9 + .pre-commit-config.yaml | 14 + LICENSE | 21 ++ README.md | 35 +++ dev.requirements.txt | 6 + main.py | 18 ++ tests/__init__.py | 0 tests/conftest.py | 30 ++ tests/machine_mock.py | 133 +++++++++ tests/test_tmf8805.py | 120 ++++++++ tests/utime_mock.py | 23 ++ tmf8805/__init__.py | 0 tmf8805/tmf8805.py | 496 +++++++++++++++++++++++++++++++++ 14 files changed, 945 insertions(+) create mode 100644 .github/workflows/pipeline.yml create mode 100644 .gitignore create mode 100644 .pre-commit-config.yaml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 dev.requirements.txt create mode 100644 main.py create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/machine_mock.py create mode 100644 tests/test_tmf8805.py create mode 100644 tests/utime_mock.py create mode 100644 tmf8805/__init__.py create mode 100644 tmf8805/tmf8805.py diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml new file mode 100644 index 0000000..97c48a7 --- /dev/null +++ b/.github/workflows/pipeline.yml @@ -0,0 +1,40 @@ +name: Quality Assurance + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.11 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install ruff black pytest pytest-cov + + - name: Run Ruff + run: | + ruff check + + - name: Run Black + run: | + black --check . + + - name: Run Pytest + run: | + pytest --cov=tmf8805 --cov-fail-under=80 tests diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..de11347 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.micropico +.vscode/ +.venv/ +venv/ +.ruff_cache/ +__pycache__/ +.pytest_cache/ +.coverage +coverage.xml \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..045e1b5 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,14 @@ +repos: + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.1.3 + hooks: + - id: ruff + - repo: https://github.com/psf/black + rev: 22.10.0 + hooks: + - id: black + - repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort + args: ["--profile", "black", "--filter-files"] \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0a56ce0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Marcel Kwaschny + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..22b8075 --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# RP2040 with TMF8805 using MicroPython + +This code origins from the course [Embedded Systems](https://www.fh-muenster.de/eti/personen/professoren/gloesekoetter/embedded-systems.php) at [FH Münster – University of Applied Sciences](https://www.fh-muenster.de/). There a RP2040 with a TMF8805 programmed with MicroPython was used to measure the water level in a pipe with a swimmer. + +## Development + +To setup your local development environment install [Visual Studio Code](https://code.visualstudio.com/) and the [MicroPico](https://marketplace.visualstudio.com/items?itemName=paulober.pico-w-go) extension. After that connect the Raspberry Pi Pico using a USB cabel and run your code using MicroPico. To enforce a global style please install the [development requirements](./dev.requirements.txt) and after that the pre-commit hook using `pre-commit install`. + +## Setup & upload project to Pico + +- Download the latest firmware from https://micropython.org/download/RPI_PICO/ and flash the RP2040 via UF2 bootloader +- Follow the steps above to setup a local development environment +- Open the command pallete (```CTRL + SHIFT + P```) +- Search for ```MicroPico: Configure project``` and execute it +- Search for ```MicroPico: Upload project to Pico``` and execute it +- Switch to the ```main.py``` and run it! + +## TMF8805 + +To create an instance of the `TMF8805` class you can use the following code snippet: + +```python +tmf8805: TMF8805 = TMF8805( + enable=7, + sda=4, + scl=5, + i2c_frequency=100000, + debug=True +) +tmf8805.initialize() +distance: float = tmf8805.get_measurement() +print(f"Measured distance: {distance}mm") +``` + +or copy the contents from `main.py` diff --git a/dev.requirements.txt b/dev.requirements.txt new file mode 100644 index 0000000..79d0fb6 --- /dev/null +++ b/dev.requirements.txt @@ -0,0 +1,6 @@ +pre-commit +ruff +isort +black +pytest +pytest-cov \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..c67cb34 --- /dev/null +++ b/main.py @@ -0,0 +1,18 @@ +import machine + +from tmf8805.tmf8805 import TMF8805 + +if __name__ == "__main__": + tmf8805: TMF8805 = TMF8805( + enable=7, + sda=4, + scl=5, + i2c_frequency=100000, + debug=True, + wdt=machine.WDT(timeout=8000), + ) + + while True: + tmf8805.initialize() + distance: int = tmf8805.get_measurement() + print(f"Distance: {distance}mm") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..b02a06a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,30 @@ +"""This module contains general test configuration""" + +import sys + +import pytest + +from tests import machine_mock, utime_mock + +# Mocks the micropython machine module +sys.modules["machine"] = machine_mock + +# Mocks the utime module +sys.modules["utime"] = utime_mock + +from tmf8805.tmf8805 import TMF8805 # noqa: E402 + + +@pytest.fixture +def tmf8805_instance() -> TMF8805: + """Fixture for a tmf8805 instance + + Returns: + TMF8805: The object of a tmf8805 instance + """ + + tmf8805: TMF8805 = TMF8805( + enable=7, sda=4, scl=5, i2c_frequency=100000, debug=True, wdt=machine_mock.WDT() + ) + + return tmf8805 diff --git a/tests/machine_mock.py b/tests/machine_mock.py new file mode 100644 index 0000000..3734325 --- /dev/null +++ b/tests/machine_mock.py @@ -0,0 +1,133 @@ +"""This module mocks the machine library from micropython""" + +import sys +from enum import Enum +from typing import List + + +class TMF8805MockConfiguration(Enum): + """Holds the different mock configurations""" + + CORRECT_MEASUREMENT = 0 + PERFORM_CALIBRATION = 1 + NOT_CONNECTED = 2 + CPU_READY_TIMEOUT = 3 + APPLICATION_READY_TIMEOUT = 4 + UNKNOWN_STATUS = 5 + DATA_AVAILABLE_TIMEOUT = 6 + + +TMF8805_MOCK_CONFIGURATION: TMF8805MockConfiguration = ( + TMF8805MockConfiguration.CORRECT_MEASUREMENT +) + + +class I2C: + def __init__(self, *args, **kwargs) -> None: + """Mocks the constructor of the I2C class""" + + pass + + def scan(self) -> List: + if TMF8805_MOCK_CONFIGURATION == TMF8805MockConfiguration.NOT_CONNECTED: + return [] + + return [65] + + def readfrom_mem(self, addr, memaddr, nbytes, *, addrsize=8) -> bytes: + if memaddr == 0: # REGISTER_APPID + if ( + TMF8805_MOCK_CONFIGURATION + == TMF8805MockConfiguration.APPLICATION_READY_TIMEOUT + ): + return int(0).to_bytes(2, sys.byteorder) + + return int(192).to_bytes(2, sys.byteorder) + + if memaddr == 14: # REGISTER_ENABLE_REG + return int(1).to_bytes(2, sys.byteorder) + + if memaddr == 29: # REGISTER_STATUS + if TMF8805_MOCK_CONFIGURATION == TMF8805MockConfiguration.UNKNOWN_STATUS: + return int(9999).to_bytes( + 2, sys.byteorder + ) # Status that is not implemented + + return int(39).to_bytes(2, sys.byteorder) # MISSING_FACTORY_CALIBRATION + + if memaddr == 30: # REGISTER_REGISTER_CONTENTS + if ( + TMF8805_MOCK_CONFIGURATION + == TMF8805MockConfiguration.PERFORM_CALIBRATION + ): + return int(10).to_bytes(2, sys.byteorder) + + if ( + TMF8805_MOCK_CONFIGURATION + == TMF8805MockConfiguration.DATA_AVAILABLE_TIMEOUT + ): + return int(0).to_bytes(2, sys.byteorder) + + return int(85).to_bytes(2, sys.byteorder) + + if memaddr == 32: # REGISTER_FACTORY_CALIB_0 + return ("A" * nbytes).encode() + + if memaddr == 33: # REGISTER_RESULT_INFO + return int(63).to_bytes(2, sys.byteorder) # Reliability = 63, Status = 0 + + if memaddr == 34: # REGISTER_DISTANCE_PEAK_0 + return int(226).to_bytes(2, sys.byteorder) + + if memaddr == 35: # REGISTER_DISTANCE_PEAK_1 + return int(4).to_bytes(2, sys.byteorder) + + if memaddr == 224: # REGISTER_ENABLE_REG + if TMF8805_MOCK_CONFIGURATION == TMF8805MockConfiguration.CPU_READY_TIMEOUT: + return int(0).to_bytes(2, sys.byteorder) + + return int(64).to_bytes(2, sys.byteorder) + + if memaddr == 225: # REGISTER_INT_STATUS + return int(1).to_bytes(2, sys.byteorder) + + pass + + def writeto_mem(self, addr, memaddr, buf, *, addrsize=8) -> None: + + pass + + +class Pin: + """Mocks the micropython Pin class""" + + OUT: int = 1 + + def __init__(self, *args, **kwargs) -> None: + """Mocks the constructor of the Pin class""" + + pass + + def high(self) -> None: + """Mocks the high function of the Pin class""" + + pass + + def low(self) -> None: + """Mocks the low function of the Pin class""" + + pass + + +class WDT: + """Mocks the micropython watchdog class""" + + def __init__(self, *args, **kwargs) -> None: + """Mocks the constructor of the watchdog class""" + + pass + + def feed(self, *args, **kwargs) -> None: + """Mocks the feed function of the watchdog""" + + pass diff --git a/tests/test_tmf8805.py b/tests/test_tmf8805.py new file mode 100644 index 0000000..0ea79db --- /dev/null +++ b/tests/test_tmf8805.py @@ -0,0 +1,120 @@ +"""Module to test the implementation for the tmf8805 sensor""" + +from tests import machine_mock +from tmf8805.tmf8805 import TMF8805 + + +def test_initialization_of_tmf8805(tmf8805_instance: TMF8805): + """Test if the initialization of the instance is successful""" + + assert tmf8805_instance is not None + + +def test_connected_check(tmf8805_instance: TMF8805): + """Test if the is_connected method is working as expected""" + + assert tmf8805_instance.is_connected() + + tmf8805_instance.address = "0x42" + assert not tmf8805_instance.is_connected() + + +def test_execution_of_measurement(tmf8805_instance: TMF8805): + """Test if the performing of a measurement is working""" + + assert tmf8805_instance.initialize() + distance: int = tmf8805_instance.get_measurement() + assert distance == 1250 + + +def test_performing_of_calibration(tmf8805_instance: TMF8805): + """Test if the methods for get, set and perform a calibration are working""" + + machine_mock.TMF8805_MOCK_CONFIGURATION = ( + machine_mock.TMF8805MockConfiguration.PERFORM_CALIBRATION + ) + + current_calibration = tmf8805_instance.get_current_calibration() + assert current_calibration.decode() == "AAAAAAAAAAAAAA" + + new_calibration = tmf8805_instance.perform_calibration() + assert new_calibration.decode() == "AAAAAAAAAAAAAA" + tmf8805_instance.set_calibration(new_calibration) + + +def test_get_status(tmf8805_instance: TMF8805): + """Test if the get status method returns a status""" + + error, description = tmf8805_instance.get_status() + assert error == "ErrMissingFactCal" + assert ( + description + == "There is no (or no valid) factory calibration on the device. Using default values instead." + ) + + +def test_not_implemented_status(tmf8805_instance: TMF8805): + """Test return of the get status method when a status is not implemented""" + + machine_mock.TMF8805_MOCK_CONFIGURATION = ( + machine_mock.TMF8805MockConfiguration.UNKNOWN_STATUS + ) + + error, description = tmf8805_instance.get_status() + assert error == "9999" + assert description == "N/A" + + +def test_clearing_of_the_interrupt_flag(tmf8805_instance: TMF8805): + """Test if the clearing of the interrupt flag returns no error""" + + tmf8805_instance.clear_interrupt_flag() + + +def test_power_up_and_power_down(tmf8805_instance: TMF8805): + """Test if the power up/down methods don't raise an error""" + + tmf8805_instance.power_down() + tmf8805_instance.power_up() + + +def test_sensor_not_connected(tmf8805_instance: TMF8805): + """Test return from initialization if sensor is not connected""" + + machine_mock.TMF8805_MOCK_CONFIGURATION = ( + machine_mock.TMF8805MockConfiguration.NOT_CONNECTED + ) + + assert not tmf8805_instance.initialize() + + +def test_cpu_ready_timed_out(tmf8805_instance: TMF8805): + """Test return from initialization if wait for the cpu to be ready timed out""" + + machine_mock.TMF8805_MOCK_CONFIGURATION = ( + machine_mock.TMF8805MockConfiguration.CPU_READY_TIMEOUT + ) + + assert not tmf8805_instance.initialize() + + +def test_load_application_timed_out(tmf8805_instance: TMF8805): + """Test if the timeout of the load application is handled correctly""" + + machine_mock.TMF8805_MOCK_CONFIGURATION = ( + machine_mock.TMF8805MockConfiguration.APPLICATION_READY_TIMEOUT + ) + + assert tmf8805_instance.initialize() + assert not tmf8805_instance.get_measurement() + + +def test_data_available_timeout(tmf8805_instance: TMF8805): + """Test if timeout when waiting for data to be available is handled correctly""" + + machine_mock.TMF8805_MOCK_CONFIGURATION = ( + machine_mock.TMF8805MockConfiguration.DATA_AVAILABLE_TIMEOUT + ) + + assert tmf8805_instance.initialize() + assert not tmf8805_instance.get_measurement() diff --git a/tests/utime_mock.py b/tests/utime_mock.py new file mode 100644 index 0000000..8a8c922 --- /dev/null +++ b/tests/utime_mock.py @@ -0,0 +1,23 @@ +"""Mocked module for the utime library of micropython""" + +import time as python_time + + +def sleep_ms(ms: int) -> None: + """Mocked sleep_ms method + + Args: + ms (int): Sleep time in milliseconds + """ + + pass + + +def time() -> int: + """Returns the current epoch timestamp + + Returns: + int: Seconds since epoch + """ + + return int(python_time.time()) diff --git a/tmf8805/__init__.py b/tmf8805/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tmf8805/tmf8805.py b/tmf8805/tmf8805.py new file mode 100644 index 0000000..34ff2db --- /dev/null +++ b/tmf8805/tmf8805.py @@ -0,0 +1,496 @@ +"""Library for the tmf8805 sensor""" + +import sys + +import utime +from machine import I2C, WDT, Pin + +# Constants definitions +DEFAULT_I2C_ADDR = "0x41" +CPU_READY_TIMEOUT = 200 +APPLICATION_READY_TIMEOUT = 500 +DATA_AVAILABLE_TIMEOUT = 500 +CHIP_ID_NUMBER = "0x07" +APPLICATION = "0xc0" +BOOTLOADER = "0x80" +COMMAND_CALIBRATION = "0x0b" +COMMAND_FACTORY_CALIBRATION = "0x0a" +COMMAND_MEASURE = "0x02" +COMMAND_RESULT = "0x55" +COMMAND_SERIAL = "0x47" +COMMAND_STOP = "0xff" +INTERRUPT_MASK = "0x01" +CONTENT_CALIBRATION = "0x0a" + +# Values below were taken from AN000597, pp 22 +ALGO_STATE = [ + "0xB1", + "0xA9", + "0x02", + "0x00", + "0x00", + "0x00", + "0x00", + "0x00", + "0x00", + "0x00", + "0x00", +] + +# Error constants +ERROR_NONE = "0x00" +ERROR_I2C_COMM_ERROR = "0x01" +ERROR_CPU_RESET_TIMEOUT = "0x02" +ERROR_WRONG_CHIP_ID = "0x03" +ERROR_CPU_LOAD_APPLICATION_ERROR = "0x04" +ERROR_FACTORY_CALIBRATION_ERROR = "0x05" + +# GPIO mode +MODE_INPUT = "0x00" +MODE_LOW_INPUT = "0x01" +MODE_HIGH_INPUT = "0x02" +MODE_VCSEL = "0x03" +MODE_LOW_OUTPUT = "0x04" +MODE_HIGH_OUTPUT = "0x05" + +# COMMAND constants +CMD_DATA_7 = "0x00" +CMD_DATA_6 = "0x01" +CMD_DATA_5 = "0x02" +CMD_DATA_4 = "0x03" +CMD_DATA_3 = "0x04" +CMD_DATA_2 = "0x05" +CMD_DATA_1 = "0x06" +CMD_DATA_0 = "0x07" + +# CPU status +CPU_RESET = "0x07" +CPU_READY = "0x06" + +# Registers definitions +REGISTER_APPID = "0x00" +REGISTER_APPREQID = "0x02" +REGISTER_APPREV_MAJOR = "0x01" +REGISTER_APPREV_MINOR = "0x12" +REGISTER_APPREV_PATCH = "0x13" +REGISTER_CMD_DATA9 = "0x06" +REGISTER_CMD_DATA8 = "0x07" +REGISTER_CMD_DATA7 = "0x08" +REGISTER_CMD_DATA6 = "0x09" +REGISTER_CMD_DATA5 = "0x0A" +REGISTER_CMD_DATA4 = "0x0B" +REGISTER_CMD_DATA3 = "0x0C" +REGISTER_CMD_DATA2 = "0x0D" +REGISTER_CMD_DATA1 = "0x0E" +REGISTER_CMD_DATA0 = "0x0F" +REGISTER_COMMAND = "0x10" +REGISTER_PREVIOUS = "0x11" +REGISTER_STATUS = "0x1D" +REGISTER_REGISTER_CONTENTS = "0x1E" +REGISTER_TID = "0x1F" +REGISTER_RESULT_NUMBER = "0x20" +REGISTER_RESULT_INFO = "0x21" +REGISTER_DISTANCE_PEAK_0 = "0x22" +REGISTER_DISTANCE_PEAK_1 = "0x23" +REGISTER_SYS_CLOCK_0 = "0x24" +REGISTER_SYS_CLOCK_1 = "0x25" +REGISTER_SYS_CLOCK_2 = "0x26" +REGISTER_SYS_CLOCK_3 = "0x27" +REGISTER_STATE_DATA_0 = "0x28" +REGISTER_STATE_DATA_1 = "0x29" +REGISTER_STATE_DATA_2 = "0x2A" +REGISTER_STATE_DATA_3 = "0x2B" +REGISTER_STATE_DATA_4 = "0x2C" +REGISTER_STATE_DATA_5 = "0x2D" +REGISTER_STATE_DATA_6 = "0x2E" +REGISTER_STATE_DATA_7 = "0x2F" +REGISTER_STATE_DATA_8_XTALK_MSB = "0x30" +REGISTER_STATE_DATA_9_XTALK_LSB = "0x31" +REGISTER_STATE_DATA_10_TJ = "0x32" +REGISTER_REFERENCE_HITS_0 = "0x33" +REGISTER_REFERENCE_HITS_1 = "0x34" +REGISTER_REFERENCE_HITS_2 = "0x35" +REGISTER_REFERENCE_HITS_3 = "0x36" +REGISTER_OBJECT_HITS_0 = "0x37" +REGISTER_OBJECT_HITS_1 = "0x38" +REGISTER_OBJECT_HITS_2 = "0x39" +REGISTER_OBJECT_HITS_3 = "0x3A" +REGISTER_FACTORY_CALIB_0 = "0x20" +REGISTER_STATE_DATA_WR_0 = "0x2E" +REGISTER_ENABLE_REG = "0xE0" +REGISTER_INT_STATUS = "0xE1" +REGISTER_INT_ENAB = "0xE2" +REGISTER_ID = "0xE3" +REGISTER_REVID = "0xE4" + +# Calibration data +CALIBRATION_DATA_LENGTH = 14 + +# Status constants +MISSING_FACTORY_CALIBRATION = 39 + + +class TMF8805: + """Implementation for the tmf8805 sensor""" + + enable: int + sda: int + scl: int + address: str + debug: bool + i2c: I2C + + def __init__( + self, + enable: int, + sda: int, + scl: int, + wdt: WDT, + address: str = "0x41", + i2c_id: int = 0, + i2c_frequency: int = 100000, + debug: bool = False, + ): + self.enable = enable + self.sda = sda + self.scl = scl + self.address = address + self.debug = debug + self.wdt = wdt + + self.power_up() + self.i2c = I2C(i2c_id, sda=Pin(sda), scl=Pin(scl), freq=i2c_frequency) + + def hex_to_dec(self, hex_value: str) -> int: + """Helper method to convert a hex string to a decimal value + + Args: + hex_value (str): Hex string that should be converted + + Returns: + int: Converted decimal as an integer + """ + + return int(hex_value, 0) + + def is_connected(self) -> bool: + """Scans the I2C bus and checks if the given address is found + + Returns: + bool: True if the address could be found, False otherwise + """ + + devices = self.i2c.scan() + + return self.hex_to_dec(self.address) in devices + + def read_single_byte(self, register: str) -> int: + """Reads a single bytes from a given register + + Args: + register (str): Register that should be read + + Returns: + int: Read value as a decimal + """ + + value = self.i2c.readfrom_mem( + self.hex_to_dec(self.address), self.hex_to_dec(register), 1 + ) + + return int.from_bytes(value, sys.byteorder) + + def read_bytes(self, register: str, byte_count: int) -> bytes: + """Reads the given count of bytes from a given register + + Args: + register (str): Register that should be read + byte_count (int): Count of bytes + + Returns: + bytes: Read content as bytes + """ + + value: bytes = self.i2c.readfrom_mem( + self.hex_to_dec(self.address), self.hex_to_dec(register), byte_count + ) + + return value + + def write_bytes(self, register: str, value: str): + """Writes the given value into a given register + + Args: + register (str): Register in which should be written + value (str): Value as hex string + """ + + if value.startswith("0x"): + value = value[2:] + + if len(value) == 1: + value = "".join(["0", value]) + + self.i2c.writeto_mem( + self.hex_to_dec(self.address), + self.hex_to_dec(register), + bytes.fromhex(value), + ) + + def set_register_bit(self, register: str): + """Sets a 1 in a given register + + Args: + register (str): Register that should be set to 1 + """ + + self.i2c.writeto_mem( + self.hex_to_dec(self.address), self.hex_to_dec(register), b"1" + ) + + def is_bit_set(self, register: str, position: str) -> bool: + """Checks if a bit is set in a given register and position + + Args: + register (str): Register that should be checked + position (str): Position within the register + + Returns: + bool: True if the bit is set, False otherwise + """ + + value = int(self.read_single_byte(register)) + mask = 1 << self.hex_to_dec(position) + + return bool(value & mask) + + def reset_cpu(self): + """Sets the bit in a register so that the cpu reset is triggered""" + + self.set_register_bit(REGISTER_ENABLE_REG) + + def is_cpu_ready(self): + """Checks if the register that indicates if the cpu is ready for commands is set""" + + counter: int = 0 + ready: bool = False + + self.wdt.feed() + while counter < CPU_READY_TIMEOUT and not ready: + ready = self.is_bit_set(REGISTER_ENABLE_REG, CPU_READY) + self.wdt.feed() + + if not ready: + counter += 1 + utime.sleep_ms(100) + self.wdt.feed() + + return ready + + def get_status(self) -> tuple[str, str]: + """Returns the current status of the sensor + + Returns: + tuple[str, str]: Tuple with error and description + """ + + status = self.read_single_byte(REGISTER_STATUS) + + if status == MISSING_FACTORY_CALIBRATION: + return ( + "ErrMissingFactCal", + "There is no (or no valid) factory calibration on the device. Using default values instead.", + ) + + return f"{status}", "N/A" + + def get_current_calibration(self) -> bytes: + calibration: bytes = self.read_bytes( + REGISTER_FACTORY_CALIB_0, CALIBRATION_DATA_LENGTH + ) + + return calibration + + def perform_calibration(self) -> bytes: + calibration_data: bytes = b"" + + self.wdt.feed() + self.enable_interrupt() + self.write_bytes(REGISTER_COMMAND, COMMAND_FACTORY_CALIBRATION) + + start_of_calibration = utime.time() + + while (utime.time() - start_of_calibration) < 30: + self.wdt.feed() + utime.sleep_ms(50) + value = self.read_single_byte(REGISTER_REGISTER_CONTENTS) + print(f"REGISTER_REGISTER_CONTENTS: {value}") + + if value == self.hex_to_dec(CONTENT_CALIBRATION): + self.wdt.feed() + utime.sleep_ms(10) + calibration_data = self.read_bytes( + REGISTER_FACTORY_CALIB_0, CALIBRATION_DATA_LENGTH + ) + break + + return calibration_data + + def set_calibration(self, calibration_data: bytes): + self.write_bytes(REGISTER_COMMAND, COMMAND_CALIBRATION) + self.i2c.writeto_mem( + self.hex_to_dec(self.address), + self.hex_to_dec(REGISTER_FACTORY_CALIB_0), + calibration_data, + ) + self.write_bytes( + REGISTER_STATE_DATA_WR_0, "".join(entry[2:] for entry in ALGO_STATE) + ) + + def load_measurement_application(self): + """Sets the values in the register that is responsible for the current loaded application + to the application to start a measurement + """ + + self.write_bytes(REGISTER_APPREQID, APPLICATION) + + def is_application_ready(self) -> bool: + """Checks if the current loaded application is ready for execution + + Returns: + bool: True if the application is ready, False otherwise + """ + + counter: int = 0 + ready: bool = False + + while counter < APPLICATION_READY_TIMEOUT and not ready: + ready = self.read_single_byte(REGISTER_APPID) == self.hex_to_dec( + APPLICATION + ) + self.wdt.feed() + + if not ready: + counter += 1 + utime.sleep_ms(100) + self.wdt.feed() + + return ready + + def start_measurement_application(self): + """Sets the register to start the measurement application""" + + self.write_bytes(REGISTER_COMMAND, COMMAND_MEASURE) + + def is_data_available(self) -> bool: + """Checks if the data register contains values that could be read + + Returns: + bool: True if data is available, False otherwise + """ + + counter: int = 0 + data_available: bool = False + + while counter < DATA_AVAILABLE_TIMEOUT and not data_available: + self.wdt.feed() + result = self.read_single_byte(REGISTER_REGISTER_CONTENTS) + data_available = result == int(COMMAND_RESULT, 0) + + if not data_available: + counter += 1 + utime.sleep_ms(10) + self.wdt.feed() + + return data_available + + def clear_interrupt_flag(self): + """Clears the register that is responsible for the interrupt""" + + value = self.read_single_byte(REGISTER_INT_STATUS) + value = value | int(INTERRUPT_MASK, 0) + self.write_bytes(REGISTER_INT_STATUS, str(value)) + + def enable_interrupt(self): + """Enables the interrupt of the sensor""" + + value = self.read_single_byte(REGISTER_INT_STATUS) + value = value | int(INTERRUPT_MASK, 0) + self.write_bytes(REGISTER_INT_ENAB, str(value)) + + def power_down(self): + """Powers down the sensor""" + + enable_pin = Pin(self.enable, mode=Pin.OUT) + enable_pin.low() + + def power_up(self): + """Powers up the sensor""" + + enable_pin = Pin(self.enable, mode=Pin.OUT) + enable_pin.high() + self.wdt.feed() + + def initialize(self) -> bool: + """Initialization of the sensor + + Returns: + bool: True if initialization was successful, False otherwise + """ + + if not self.is_connected(): + print("TMF8805 is not connected. Please check the parameters") + return False + + self.reset_cpu() + + if not self.is_cpu_ready(): + print("Waiting for CPU to be ready timed out") + return False + + return True + + def get_measurement(self) -> int: + """Loads the application to start a measurement. Wait till the application is ready. Executes the + measurement application. Waits till data is available. Returns the read data. + + Returns: + int: Measured distance in mm + """ + + self.load_measurement_application() + + if not self.is_application_ready(): + print("Waiting for measurement application to be ready timed out") + return False + + self.start_measurement_application() + self.enable_interrupt() + + if not self.is_data_available(): + print("[TMF8805.Error] Waiting for data to be available timed out") + return False + + self.wdt.feed() + result_info = bin(self.read_single_byte(REGISTER_RESULT_INFO)) + result_info = result_info[2:] + result_info = "".join(reversed(result_info)) + result_info = (lambda s, n, c: s + c * (n - len(s)))(result_info, 8, "0") + reliabilty = int(result_info[:6], 2) + status = int(result_info[6:8], 2) + self.wdt.feed() + + peak_1 = self.read_single_byte(REGISTER_DISTANCE_PEAK_1) + peak_0 = self.read_single_byte(REGISTER_DISTANCE_PEAK_0) + + distance = peak_1 + distance = distance << 8 + distance += peak_0 + self.wdt.feed() + + if self.debug: + print( + f"[TMF8805] Measurement: {distance}mm (Reliabilty: {reliabilty}; Status: {status}; Peak1: {peak_1}; Peak0: {peak_0})" + ) + + return distance