Skip to content

Commit d8a46fc

Browse files
committed
Add the shi module __init__ file
1 parent ead39ec commit d8a46fc

File tree

2 files changed

+160
-1
lines changed

2 files changed

+160
-1
lines changed

luxtronik/shi/__init__.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""
2+
Python module for controlling a Luxtronik heat pump controller
3+
via the smart home interface. Powered by Guzz-T.
4+
"""
5+
6+
from luxtronik.datatypes import FullVersion, MajorMinorVersion
7+
from luxtronik.shi.constants import (
8+
LUXTRONIK_DEFAULT_MODBUS_PORT,
9+
LUXTRONIK_DEFAULT_MODBUS_TIMEOUT,
10+
LUXTRONIK_LATEST_SHI_VERSION,
11+
)
12+
from luxtronik.shi.common import LOGGER, parse_version
13+
from luxtronik.shi.inputs import INPUTS_DEFINITIONS
14+
from luxtronik.shi.modbus import LuxtronikModbusTcpInterface
15+
from luxtronik.shi.interface import LuxtronikSmartHomeInterface
16+
17+
VERSION_DETECT = "detect"
18+
VERSION_LATEST = "latest"
19+
20+
21+
###############################################################################
22+
# Helper methods
23+
###############################################################################
24+
25+
def get_version_definitions(definitions):
26+
"""
27+
Retrieve all definitions that represent version fields.
28+
29+
Args:
30+
definitions (LuxtronikDefinitionsList): List of definitions
31+
32+
Returns:
33+
list[LuxtronikDefinition]: List of definitions whose data_type
34+
is either FullVersion or MajorMinorVersion.
35+
"""
36+
version_definitions = []
37+
for d in definitions:
38+
if d.data_type in (FullVersion, MajorMinorVersion):
39+
version_definitions.append(d)
40+
return version_definitions
41+
42+
def determine_version(interface):
43+
"""
44+
Determine the version of the luxtronik controller.
45+
46+
This is a little bit ugly! The controller version is required
47+
to locate the version field. As workaround, probe each known
48+
version field until one yields a valid read and a parsable version.
49+
This approach works as long as the version-field has not changed.
50+
51+
Args:
52+
interface (LuxtronikModbusTcpInterface):
53+
Simple read/write interface to read out the version.
54+
55+
Returns:
56+
tuple[int] | None: The version of the controller on success,
57+
or None if no version could be determined.
58+
"""
59+
definitions = get_version_definitions(INPUTS_DEFINITIONS)
60+
for definition in definitions:
61+
data = interface.read_inputs(definition.addr, definition.count)
62+
if data is not None:
63+
field = definition.create_field()
64+
field.raw = data
65+
parsed = parse_version(field.value)
66+
if parsed is not None:
67+
return parsed
68+
LOGGER.warning("It was not possible to determine the controller version. " \
69+
+ "Switch to trial-and-error mode.")
70+
return None
71+
72+
73+
###############################################################################
74+
# Factory methods
75+
###############################################################################
76+
77+
def create_modbus_tcp(
78+
host,
79+
port=LUXTRONIK_DEFAULT_MODBUS_PORT,
80+
timeout=LUXTRONIK_DEFAULT_MODBUS_TIMEOUT,
81+
version=VERSION_DETECT
82+
):
83+
"""
84+
Create a LuxtronikSmartHomeInterface using a Modbus TCP connection.
85+
86+
The function constructs a Modbus TCP low-level interface and resolves the
87+
controller version according to the supplied `version` argument:
88+
- If `version` equals VERSION_DETECT, attempt to determine the version.
89+
- If `version` equals VERSION_LATEST, use LUXTRONIK_LATEST_SHI_VERSION as version.
90+
- If `version` is a string, parse it into a version tuple.
91+
- If `version` is None, the interface is initialized in trial-and-error mode.
92+
- Otherwise assume `version` is already a parsed version tuple.
93+
94+
Args:
95+
host (str): Hostname or IP address of the Luxtronik controller.
96+
port (int): TCP port for the Modbus connection.
97+
timeout (float): Timeout in seconds for the Modbus connection.
98+
version (tuple[int] | str | None): Version used to initialize the interface.
99+
If VERSION_DETECT is passed, the function will attempt to determine the version.
100+
If a str is passed, the string will be parsed into a version tuple.
101+
If None is passed, trial-and-error mode is activated.
102+
103+
Returns:
104+
LuxtronikSmartHomeInterface:
105+
Initialized interface instance bound to the Modbus TCP connection.
106+
"""
107+
modbus_interface = LuxtronikModbusTcpInterface(host, port, timeout)
108+
109+
resolved_version = version
110+
if resolved_version == VERSION_DETECT:
111+
# return None in case of an error -> trial-and-error mode
112+
resolved_version = determine_version(modbus_interface)
113+
elif isinstance(resolved_version, str):
114+
if resolved_version.lower() == VERSION_LATEST:
115+
resolved_version = LUXTRONIK_LATEST_SHI_VERSION
116+
else:
117+
# return None in case of an error -> trial-and-error mode
118+
resolved_version = parse_version(resolved_version)
119+
else:
120+
resolved_version = parse_version(resolved_version)
121+
122+
LOGGER.info(f"Create smart-home-interface via modbus-TCP on {host}:{port}"
123+
+ f" for version {resolved_version}")
124+
return LuxtronikSmartHomeInterface(modbus_interface, resolved_version)

tests/shi/test_interface.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pytest
2+
from unittest.mock import patch
23

34
from luxtronik.datatypes import Base, Unknown
45

@@ -24,6 +25,7 @@
2425
LuxtronikSmartHomeData,
2526
LuxtronikSmartHomeInterface,
2627
)
28+
from luxtronik.shi import create_modbus_tcp
2729

2830
###############################################################################
2931
# Fake modbus client
@@ -33,14 +35,23 @@ class FakeInterface:
3335
telegram_list = []
3436
result = True
3537

38+
def __init__(self, host="", port="", timeout=0):
39+
pass
40+
41+
def _get_data(self, addr, count):
42+
return [addr - 10000 + i for i in range(count)]
43+
44+
def read_inputs(self, addr, count):
45+
return self._get_data(addr, count) if self.result else None
46+
3647
def send(self, telegrams):
3748
if not isinstance(telegrams, list):
3849
telegrams = [telegrams]
3950
FakeInterface.telegram_list = telegrams
4051

4152
for t in telegrams:
4253
if isinstance(t, LuxtronikSmartHomeReadTelegram):
43-
t.data = [t.addr - 10000 + i for i in range(t.count)]
54+
t.data = self._get_data(t.addr, t.count)
4455
return self.result
4556

4657

@@ -73,6 +84,7 @@ def test_empty(self):
7384
assert data.inputs.version == (1, 2, 0, 0)
7485

7586

87+
@patch("luxtronik.shi.LuxtronikModbusTcpInterface", FakeInterface)
7688
class TestLuxtronikSmartHomeInterface:
7789

7890
@classmethod
@@ -1246,3 +1258,26 @@ def test_trial_and_error_mode(self):
12461258
assert FakeInterface.telegram_list[7].addr == offset + 4
12471259
assert FakeInterface.telegram_list[7].count == 1
12481260
assert FakeInterface.telegram_list[7].data == [16]
1261+
1262+
def test_create_modbus(self):
1263+
interface = create_modbus_tcp('host', version=None)
1264+
assert interface.version is None
1265+
1266+
interface = create_modbus_tcp('host', version=1)
1267+
assert interface.version is None
1268+
1269+
interface = create_modbus_tcp('host', version="1.2.3")
1270+
assert interface.version == (1, 2, 3, 0)
1271+
1272+
interface = create_modbus_tcp('host', version="latest")
1273+
assert interface.version == LUXTRONIK_LATEST_SHI_VERSION
1274+
1275+
interface = create_modbus_tcp('host')
1276+
assert interface.version == (400, 401, 402, 0)
1277+
1278+
FakeInterface.result = False
1279+
1280+
interface = create_modbus_tcp('host')
1281+
assert interface.version is None
1282+
1283+
FakeInterface.result = True

0 commit comments

Comments
 (0)