Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,38 @@ The following example reads in data from the heat pump:
from luxtronik import Luxtronik

l = Luxtronik('192.168.1.23', 8889)
calculations, parameters, visibilities = l.read()
heating_limit = l.parameters.get("ID_Einst_Heizgrenze_Temp")

t_forerun = calculations.get("ID_WEB_Temperatur_TVL")
# Do something else here...

# alternatively get also works with numerical ID values
# Read the values again
l.read()
t_forerun = l.calculations.get("ID_WEB_Temperatur_TVL")
t_outside = l.calculations.get("ID_WEB_Temperatur_TA")

t_forerun = calculations.get(10)
# alternatively get also works with numerical ID values
t_forerun = l.calculations.get(10)

print(t_forerun) # this returns the temperature value of the forerun, 22.7 for example
print(t_forerun.unit) # gives you the unit of the value if known, °C for example

# calculations holds measurement values
# check https://github.com/Bouni/luxtronik/blob/master/luxtronik/calculations.py for values you might need
# check https://github.com/Bouni/python-luxtronik/blob/master/luxtronik/calculations.py for values you might need

# parameters holds parameter values
# check https://github.com/Bouni/luxtronik/blob/master/luxtronik/parameters.py for values you might need
# check https://github.com/Bouni/python-luxtronik/blob/master/luxtronik/parameters.py for values you might need

# visibilitys holds visibility values, the function of visibilities is not clear at this point
# check https://github.com/Bouni/luxtronik/blob/master/luxtronik/visibilities.py for values you might need
# check https://github.com/Bouni/python-luxtronik/blob/master/luxtronik/visibilities.py for values you might need
```

The method `read()` reads the calculations, parameters and
visibilities from the heat pump.
Alternatively `read_parameters()`, `read_calculations()` or `read_visibilities()`
can be used.

Note that an initial read operation is carried out in the constructor.

### SCRIPTS AND COMMAND LINE INTERFACE (CLI)

Once installed, the luxtronik package provides several scripts that can be used
Expand Down Expand Up @@ -188,8 +199,16 @@ from luxtronik import Luxtronik, Parameters

l = Luxtronik('192.168.1.23', 8889)

# Queue a parameter change
# In this example, the domestic hot water temperature is set to 45 degrees.
l.parameters.set("ID_Soll_BWS_akt", 45.0)

# Write all queued changes to the heat pump
l.write()

# Another possibility to write parameters
parameters = Parameters()
heating_mode = parameters.set("ID_Ba_Hz_akt", "Party")
parameters.set("ID_Ba_Hz_akt", "Party")
l.write(parameters)

# If you're not sure what values to write, you can get all available options:
Expand Down
162 changes: 114 additions & 48 deletions luxtronik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,38 @@ def is_socket_closed(sock: socket.socket) -> bool:
"""Check is socket closed."""
try:
# this will try to read bytes without blocking and also without removing them from buffer
data = sock.recv(
LUXTRONIK_SOCKET_READ_SIZE_PEEK, socket.MSG_DONTWAIT | socket.MSG_PEEK
)
data = sock.recv(LUXTRONIK_SOCKET_READ_SIZE_PEEK, socket.MSG_DONTWAIT | socket.MSG_PEEK)
if len(data) == 0:
return True
except BlockingIOError:
return False # socket is open and reading from it would block
except ConnectionResetError: # pylint: disable=broad-except
return True # socket was closed for some other reason
except Exception as err: # pylint: disable=broad-except
LOGGER.exception(
"Unexpected exception when checking if socket is closed", exc_info=err
)
LOGGER.exception("Unexpected exception when checking if socket is closed", exc_info=err)
return False
return False


class Luxtronik:
"""Main luxtronik class."""
class LuxtronikData:
"""
Collection of parameters, calculations and visiblities.
Also provide some high level access functions to their data values.
"""

def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT, safe=True):
def __init__(self, parameters=None, calculations=None, visibilities=None, safe=True):
self.parameters = Parameters(safe) if parameters is None else parameters
self.calculations = Calculations() if calculations is None else calculations
self.visibilities = Visibilities() if visibilities is None else visibilities


class LuxtronikSocketInterface:
"""Luxtronik read/write interface via socket."""

def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT):
self._lock = threading.Lock()
self._host = host
self._port = port
self._safe = safe
self._socket = None
self._connect()

Expand All @@ -76,18 +83,15 @@ def _connect(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if is_none or is_socket_closed(self._socket):
self._socket.connect((self._host, self._port))
LOGGER.info(
"Connected to Luxtronik heat pump %s:%s", self._host, self._port
)
LOGGER.info("Connected to Luxtronik heat pump %s:%s", self._host, self._port)

def _disconnect(self):
"""Disconnect the socket if not already done."""
if self._socket is not None:
if not is_socket_closed(self._socket):
self._socket.close()
self._socket = None
LOGGER.info(
"Disconnected from Luxtronik heatpump %s:%s", self._host, self._port
)
LOGGER.info("Disconnected from Luxtronik heatpump %s:%s", self._host, self._port)

def _with_lock_and_connect(self, func, *args, **kwargs):
"""
Expand All @@ -103,41 +107,73 @@ def _with_lock_and_connect(self, func, *args, **kwargs):
ret_val = func(*args, **kwargs)
return ret_val

def read(self):
def read(self, data=None):
"""
Read data from heat pump.
All available data will be read from the heat pump.
All available data will be read from the heat pump
and integrated to the passed data object.
This data object is returned afterwards, mainly for access to a newly created.
"""
return self._with_lock_and_connect(self._read)
if data is None:
data = LuxtronikData()
return self._with_lock_and_connect(self._read, data)

def read_parameters(self):
"""Read parameters from heat pump."""
return self._with_lock_and_connect(self._read_parameters)
def read_parameters(self, parameters=None):
"""
Read parameters from heat pump and integrate them to the passed dictionary.
This dictionary is returned afterwards, mainly for access to a newly created.
"""
if parameters is None:
parameters = Parameters()
return self._with_lock_and_connect(self._read_parameters, parameters)

def read_calculations(self):
"""Read calculations from heat pump."""
return self._with_lock_and_connect(self._read_calculations)
def read_calculations(self, calculations=None):
"""
Read calculations from heat pump and integrate them to the passed dictionary.
This dictionary is returned afterwards, mainly for access to a newly created.
"""
if calculations is None:
calculations = Calculations()
return self._with_lock_and_connect(self._read_calculations, calculations)

def read_visibilities(self):
"""Read visibilities from heat pump."""
return self._with_lock_and_connect(self._read_visibilities)
def read_visibilities(self, visibilities=None):
"""
Read visibilities from heat pump and integrate them to the passed dictionary.
This dictionary is returned afterwards, mainly for access to a newly created.
"""
if visibilities is None:
visibilities = Visibilities()
return self._with_lock_and_connect(self._read_visibilities, visibilities)

def write(self, parameters):
"""
Write parameter to heat pump.
All parameters will be written to the heat pump
prior to reading back in all data from the heat pump.
Write all set parameters to the heat pump.
:param Parameters() parameters Parameter dictionary to be written
to the heatpump before reading all available data
from the heat pump.
"""
return self._with_lock_and_connect(self._write, parameters)
self._with_lock_and_connect(self._write, parameters)

def write_and_read(self, parameters, data=None):
"""
Write all set parameter to the heat pump (see write())
prior to reading back in all data from the heat pump (see read())
after a short wait time
"""
if data is None:
data = LuxtronikData()
return self._with_lock_and_connect(self._write_and_read, parameters, data)

def _read(self, data):
self._read_parameters(data.parameters)
self._read_calculations(data.calculations)
self._read_visibilities(data.visibilities)
return data

def _read(self):
parameters = self._read_parameters()
calculations = self._read_calculations()
visibilities = self._read_visibilities()
return calculations, parameters, visibilities
def _write_and_read(self, parameters, data):
self._write(parameters)
# Give the heatpump a short time to handle the value changes/calculations:
time.sleep(WAIT_TIME_AFTER_PARAMETER_WRITE)
return self._read(data)

def _write(self, parameters):
for index, value in parameters.queue.items():
Expand All @@ -157,12 +193,8 @@ def _write(self, parameters):
LOGGER.debug("%s: Value %s", self._host, val)
# Flush queue after writing all values
parameters.queue = {}
# Give the heatpump a short time to handle the value changes/calculations:
time.sleep(WAIT_TIME_AFTER_PARAMETER_WRITE)
# Read the new values based on our parameter changes:
return self._read()

def _read_parameters(self):
def _read_parameters(self, parameters):
data = []
self._send_ints(LUXTRONIK_PARAMETERS_READ, 0)
cmd = self._read_int()
Expand All @@ -176,11 +208,10 @@ def _read_parameters(self):
# not logging this as error as it would be logged on every read cycle
LOGGER.debug("%s: %s", self._host, err)
LOGGER.info("%s: Read %d parameters", self._host, length)
parameters = Parameters(safe=self._safe)
parameters.parse(data)
return parameters

def _read_calculations(self):
def _read_calculations(self, calculations):
data = []
self._send_ints(LUXTRONIK_CALCULATIONS_READ, 0)
cmd = self._read_int()
Expand All @@ -196,11 +227,10 @@ def _read_calculations(self):
# not logging this as error as it would be logged on every read cycle
LOGGER.debug("%s: %s", self._host, err)
LOGGER.info("%s: Read %d calculations", self._host, length)
calculations = Calculations()
calculations.parse(data)
return calculations

def _read_visibilities(self):
def _read_visibilities(self, visibilities):
data = []
self._send_ints(LUXTRONIK_VISIBILITIES_READ, 0)
cmd = self._read_int()
Expand All @@ -214,7 +244,6 @@ def _read_visibilities(self):
# not logging this as error as it would be logged on every read cycle
LOGGER.debug("%s: %s", self._host, err)
LOGGER.info("%s: Read %d visibilities", self._host, length)
visibilities = Visibilities()
visibilities.parse(data)
return visibilities

Expand All @@ -233,3 +262,40 @@ def _read_char(self):
"Low-level helper to receive a signed int"
reading = self._socket.recv(LUXTRONIK_SOCKET_READ_SIZE_CHAR)
return struct.unpack(">b", reading)[0]


class Luxtronik(LuxtronikData):
"""
Wrapper around the data and the read/write interface.
Mainly to ensure backwards compatibility
of the read/write interface to other projects.
"""

def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT, safe=True):
super().__init__(safe=safe)
self.interface = LuxtronikSocketInterface(host, port)
self.read()

def read(self):
return self.interface.read(self)

def read_parameters(self):
return self.interface.read_parameters(self.parameters)

def read_calculations(self):
return self.interface.read_calculations(self.calculations)

def read_visibilities(self):
return self.interface.read_visibilities(self.visibilites)

def write(self, parameters=None):
if parameters is None:
self.interface.write(self.parameters)
else:
self.interface.write(parameters)

def write_and_read(self, parameters=None):
if parameters is None:
return self.interface.write_and_read(self.parameters, self)
else:
return self.interface.write_and_read(parameters, self)
Loading