Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: # yamllint disable-line rule:truthy
push:
branches:
- main
pull_request:
pull_request_target:
jobs:
pytest:
runs-on: ubuntu-latest
Expand Down
83 changes: 52 additions & 31 deletions luxtronik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,23 @@ def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT, safe=True):
self._port = port
self._safe = safe
self._socket = None
self.read()
self._connect()

def __del__(self):
self._disconnect()

def _connect(self):
"""Connect the socket if not already done."""
is_none = self._socket is None
if is_none:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_none or is_socket_closed(self._socket):
self._socket.connect((self._host, self._port))
LOGGER.info(
"Connected to Luxtronik heat pump %s:%s", self._host, self._port
)

def _disconnect(self):
if self._socket is not None:
if not is_socket_closed(self._socket):
self._socket.close()
Expand All @@ -74,43 +88,50 @@ def __del__(self):
"Disconnected from Luxtronik heatpump %s:%s", self._host, self._port
)

def read(self):
"""Read data from heatpump."""
return self._read_after_write(parameters=None)

def write(self, parameters):
"""Write parameter to heatpump."""
return self._read_after_write(parameters=parameters)

def _read_after_write(self, parameters):
def _with_lock_and_connect(self, func, *args, **kwargs):
"""
Read and/or write value from and/or to heatpump.
This method is essentially a wrapper for the _read() and _write()
methods.
Decorator around various read/write functions to connect first.

This method is essentially a wrapper for the _read() and _write() methods.
Locking is being used to ensure that only a single socket operation is
performed at any point in time. This helps to avoid issues with the
Luxtronik controller, which seems unstable otherwise.
If write is true, all parameters will be written to the heat pump
prior to reading back in all data from the heat pump. If write is
false, no data will be written, but all available data will be read
from the heat pump.
"""
with self._lock:
self._connect()
ret_val = func(*args, **kwargs)
# self._disconnect()
return ret_val

def read(self):
"""
Read data from heat pump.
All available data will be read from the heat pump.
"""
return self._with_lock_and_connect(self._read)

def read_parameters(self):
"""Read parameters from heat pump."""
return self._with_lock_and_connect(self._read_parameters)

def read_calculations(self):
"""Read calculations from heat pump."""
return self._with_lock_and_connect(self._read_calculations)

def read_visibilities(self):
"""Read visibilities from heat pump."""
return self._with_lock_and_connect(self._read_visibilities)

def write(self, parameters):
"""
Write parameter to heat pump.
All parameters will be written to the heat pump
prior to reading back in all data from the heat pump.
:param Parameters() parameters Parameter dictionary to be written
to the heatpump before reading all available data
from the heatpump. At 'None' it is read only.
from the heat pump.
"""

with self._lock:
is_none = self._socket is None
if is_none:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_none or is_socket_closed(self._socket):
self._socket.connect((self._host, self._port))
LOGGER.info(
"Connected to Luxtronik heatpump %s:%s", self._host, self._port
)
if parameters is not None:
return self._write(parameters)
return self._read()
return self._with_lock_and_connect(self._write, parameters)

def _read(self):
parameters = self._read_parameters()
Expand Down
57 changes: 16 additions & 41 deletions luxtronik/calculations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Parse luxtronik calculations."""
import logging

from luxtronik.data_vector import DataVector

from luxtronik.datatypes import (
BivalenceLevel,
Bool,
Expand Down Expand Up @@ -33,14 +35,16 @@
Voltage,
)

LOGGER = logging.getLogger("Luxtronik.Calculations")


class Calculations:
class Calculations(DataVector):
"""Class that holds all calculations."""

logger = logging.getLogger("Luxtronik.Calculations")
name = "Calculation"

def __init__(self):
self._calculations = {
super().__init__()
self._data = {
0: Unknown("Unknown_Calculation_0"),
1: Unknown("Unknown_Calculation_1"),
2: Unknown("Unknown_Calculation_2"),
Expand Down Expand Up @@ -301,41 +305,12 @@ def __init__(self):
257: Power("Heat_Output"),
258: MajorMinorVersion("RBE_Version"),
259: Unknown("Unknown_Calculation_259"),
260: Unknown("Unknown_Calculation_260"),
261: Unknown("Unknown_Calculation_261"),
262: Unknown("Unknown_Calculation_262"),
263: Unknown("Unknown_Calculation_263"),
264: Unknown("Unknown_Calculation_264"),
265: Unknown("Unknown_Calculation_265"),
266: Unknown("Unknown_Calculation_266"),
267: Unknown("Unknown_Calculation_267"),
}

def __iter__(self):
return iter(self._calculations.items())

def parse(self, raw_data):
"""Parse raw calculations data."""
for index, data in enumerate(raw_data):
calculation = self._calculations.get(index, False)
if calculation is not False:
calculation.raw = data
else:
# LOGGER.warning("Calculation '%d' not in list of calculations", index)
calculation = Unknown(f"Unknown_Calculation_{index}")
calculation.raw = data
self._calculations[index] = calculation

def _lookup(self, target):
"""Lookup calculation by either id or name."""
# Get calculation by id
if isinstance(target, int):
return self._calculations.get(target, None)
# Get calculation by name
if isinstance(target, str):
try:
target = int(target)
return self._calculations.get(target, None)
except ValueError:
for _, calculation in self._calculations.items():
if calculation.name == target:
return calculation
LOGGER.warning("Calculation '%s' not found", target)
return None

def get(self, target):
"""Get calculation by id or name."""
calculation = self._lookup(target)
return calculation
67 changes: 67 additions & 0 deletions luxtronik/data_vector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Provides a base class for parameters, calculations, visibilities."""
import logging

from luxtronik.datatypes import Unknown


class DataVector:
"""Class that holds a vector of data entries."""

logger = logging.getLogger("Luxtronik.DataVector")
name = "DataVector"

def __init__(self):
"""Initialize DataVector class."""
self._data = {}

def __iter__(self):
"""Iterator for the data entries."""
return iter(self._data.items())

def parse(self, raw_data):
"""Parse raw data."""
for index, data in enumerate(raw_data):
entry = self._data.get(index, None)
if entry is not None:
entry.raw = data
else:
# self.logger.warning(f"Entry '%d' not in list of {self.name}", index)
entry = Unknown(f"Unknown_{self.name}_{index}")
entry.raw = data
self._data[index] = entry

def _lookup(self, target, with_index=False):
"""
Lookup an entry

"target" could either be its id or its name.

In case "with_index" is set, also the index is returned.
"""
if isinstance(target, str):
try:
# Try to get entry by id
target_index = int(target)
except ValueError:
# Get entry by name
target_index = None
for index, entry in self._data.items():
if entry.name == target:
target_index = index
elif isinstance(target, int):
# Get entry by id
target_index = target
else:
target_index = None

target_entry = self._data.get(target_index, None)
if target_entry is None:
self.logger.warning("entry '%s' not found", target)
if with_index:
return target_index, target_entry
return target_entry

def get(self, target):
"""Get entry by id or name."""
entry = self._lookup(target)
return entry
Loading