diff --git a/luxtronik/datatypes.py b/luxtronik/datatypes.py index 5598c67..a56666f 100755 --- a/luxtronik/datatypes.py +++ b/luxtronik/datatypes.py @@ -32,6 +32,7 @@ def __init__(self, names, writeable=False): assert len(self._names) > 0, "At least one name is required" assert all(isinstance(name, str) for name in self._names), "Names must be strings" self.writeable = writeable + self.write_pending = False @classmethod def to_heatpump(cls, value): @@ -69,6 +70,7 @@ def value(self): def value(self, value): """Converts the value into heatpump units and store it.""" self._raw = self.to_heatpump(value) + self.write_pending = True @property def raw(self): @@ -79,6 +81,7 @@ def raw(self): def raw(self, raw): """Store the raw data.""" self._raw = raw + self.write_pending = False def __repr__(self): """Returns a printable representation of the datatype object""" @@ -90,6 +93,7 @@ def __repr__(self): f"writeable: {self.writeable}, " f"value: {self.value}, " f"raw: {self._raw}, " + f"write_pending: {self.write_pending}, " f"class: {self.datatype_class}, " f"unit: {self.datatype_unit}" f")" diff --git a/luxtronik/definitions/inputs.py b/luxtronik/definitions/inputs.py index acbe309..c983955 100644 --- a/luxtronik/definitions/inputs.py +++ b/luxtronik/definitions/inputs.py @@ -350,7 +350,7 @@ { "index": 163, "count": 1, - "names": ["mc3_heat_max"], + "names": ["mc3_max"], "type": Celsius, "writeable": False, "since": "3.90.1", diff --git a/luxtronik/shi/README.md b/luxtronik/shi/README.md new file mode 100644 index 0000000..c34d289 --- /dev/null +++ b/luxtronik/shi/README.md @@ -0,0 +1,443 @@ + +# Smart-Home-Interface + +## Introduction + +© Guzz-T, 2025. The smart home interface is licensed under the terms specified in the repository's license file. Please refer to the LICENSE for details. + +This source code provides an interface for the temporary control of a Luxtronik heat pump controller. All data accessed or modified through this interface is volatile and will be reset upon system restart — unlike configuration values, which are persistent. The interface is designed to interact with the controller in a lightweight and flexible way, making it suitable for short-term adjustments or testing scenarios. + +Not every function can be explained in detail or even listed in the readme file. Please refer to the function documentation for further information. The examples are intended to illustrate the typical use of the interface. + +## Table of Content + +1. [Common Usage](#common-usage) + 1. [Available Data](#available-data) + 2. [Creation](#creation) + 3. [Read](#read) + 4. [Write](#write) + 5. [Collect](#collect) +2. [Using aliases](#using-aliases) +3. [Alternative use cases](#alternative-use-cases) + 1. [Latest or specific version](#latest-or-specific-version) + 2. [Trial-and-error mode](#trial-and-error-mode) +4. [Customization](#customization) +5. [Implementation Details](#implementation-details) + +## Common Usage + +The functionality available depends on the firmware version of the luxtronik controller. Since different versions support different features, it is important to know which version is in use. In standard usage, the interface automatically detects the firmware version, streamlining setup and ensure compatibility. + +Because of this version-dependent behavior, data objects should always be created through the provided interface methods. While it is technically possible to instantiate these objects directly via their constructors, doing so is error-prone and cumbersome. The interface handles version-specific quirks and ensures that the objects are initialized correctly, making it the safer and more convenient approach. + +Alternatively, users may choose to manually specify a firmware version or operate the interface in a trial-and-error mode. This can be useful for exploring undocumented features or working with custom setups. For more details, refer to the section titled **"Alternative use cases"**. + +### Available Data + +The interface provides access to two distinct types of data: *holdings* and *inputs*. Holdings are writable values that allow configuration settings to be modified or overridden, and in some cases, enable additional features. These values can also be read back, enabling verification of changes or inspection of current settings. If a holding has not yet been explicitly written, it will contain a default value as defined by the controller. + +Inputs, in contrast, represent read-only values that reflect the current operational state of the system. They provide insight into live metrics such as temperatures, operating modes, and system status, but cannot be altered through the interface. + +In both *holdings* and *inputs*, a single *holding* or *input* are referred to as data fields. Each field represents a logical unit of information, and in some cases, multiple raw values are grouped together to form a single, higher-level field — for example, when a value spans multiple registers or requires interpretation. Alongside the interface itself, a comprehensive list of all data fields ever discovered is maintained. This catalog serves as a reference for developers and users alike, offering insight into the full range of known data and their structure. + +It’s worth noting that while certain functions may be supported by the firmware, they can be disabled by the active configuration. In such cases, the interface will not return any data for the affected fields. Instead, it will yield `None`, indicating that the feature is currently inactive or unavailable. + +### Creation + +To begin using the interface, it must first be instantiated. During this step, the communication method is selected — currently, only Modbus-TCP is supported. Once initialized, the interface automatically reads the firmware version from the Luxtronik controller and configures itself accordingly. This ensures that all subsequent operations are aligned with the capabilities and structure of the detected version, providing a reliable foundation for interacting with the system. + +```python +from luxtronik.shi import create_modbus_tcp + +# Use the default values for all arguments except the IP address +shi = create_modbus_tcp('your.lux.ip.addr') +``` + +### Read + +Data fields can be accessed individually or grouped together as data vectors for more efficient reading. This allows users to retrieve multiple related values in a single operation, improving performance and simplifying code. In addition to individual vectors, the interface also provides a data vector collection — a container that holds an instance of all available data vectors. This collection can be passed around and reused, making it easier to manage and access structured data sets. + +The examples provided in this chapter focus on reading *inputs*, which represent live system values. However, the same principles apply to *holdings* as well, allowing for consistent usage patterns across both read-only and writable data types. + +**Read all fields together:** + +Recommendation if only one data vector is to be used: + +```python +# First create the data vector (once) that contains all data fields +inputs = shi.create_inputs() +# ... and afterwards read the data into those fields +shi.read_inputs(inputs) +print(inputs) +``` + +Recommendation if all data vectors are to be used: + +```python +# First create the data vector collection (once) that contains +# all supported data vectors which contains all data fields +data = shi.create_data() +# ... and afterwards read the data into those fields +shi.read(data) +print(data.inputs) +``` + +Create a new data vector for each read operation: + +```python +# Create the data vector and read the fields +inputs = shi.read_inputs() +print(inputs) +``` + +Create a new data vector collection for each read operation: + +```python +# Create the data vector collection and read the fields +data = shi.read() +# or ... +data = shi.read_data() +print(data.inputs) +``` + +**Read a single field:** + +For single queries, the (newly created) field is returned. + +```python +# Read field by name +op_mode = shi.read_input('operation_mode') +# ... or register index (index 2 is 'operation_mode') +op_mode = shi.read_input(2) +# ... or first create the field (once) +op_mode = shi.create_input('operation_mode') +shi.read_input(op_mode) +# ... or use the definition of the field +op_mode_def = shi.get_input('operation_mode') +op_mode = shi.read_input(op_mode_def) +print(op_mode) +``` + +**Read a subset:** + +It is also possible to read only a selected subset of data fields together. To do this, a new, empty data vector must first be created. The desired fields can then be added to this vector individually. Once all relevant fields have been included, the vector can be read in a single operation, allowing for efficient and targeted data access tailored to specific use cases. + +```python +# Create (once) an empty data vector +inputs = shi.create_empty_inputs() +# ..., add the desired data fields +op_mode = inputs.add('operation_mode') +# ... +# Afterwards read the data into those fields +shi.read_inputs(inputs) +print(op_mode) +# (index 2 is 'operation_mode') +print(inputs.get(2)) +print(inputs['operation_mode']) +``` + +### Write + +Just like reading, writing to data fields can be done individually or in grouped form using data vectors. This allows multiple configuration values to be updated in a single operation. Only fields for which the user has updated the value are written. + +**Write all (updated by the user) fields together:** + +Recommendation if only one data vector is to be used: + +```python +# First create the data vector (once) that contains all data fields +holdings = shi.create_holdings() +# ..., set the user data +holdings['heating_mode'] = 'Setpoint' +# ... and then write the data from these fields +success = shi.write_holdings(holdings) +``` + +Recommendation if all data vectors are to be used: + +```python +# First create the data vector collection (once) that contains +# all supported data vectors which contains all data fields +data = shi.create_data() +# ..., set the user data +holdings.set('heating_mode', 'Setpoint') +# ... and then write the data from these fields +success = shi.write(data) +# or ... +success = shi.write_data(data) +``` + +On writing, an empty data vector can also be created and filled, but this generates the same data transfer as above. + +```python +# Create (once) an empty data vector +holdings = shi.create_empty_holdings() +# ..., add the desired data fields (index 0 is 'heating_mode') +heating_mode = holdings.add(0) +# ..., set the user data +heating_mode.value = 'Setpoint' +# ... and then write the data from these fields +success = shi.write_holdings(holdings) +``` + +### Collect + +In addition to standard read and write operations, the interface also offers *collect* variants of these functions. A collect operation behaves like a read or write, but without triggering any communication with the controller. Instead, it prepares the data structures internally by collecting all valid fields. + +This is particularly useful for inspecting or staging data before transmission, validating field selections, or building reusable data blocks in advance. All read and write capabilities are mirrored in their collect counterparts, ensuring consistent behavior and structure across both modes of operation. + +Each valid collect operation schedules at least one transmission to the controller. While the collect itself does not initiate communication, it prepares the necessary data for a future read or write. When the actual transmission is triggered, all scheduled operations are executed together within the same connection session. This bundling ensures efficient communication and minimizes overhead, especially when working with multiple data vectors or fields. + +```python +# Collect a write to activate the hot water lock +shi.collect_holding_for_write('lock_hot_water', True) +# Collect all data fields within "holdings_for_heating" +# to write first and then read back afterwards +shi.collect_holdings(holdings_for_heating) +# Collect all data fields within "inputs_for_heating" to read them +shi.collect_inputs(inputs_for_heating) +# Trigger communication +success = shi.send() +``` + +Attention: + +It is technically possible to collect the same instance of a data field multiple times. However, this practice is discouraged, as it can lead to ambiguous behavior. During a write operation, the value is not extracted from the field until the actual transmission occurs. Similarly, during a read, the results are written into the field’s single memory location multiple times. If the same field is collected multiple times, the last value assigned before transmission will take precedence and overwrite any previous ones. To ensure clarity and avoid unintended side effects, each field should be collected only once per transmission, or using multiple instances of the same data field. This ensures that each collected field maintains its own memory space and avoids conflicts during read or write operations. By using distinct field objects, values can be managed independently, and the outcome of each transmission remains predictable and clearly scoped. + +## Using aliases + +Instead of the predefined names, any (hashable) values can also be used. +However, these must be registered beforehand. This also makes it possible +to "overwrite" existing names or register indices. + +There are two ways to register: + +**global:** + +The aliases are registered in the `LuxtronikDefinitionsList`. They are then available in every newly created data vector. + +```python +from luxtronik.shi import create_modbus_tcp + +shi = create_modbus_tcp('your.lux.ip.addr') +shi.inputs.register_alias(input_definition_to_alias, any_hashable_alias) + +data = shi.read() +print(data.inputs[any_hashable_alias].value) +``` + +**local:** + +The aliases can also only be registered in a specific data vector. + +```python +from luxtronik.shi import create_modbus_tcp + +shi = create_modbus_tcp('your.lux.ip.addr') + +data = shi.read() +data.holdings.register_alias(holding_definition_to_alias, any_hashable_alias) + +print(data.holdings[any_hashable_alias].value) +``` + +## Alternative use cases + +### Latest or specific version + +It is possible to create the data vector/data object yourself, +but there is no guarantee that the fields they contains +will match the current firmware version of the controller. + +Use a specific versions: + +```python +from luxtronik.shi import create_modbus_tcp +from luxtronik.shi.holdings import Holdings + +shi = create_modbus_tcp('your.lux.ip.addr', version="3.92.0") +holdings = Holdings("3.92.0") +shi.read_holdings(holdings) + +# inputs is created with version "3.92.0" +inputs = shi.read_inputs() +``` + +The special tag "latest" is used to generate the interface +using the latest supported version: + +```python +from shi import create_modbus_tcp + +shi = create_modbus_tcp('your.lux.ip.addr', version="latest") +``` + +### Trial-and-error mode + +If you pass `None’ as the version, you set the interface to trial-and-error mode. +This means that no attempt is made to bundle read or write accesses, +but that all available fields/definitions are read or written individually (possibly twice). +Errors will occur, but as many operations as possible will be attempted. + +```python +from luxtronik.shi import create_modbus_tcp +from luxtronik.shi.interface import LuxtronikSmartHomeData + +shi = create_modbus_tcp('your.lux.ip.addr', version=None) + +data = LuxtronikSmartHomeData(version=None) +shi.read(data) + +holdings = shi.create_holdings() +holdings[1].value = 22.0 +holdings[2].value = 5.0 +success = shi.write_holdings(holdings) +``` + +## Customization + +**Safe / non-safe:** + +Only data fields with a known and verified function are marked as writable by default. This precaution helps prevent unintended changes and ensures that write operations target safe and well-understood parameters. When performing a write or preparing a data vector for writing, a `safe` flag can be used to control how strictly this classification is enforced. If the flag is set, the interface will only write to fields that are both marked as writable and considered safe. Fields that are not writable will be skipped. If the flag is not set, the interface will attempt to write to all fields that carries user data in the vector, regardless of their classification — this includes fields that are not marked as writable and those whose safety is uncertain. This flexibility is particularly useful for testing or exploring undocumented functionality. + +```python +shi.write_holding('heating_mode', 'Setpoint', safe=False) +``` + +**Custom definitions:** + +If you discover a new field or want to experiment with one that isn’t yet part of the standard definitions, you can add it manually to your local configuration. If you're confident in its behavior and purpose, please consider reporting it so it can be reviewed and potentially included in future versions of the interface. Since this is not the norm, there is currently no convenient function that allows for easier definition. + +```python +shi.inputs.add({ + "index": 5, + "count": 2, + "names": "foo" +}) +``` + +## Implementation Details + +### Definition vs. field + +- **Definition**: + A definition describes a data field. This includes, among other things, + the register index where to find the related raw data, + the number of required registers, the usable names + and meta-information about the appropriate controller version. + +- **Field**: + A field contains the data that has been read or is to be written + and makes the raw data available in a user-friendly format. + +### Register vs. fields vs. data-blocks + +- **Register**: + A single 16‑bit word addressable by an index. + Registers are the atomic unit of transfer. + +- **Field**: + Logically related registers. A field can comprise + one register or several consecutive registers. + +- **Data Block**: + A contiguous address range containing one or more fields. + Data blocks are used to perform bulk read or write operations in a + single sequential transfer to minimize communication overhead. + +```json +Index + +------------+ +------------+ +------------+ +0x00 | Register 0 | | Field 0 | | Data block | + +------------+ +------------+ + + +0x01 | Register 1 | | Field 1 | | | + +------------+ + + + + +0x02 | Register 2 | | | | | + +------------+ +------------+ +------------+ +0x03 Register 3 do not exist + +0x04 Register 4 do not exist + +------------+ +------------+ +------------+ +0x05 | Register 5 | | Field 5 | | Data block | + +------------+ + + + + +0x06 | Register 6 | | | | | + +------------+ +------------+ +------------+ +0x07 Register 7 do not exist + +0x08 Register 8 do not exist + +------------+ +------------+ +------------+ +0x09 | Register 9 | | Field 9 | | Data block | + +------------+ +------------+ +------------+ +... +``` + +### Available definition vs. version-dependent definition + +- **Available definitions**: + All definitions contained in the `LuxtronikDefinitionsList` are designated + with the term "available definitions". This includes all definitions ever used. + +- **Version-dependent definitions**: + The definitions themselves may contain version information specifying + in which version the described field is included. This is used to + determine a subset that matches a specific firmware version of the controller. + Fields without version information are always included. + Note: If the desired version is "None", + all "available" are considered as "version-dependent". + +```json +Available: +- {index: 0, since: 1.0, until: 2.9} +- {index: 1, since: 2.2 } +- {index: 4, until: 1.5} +- {index: 5, since: 2.4, until: 3.0} +- {index: 6, since: 1.3 } +- {index: 8, } +- {index: 9, until: 2.0} + +Version-dependent on v0.3: +- {index: 4, until: 1.5} +- {index: 8, } +- {index: 9, until: 2.0} + +Version-dependent on v1.1: +- {index: 0, since: 1.0, until: 2.9} +- {index: 4, until: 1.5} +- {index: 8, } +- {index: 9, until: 2.0} + +Version-dependent on v2.6: +- {index: 0, since: 1.0, until: 2.9} +- {index: 1, since: 2.2 } +- {index: 5, since: 2.4, until: 3.0} +- {index: 6, since: 1.3 } +- {index: 8, } + +Version-dependent on v3.2: +- {index: 1, since: 2.2 } +- {index: 6, since: 1.3 } +- {index: 8, } + +Version-dependent on None: +- {index: 0, since: 1.0, until: 2.9} +- {index: 1, since: 2.2 } +- {index: 4, until: 1.5} +- {index: 5, since: 2.4, until: 3.0} +- {index: 6, since: 1.3 } +- {index: 8, } +- {index: 9, until: 2.0} +``` + +### Data-blocks vs. telegrams + +- **Data-blocks**: + A data block bundles all fields that can be read or written together. + However, when writing, only fields for which the user has set data are bundled. + The resulting address space can be read with a single telegram, + or the contiguous raw data can be written with a single telegram. + +- **Telegrams**: + A telegram defines a read or write operation to be performed. + Several telegrams can be handled in one transmission. + + \ No newline at end of file diff --git a/luxtronik/shi/__init__.py b/luxtronik/shi/__init__.py new file mode 100644 index 0000000..d37ae08 --- /dev/null +++ b/luxtronik/shi/__init__.py @@ -0,0 +1,142 @@ +""" +Python module for controlling a Luxtronik heat pump controller +via the smart home interface. Powered by Guzz-T. +""" + +from luxtronik.datatypes import FullVersion, MajorMinorVersion +from luxtronik.shi.constants import ( + LUXTRONIK_DEFAULT_MODBUS_PORT, + LUXTRONIK_DEFAULT_MODBUS_TIMEOUT, + LUXTRONIK_LATEST_SHI_VERSION, +) +from luxtronik.shi.common import LOGGER, parse_version +from luxtronik.shi.inputs import INPUTS_DEFINITIONS +from luxtronik.shi.modbus import LuxtronikModbusTcpInterface +from luxtronik.shi.interface import LuxtronikSmartHomeInterface + +VERSION_DETECT = "detect" +VERSION_LATEST = "latest" + + +############################################################################### +# Helper methods +############################################################################### + +def get_version_definitions(definitions): + """ + Retrieve all definitions that represent version fields. + + Args: + definitions (LuxtronikDefinitionsList): List of definitions + + Returns: + list[LuxtronikDefinition]: List of definitions whose data_type + is either FullVersion or MajorMinorVersion. + """ + version_definitions = [] + for d in definitions: + if d.data_type in (FullVersion, MajorMinorVersion): + version_definitions.append(d) + return version_definitions + +def determine_version(interface): + """ + Determine the version of the luxtronik controller. + + This is a little bit ugly! The controller version is required + to locate the version field. As workaround, probe each known + version field until one yields a valid read and a parsable version. + This approach works as long as the version-field has not changed. + + Args: + interface (LuxtronikModbusTcpInterface): + Simple read/write interface to read out the version. + + Returns: + tuple[int] | None: The version of the controller on success, + or None if no version could be determined. + """ + definitions = get_version_definitions(INPUTS_DEFINITIONS) + for definition in definitions: + data = interface.read_inputs(definition.addr, definition.count) + if data is not None: + field = definition.create_field() + field.raw = data + parsed = parse_version(field.value) + if parsed is not None: + return parsed + LOGGER.warning("It was not possible to determine the controller version. " \ + + "Switch to trial-and-error mode.") + return None + +def resolve_version(interface, version=VERSION_DETECT): + """ + Resolve the version input. + + Args: + interface (LuxtronikModbusTcpInterface): + Simple read/write interface to read out the version. + version (tuple[int] | str | None): Version used to initialize the interface. + If VERSION_DETECT is passed, the function will attempt to determine the version. + If a str is passed, the string will be parsed into a version tuple. + If None is passed, trial-and-error mode is activated. + (default: VERSION_DETECT) + + Returns: + tuple[int] | None: The version of the controller on success, + or None if no version could be determined. + """ + resolved_version = version + if resolved_version == VERSION_DETECT: + # return None in case of an error -> trial-and-error mode + resolved_version = determine_version(interface) + elif isinstance(resolved_version, str): + if resolved_version.lower() == VERSION_LATEST: + resolved_version = LUXTRONIK_LATEST_SHI_VERSION + else: + # return None in case of an error -> trial-and-error mode + resolved_version = parse_version(resolved_version) + else: + resolved_version = parse_version(resolved_version) + return resolved_version + + +############################################################################### +# Factory methods +############################################################################### + +def create_modbus_tcp( + host, + port=LUXTRONIK_DEFAULT_MODBUS_PORT, + timeout=LUXTRONIK_DEFAULT_MODBUS_TIMEOUT, + version=VERSION_DETECT +): + """ + Create a LuxtronikSmartHomeInterface using a Modbus TCP connection. + + The function constructs a Modbus TCP low-level interface and resolves the + controller version according to the supplied `version` argument: + - If `version` equals VERSION_DETECT, attempt to determine the version. + - If `version` equals VERSION_LATEST, use LUXTRONIK_LATEST_SHI_VERSION as version. + - If `version` is a string, parse it into a version tuple. + - If `version` is None, the interface is initialized in trial-and-error mode. + - Otherwise assume `version` is already a parsed version tuple. + + Args: + host (str): Hostname or IP address of the Luxtronik controller. + port (int): TCP port for the Modbus connection. + timeout (float): Timeout in seconds for the Modbus connection. + version (tuple[int] | str | None): Version used to initialize the interface. + If VERSION_DETECT is passed, the function will attempt to determine the version. + If a str is passed, the string will be parsed into a version tuple. + If None is passed, trial-and-error mode is activated. + + Returns: + LuxtronikSmartHomeInterface: + Initialized interface instance bound to the Modbus TCP connection. + """ + modbus_interface = LuxtronikModbusTcpInterface(host, port, timeout) + resolved_version = resolve_version(modbus_interface, version) + LOGGER.info(f"Create smart-home-interface via modbus-TCP on {host}:{port}" + + f" for version {resolved_version}") + return LuxtronikSmartHomeInterface(modbus_interface, resolved_version) \ No newline at end of file diff --git a/luxtronik/shi/contiguous.py b/luxtronik/shi/contiguous.py new file mode 100644 index 0000000..be641a8 --- /dev/null +++ b/luxtronik/shi/contiguous.py @@ -0,0 +1,354 @@ +""" +Methods and classes to group contiguous fields of same type into single blocks to minimize +the number of read/write operations. They are necessary, because an invalid address +or a non-existent register within a read/write operation will result in a transmission error. +""" + +from luxtronik.shi.common import LOGGER +from luxtronik.shi.definitions import get_data_arr, integrate_data + +############################################################################### +# ContiguousDataPart +############################################################################### + +class ContiguousDataPart: + """ + Represents a single element of a contiguous data block. + Each part references a `field` and its associated `definition`. + """ + + def __init__(self, definition, field): + """ + Initialize a contiguous data part. + + Args: + field (Base): The field object to read or write. + definition (LuxtronikDefinition): The definition for this field. + """ + self.field = field + self.definition = definition + + def __repr__(self): + return f"({self.index}, {self.count})" + + @property + def index(self): + return self.definition.index + + @property + def addr(self): + return self.definition.addr + + @property + def count(self): + return self.definition.count + + def get_data_arr(self): + """ + Normalize the field's data to a list of the correct size. + + Returns: + list[int] | None: List of length `definition.count`, or None if insufficient. + """ + return get_data_arr(self.definition, self.field) + + def integrate_data(self, raw_data, data_offset=-1): + """ + Integrate the related parts of the `raw_data` into the field + + Args: + raw_data (list): Source array of bytes/words. + data_offset (int): Optional offset. Defaults to `definition.index`. + """ + integrate_data(self.definition, self.field, raw_data, data_offset) + + +############################################################################### +# ContiguousDataBlock +############################################################################### + +class ContiguousDataBlock: + """ + Represents a contiguous block of fields for efficient read/write access. + Contiguous fields of same type are grouped into a single block to minimize + the number of read/write operations. + + Note: + - An invalid address or a non-existent register within a block + will result in a transmission error. + - Parts must be added in non-decreasing index order. + """ + + def __init__(self): + self._parts = [] + self._last_idx = -1 + + @classmethod + def create_and_add(cls, definition, field): + """ + Create a new block and add a single part. + + Args: + definition (LuxtronikDefinition): Definition to add. + field (Base): Associated field object. + + Returns: + ContiguousDataBlock: New block with the part added. + """ + obj = cls() + obj.add(definition, field) + return obj + + def clear(self): + """Remove all parts from the block.""" + self._parts = [] + self._last_idx = -1 + + def __iter__(self): + return iter(self._parts) + + def __getitem__(self, index): + return self._parts[index] + + def __len__(self): + return len(self._parts) + + def __repr__(self): + parts_str = ", ".join(repr(part) for part in self._parts) + return f"(index={self.first_index}, count={self.overall_count}, " \ + + f"parts=[{parts_str}])" + + def can_add(self, definition): + """ + Check whether a part with the given definition + can be appended without creating gaps. + We assume that the (valid) parts are added in order. + Therefore, some special cases can be disregarded. + + Args: + definition (LuxtronikDefinition): Definition to add. + + Returns: + bool: True if the part can be added to this block, otherwise False. + """ + if self._last_idx == -1: + return True + start_idx = definition.index + return start_idx >= self.first_index and start_idx <= self._last_idx + 1 + + def add(self, definition, field): + """ + Add a subsequent part to this contiguous data block. + We assume that the (valid) parts are added in order. + Therefore, some special cases can be disregarded. + Call `can_add` before `add` to ensure validity. + + Args: + definition (LuxtronikDefinition): Definition to add. + field (Base): Associated field object. + """ + self._parts.append(ContiguousDataPart(definition, field)) + self._last_idx = max(self._last_idx, definition.index + definition.count - 1) + + @property + def first_index(self): + """ + Return the first index of the block, or 0 if empty. + This should be sufficient since the (valid) parts are added in index-sorted order. + + Returns: + int: index of the first part or 0 if empty. + """ + return self._parts[0].index if self._parts else 0 + + @property + def first_addr(self): + """ + Return the first addr of the block, or 0 if empty. + This should be sufficient since the (valid) parts are added in index-sorted order. + + Returns: + int: addr of the first part or 0 if empty. + """ + return self._parts[0].addr if self._parts else 0 + + @property + def overall_count(self): + """ + Total contiguous register count covered by this block. + + Returns: + int: number of registers or 0 if block is empty. + """ + return self._last_idx - self.first_index + 1 if self._parts else 0 + + def integrate_data(self, data_arr): + """ + Integrate an array of registers (e.g. the read data) + into the raw values of the corresponding fields. + + Args: + data_arr (list[int] | None): A list of register values. + + Returns: + bool: True if data length matches `overall_count` + and integration succeeded, False otherwise. + """ + valid = data_arr is not None and isinstance(data_arr, list) + data_len = len(data_arr) if valid else 0 + valid &= data_len == self.overall_count + + if not valid: + LOGGER.debug( + f"Data to integrate not valid! Expected length {self.overall_count} " \ + + f"but got {data_len}: data = {data_arr}, block = {self}" + ) + return False + + first = self.first_index + for part in self._parts: + data_offset = part.index - first + part.integrate_data(data_arr, data_offset) + + return True + + def get_data_arr(self): + """ + Build a data array to write from parts' fields. + + Returns: + list[int] | None: List of register values when valid, otherwise None. + Returns None if overlapping writes occur or if some elements are missing. + """ + if not self._parts: + return None + + total = self.overall_count + data_arr = [None] * total + first = self.first_index + valid = True + for part in self._parts: + data_offset = part.index - first + data = part.get_data_arr() + + if data is None: + valid = False + LOGGER.error(f"No data provided for part {part}") + continue + + end = data_offset + part.count + if end > total: + valid = False + LOGGER.error(f"Part {part} would overflow block (end={end}, total={total})") + continue + + # Integrate data only if not already done (first data wins) + for i, value in enumerate(data): + slot = data_offset + i + if data_arr[slot] is None: + data_arr[slot] = value + else: + valid = False + LOGGER.debug( + f"Overlapping write detected for slot {slot}: " \ + + f"existing={data_arr[slot]}, new={value}, part={part}" + ) + + if not valid: + return None + + if any(value is None for value in data_arr): + LOGGER.error(f"Missing data after assembly: {data_arr}, block = {self}") + return None + + return data_arr + + +############################################################################### +# ContiguousDataBlockList +############################################################################### + +class ContiguousDataBlockList: + """ + Maintain ordered contiguous data blocks for a single register type. + + Notes: + - Parts (definitions) are expected to be presented in non-decreasing index order. + - This container groups parts into contiguous blocks to minimize read/write operations. + """ + + def __init__(self, type_name, read_not_write): + """ + Initialize a new container. + + Args: + type_name (str): descriptive name for this block list (e.g., "holding", "input"). + read_not_write (bool): True when these blocks are for reads, False for writes. + """ + self._blocks = [] + self._type_name = type_name + self._read_not_write = read_not_write + self._can_add = True + + def clear(self): + """Remove all blocks.""" + self._blocks = [] + + def __iter__(self): + return iter(self._blocks) + + def __getitem__(self, index): + return self._blocks[index] + + def __len__(self): + return len(self._blocks) + + def __repr__(self): + blocks_str = ", ".join(repr(block) for block in self._blocks) + return f"(type_name={self._type_name}, read_not_write={self._read_not_write}, " \ + + f"blocks=[{blocks_str}])" + + @property + def type_name(self): + return self._type_name + + @property + def read_not_write(self): + return self._read_not_write + + def collect(self, definition, field): + """ + Add a part into the appropriate contiguous block. + Assumes parts arrive in sorted order by index. (see LuxtronikDefinitionsList). + + Args: + definition (LuxtronikDefinition): Definition to add. + field (Base): Associated field object. + """ + # Start a new block if none exists or the last block cannot accept this definition + if not self._blocks or not self._can_add or not self._blocks[-1].can_add(definition): + self._blocks.append(ContiguousDataBlock()) + self._can_add = True + + # Append the (new) part to the last block + self._blocks[-1].add(definition, field) + + def append(self, block): + """ + Append an existing ContiguousDataBlock to the list. + + Args: + block (ContiguousDataBlock): Block to append. + """ + self._blocks.append(block) + + def append_single(self, definition, field): + """ + Create a new block with a single part and append it. + + Args: + definition (LuxtronikDefinition): Definition to add. + field (Base): Associated field object. + """ + self._blocks.append(ContiguousDataBlock.create_and_add(definition, field)) + self._can_add = False \ No newline at end of file diff --git a/luxtronik/shi/definitions.py b/luxtronik/shi/definitions.py index 1d79572..5164455 100644 --- a/luxtronik/shi/definitions.py +++ b/luxtronik/shi/definitions.py @@ -14,7 +14,8 @@ ) from luxtronik.shi.common import ( LOGGER, - parse_version + parse_version, + version_in_range ) @@ -284,7 +285,7 @@ def get(self, name_or_idx, default=None): """ d = self._get(name_or_idx) if d is None: - LOGGER.warning(f"Definition for '{name_or_idx}' not found", ) + LOGGER.debug(f"Definition for '{name_or_idx}' not found", ) return d if d is not None else default def _get(self, name_or_idx): @@ -384,6 +385,15 @@ class LuxtronikDefinitionsList: (locally = only valid for that dictionary). """ + def _init_instance(self, name, offset, version): + """Re-usable method to initialize all instance variables.""" + self._name = name + self._offset = offset + self._version = version + # sorted list of all definitions + self._definitions = [] + self._lookup = LuxtronikDefinitionsDictionary() + def __init__(self, definitions_list, name, offset=LUXTRONIK_DEFAULT_DEFINITION_OFFSET): """ Initialize the (by index sorted) definitions list. @@ -402,11 +412,7 @@ def __init__(self, definitions_list, name, offset=LUXTRONIK_DEFAULT_DEFINITION_O - The value of count must always be greater than or equal to 1 - All names should be unique """ - self._name = name - self._offset = offset - # sorted list of all definitions - self._definitions = [] - self._lookup = LuxtronikDefinitionsDictionary() + self._init_instance(name, offset, None) # Add definition objects only for valid items. # The correct sorting has already been ensured by the pytest @@ -415,6 +421,27 @@ def __init__(self, definitions_list, name, offset=LUXTRONIK_DEFAULT_DEFINITION_O if d.valid: self._add(d) + @classmethod + def filtered(cls, definitions, version): + """ + Filter an existing definitions list by the given version + and return the new (by index sorted) definitions list. + + Args: + definitions (LuxtronikDefinitionsList): List of definitions to filter. + version (tuple[int] | None): + Only definitions that match this version are added to the list. + If None is passed, all available fields are added. + """ + obj = cls.__new__(cls) # this don't call __init__() + obj._init_instance(definitions.name, definitions.offset, version) + + for d in definitions: + if d.valid and version_in_range(obj._version, d.since, d.until): + obj._add(d) + + return obj + def __getitem__(self, name_or_idx): return self.get(name_or_idx) @@ -531,12 +558,12 @@ def get_data_arr(definition, field): field (Base): Field object that contains data to get. Returns: - list[int] | None: List of length `definition.count`, or None if insufficient. + list[int] | None: List of length `definition.count`, + or None if the data size does not match. """ data = field.raw if not isinstance(data, list): data = [data] - data = data[:definition.count] return data if len(data) == definition.count else None def check_data(definition, field): diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py new file mode 100644 index 0000000..c68eae0 --- /dev/null +++ b/luxtronik/shi/interface.py @@ -0,0 +1,1124 @@ +"""Main components of the Luxtronik smart home interface.""" + +from luxtronik.datatypes import Base +from luxtronik.shi.constants import LUXTRONIK_LATEST_SHI_VERSION +from luxtronik.shi.common import ( + LOGGER, + version_in_range, + LuxtronikSmartHomeReadHoldingsTelegram, + LuxtronikSmartHomeReadInputsTelegram, + LuxtronikSmartHomeWriteHoldingsTelegram, +) +from luxtronik.shi.definitions import ( + LuxtronikDefinition, + LuxtronikDefinitionsList, + check_data +) +from luxtronik.shi.vector import DataVectorSmartHome +from luxtronik.shi.holdings import Holdings, HOLDINGS_DEFINITIONS +from luxtronik.shi.inputs import Inputs, INPUTS_DEFINITIONS +from luxtronik.shi.contiguous import ContiguousDataBlockList + +READ = True +WRITE = False +SAFE = True + +############################################################################### +# Smart home interface data +############################################################################### + +class LuxtronikSmartHomeData: + """ + Data-vector collection for all smart home interface data vectors. + + Holds both the `holdings` and `inputs` data structures that represent + the smart-home data exposed by the Luxtronik controller. + """ + + def __init__( + self, + holdings=None, + inputs=None, + version=LUXTRONIK_LATEST_SHI_VERSION, + safe=SAFE + ): + """ + Initialize a LuxtronikSmartHomeData instance. + + Args: + holdings (Holdings): Optional holdings data vector. If not provided, + a new `Holdings` instance is created. + inputs (Inputs): Optional inputs data vector. If not provided, + a new `Inputs` instance is created. + version (tuple[int] | None): Version to be used for creating the data vectors. + This ensures that the data vectors only contain valid fields. + If None is passed, all available fields are added. + (default: LUXTRONIK_LATEST_SHI_VERSION) + safe (bool): If true, prevent holding fields marked as + not secure from being written to. + """ + self.holdings = holdings if holdings is not None else Holdings(version, safe) + self.inputs = inputs if inputs is not None else Inputs(version) + + @classmethod + def empty( + cls, + version=LUXTRONIK_LATEST_SHI_VERSION, + safe=SAFE + ): + """ + Initialize an empty LuxtronikSmartHomeData instance + (= no fields are added to the data-vectors). + + Args: + version (tuple[int] | None): The version is added to the data vectors + so some checks can be performed later. + (default: LUXTRONIK_LATEST_SHI_VERSION) + safe (bool): If true, prevent holding fields marked as + not secure from being written to. + """ + obj = cls.__new__(cls) + obj.holdings = Holdings.empty(version, safe) + obj.inputs = Inputs.empty(version) + return obj + +############################################################################### +# Smart home interface +############################################################################### + +class classproperty: + def __init__(self, fget): + self.fget = fget + def __get__(self, instance, owner): + return self.fget(owner) + +class LuxtronikSmartHomeInterface: + """ + Read/write interface for Luxtronik smart home fields. + + This class builds on the simple addr/count/data interface and + provides indexing and name resolution for easier access. + + This interface contains an internal list of operations to be executed. + Operations can be added to the list using the "collect" methods. + "Send" will process the entire list and then clear it. "Read" or "write" methods + (except "raw") also adds an operation and then processes the entire list, + which is cleared afterwards. + """ + + def __init__(self, interface, version=LUXTRONIK_LATEST_SHI_VERSION): + """ + Initialize the smart home interface. + + Args: + interface: The underlying read/write interface. + version (tuple[int] | None): Version to be used for creating fields or data vectors. + This ensures that the data vectors only contain valid fields. + If None is passed, all available fields are added. + Additionally, the version is used to performed some consistency checks. + (default: LUXTRONIK_LATEST_SHI_VERSION) + """ + self._interface = interface + self._version = version + self._blocks_list = [] + self._filtered_holdings = LuxtronikDefinitionsList.filtered(HOLDINGS_DEFINITIONS, version) + self._filtered_inputs = LuxtronikDefinitionsList.filtered(INPUTS_DEFINITIONS, version) + + @property + def version(self): + return self._version + +# Helper methods ############################################################## + + def _get_definition(self, def_name_or_idx, definitions): + """ + Retrieve a definition by name or index that is supported by the controller. + + Args: + def_name_or_idx (LuxtronikDefinition | str | int): + Name, register index or the definition of the field. + + Returns: + LuxtronikDefinition | None: The matching definition, or None if not found. + """ + if isinstance(def_name_or_idx, LuxtronikDefinition): + definition = def_name_or_idx + else: + definition = definitions.get(def_name_or_idx) + if definition is None: + return None + if not version_in_range(self._version, definition.since, definition.until): + LOGGER.debug(f"Field {definition.name} not valid for {self._version}") + return None + return definition + + def _get_def_field_pair(self, def_field_name_or_idx, definitions): + """ + Retrieve a definition by name or index that is supported by the controller. + On success, additionally returns either the passed field or a newly created field. + + Args: + def_name_or_idx (LuxtronikDefinition | Base| str | int): + Name, register index or the definition of the field, or the field itself. + + Returns: + tuple[LuxtronikDefinition, Base]: The matching definition-field pair. + """ + if isinstance(def_field_name_or_idx, Base): + definition = self._get_definition(def_field_name_or_idx.name, definitions) + field = def_field_name_or_idx if definition is not None else None + else: + definition = self._get_definition(def_field_name_or_idx, definitions) + field = definition.create_field() if definition is not None else None + return definition, field + + def _get_index_from_name(self, name): + """ + Extract the index from an 'unknown' identifier (e.g. 'Unknown_Input_105'). + + Args: + name (str): The identifier string. + + Returns: + int | None: The extracted index, or None if it cannot be determined. + """ + parts = name.split("_") + if len(parts) == 3 and parts[2].isdigit(): + return int(parts[2]) + return None + + def _try_create_definition(self, def_name_or_idx, definitions): + """ + Retrieve the definition for the given name or index. If no definition + is found out of all available, attempt to generate a temporary one. + If this also fails, return None. + + Args: + def_name_or_idx (LuxtronikDefinition | str | int): + The field name, register index or a definition object. + definitions (LuxtronikDefinitionsList): + Field definition list to look-up the desired definition + + Returns: + LuxtronikDefinition | None: A valid definition, or None if not found. + """ + if isinstance(def_name_or_idx, LuxtronikDefinition): + definition = def_name_or_idx + else: + definition = definitions.get(def_name_or_idx) + if definition is not None: + return definition + + LOGGER.debug( + f"Definition for {def_name_or_idx} not found. Attempting to create a temporary one." + ) + + # Handle unknown names like 'Unknown_Input_105' + if isinstance(def_name_or_idx, str) and def_name_or_idx.lower().startswith("unknown_"): + index = self._get_index_from_name(def_name_or_idx) + if index is None: + LOGGER.debug( + "Cannot determine index from name '{def_name_or_idx}'. " \ + + "Use format 'Unknown_Input_INDEX'." + ) + return None + return definitions.create_unknown_definition(index) + + if isinstance(def_name_or_idx, str) and def_name_or_idx.isdigit(): + index = int(def_name_or_idx) + return definitions.create_unknown_definition(index) + + # Handle integer indices + if isinstance(def_name_or_idx, int): + return definitions.create_unknown_definition(def_name_or_idx) + + LOGGER.debug(f"Could not find or generate a definition for {def_name_or_idx}.") + return None + + +# Telegram methods ############################################################ + + def _create_read_telegram(self, block, telegram_type): + """ + Create a read-telegram of type `telegram_type` out of this `ContiguousDataBlock`. + + Args: + telegram_type (class of LuxtronikSmartHomeReadTelegram): + Type of the telegram to create. + + Returns: + LuxtronikSmartHomeReadTelegram: + The created telegram. + """ + return telegram_type(block.first_addr, block.overall_count) + + def _create_write_telegram(self, block, telegram_type): + """ + Create a write-telegram of type `telegram_type` out of this `ContiguousDataBlock`. + + Args: + telegram_type (class of LuxtronikSmartHomeWriteTelegram): + Type of the telegram to create. + + Returns: + LuxtronikSmartHomeWriteTelegram | None: + The created telegram or None in case of an error. + """ + data_arr = block.get_data_arr() + if data_arr is None: + LOGGER.error(f"Failed to create a {telegram_type} telegram! " \ + + "The provided data is not valid.") + return None + return telegram_type(block.first_addr, data_arr) + + def _create_telegram(self, block, type_name, read_not_write): + """ + Create a read or write-telegram out of this `ContiguousDataBlock`. + + Returns: + LuxtronikSmartHomeReadTelegram | LuxtronikSmartHomeWriteTelegram | None: + The created telegram or None in case of an error. + """ + if type_name == self.holdings.name and (read_not_write == READ): + return self._create_read_telegram(block, LuxtronikSmartHomeReadHoldingsTelegram) + if type_name == self.inputs.name and (read_not_write == READ): + return self._create_read_telegram(block, LuxtronikSmartHomeReadInputsTelegram) + if type_name == self.holdings.name and (read_not_write == WRITE): + return self._create_write_telegram(block, LuxtronikSmartHomeWriteHoldingsTelegram) + LOGGER.error(f"Could not create a telegram for {block}. Skip this operation.") + return None + + def _create_telegrams(self, blocks_list): + """ + Create read and/or write-telegrams out of the blocks list. + + Args: + blocks_list (list[ContiguousDataBlockList]): + List of contiguous block lists. + + Returns: + list[tuple(ContiguousDataBlock, LuxtronikSmartHomeReadTelegram + | LuxtronikSmartHomeWriteTelegram, bool)]: + Data-tuple for `_send_and_integrate` method. + """ + telegrams_data = [] + for blocks in blocks_list: + for block in blocks: + telegram = self._create_telegram(block, blocks.type_name, blocks.read_not_write) + if telegram is not None: + telegrams_data.append((block, telegram, blocks.read_not_write)) + return telegrams_data + + def _integrate_data(self, telegrams_data): + """ + Integrate the read data from telegrams back into the corresponding blocks. + '_create_telegrams' must be called up beforehand. + + Returns: + bool: True if all data could be integrated. + """ + success = True + for block, telegram, read_not_write in telegrams_data: + if (read_not_write == READ): + valid = block.integrate_data(telegram.data) + if not valid: + LOGGER.debug(f"Failed to integrate read data into {block}") + success &= valid + else: + # Reset write_pending flag + for part in block: + part.field.write_pending = False + return success + + +# Main methods ################################################################ + + def _prepare_read_field(self, definition, field): + """ + Check whether the field to be read is supported by the controller. + + Args: + definition (LuxtronikDefinition): + Definition object related to the field to be read. + field (Base): + Field to be read. + + Returns: + bool: True if all checks have been passed, False otherwise. + """ + # Skip non-supported fields + if not version_in_range(self._version, definition.since, definition.until): + field.raw = None + return False + + return True + + def _prepare_write_field(self, definition, field, safe, data): + """ + Check whether the field to be written + - is supported by the controller + - got data (either within the field or via `data`) + - is writeable (if `safe` is True) + and additionally integrates transferred data + + Args: + definition (LuxtronikDefinition): + Definition object related to the field to be read. + field (Base): + Field to be read. + + Returns: + bool: True if all checks have been passed, False otherwise. + """ + # Skip non-supported fields + if not version_in_range(self._version, definition.since, definition.until): + return False + + # Skip fields that do not carry user-data and not data is provided + if not field.write_pending and data is None: + return False + + # Abort if field is not writeable + if safe and not (definition.writeable and field.writeable): + LOGGER.warning("Field marked as non-writeable: " \ + + f"name={definition.name}, data={field.raw}") + return False + + # Override the field's data with the provided data + if data is not None: + field.value = data + + # Abort if insufficient data is provided + if not check_data(definition, field): + LOGGER.warning("Data error / insufficient data provided: " \ + + f"name={definition.name}, data={field.raw}") + return False + + return True + + def _collect_field(self, blocks_list, def_field_name_or_idx, definitions, \ + read_not_write, safe, data): + """ + Add a single field to the blocks list. + + The field may correspond to multiple registers and can be specified by + name (str), register index (int), or directly as a field or definition object. + Only if the controller supports the field will it be collected. + + Args: + blocks_list (list[ContiguousDataBlockList]): + List of contiguous block lists. + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + definitions (LuxtronikDefinitionsList): + List of definitions that contains the desired field definition. + read_not_write (bool): If True, the field is collected for read, otherwise for write. + safe (bool): If True, do not collect the field for write when it is marked as non-writeable. + data (list[int] | None): Optional raw data to override the field's data. + + Returns: + Base | None: The field object with integrated data, or None in case of an error. + """ + if def_field_name_or_idx is None: + return None + + definition, field = self._get_def_field_pair(def_field_name_or_idx, definitions) + + # create temporary definition for trial-and-error-mode + if self._version is None and definition is None and isinstance(def_field_name_or_idx, (int, str)): + definition = self._try_create_definition(def_field_name_or_idx, definitions) + if definition is None: + return None + field = definition.create_field() + + if definition is None: + # We have to clear non-supported fields here, see comment below + if (read_not_write == READ) and isinstance(def_field_name_or_idx, Base): + def_field_name_or_idx.raw = None + return None + + # _get_def_field_pair ensures that the field is supported, no need to call _prepare_read_field + #if (read_not_write == READ) and not self._prepare_read_field(definition, field): + # return None + if (read_not_write == WRITE) and not self._prepare_write_field(definition, field, safe, data): + return None + + blocks = ContiguousDataBlockList(definitions.name, read_not_write) + blocks.append_single(definition, field) + blocks_list.append(blocks) + return field + + def _collect_fields(self, blocks_list, data_vector, definitions, read_not_write): + """ + Add all fields to the blocks list. + Only by the controller supported fields will be collected. + + Args: + blocks_list (list[ContiguousDataBlockList]): + List of contiguous block lists. + data_vector (DataVectorSmartHome): The data vector class providing fields. + definitions (LuxtronikDefinitionsList): + List of definitions that contains the desired field definitions. + read_not_write (bool): If True, the fields are collected for read, otherwise for write. + """ + if not isinstance(data_vector, DataVectorSmartHome): + return + + if self._version is None: + # Trial-and-error mode: Add a block for every field + blocks = ContiguousDataBlockList(definitions.name, read_not_write) + if (read_not_write == READ): + for definition, field in data_vector.items(): + # _prepare_read_field will never fail, no need to call it + #if self._prepare_read_field(definition, field): + blocks.append_single(definition, field) + else: + for definition, field in data_vector.items(): + if self._prepare_write_field(definition, field, data_vector.safe, None): + blocks.append_single(definition, field) + if len(blocks) > 0: + blocks_list.append(blocks) + else: + if (read_not_write == READ): + # We can directly use the prepared read-blocks + data_vector.update_read_blocks() + if len(data_vector._read_blocks) > 0: + blocks_list.append(data_vector._read_blocks) + else: + blocks = ContiguousDataBlockList(definitions.name, read_not_write) + # Organize data into contiguous blocks + for definition, field in data_vector.items(): + if self._prepare_write_field(definition, field, data_vector.safe, None): + blocks.collect(definition, field) + if len(blocks) > 0: + blocks_list.append(blocks) + + def _send_and_integrate(self, blocks_list): + """ + Generate all necessary telegrams and then send them. + Subsequently, the retrieved data is integrated into the provided fields. + + Args: + blocks_list (list[ContiguousDataBlockList]): + List of contiguous block lists. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + # Convert the list of contiguous blocks to telegrams + telegrams_data = self._create_telegrams(blocks_list) + # Send all telegrams. The retrieved data is returned within the telegrams + telegrams = [data[1] for data in telegrams_data] + success = self._interface.send(telegrams) + # Transfer the data from the telegrams into the fields + success &= self._integrate_data(telegrams_data) + return success + + +# Collect and send methods #################################################### + + def collect_holding_for_read(self, def_field_name_or_idx): + """ + Collect a single field to read. + Only if the controller supports the field will it be collected. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + + Returns: + Base | None: The field object with integrated data, or None in case of an error. + """ + return self._collect_field(self._blocks_list, def_field_name_or_idx, \ + self.holdings, READ, SAFE, None) + + def collect_holding_for_write(self, def_field_name_or_idx, data=None, safe=True): + """ + Collect a single field to write. + Only if the controller supports the field will it be collected. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + data (list[int] | None): Optional raw data to override the field's data. + safe (bool): If True, do not collect the field when it is marked as non-writeable. + + Returns: + Base | None: The field object with integrated data, or None in case of an error. + """ + return self._collect_field(self._blocks_list, def_field_name_or_idx, \ + self.holdings, WRITE, safe, data) + + def collect_holding(self, def_field_name_or_idx, data=None, safe=True): + """ + First, collect a single field for writing and, in addition, for reading. + Only if the controller supports the field will it be collected. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + data (list[int] | None): Optional raw data to override the field's data to write. + safe (bool): If True, do not collect the field for write when it is marked as non-writeable. + + Returns: + Base | None: The field object with integrated data, or None in case of an error. + """ + field = self._collect_field(self._blocks_list, def_field_name_or_idx, \ + self.holdings, WRITE, safe, data) + if field is None: + return None + self._collect_field(self._blocks_list, field, \ + self.holdings, READ, SAFE, None) + return field + + def collect_holdings_for_read(self, holdings): + """ + Collect all fields of a holding data vector for reading + that are supported by the controller. All others are filled with None. + + Args: + holdings (Holdings): The holdings object containing field data. + If None is provided, nothing is collected. + """ + self._collect_fields(self._blocks_list, holdings, self.holdings, READ) + + def collect_holdings_for_write(self, holdings): + """ + Collect all fields of a holding data vector for writing + that are supported by the controller. + + Args: + holdings (Holdings): The holdings object containing field data. + If None is provided, nothing is collected. + """ + self._collect_fields(self._blocks_list, holdings, self.holdings, WRITE) + + def collect_holdings(self, holdings): + """ + First, collect all fields of a holding data vector for writing and, + in addition, all fields for reading that are supported by the controller. + All others are filled with None when reading. + + Args: + holdings (Holdings): The holdings object containing field data. + If None is provided, nothing is collected. + """ + self.collect_holdings_for_write(holdings) + self.collect_holdings_for_read(holdings) + + def collect_input(self, def_field_name_or_idx): + """ + Collect a single field to read. + Only if the controller supports the field will it be collected. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + + Returns: + Base | None: The field object with integrated data, or None in case of an error. + """ + return self._collect_field(self._blocks_list, def_field_name_or_idx, \ + self.inputs, READ, SAFE, None) + + def collect_inputs(self, inputs): + """ + Collect all fields of a inputs data vector for reading + that are supported by the controller. All others are filled with None. + + Args: + inputs (Inputs): The inputs object containing field data. + If None is provided, nothing is collected. + """ + self._collect_fields(self._blocks_list, inputs, self.inputs, READ) + + def collect_data_for_read(self, data): + """ + Collect all fields of all data vectors within the collection for reading + that are supported by the controller. All others are filled with None. + + Args: + data (LuxtronikSmartHomeData): The data vector collection containing field data. + If None is provided, nothing is collected. + """ + if not isinstance(data, LuxtronikSmartHomeData): + return + self.collect_holdings_for_read(data.holdings) + self.collect_inputs(data.inputs) + + def collect_data_for_write(self, data): + """ + Collect all fields of all data vectors within the collection for writing + that are supported by the controller. + + Args: + data (LuxtronikSmartHomeData): The data vector collection containing field data. + If None is provided, nothing is collected. + """ + if not isinstance(data, LuxtronikSmartHomeData): + return + self.collect_holdings_for_write(data.holdings) + + def collect_data(self, data): + """ + First, collect all fields of all data vectors within the collection for writing and, + in addition, all fields for reading that are supported by the controller. + All others are filled with None when reading. + + Args: + data (LuxtronikSmartHomeData): The data vector collection containing field data. + If None is provided, nothing is collected. + """ + if not isinstance(data, LuxtronikSmartHomeData): + return + self.collect_data_for_write(data) + self.collect_data_for_read(data) + + def send(self): + """ + Send all collected operations via the "collect" methods. + Afterwards clears the internal list of operations. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + success = self._send_and_integrate(self._blocks_list) + self._blocks_list = [] + return success + + +# Holding methods ############################################################# + + @classproperty + def holdings(cls): + """Returns the holdings dictionary containing all available holding definitions.""" + return HOLDINGS_DEFINITIONS + + def get_holdings(self, full_not_filtered=False): + """ + Returns either the holdings dictionary containing all available holding definitions + or a version-dependent variant of it, depending on the parameter `full_not_filtered`. + + Args: + full_not_filtered (LuxtronikDefinition | str | int): + Parameter for selecting the returned definitions list. + + Returns: + LuxtronikDefinitionsList: List of definitions. + """ + if full_not_filtered: + return self.holdings + else: + return self._filtered_holdings + + def create_holding(self, def_name_or_idx): + """ + Create a holding field if the related definition matches the stored version. + + Args: + def_name_or_idx (LuxtronikDefinition | str | int): + Name, register index or definition of the holding. + + Returns: + Base | None: On success the created field, otherwise None. + + """ + _, field = self._get_def_field_pair(def_name_or_idx, HOLDINGS_DEFINITIONS) + return field + + def create_holdings(self, safe=SAFE): + """ + Create a holdings data-vector only with fields that match the stored version. + + Args: + safe (bool): If true, prevent holding fields marked as + not secure from being written to. + + Returns: + DataVectorSmartHome: The created data-vector. + """ + return Holdings(self._version, safe) + + def create_empty_holdings(self, safe=SAFE): + """ + Create an empty holdings data-vector for the stored version. + + Args: + safe (bool): If true, prevent holding fields marked as + not secure from being written to. + + Returns: + DataVectorSmartHome: The created data-vector. + """ + return Holdings.empty(self._version, safe) + + def read_holding(self, def_field_name_or_idx): + """ + Read the data of a single field. + + The field may correspond to multiple registers and can be specified by + name (str), register index (int), or directly as a field or definition object. + Only if the controller supports the field will it be read. + The required offset is added automatically. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + + Returns: + Base | None: The field object containing the read data, + or None if the read failed. + """ + field = self.collect_holding_for_read(def_field_name_or_idx) + success = self.send() + return field if success else None + + def read_holdings(self, holdings=None): + """ + Read the data of all fields within the holdings data vector + that are supported by the controller. All others are filled with None. + + Args: + holdings (Holdings | None): Optional existing holdings object. + If None is provided, a new instance is created. + + Returns: + Holdings: The passed / created holdings data vector. + """ + if not isinstance(holdings, Holdings): + holdings = self.create_holdings(SAFE) + + self.collect_holdings_for_read(holdings) + self.send() + return holdings + + def write_holding(self, def_field_name_or_idx, data=None, safe=True): + """ + Write all provided data or the field's own data to a field. + + The field may correspond to multiple registers and can be specified by + name (str), register index (int), or directly as a field or definition object. + Only if the controller supports the field will it be written. + The required offset is added automatically. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + data (list[int] | None): Optional raw data to override the field's data. + safe (bool): If True, aborts when the field is marked as non-writeable. + + Returns: + Base | None: The written field object, or None if the write failed. + """ + field = self.collect_holding_for_write(def_field_name_or_idx, data, safe) + success = self.send() + return field if success else None + + def write_holdings(self, holdings): + """ + Write the data of all fields within the holdings data vector + that are supported by the controller. + + Args: + holdings (Holdings): The holdings object containing field data. + If None is provided, the write is aborted. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + if not isinstance(holdings, Holdings): + LOGGER.warning("Abort write! No data to write provided.") + return False + + self.collect_holdings_for_write(holdings) + return self.send() + + def write_and_read_holdings(self, holdings): + """ + Write and then read the data of all fields within the holdings data vector + that are supported by the controller. All others are filled with None. + + Args: + holdings (Holdings): The holdings object containing field data. + If None is provided, the write and read is aborted. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + if not isinstance(holdings, Holdings): + LOGGER.warning("Abort write and read! No data to write provided.") + return False + + self.collect_holdings(holdings) + return self.send() + + +# Input methods ############################################################### + + @classproperty + def inputs(cls): + """Returns the inputs dictionary containing all available input definitions.""" + return INPUTS_DEFINITIONS + + def get_inputs(self, full_not_filtered=False): + """ + Returns either the inputs dictionary containing all available input definitions + or a version-dependent variant of it, depending on the parameter `full_not_filtered`. + + Args: + full_not_filtered (LuxtronikDefinition | str | int): + Parameter for selecting the returned definitions list. + + Returns: + LuxtronikDefinitionsList: List of definitions. + """ + if full_not_filtered: + return self.inputs + else: + return self._filtered_inputs + + def create_input(self, def_name_or_idx): + """ + Create an input field if the related definition matches the stored version. + + Args: + def_name_or_idx (LuxtronikDefinition | str | int): + Name, register index or definition of the holding. + + Returns: + Base | None: On success the created field, otherwise None. + + """ + _, field = self._get_def_field_pair(def_name_or_idx, INPUTS_DEFINITIONS) + return field + + def create_inputs(self): + """ + Create an inputs data-vector only with fields that match the stored version. + + Returns: + DataVectorSmartHome: The created data-vector. + """ + return Inputs(self._version, SAFE) + + def create_empty_inputs(self): + """ + Create an empty inputs data-vector for the stored version. + + Returns: + DataVectorSmartHome: The created data-vector. + """ + return Inputs.empty(self._version, SAFE) + + def read_input(self, def_field_name_or_idx): + """ + Read the data of a single field. + + The field may correspond to multiple registers and can be specified by + name (str), register index (int), or directly as a field or definition object. + Only if the controller supports the field will it be read. + The required offset is added automatically. + + Args: + def_field_name_or_idx (LuxtronikDefinition | Base | str | int): + Field name, register index, field object or definition object. + + Returns: + Base | None: The field object containing the read data, + or None if the read failed. + """ + field = self.collect_input(def_field_name_or_idx) + success = self.send() + return field if success else None + + def read_inputs(self, inputs=None): + """ + Read the data of all fields within the inputs data vector + that are supported by the controller. All others are filled with None. + + Args: + inputs (Inputs | None): Optional existing inputs object. + If None is provided, a new instance is created. + + Returns: + Inputs: The passed / created inputs data vector. + """ + if not isinstance(inputs, Inputs): + inputs = self.create_inputs() + + self.collect_inputs(inputs) + self.send() + return inputs + + +# Data methods ################################################################ + + def create_data(self, safe=SAFE): + """ + Create a data vector collection only with fields that match the stored version. + + Args: + safe (bool): If true, prevent holding fields marked as + not secure from being written to. + + Returns: + LuxtronikSmartHomeData: The created data-collection. + """ + return LuxtronikSmartHomeData(None, None, self._version, safe) + + def create_empty_data(self, safe=SAFE): + """ + Create an empty data-collection for the stored version. + + Args: + safe (bool): If true, prevent holding fields marked as + not secure from being written to. + + Returns: + LuxtronikSmartHomeData: The created data-collection. + """ + return LuxtronikSmartHomeData.empty(self._version, safe) + + def read_data(self, data=None): + """ + Read the data of all fields within the data vector collection + that are supported by the controller. All others are filled with None. + + Args: + data (LuxtronikSmartHomeData | None): Optional existing data vector collection. + If None is provided, a new instance is created. + + Returns: + LuxtronikSmartHomeData: The passed / created data vector collection. + """ + if not isinstance(data, LuxtronikSmartHomeData): + data = self.create_data(SAFE) + + self.collect_data_for_read(data) + self.send() + return data + + def write_data(self, data): + """ + Write the data of all fields within the data vector collection + that are supported by the controller. + + Args: + data (LuxtronikSmartHomeData): The data vector collection containing field data. + If None is provided, the write is aborted. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + if not isinstance(data, LuxtronikSmartHomeData): + LOGGER.warning("Abort write! No data to write provided.") + return False + + self.collect_data_for_write(data) + return self.send() + + def write_and_read_data(self, data): + """ + Write and then read the data of all fields within the data vector collection + that are supported by the controller. All others are filled with None. + + Args: + data (LuxtronikSmartHomeData): The data vector collection containing field data. + If None is provided, the write and read is aborted. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + if not isinstance(data, LuxtronikSmartHomeData): + LOGGER.warning("Abort write and read! No data to write provided.") + return False + + self.collect_data(data) + return self.send() + + +# Debug methods ############################################################### + + def read_holding_raw(self, index, count=1): + """ + Read a specified number of registers starting at the given index, + without performing version consistency checks. + Mainly for debugging purposes. + + The required offset is added automatically. + + Args: + index (int): The starting register index. + count (int): Number of registers to read (Defaults to 1). + + Returns: + list[int] | None: On success the list of read register values, + otherwise None. + """ + telegram = LuxtronikSmartHomeReadHoldingsTelegram(index + HOLDINGS_DEFINITIONS.offset, count) + success = self._interface.send(telegram) + return telegram.data if success else None + + def write_holding_raw(self, index, data_arr): + """ + Write all provided data to registers at the specified index, + without performing version consistency checks. + Mainly for debugging purposes. + + The required offset is added automatically. + + Args: + index (int): Starting register index. + data_arr (list[int]): Values to be written to the registers. + + Returns: + bool: True if no errors occurred, otherwise False. + """ + telegram = LuxtronikSmartHomeWriteHoldingsTelegram(index + HOLDINGS_DEFINITIONS.offset, data_arr) + return self._interface.send(telegram) + + def read_input_raw(self, index, count=1): + """ + Read a specified number of registers starting at the given index, + without performing version consistency checks. + Mainly for debugging purposes. + + The required offset is added automatically. + + Args: + index (int): The starting register index. + count (int): Number of registers to read (Defaults to 1). + + Returns: + list[int] | None: On success the list of read register values, + otherwise None. + """ + telegram = LuxtronikSmartHomeReadInputsTelegram(index + self.inputs.offset, count) + success = self._interface.send(telegram) + return telegram.data if success else None + + +# Standard methods ############################################################ +# Be careful with method names! +# Identical named methods could be overridden in a derived class. + + def read(self, data=None): + """ + Calls `read_data()`. Please check its documentation. + Exists mainly to standardize the various interfaces. + """ + return self.read_data(data) + + def write(self, data): + """ + Calls `write_data()`. Please check its documentation. + Exists mainly to standardize the various interfaces. + """ + return self.write_data(data) + + def write_and_read(self, data): + """ + Calls `write_and_read_data()`. Please check its documentation. + Exists mainly to standardize the various interfaces. + """ + return self.write_and_read_data(data) \ No newline at end of file diff --git a/luxtronik/shi/modbus.py b/luxtronik/shi/modbus.py index 3876ae1..bfd214a 100644 --- a/luxtronik/shi/modbus.py +++ b/luxtronik/shi/modbus.py @@ -288,7 +288,7 @@ def send(self, telegrams): if t.count > 0: total_count += t.count else: - LOGGER.warning(f"No data requested/provided: addr={t.addr}, count={t.count}") + LOGGER.debug(f"No data requested/provided: addr={t.addr}, count={t.count}") # Exit the function if no operation is necessary if total_count <= 0: diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index 0f23246..6ea85b0 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -8,6 +8,7 @@ LuxtronikDefinition, LuxtronikDefinitionsDictionary, ) +from luxtronik.shi.contiguous import ContiguousDataBlockList ############################################################################### # Smart home interface data-vector @@ -96,6 +97,11 @@ def _init_instance(self, version, safe): # field-definition pairs to keep the index-sorted order when adding new entries self._items = [] # list of tuples, 0: definition, 1: field + # Instead of re-create the block-list on every read, we just update it + # on first time used or on next time used if some fields are added. + self._read_blocks_up_to_date = False + self._read_blocks = ContiguousDataBlockList(self.name, True) + def __init__(self, version=LUXTRONIK_LATEST_SHI_VERSION, safe=True): """ Initialize the data-vector instance. @@ -110,7 +116,7 @@ def __init__(self, version=LUXTRONIK_LATEST_SHI_VERSION, safe=True): This ensures that the data vector only contain valid fields. If None is passed, all available fields are added. (default: LUXTRONIK_LATEST_SHI_VERSION) - safe (bool): If false, prevent fields marked as + safe (bool): If true, prevent fields marked as not secure from being written to. """ self._init_instance(version, safe) @@ -133,7 +139,7 @@ def empty(cls, version=LUXTRONIK_LATEST_SHI_VERSION, safe=True): This ensures that the data vector only contain valid fields. If None is passed, all available fields can be added. (default: LUXTRONIK_LATEST_SHI_VERSION) - safe (bool): If false, prevent fields marked as + safe (bool): If true, prevent fields marked as not secure from being written to. """ obj = cls.__new__(cls) # this don't call __init__() @@ -272,6 +278,7 @@ def add(self, def_field_name_or_idx, alias=None): if version_in_range(self._version, definition.since, definition.until): if field is None: field = definition.create_field() + self._read_blocks_up_to_date = False self._add(definition, field, alias) # sort _items by definition.index # _items is a list of tuples, 0: definition, 1: field @@ -305,6 +312,22 @@ def register_alias(self, def_field_name_or_idx, alias): return self._field_lookup.get(definition, None) +# Data-blocks methods ######################################################### + + def update_read_blocks(self): + """ + (Re-)Create the data block list (`ContiguousDataBlockList`) for read-operations. + + Since the data blocks do not change as long as no new fields are added, + it is sufficient to regenerate them only when a change occurs. + """ + if not self._read_blocks_up_to_date: + self._read_blocks.clear() + for definition, field in self._items: + self._read_blocks.collect(definition, field) + self._read_blocks_up_to_date = True + + # Data and access methods ##################################################### def parse(self, raw_data): diff --git a/tests/shi/test_contiguous.py b/tests/shi/test_contiguous.py new file mode 100644 index 0000000..3ca4e2d --- /dev/null +++ b/tests/shi/test_contiguous.py @@ -0,0 +1,366 @@ + +from luxtronik.datatypes import Base +from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE +from luxtronik.shi.definitions import LuxtronikDefinition +from luxtronik.shi.contiguous import ( + ContiguousDataPart, + ContiguousDataBlock, + ContiguousDataBlockList, +) + + +def_a1 = LuxtronikDefinition({ + 'index': 1, + 'count': 1, +}, 'test', 100) +def_a = LuxtronikDefinition({ + 'index': 1, + 'count': 2, +}, 'test', 100) +def_b = LuxtronikDefinition({ + 'index': 3, + 'count': 1, +}, 'test', 100) +def_c = LuxtronikDefinition({ + 'index': 4, + 'count': 3, +}, 'test', 100) +def_c1 = LuxtronikDefinition({ + 'index': 4, + 'count': 1, +}, 'test', 100) +def_c2 = LuxtronikDefinition({ + 'index': 5, + 'count': 1, +}, 'test', 100) +defs = [] + +field_a1 = Base('field_a') +field_a = Base('field_a') +field_b = Base('field_b') +field_c = Base('field_c') +field_c1 = Base('field_c1') +field_c2 = Base('field_c2') + + +class TestContiguousDataPart: + + def test_init(self): + part = ContiguousDataPart(def_a, field_a) + assert part.index == 1 + assert part.addr == 101 + assert part.count == 2 + assert part.field == field_a + assert part.definition == def_a + + def test_repr(self): + part = ContiguousDataPart(def_a, None) + assert repr(part) == "(1, 2)" + + def test_get_data(self): + part = ContiguousDataPart(def_a, field_a) + field_a.raw = [4, 2] + assert part.get_data_arr() == [4, 2] + + field_a.raw = [1, 3, 5] + assert part.get_data_arr() is None + + field_a.raw = [9] + assert part.get_data_arr() is None + + part = ContiguousDataPart(def_a1, field_a1) + + field_a1.raw = [8] + assert part.get_data_arr() == [8] + + field_a1.raw = 7 + assert part.get_data_arr() == [7] + + def test_integrate_data(self): + part = ContiguousDataPart(def_a, field_a) + + part.integrate_data([1, 5, 7, 9], 0) + assert part.field.raw == [1, 5] + + part.integrate_data([1, 5, 7, 9]) + assert part.field.raw == [5, 7] + + part.integrate_data([1, 5, 7, 9], 2) + assert part.field.raw == [7, 9] + + part.integrate_data([1, 5, 7, 9], 3) + assert part.field.raw is None + + part.integrate_data([1, 5, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE, 9], 1) + assert part.field.raw is None + + part = ContiguousDataPart(def_c1, field_c1) + + part.integrate_data([2, 4, 6], 1) + assert part.field.raw == 4 + + part.integrate_data([2, 4, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE], 2) + assert part.field.raw is None + + part.integrate_data([2, 4, 6], 5) + assert part.field.raw is None + + +class TestContiguousDataBlock: + + def test_clear(self): + block = ContiguousDataBlock() + block.add(def_a, None) + block.add(def_b, None) + block.add(def_c, None) + block.clear() + assert len(block) == 0 + assert block._last_idx == -1 + + def test_iter(self): + block = ContiguousDataBlock.create_and_add(def_a, None) + block.add(def_b, None) + block.add(def_c, None) + for index, part in enumerate(block): + if index == 0: + assert part.index == 1 + assert part.count == 2 + if index == 1: + assert part.index == 3 + assert part.count == 1 + if index == 2: + assert part.index == 4 + assert part.count == 3 + + def test_add(self): + block = ContiguousDataBlock() + + can_add = block.can_add(def_b) + assert can_add + block.add(def_b, None) + assert len(block) == 1 + + can_add = block.can_add(def_c2) + assert not can_add + block.add(def_c2, None) + assert len(block) == 2 + + can_add = block.can_add(def_a) + assert not can_add + block.add(def_a, None) + assert len(block) == 3 + + def test_first_index(self): + block = ContiguousDataBlock() + assert block.first_index == 0 + + block.add(def_b,field_b) + block.add(def_c, field_c) + + assert block.first_index == 3 + assert block.first_addr == 103 + assert block.overall_count == 4 + + def test_overall_count(self): + block = ContiguousDataBlock() + assert block.overall_count == 0 + + # Several parts for one register + block.add(def_a1, field_a1) + block.add(def_a, field_a) + block.add(def_b, field_b) + block.add(def_c, field_c) + block.add(def_c1, field_c1) + block.add(def_c2, field_c2) + + assert block.first_index == 1 + assert block.first_addr == 101 + assert block.overall_count == 6 + + def test_integrate_data(self): + block = ContiguousDataBlock() + + block.add(def_a1, field_a1) + block.add(def_a, field_a) + block.add(def_b, field_b) + + valid = block.integrate_data(None) + assert not valid + + valid = block.integrate_data([11, 12, 13]) + assert valid + assert block[0].field.raw == 11 + assert block[1].field.raw == [11, 12] + assert block[2].field.raw == 13 + + block.add(def_c, field_c) + + block.integrate_data([7, 6, 5, 4, 3, 2]) + assert valid + assert block[0].field.raw == 7 + assert block[1].field.raw == [7, 6] + assert block[2].field.raw == 5 + assert block[3].field.raw == [4, 3, 2] + + block.add(def_c1, field_c1) + block.add(def_c2, field_c2) + + block.integrate_data([21, 22, 23, 24, 25, 26]) + assert valid + assert block[0].field.raw == 21 + assert block[1].field.raw == [21, 22] + assert block[2].field.raw == 23 + assert block[3].field.raw == [24, 25, 26] + assert block[4].field.raw == 24 + assert block[5].field.raw == 25 + + valid = block.integrate_data([5, 4, 3]) + assert not valid + + def test_get_data(self): + block = ContiguousDataBlock() + + data_arr = block.get_data_arr() + assert data_arr is None + + # Multiple data for a single register #1 + field_a1.raw = 35 + field_a.raw = [56, 57] + block.add(def_a1, field_a1) + block.add(def_a, field_a) + + data_arr = block.get_data_arr() + assert data_arr is None + + block = ContiguousDataBlock() + field_b.raw = 11 + field_c.raw = [21, 22, 23] + block.add(def_b, field_b) + block.add(def_c, field_c) + + data_arr = block.get_data_arr() + assert data_arr == [11, 21, 22, 23] + + # provided data greater than overall count + orig_last = block._last_idx + block._last_idx = orig_last - 1 + data_arr = block.get_data_arr() + assert data_arr is None + block._last_idx = orig_last + + # To less data for one register + field_c.raw = [21, 22] + data_arr = block.get_data_arr() + assert data_arr is None + + # Multiple data for a single register #2 + field_b.raw = 11 + field_c.raw = [21, 22, 23] + field_c1.raw = 6 + field_c2.raw = 7 + block.add(def_c1, field_c1) + block.add(def_c2, field_c2) + + data_arr = block.get_data_arr() + assert data_arr is None + + def test_repr(self): + block = ContiguousDataBlock() + text_empty = repr(block) + assert text_empty + + block.add(def_a1, field_a1) + block.add(def_a, field_a) + text = repr(block) + assert text + assert text > text_empty + + +class TestContiguousDataBlockList: + + def test_init(self): + blocks = ContiguousDataBlockList('foo', True) + assert len(blocks) == 0 + assert blocks.type_name == 'foo' + assert blocks.read_not_write + + def test_iter(self): + blocks = ContiguousDataBlockList('foo', True) + blocks.append_single(def_a, None) + blocks.append_single(def_b, None) + blocks.collect(def_c1, None) + blocks.collect(def_c2, None) + assert len(blocks) == 3 + for index, block in enumerate(blocks): + if index == 0: + assert len(block) == 1 + assert block.first_index == 1 + assert block.overall_count == 2 + if index == 1: + assert len(block) == 1 + assert block.first_index == 3 + assert block.overall_count == 1 + if index == 2: + assert len(block) == 2 + assert block.first_index == 4 + assert block.overall_count == 2 + + blocks.clear() + assert len(blocks) == 0 + + def test_collect(self): + blocks = ContiguousDataBlockList('foo', True) + # First block + blocks.collect(def_c1, None) + blocks.collect(def_c2, None) + # Second block + blocks.collect(def_b, None) + blocks.collect(def_c, None) + # Third block + blocks.collect(def_a1, None) + blocks.collect(def_a, None) + # Fourth block + blocks.append_single(def_b, None) + # Fifth block + blocks.collect(def_c, None) + + assert len(blocks) == 5 + assert blocks[0].first_index == 4 + assert blocks[0].overall_count == 2 + assert blocks[1].first_index == 3 + assert blocks[1].overall_count == 4 + assert blocks[2].first_index == 1 + assert blocks[2].overall_count == 2 + assert blocks[3].first_index == 3 + assert blocks[3].overall_count == 1 + assert blocks[4].first_index == 4 + assert blocks[4].overall_count == 3 + + def test_append(self): + blocks = ContiguousDataBlockList('foo', True) + + block = ContiguousDataBlock() + block.add(def_a, None) + block.add(def_b, None) + block.add(def_c, None) + + blocks.append(block) + assert len(blocks) == 1 + assert blocks[0].first_index == 1 + assert blocks[0].overall_count == 6 + + def test_repr(self): + blocks = ContiguousDataBlockList('foo', True) + text_empty = repr(blocks) + assert text_empty + + block = ContiguousDataBlock() + block.add(def_a, None) + block.add(def_b, None) + block.add(def_c, None) + + blocks.append(block) + + text = repr(blocks) + assert text + assert len(text) > len(text_empty) diff --git a/tests/shi/test_definitions.py b/tests/shi/test_definitions.py index 7c11b8b..1077379 100644 --- a/tests/shi/test_definitions.py +++ b/tests/shi/test_definitions.py @@ -108,6 +108,11 @@ def test_create_field(self): field = definition.create_field() assert field is None + def test_repr(self): + definition = LuxtronikDefinition.unknown(2, 'Foo', 30) + text = repr(definition) + assert text + class TestDefinitionsDict: @@ -331,6 +336,7 @@ def test_init(self): assert len(definitions) == 4 assert definitions.name == 'foo' assert definitions.offset == 100 + assert definitions._version is None assert 5 in definitions assert 'field_9a' in definitions assert definitions[7] in definitions @@ -341,6 +347,27 @@ def test_init(self): assert definitions.get(9).addr == 109 assert definitions.get(9).name == "field_9" + def test_filtered(self): + definitions = LuxtronikDefinitionsList(self.def_list, 'foo', 100) + + filtered1 = LuxtronikDefinitionsList.filtered(definitions, (1, 1, 0, 0)) + assert filtered1.name == 'foo' + assert filtered1.offset == 100 + assert filtered1._version == (1, 1, 0, 0) + assert 'field_5' in filtered1 # 1.1 - 1.2 + assert 'field_7' not in filtered1 # 3.1 - + assert 'field_9a' in filtered1 # - 1.3 + assert 'field_9' in filtered1 # - 3.3 + assert 'field_invalid' not in filtered1 # invalid + + filtered1 = LuxtronikDefinitionsList.filtered(definitions, (3, 2, 0, 0)) + assert filtered1._version == (3, 2, 0, 0) + assert 'field_5' not in filtered1 # 1.1 - 1.2 + assert 'field_7' in filtered1 # 3.1 - + assert 'field_9a' not in filtered1 # - 1.3 + assert 'field_9' in filtered1 # - 3.3 + assert 'field_invalid' not in filtered1 # invalid + def test_iter(self): definitions = LuxtronikDefinitionsList(self.def_list, 'foo', 100) @@ -427,6 +454,11 @@ def test_add(self): }) assert added_4 is None + def test_repr(self): + definitions = LuxtronikDefinitionsList(self.def_list, 'foo', 100) + text = repr(definitions) + assert text + class TestDefinitionFieldPair: @@ -452,8 +484,8 @@ def test_data_arr(self): definition._count = 2 field.raw = [4, 8, 1] arr = get_data_arr(definition, field) - assert arr == [4, 8] - assert check_data(definition, field) + assert arr is None + assert not check_data(definition, field) # insufficient data definition._count = 2 diff --git a/tests/shi/test_interface.py b/tests/shi/test_interface.py new file mode 100644 index 0000000..ee9172e --- /dev/null +++ b/tests/shi/test_interface.py @@ -0,0 +1,1323 @@ +import pytest +from unittest.mock import patch + +from luxtronik.datatypes import Base, Unknown + +from luxtronik.shi.constants import ( + LUXTRONIK_LATEST_SHI_VERSION, + LUXTRONIK_FIRST_VERSION_WITH_SHI, + LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE, +) +from luxtronik.shi.common import ( + LuxtronikSmartHomeReadTelegram, + LuxtronikSmartHomeReadHoldingsTelegram, + LuxtronikSmartHomeReadInputsTelegram, + LuxtronikSmartHomeWriteHoldingsTelegram, +) +from luxtronik.shi.definitions import LuxtronikDefinition +from luxtronik.shi.holdings import HOLDINGS_DEFINITIONS, Holdings +from luxtronik.shi.inputs import INPUTS_DEFINITIONS +from luxtronik.shi.contiguous import ( + ContiguousDataBlock, + ContiguousDataBlockList, +) +from luxtronik.shi.interface import ( + LuxtronikSmartHomeData, + LuxtronikSmartHomeInterface, +) +from luxtronik.shi import create_modbus_tcp + +############################################################################### +# Fake modbus client +############################################################################### + +class FakeInterface: + telegram_list = [] + result = True + + def __init__(self, host="", port="", timeout=0): + pass + + def _get_data(self, addr, count): + return [addr - 10000 + i for i in range(count)] + + def read_inputs(self, addr, count): + return self._get_data(addr, count) if self.result else None + + def send(self, telegrams): + if not isinstance(telegrams, list): + telegrams = [telegrams] + FakeInterface.telegram_list = telegrams + + for t in telegrams: + if isinstance(t, LuxtronikSmartHomeReadTelegram): + t.data = self._get_data(t.addr, t.count) + return self.result + + +############################################################################### +# Tests +############################################################################### + +class TestLuxtronikSmartHomeData: + + def test_init(self): + data1 = LuxtronikSmartHomeData() + assert data1.holdings is not None + assert data1.holdings.version == LUXTRONIK_LATEST_SHI_VERSION + assert data1.holdings.safe + assert data1.inputs is not None + assert data1.inputs.version == LUXTRONIK_LATEST_SHI_VERSION + assert data1.inputs.safe + + data2 = LuxtronikSmartHomeData(data1.holdings) + assert data2.holdings is data1.holdings + assert data2.inputs is not data1.inputs + + def test_empty(self): + data = LuxtronikSmartHomeData.empty((1, 2, 0, 0), False) + assert data.holdings is not None + assert not data.holdings.safe + assert data.holdings.version == (1, 2, 0, 0) + assert data.inputs is not None + assert data.inputs.safe + assert data.inputs.version == (1, 2, 0, 0) + + +@patch("luxtronik.shi.LuxtronikModbusTcpInterface", FakeInterface) +class TestLuxtronikSmartHomeInterface: + + @classmethod + def setup_class(cls): + cls.interface = LuxtronikSmartHomeInterface(FakeInterface(), LUXTRONIK_FIRST_VERSION_WITH_SHI) + + def test_init(self): + assert isinstance(self.interface._interface, FakeInterface) + assert self.interface._blocks_list == [] + assert self.interface.version == LUXTRONIK_FIRST_VERSION_WITH_SHI + assert len(self.interface._filtered_holdings) > 0 + assert len(self.interface._filtered_inputs) > 0 + + def test_get(self): + assert self.interface.holdings is HOLDINGS_DEFINITIONS + assert self.interface.holdings is self.interface.get_holdings(True) + assert self.interface.holdings is not self.interface.get_holdings(False) + assert self.interface.inputs is INPUTS_DEFINITIONS + assert self.interface.inputs is self.interface.get_inputs(True) + assert self.interface.inputs is not self.interface.get_inputs(False) + + # via index + definition = self.interface._get_definition(2, HOLDINGS_DEFINITIONS) + assert definition is HOLDINGS_DEFINITIONS[2] + + # via name + definition, field = self.interface._get_def_field_pair("hot_water_mode", HOLDINGS_DEFINITIONS) + assert definition is HOLDINGS_DEFINITIONS[5] + assert isinstance(field, Base) + assert field.name == definition.name + + # via definition + definition = self.interface._get_definition(HOLDINGS_DEFINITIONS[6], HOLDINGS_DEFINITIONS) + assert definition is HOLDINGS_DEFINITIONS[6] + + # via field + field_52 = Base("lock_cooling", False) + definition, field = self.interface._get_def_field_pair(field_52, HOLDINGS_DEFINITIONS) + assert definition is HOLDINGS_DEFINITIONS[52] + assert field is field_52 + assert field.name == definition.name + + # unsupported via index + definition, field = self.interface._get_def_field_pair(109, INPUTS_DEFINITIONS) + assert definition is None + assert field is None + + definition = self.interface.inputs.get(109) + assert definition is INPUTS_DEFINITIONS[109] + + # unsupported via name + definition = self.interface._get_definition("mc1_heat_level", HOLDINGS_DEFINITIONS) + assert definition is None + + definition = self.interface.holdings.get("mc1_heat_level") + assert definition is HOLDINGS_DEFINITIONS["mc1_heat_level"] + + # unsupported via definition + definition, field = self.interface._get_def_field_pair(HOLDINGS_DEFINITIONS[23], HOLDINGS_DEFINITIONS) + assert definition is None + assert field is None + + # unsupported via field + field_500 = Base("unknown_input_500", False) + definition, field = self.interface._get_def_field_pair(field_500, INPUTS_DEFINITIONS) + assert definition is None + assert field is None + + @pytest.mark.parametrize( + "name, index", + [ + ("UNKNOWNINPUT", None), + ("UNKNOWN_INPUT", None), + ("UNKNOWN_INPUT_4", 4), + ("UNKNOWN_INPUT_4_5", None), + ("UNKNOWN_INPUT_four", None), + ("unknown_input_8", 8), + ("foo_bar_2", 2), + ] + ) + def test_index_from_name(self, name, index): + idx = self.interface._get_index_from_name(name) + assert idx == index + + def test_try_create(self): + # get unsupported via name + definition = self.interface._try_create_definition("heating_level", HOLDINGS_DEFINITIONS) + assert definition is HOLDINGS_DEFINITIONS["heating_level"] + + # get unsupported via definition + definition = self.interface._try_create_definition(HOLDINGS_DEFINITIONS[8], HOLDINGS_DEFINITIONS) + assert definition is HOLDINGS_DEFINITIONS[8] + + # create by name + definition = self.interface._try_create_definition("unKnOWn_foo_4", HOLDINGS_DEFINITIONS) + assert definition not in HOLDINGS_DEFINITIONS + assert definition.name == "unknown_holding_4" + assert definition.index == 4 + assert definition.count == 1 + assert not definition.writeable + assert definition.data_type is Unknown + + # fail by name + definition = self.interface._try_create_definition("unKnOWn_foo4", HOLDINGS_DEFINITIONS) + assert definition is None + + definition = self.interface._try_create_definition("nKnOWn_foo_4", HOLDINGS_DEFINITIONS) + assert definition is None + + # create by index + definition = self.interface._try_create_definition(9, HOLDINGS_DEFINITIONS) + assert definition not in HOLDINGS_DEFINITIONS + assert definition.name == "unknown_holding_9" + assert definition.index == 9 + assert definition.count == 1 + assert not definition.writeable + assert definition.data_type is Unknown + + # fail by else + definition = self.interface._try_create_definition(Base, HOLDINGS_DEFINITIONS) + assert definition is None + + # create by index as name + definition = self.interface._try_create_definition("14", HOLDINGS_DEFINITIONS) + assert definition not in HOLDINGS_DEFINITIONS + assert definition.name == "unknown_holding_14" + assert definition.index == 14 + assert definition.count == 1 + assert not definition.writeable + assert definition.data_type is Unknown + + def test_create_telegram(self): + block = ContiguousDataBlock() + field_1 = HOLDINGS_DEFINITIONS[10].create_field() + block.add(HOLDINGS_DEFINITIONS[10], field_1) + field_2 = HOLDINGS_DEFINITIONS[11].create_field() + block.add(HOLDINGS_DEFINITIONS[11], field_2) + addr = HOLDINGS_DEFINITIONS[10].addr + count = HOLDINGS_DEFINITIONS[10].count + HOLDINGS_DEFINITIONS[11].count + + # read holdings + telegram = self.interface._create_read_telegram(block, LuxtronikSmartHomeReadHoldingsTelegram) + assert type(telegram) is LuxtronikSmartHomeReadHoldingsTelegram + assert telegram.addr == addr + assert telegram.count == count + assert telegram.data == [] + + telegram = self.interface._create_telegram(block, "holding", True) + assert type(telegram) is LuxtronikSmartHomeReadHoldingsTelegram + assert telegram.addr == addr + assert telegram.count == count + assert telegram.data == [] + + # read inputs + telegram = self.interface._create_read_telegram(block, LuxtronikSmartHomeReadInputsTelegram) + assert type(telegram) is LuxtronikSmartHomeReadInputsTelegram + assert telegram.addr == addr + assert telegram.count == count + assert telegram.data == [] + + telegram = self.interface._create_telegram(block, "input", True) + assert type(telegram) is LuxtronikSmartHomeReadInputsTelegram + assert telegram.addr == addr + assert telegram.count == count + assert telegram.data == [] + + # fail on create + field_1.raw = 21 + telegram = self.interface._create_write_telegram(block, LuxtronikSmartHomeWriteHoldingsTelegram) + assert telegram is None + + # write holdings + field_2.raw = 5 + telegram = self.interface._create_write_telegram(block, LuxtronikSmartHomeWriteHoldingsTelegram) + assert type(telegram) is LuxtronikSmartHomeWriteHoldingsTelegram + assert telegram.addr == addr + assert telegram.count == count + assert telegram.data == [21, 5] + + telegram = self.interface._create_telegram(block, "holding", False) + assert type(telegram) is LuxtronikSmartHomeWriteHoldingsTelegram + assert telegram.addr == addr + assert telegram.count == count + assert telegram.data == [21, 5] + + # fail on create + telegram = self.interface._create_telegram(block, "input", False) + assert telegram is None + + def test_create_telegrams(self): + blocks_list = [] + blocks = ContiguousDataBlockList("holding", True) + + # block 1 + block = ContiguousDataBlock() + block.add(HOLDINGS_DEFINITIONS[10], HOLDINGS_DEFINITIONS[10].create_field()) + block.add(HOLDINGS_DEFINITIONS[11], HOLDINGS_DEFINITIONS[11].create_field()) + blocks.append(block) + + # block 2 + blocks.append_single(HOLDINGS_DEFINITIONS[13], HOLDINGS_DEFINITIONS[13].create_field()) + + # block 3 + blocks.append_single(HOLDINGS_DEFINITIONS[10], HOLDINGS_DEFINITIONS[10].create_field()) + + blocks_list.append(blocks) + + blocks = ContiguousDataBlockList("holding", False) + + # invalid block + blocks.append_single(HOLDINGS_DEFINITIONS[12], HOLDINGS_DEFINITIONS[12].create_field()) + # block 4 + field3 = HOLDINGS_DEFINITIONS[13].create_field() + blocks.append_single(HOLDINGS_DEFINITIONS[13], field3) + + blocks_list.append(blocks) + + field3.raw = 17 + + telegram_data = self.interface._create_telegrams(blocks_list) + assert len(telegram_data) == 4 + # blocks + assert len(telegram_data[0][0]) == 2 + assert telegram_data[0][0].first_index == 10 + assert telegram_data[0][0].overall_count == 2 + assert len(telegram_data[1][0]) == 1 + assert telegram_data[1][0].first_index == 13 + assert telegram_data[1][0].overall_count == 1 + assert len(telegram_data[2][0]) == 1 + assert telegram_data[2][0].first_index == 10 + assert telegram_data[2][0].overall_count == 1 + assert len(telegram_data[3][0]) == 1 + assert telegram_data[3][0].first_index == 13 + assert telegram_data[3][0].overall_count == 1 + # telegrams + assert telegram_data[0][1].count == 2 + assert telegram_data[1][1].count == 1 + assert telegram_data[2][1].count == 1 + assert telegram_data[3][1].count == 1 + # read not write + assert telegram_data[0][2] + assert telegram_data[1][2] + assert telegram_data[2][2] + assert not telegram_data[3][2] + + # integrate + telegram_data[0][1].data = [18, 4] + telegram_data[1][1].data = [9] + telegram_data[2][1].data = [27] + telegram_data[3][0][0].field.write_pending = True + valid = self.interface._integrate_data(telegram_data) + assert valid + # [index data, index for blocks, index for part] + assert telegram_data[0][0][0].field.raw == 18 + assert telegram_data[0][0][1].field.raw == 4 + assert telegram_data[1][0][0].field.raw == 9 + assert telegram_data[2][0][0].field.raw == 27 + assert not telegram_data[3][0][0].field.write_pending + assert telegram_data[3][0][0].field.raw == 17 # no update + + # integrate not available / None -> no error + telegram_data[0][1].data = [18, 4] + telegram_data[1][1].data = [LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE] + telegram_data[2][1].data = [None] + valid = self.interface._integrate_data(telegram_data) + assert valid + # [index data, index for blocks, index for part] + assert telegram_data[0][0][0].field.raw == 18 + assert telegram_data[0][0][1].field.raw == 4 + assert telegram_data[1][0][0].field.raw is None + assert telegram_data[2][0][0].field.raw is None + assert telegram_data[3][0][0].field.raw == 17 # no update + + # integrate too less -> error + telegram_data[0][1].data = [18] + telegram_data[1][1].data = [1] + telegram_data[2][1].data = [None] + valid = self.interface._integrate_data(telegram_data) + assert not valid + # [index data, index for blocks, index for part] + assert telegram_data[0][0][0].field.raw == 18 + assert telegram_data[0][0][1].field.raw == 4 # no update + assert telegram_data[1][0][0].field.raw == 1 + assert telegram_data[2][0][0].field.raw is None + assert telegram_data[3][0][0].field.raw == 17 # no update + + def test_prepare(self): + definition = HOLDINGS_DEFINITIONS[2] + field = definition.create_field() + field.raw = 2 + + # supported valid read + valid = self.interface._prepare_read_field(definition, field) + assert valid + assert field.raw == 2 + + # supported valid write + field.raw = 5 + field.write_pending = True + valid = self.interface._prepare_write_field(definition, field, False, None) + assert valid + assert field.raw == 5 + + # supported invalid write via safe + field.raw = 6 + field.write_pending = True + field.writeable = False + valid = self.interface._prepare_write_field(definition, field, True, None) + assert not valid + assert field.raw == 6 + field.writeable = True + + # supported invalid write via write_pending + field.raw = 7 + field.write_pending = False + valid = self.interface._prepare_write_field(definition, field, False, None) + assert not valid + assert field.raw == 7 + + # supported valid write with data + field.raw = 8 + field.write_pending = False + valid = self.interface._prepare_write_field(definition, field, False, 9) + assert valid + assert field.value == 9 + + # supported invalid write via data + field.raw = [] + field.write_pending = True + valid = self.interface._prepare_write_field(definition, field, False, None) + assert not valid + assert field.raw == [] + + definition = HOLDINGS_DEFINITIONS[3] + field = definition.create_field() + field.raw = 1 + + # not supported read + valid = self.interface._prepare_read_field(definition, field) + assert not valid + assert field.raw is None + + # not supported write + field.raw = 1 + valid = self.interface._prepare_write_field(definition, field, False, None) + assert not valid + assert field.raw == 1 + + def test_collect_field(self): + blocks_list = [] + + # could not collect via None + field = self.interface._collect_field(blocks_list, None, HOLDINGS_DEFINITIONS, True, True, None) + assert field is None + assert len(blocks_list) == 0 + + # could not collect via string + field = self.interface._collect_field(blocks_list, 'foo', HOLDINGS_DEFINITIONS, True, True, None) + assert field is None + assert len(blocks_list) == 0 + + # could not collect, not supported read + field = self.interface._collect_field(blocks_list, 3, HOLDINGS_DEFINITIONS, True, True, None) + assert field is None + assert len(blocks_list) == 0 + + # could not collect, safe + field = self.interface._collect_field(blocks_list, 0, INPUTS_DEFINITIONS, False, True, None) + assert field is None + assert len(blocks_list) == 0 + + # collected read + field = self.interface._collect_field(blocks_list, 2, HOLDINGS_DEFINITIONS, True, True, None) + assert field is not None + assert field.name == "heating_offset" + assert len(blocks_list) == 1 + # blocks + assert blocks_list[0].type_name == "holding" + assert blocks_list[0].read_not_write + assert len(blocks_list[0]) == 1 + # block + assert blocks_list[0][0].first_index == 2 + assert blocks_list[0][0].overall_count == 1 + assert len(blocks_list[0][0]) == 1 + # part + assert blocks_list[0][0][0].definition.index == 2 + assert blocks_list[0][0][0].field is field + + # could not collect, not supported write + field = self.interface._collect_field(blocks_list, 3, HOLDINGS_DEFINITIONS, False, True, None) + assert field is None + assert len(blocks_list) == 1 + + # collected write + field = self.interface._collect_field(blocks_list, 1, HOLDINGS_DEFINITIONS, False, True, 20) + assert field is not None + assert field.name == "heating_setpoint" + assert field.value == 20 + assert len(blocks_list) == 2 + # blocks + assert blocks_list[1].type_name == "holding" + assert not blocks_list[1].read_not_write + assert len(blocks_list[1]) == 1 + # block + assert blocks_list[1][0].first_index == 1 + assert blocks_list[1][0].overall_count == 1 + assert len(blocks_list[1][0]) == 1 + # part + assert blocks_list[1][0][0].definition.index == 1 + assert blocks_list[1][0][0].field is field + + valid = self.interface._send_and_integrate(blocks_list) + assert valid + assert len(FakeInterface.telegram_list) == 2 + assert type(FakeInterface.telegram_list[0]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[0].addr == 10000 + 2 + assert FakeInterface.telegram_list[0].count == 1 + assert type(FakeInterface.telegram_list[1]) is LuxtronikSmartHomeWriteHoldingsTelegram + assert FakeInterface.telegram_list[1].addr == 10000 + 1 + assert FakeInterface.telegram_list[1].count == 1 + + def test_collect_fields(self): + blocks_list = [] + + # could not collect via None + self.interface._collect_fields(blocks_list, None, HOLDINGS_DEFINITIONS, True) + assert len(blocks_list) == 0 + + data_vector = Holdings.empty(LUXTRONIK_FIRST_VERSION_WITH_SHI) + data_vector.add(0) + data_vector.add(2) + data_vector.add(3) # not supported + data_vector.add(4) # does not exist + data_vector.add(5) + data_vector.add(6) + data_vector.add(7) + data_vector.update_read_blocks() + + # collect read + self.interface._collect_fields(blocks_list, data_vector, HOLDINGS_DEFINITIONS, True) + assert len(blocks_list) == 1 + # blocks + assert blocks_list[0].type_name == "holding" + assert blocks_list[0].read_not_write + assert len(blocks_list[0]) == 3 + # block + assert blocks_list[0][0].first_index == 0 + assert blocks_list[0][0].overall_count == 1 + assert blocks_list[0][1].first_index == 2 + assert blocks_list[0][1].overall_count == 1 + assert blocks_list[0][2].first_index == 5 + assert blocks_list[0][2].overall_count == 3 + assert len(blocks_list[0][2]) == 3 + # part + assert blocks_list[0][2][0].definition.index == 5 + assert blocks_list[0][2][1].definition.index == 6 + assert blocks_list[0][2][2].definition.index == 7 + + data_vector[0].value = 'Setpoint' + data_vector[1] = 20 # not added + data_vector.set(5, 'Setpoint') + data_vector[6] = 40 + + # collect write + self.interface._collect_fields(blocks_list, data_vector, HOLDINGS_DEFINITIONS, False) + assert len(blocks_list) == 2 + # blocks + assert blocks_list[1].type_name == "holding" + assert not blocks_list[1].read_not_write + assert len(blocks_list[1]) == 2 + # block + assert blocks_list[1][0].first_index == 0 + assert blocks_list[1][0].overall_count == 1 + assert blocks_list[1][1].first_index == 5 + assert blocks_list[1][1].overall_count == 2 + assert len(blocks_list[1][1]) == 2 + # part + assert blocks_list[1][1][0].field.value == 'Setpoint' + assert blocks_list[1][1][1].field.value == 40 + + self.interface._send_and_integrate(blocks_list) + assert len(FakeInterface.telegram_list) == 5 + assert type(FakeInterface.telegram_list[0]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[0].addr == 10000 + 0 + assert FakeInterface.telegram_list[0].count == 1 + assert type(FakeInterface.telegram_list[1]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[1].addr == 10000 + 2 + assert FakeInterface.telegram_list[1].count == 1 + assert type(FakeInterface.telegram_list[2]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[2].addr == 10000 + 5 + assert FakeInterface.telegram_list[2].count == 3 + assert type(FakeInterface.telegram_list[3]) is LuxtronikSmartHomeWriteHoldingsTelegram + assert FakeInterface.telegram_list[3].addr == 10000 + 0 + assert FakeInterface.telegram_list[3].count == 1 + assert type(FakeInterface.telegram_list[4]) is LuxtronikSmartHomeWriteHoldingsTelegram + assert FakeInterface.telegram_list[4].addr == 10000 + 5 + assert FakeInterface.telegram_list[4].count == 2 + + + def test_collect_field2(self): + self.interface._blocks_list = [] + + h2 = self.interface.holdings.get(2).create_field() + h3 = self.interface.holdings.get(3).create_field() + i105 = self.interface.inputs.get(105).create_field() + i109 = self.interface.inputs.get(109).create_field() + + # collect supported + h2.raw = 2 + field = self.interface.collect_holding_for_read(h2) + assert len(self.interface._blocks_list) == 1 + assert self.interface._blocks_list[0].type_name == "holding" + assert self.interface._blocks_list[0].read_not_write + assert field == h2 + assert field.raw == 2 + + h2.raw = 2 + h2.write_pending = True + field = self.interface.collect_holding_for_write(h2) + assert len(self.interface._blocks_list) == 2 + assert self.interface._blocks_list[1].type_name == "holding" + assert not self.interface._blocks_list[1].read_not_write + assert field == h2 + assert field.raw == 2 + assert field.write_pending + + h2.raw = 2 + h2.write_pending = True + field = self.interface.collect_holding(h2) + assert len(self.interface._blocks_list) == 4 + assert self.interface._blocks_list[2].type_name == "holding" + assert not self.interface._blocks_list[2].read_not_write + assert self.interface._blocks_list[3].type_name == "holding" + assert self.interface._blocks_list[3].read_not_write + assert field == h2 + assert field.raw == 2 + assert field.write_pending + + i105.raw = 105 + field = self.interface.collect_input(i105) + assert len(self.interface._blocks_list) == 5 + assert self.interface._blocks_list[4].type_name == "input" + assert self.interface._blocks_list[4].read_not_write + assert field == i105 + assert field.raw == 105 + + # not collect not supported + h3.raw = 3 + field = self.interface.collect_holding_for_read(h3) + assert len(self.interface._blocks_list) == 5 + assert field is None + assert h3.raw is None + + h3.raw = 3 + h3.write_pending = True + field = self.interface.collect_holding_for_write(h3) + assert len(self.interface._blocks_list) == 5 + assert field is None + assert h3.raw == 3 + assert h3.write_pending + + h3.raw = 3 + h3.write_pending = True + field = self.interface.collect_holding(h3) + assert len(self.interface._blocks_list) == 5 + assert field is None + assert h3.raw == 3 + assert h3.write_pending + + i109.raw = 109 + field = self.interface.collect_input(i109) + assert len(self.interface._blocks_list) == 5 + assert field is None + assert i109.raw is None + + # not collect not existing + field = self.interface.collect_holding_for_read(4) + assert len(self.interface._blocks_list) == 5 + assert field is None + + field = self.interface.collect_holding_for_write(4) + assert len(self.interface._blocks_list) == 5 + assert field is None + + field = self.interface.collect_holding(4) + assert len(self.interface._blocks_list) == 5 + assert field is None + + field = self.interface.collect_input(115) + assert len(self.interface._blocks_list) == 5 + assert field is None + + self.interface.send() + assert len(self.interface._blocks_list) == 0 + + def test_collect_fields2(self): + self.interface._blocks_list = [] + + h = self.interface.create_holdings() + self.interface.collect_holdings_for_read(h) + assert len(self.interface._blocks_list) == 1 + assert self.interface._blocks_list[0].type_name == "holding" + assert self.interface._blocks_list[0].read_not_write + + self.interface.collect_holdings_for_read(h[0]) + assert len(self.interface._blocks_list) == 1 + + # nothing to write + self.interface.collect_holdings_for_write(self.interface.create_holdings()) + assert len(self.interface._blocks_list) == 1 + + self.interface.collect_holdings_for_write(h[0]) + assert len(self.interface._blocks_list) == 1 + + h[0] = 'Setpoint' + self.interface.collect_holdings(h) + assert len(self.interface._blocks_list) == 3 + assert self.interface._blocks_list[1].type_name == "holding" + assert not self.interface._blocks_list[1].read_not_write + assert self.interface._blocks_list[2].type_name == "holding" + assert self.interface._blocks_list[2].read_not_write + + self.interface.collect_holdings(h[0]) + assert len(self.interface._blocks_list) == 3 + + i = self.interface.create_inputs() + self.interface.collect_inputs(i) + assert len(self.interface._blocks_list) == 4 + assert self.interface._blocks_list[3].type_name == "input" + assert self.interface._blocks_list[3].read_not_write + + self.interface.collect_inputs(i[0]) + assert len(self.interface._blocks_list) == 4 + + d = self.interface.create_data() + self.interface.collect_data_for_read(d) + assert len(self.interface._blocks_list) == 6 + assert self.interface._blocks_list[4].type_name == "holding" + assert self.interface._blocks_list[4].read_not_write + assert self.interface._blocks_list[5].type_name == "input" + assert self.interface._blocks_list[5].read_not_write + + self.interface.collect_data_for_read(i) + assert len(self.interface._blocks_list) == 6 + + d.holdings[0] = 'Setpoint' + self.interface.collect_data_for_write(d) + assert len(self.interface._blocks_list) == 7 + assert self.interface._blocks_list[6].type_name == "holding" + assert not self.interface._blocks_list[6].read_not_write + + self.interface.collect_data_for_write(h) + assert len(self.interface._blocks_list) == 7 + + self.interface.collect_data(d) + assert len(self.interface._blocks_list) == 10 + assert self.interface._blocks_list[7].type_name == "holding" + assert not self.interface._blocks_list[7].read_not_write + assert self.interface._blocks_list[8].type_name == "holding" + assert self.interface._blocks_list[8].read_not_write + assert self.interface._blocks_list[9].type_name == "input" + assert self.interface._blocks_list[9].read_not_write + + self.interface.collect_data(None) + assert len(self.interface._blocks_list) == 10 + + self.interface.send() + assert len(self.interface._blocks_list) == 0 + + def test_create_holding(self): + + # supported + def_2 = self.interface.holdings.get(2) + field_2 = self.interface.create_holding(2) + assert isinstance(def_2, LuxtronikDefinition) + assert isinstance(field_2, Base) + assert def_2.name == field_2.name + + # not supported + def_3 = self.interface.holdings.get(3) + field_3 = self.interface.create_holding(3) + assert isinstance(def_3, LuxtronikDefinition) + assert field_3 is None + + # not existing + def_4 = self.interface.holdings.get(4) + field_4 = self.interface.create_holding(4) + assert def_4 is None + assert field_4 is None + + vector = self.interface.create_holdings(False) + assert not vector.safe + # supported + field = vector[2] + assert isinstance(field, Base) + assert field_2.name == field.name + # not supported + field = vector[3] + assert field is None + + vector = self.interface.create_holdings(True) + assert vector.safe + # not existing + field = vector[4] + assert field is None + + vector = self.interface.create_empty_holdings(False) + assert not vector.safe + assert len(vector) == 0 + # supported but not added + field = vector[2] + assert field is None + + def test_read_holding(self): + FakeInterface.result = False + + # read field with error + field = self.interface.read_holding(2) + assert field is None + + # read vector with error + vector = self.interface.read_holdings() + assert vector.safe + # provided data will be integrated + assert vector[2].raw == 2 + + # read empty vector + vector = self.interface.create_empty_holdings(False) + self.interface.read_holdings(vector) + assert not vector.safe + assert vector[2] is None + + FakeInterface.result = True + + # read field + field = self.interface.read_holding(2) + assert field is not None + assert field.raw == 2 + + # read vector + vector = self.interface.read_holdings() + assert vector.safe + assert vector[2].raw == 2 + + # read empty vector + vector = self.interface.create_empty_holdings(False) + self.interface.read_holdings(vector) + assert not vector.safe + assert vector[2] is None + + def test_write_holding(self): + + # prepare + vector = self.interface.create_empty_holdings(True) + field_2 = vector.add(2) + assert len(vector) == 1 + + FakeInterface.result = False + + # write field with error + field = self.interface.write_holding(2, 19) + assert field is None + + # write vector with error + field_2.value = 20 + success = self.interface.write_holdings(vector) + assert not success + + # write None + success = self.interface.write_holdings(2) + assert not success + + # write and read vector with error + field_2.value = 20 + success = self.interface.write_and_read_holdings(vector) + assert not success + + # write and read None + success = self.interface.write_and_read_holdings(4) + assert not success + + FakeInterface.result = True + + # write field + field = self.interface.write_holding(2, 19) + assert isinstance(field, Base) + assert field.value == 19 + + # write vector + field_2.value = 20 + success = self.interface.write_holdings(vector) + assert success + + # write and read vector + field_2.value = 20 + success = self.interface.write_and_read_holdings(vector) + assert success + + # write none + success = self.interface.write_holdings(None) + assert not success + success = self.interface.write_holdings(7) + assert not success + + # write and read none + success = self.interface.write_and_read_holdings(None) + assert not success + success = self.interface.write_and_read_holdings(18) + assert not success + + def test_create_input(self): + + # supported + def_105 = self.interface.inputs.get(105) + field_105 = self.interface.create_input(105) + assert isinstance(def_105, LuxtronikDefinition) + assert isinstance(field_105, Base) + assert def_105.name == field_105.name + + # not supported + def_109 = self.interface.inputs.get(109) + field_109 = self.interface.create_input(109) + assert isinstance(def_109, LuxtronikDefinition) + assert field_109 is None + + # not existing + def_115 = self.interface.inputs.get(115) + field_115 = self.interface.create_input(115) + assert def_115 is None + assert field_115 is None + + vector = self.interface.create_inputs() + assert vector.safe + # supported + field = vector[105] + assert isinstance(field, Base) + assert field_105.name == field.name + # not supported + field = vector[109] + assert field is None + # not existing + field = vector[115] + assert field is None + + vector = self.interface.create_empty_holdings() + assert vector.safe + assert len(vector) == 0 + # supported but not added + field = vector[105] + assert field is None + + def test_read_input(self): + FakeInterface.result = False + + # read field with error + field = self.interface.read_input(105) + assert field is None + + # read vector with error + vector = self.interface.read_inputs() + assert vector.safe + # provided data will be integrated + assert vector[105].raw == 105 + + # read empty vector + vector = self.interface.create_empty_inputs() + self.interface.read_inputs(vector) + assert vector.safe + assert vector[105] is None + + FakeInterface.result = True + + # read field + field = self.interface.read_input(105) + assert field is not None + assert field.raw == 105 + + # read vector + vector = self.interface.read_inputs() + assert vector.safe + assert vector[105].raw == 105 + + # read empty vector + vector = self.interface.create_empty_inputs() + self.interface.read_inputs(vector) + assert vector.safe + assert vector[105] is None + + def test_create_data(self): + + data = self.interface.create_data(False) + assert not data.holdings.safe + # supported + field = data.holdings[2] + assert isinstance(field, Base) + # not supported + field = data.holdings[3] + assert field is None + + data = self.interface.create_data(True) + assert data.holdings.safe + # not existing + field = data.holdings[4] + assert field is None + + data = self.interface.create_empty_data(False) + assert not data.holdings.safe + assert len(data.holdings) == 0 + assert len(data.inputs) == 0 + # supported but not added + field = data.holdings[2] + assert field is None + + def test_read_data(self): + FakeInterface.result = False + + # read data with error + data = self.interface.read_data() + assert data.holdings.safe + # provided data will be integrated + assert data.holdings[2].raw == 2 + + data = self.interface.read() + assert data.holdings.safe + # provided data will be integrated + assert data.holdings[2].raw == 2 + + # read empty data + data = self.interface.create_empty_data(False) + self.interface.read_data(data) + assert not data.holdings.safe + assert data.holdings[2] is None + + FakeInterface.result = True + + # read data + data = self.interface.read_data() + assert data.holdings.safe + assert data.holdings[2].raw == 2 + + data = self.interface.read() + assert data.holdings.safe + assert data.holdings[2].raw == 2 + + # read empty data + data = self.interface.create_empty_data(False) + self.interface.read_data(data) + assert not data.holdings.safe + assert data.holdings[2] is None + + def test_write_data(self): + + # prepare + data = self.interface.create_empty_data(True) + field_2 = data.holdings.add(2) + assert len(data.holdings) == 1 + + FakeInterface.result = False + + # write data with error + field_2.value = 20 + success = self.interface.write_data(data) + assert not success + + field_2.value = 20 + success = self.interface.write(data) + assert not success + + # write None + success = self.interface.write(None) + assert not success + + # write and read data with error + field_2.value = 20 + success = self.interface.write_and_read_data(data) + assert not success + + field_2.value = 20 + success = self.interface.write_and_read(data) + assert not success + + # write and read None + success = self.interface.write_and_read(None) + assert not success + + FakeInterface.result = True + + # write vector + field_2.value = 20 + success = self.interface.write_data(data) + assert success + + field_2.value = 20 + success = self.interface.write(data) + assert success + + # write and read data + field_2.value = 20 + success = self.interface.write_and_read_data(data) + assert success + + field_2.value = 20 + success = self.interface.write_and_read(data) + assert success + + # write none + success = self.interface.write_data(None) + assert not success + + success = self.interface.write(None) + assert not success + + # write and read none + success = self.interface.write_and_read_data(None) + assert not success + + success = self.interface.write_and_read(None) + assert not success + + def test_raw(self): + FakeInterface.result = False + + data = self.interface.read_holding_raw(1, 3) + assert data is None + + success = self.interface.write_holding_raw(1, [7, 6, 3]) + assert not success + + data = self.interface.read_input_raw(2, 5) + assert data is None + + FakeInterface.result = True + + data = self.interface.read_holding_raw(1, 3) + assert data == [1, 2, 3] + + success = self.interface.write_holding_raw(1, [7, 6, 3]) + assert success + + data = self.interface.read_input_raw(2, 5) + assert data == [2, 3, 4, 5, 6] + + def test_read_then_write(self): + field = self.interface.create_holding(2) + + field.value = 20 + self.interface.collect_holding_for_read(field) + assert field.value == 20 + + self.interface.collect_holding_for_write(field, 32) + assert field.value == 32 + + self.interface.read_input(0) + assert len(FakeInterface.telegram_list) == 3 + assert FakeInterface.telegram_list[0].data == [2] + assert FakeInterface.telegram_list[1].data == [32 * 10] + assert FakeInterface.telegram_list[2].data == [0] + + assert field.raw == 2 + + def test_write_then_read(self): + field = self.interface.create_holding(2) + + self.interface.collect_holding_for_write(field, 32) + assert field.value == 32 + + self.interface.collect_holding_for_read(field) + assert field.value == 32 + + field.value = 42 + + self.interface.read_input(0) + assert len(FakeInterface.telegram_list) == 3 + assert FakeInterface.telegram_list[0].data == [42 * 10] + assert FakeInterface.telegram_list[1].data == [2] + assert FakeInterface.telegram_list[2].data == [0] + + assert field.raw == 2 + + def test_trial_and_error_mode(self): + + # prepare + interface = LuxtronikSmartHomeInterface(FakeInterface(), None) + + holdings = Holdings.empty(None) + h0 = holdings.add(0) # 3.90.1 + h1 = holdings.add(1) # 3.90.1 + h2 = holdings.add(2) # 3.90.1 + h3 = holdings.add(3) # 3.92.0 + h4 = holdings.add(4) + assert h4 is None + + # add vector for read + interface.collect_holdings_for_read(holdings) + assert len(interface._blocks_list) == 1 + assert len(interface._blocks_list[0]) == 4 + assert len(interface._blocks_list[0][0]) == 1 + assert interface._blocks_list[0][0][0].field == h0 + assert len(interface._blocks_list[0][1]) == 1 + assert interface._blocks_list[0][1][0].field == h1 + assert len(interface._blocks_list[0][2]) == 1 + assert interface._blocks_list[0][2][0].field == h2 + assert len(interface._blocks_list[0][3]) == 1 + assert interface._blocks_list[0][3][0].field == h3 + + # add vector for write + h1.raw = 10 + h1.write_pending = True + h3.raw = 1 + h3.write_pending = True + interface.collect_holdings_for_write(holdings) + assert len(interface._blocks_list) == 2 + assert len(interface._blocks_list[1]) == 2 + assert len(interface._blocks_list[1][0]) == 1 + assert interface._blocks_list[1][0][0].field == h1 + assert len(interface._blocks_list[1][1]) == 1 + assert interface._blocks_list[1][1][0].field == h3 + + # add not existing read (success) + field = interface.collect_holding_for_read('unknown_foo_4') + assert type(field) is Unknown + assert len(interface._blocks_list) == 3 + assert len(interface._blocks_list[2]) == 1 + assert len(interface._blocks_list[2][0]) == 1 + assert interface._blocks_list[2][0][0].definition.name == 'unknown_holding_4' + assert interface._blocks_list[2][0][0].definition.index == 4 + assert interface._blocks_list[2][0][0].definition.count == 1 + assert interface._blocks_list[2][0][0].field.name == 'unknown_holding_4' + assert not interface._blocks_list[2][0][0].field.writeable + + # add not existing read (fail) + field = interface.collect_holding_for_read('bar_foo_4') + assert field is None + assert len(interface._blocks_list) == 3 + + # add not existing write (success) + field = interface.collect_holding_for_write('unknown_bar_4', 16, False) + assert type(field) is Unknown + assert len(interface._blocks_list) == 4 + assert len(interface._blocks_list[3]) == 1 + assert len(interface._blocks_list[3][0]) == 1 + assert interface._blocks_list[3][0][0].field.name == 'unknown_holding_4' + assert not interface._blocks_list[3][0][0].field.writeable + + # add not existing write (success) + field = interface.collect_holding_for_write('unknown_bar_4', 17, True) + assert field is None + assert len(interface._blocks_list) == 4 + + interface.send() + offset = interface.holdings.offset + assert len(FakeInterface.telegram_list) == 8 + assert type(FakeInterface.telegram_list[0]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[0].addr == offset + 0 + assert FakeInterface.telegram_list[0].count == 1 + assert type(FakeInterface.telegram_list[1]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[1].addr == offset + 1 + assert FakeInterface.telegram_list[1].count == 1 + assert type(FakeInterface.telegram_list[2]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[2].addr == offset + 2 + assert FakeInterface.telegram_list[2].count == 1 + assert type(FakeInterface.telegram_list[3]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[3].addr == offset + 3 + assert FakeInterface.telegram_list[3].count == 1 + assert type(FakeInterface.telegram_list[4]) is LuxtronikSmartHomeWriteHoldingsTelegram + assert FakeInterface.telegram_list[4].addr == offset + 1 + assert FakeInterface.telegram_list[4].count == 1 + assert FakeInterface.telegram_list[4].data == [10] + assert type(FakeInterface.telegram_list[5]) is LuxtronikSmartHomeWriteHoldingsTelegram + assert FakeInterface.telegram_list[5].addr == offset + 3 + assert FakeInterface.telegram_list[5].count == 1 + assert FakeInterface.telegram_list[5].data == [1] + assert type(FakeInterface.telegram_list[6]) is LuxtronikSmartHomeReadHoldingsTelegram + assert FakeInterface.telegram_list[6].addr == offset + 4 + assert FakeInterface.telegram_list[6].count == 1 + assert type(FakeInterface.telegram_list[7]) is LuxtronikSmartHomeWriteHoldingsTelegram + assert FakeInterface.telegram_list[7].addr == offset + 4 + assert FakeInterface.telegram_list[7].count == 1 + assert FakeInterface.telegram_list[7].data == [16] + + + def check_definitions(self, interface): + definitions = interface.get_holdings(False) + vector = interface.create_holdings() + assert definitions._version == vector.version + assert len(definitions) <= len(interface.holdings) + for d in definitions: + assert d.name in vector + assert d in interface.holdings + for f in vector: + assert f.name in definitions + assert f.name in interface.holdings + + definitions = interface.get_inputs(False) + vector = interface.create_inputs() + assert definitions._version == vector.version + assert len(definitions) <= len(interface.inputs) + for d in definitions: + assert d.name in vector + assert d in interface.inputs + for f in vector: + assert f.name in definitions + assert f.name in interface.inputs + + def test_create_modbus(self): + interface = create_modbus_tcp('host', version=None) + assert interface.version is None + self.check_definitions(interface) + + interface = create_modbus_tcp('host', version=1) + assert interface.version is None + self.check_definitions(interface) + + interface = create_modbus_tcp('host', version="1.2.3") + assert interface.version == (1, 2, 3, 0) + self.check_definitions(interface) + + interface = create_modbus_tcp('host', version="latest") + assert interface.version == LUXTRONIK_LATEST_SHI_VERSION + self.check_definitions(interface) + + interface = create_modbus_tcp('host', version=LUXTRONIK_FIRST_VERSION_WITH_SHI) + assert interface.version == LUXTRONIK_FIRST_VERSION_WITH_SHI + self.check_definitions(interface) + + interface = create_modbus_tcp('host') + assert interface.version == (400, 401, 402, 0) + self.check_definitions(interface) + + FakeInterface.result = False + + interface = create_modbus_tcp('host') + assert interface.version is None + self.check_definitions(interface) + + FakeInterface.result = True diff --git a/tests/shi/test_modbus.py b/tests/shi/test_modbus.py index f2bd3e5..445e22a 100644 --- a/tests/shi/test_modbus.py +++ b/tests/shi/test_modbus.py @@ -1,4 +1,6 @@ import pytest +from unittest.mock import patch + from pyModbusTCP.client import ModbusClient from luxtronik.shi.common import ( @@ -7,6 +9,7 @@ LuxtronikSmartHomeReadInputsTelegram, LuxtronikSmartHomeWriteTelegram, LuxtronikSmartHomeWriteHoldingsTelegram, + LuxtronikSmartHomeTelegrams, ) from luxtronik.shi.modbus import LuxtronikModbusTcpInterface @@ -94,6 +97,12 @@ def write_multiple_registers(self, addr, data): # Tests ############################################################################### +class DummyTelegram(LuxtronikSmartHomeReadTelegram): + pass + + +@patch("luxtronik.shi.modbus.LUXTRONIK_WAIT_TIME_AFTER_HOLDING_WRITE", 0) +@patch("luxtronik.shi.modbus.LuxtronikSmartHomeTelegrams", LuxtronikSmartHomeTelegrams | {DummyTelegram}) class TestModbusInterface: host = "local_host" port = 9876 @@ -356,3 +365,12 @@ def test_list(self): assert list[0].data == [2, 3] assert list[1].data == [11, 21] assert list[2].data == [] + + def test_not_defined(self): + telegram = DummyTelegram(0, 1) + + try: + self.modbus_interface.send(telegram) + assert False + except Exception: + pass diff --git a/tests/shi/test_vector.py b/tests/shi/test_vector.py index 2327ca3..62a0ab3 100644 --- a/tests/shi/test_vector.py +++ b/tests/shi/test_vector.py @@ -85,6 +85,8 @@ def test_create(self): data_vector = DataVectorTest(parse_version("1.2")) assert data_vector.version == (1, 2, 0, 0) assert len(data_vector) == 3 + assert not data_vector._read_blocks_up_to_date + assert len(data_vector._read_blocks) == 0 # create version-dependent field field = data_vector.create_field(5) @@ -116,6 +118,8 @@ def test_create(self): data_vector = DataVectorTest(parse_version("3.0")) assert data_vector.version == (3, 0, 0, 0) assert len(data_vector) == 1 + assert not data_vector._read_blocks_up_to_date + assert len(data_vector._read_blocks) == 0 # create not available field (invalid version) field = data_vector.create_field(5) @@ -143,6 +147,8 @@ def test_create(self): data_vector = DataVectorTest.empty(parse_version("3.0")) assert data_vector.version == (3, 0, 0, 0) assert len(data_vector) == 0 + assert not data_vector._read_blocks_up_to_date + assert len(data_vector._read_blocks) == 0 # create not available field (not available) field = data_vector.create_field(9) @@ -151,12 +157,14 @@ def test_create(self): def test_add(self): data_vector = DataVectorTest.empty(parse_version("1.1.2")) assert len(data_vector) == 0 + data_vector._read_blocks_up_to_date # Add available index field = data_vector.add(5) assert len(data_vector) == 1 assert 5 in data_vector assert field.name == 'field_5' + assert not data_vector._read_blocks_up_to_date # Add not available index (not existing) field = data_vector.add(6) @@ -273,18 +281,24 @@ def test_set(self): # set via property (writeable) data_vector['field_9'] = [2, 8] assert field_5.value is None + assert not field_5.write_pending assert field_9.value == [2, 8] + assert field_9.write_pending # set via method (non-writeable) data_vector.set(5, 1) assert field_5.value == 1 + assert field_5.write_pending assert field_9.value == [2, 8] + assert field_9.write_pending # set via field field_5.value = [4, 3] field_9.value = 6 assert field_5.value == [4, 3] + assert field_5.write_pending assert field_9.value == 6 + assert field_9.write_pending def test_parse(self): data_vector = DataVectorTest(parse_version("1.1.2")) @@ -363,6 +377,39 @@ def test_global_alias(self): assert field_9a is not None assert field_9a.name == 'field_9a' + def test_read_blocks(self): + data_vector = DataVectorTest(parse_version("1.1.2")) + assert not data_vector._read_blocks_up_to_date + assert len(data_vector._read_blocks) == 0 + + # update blocks + data_vector.update_read_blocks() + assert data_vector._read_blocks_up_to_date + assert len(data_vector._read_blocks) == 2 + assert data_vector._read_blocks[0].first_index == 5 + assert data_vector._read_blocks[0].overall_count == 1 + assert data_vector._read_blocks[1].first_index == 9 + assert data_vector._read_blocks[1].overall_count == 2 + + # reset updated + def_10 = TEST_DEFINITIONS.add({ + "index": 10, + "count": 3, + "names": "foo" + }) + data_vector.add(def_10) + assert not data_vector._read_blocks_up_to_date + assert len(data_vector._read_blocks) == 2 + + # update blocks again + data_vector.update_read_blocks() + assert data_vector._read_blocks_up_to_date + assert len(data_vector._read_blocks) == 2 + assert data_vector._read_blocks[0].first_index == 5 + assert data_vector._read_blocks[0].overall_count == 1 + assert data_vector._read_blocks[1].first_index == 9 + assert data_vector._read_blocks[1].overall_count == 4 + def test_version_none(self): data_vector = DataVectorTest.empty(None) assert len(data_vector) == 0 diff --git a/tests/test_datatypes.py b/tests/test_datatypes.py index 73ee90c..d9ab9e7 100644 --- a/tests/test_datatypes.py +++ b/tests/test_datatypes.py @@ -163,7 +163,7 @@ def test_repr(self): a.value = 123 a.raw = 123 - assert a.__repr__() == "Base (name: base, writeable: False, value: 123, raw: 123, class: None, unit: None)" + assert a.__repr__() == "Base (name: base, writeable: False, value: 123, raw: 123, write_pending: False, class: None, unit: None)" def test_str(self): """Test cases for __str__ function"""