Skip to content

Commit

Permalink
0.9.6.16 Added 2 Panel Entity Attributes and changed X10 decode
Browse files Browse the repository at this point in the history
Changed the way the X10 data is decoded to hopefully fix a bug
Added 2 Panel Attributes: lastevent and lasteventtime
  • Loading branch information
davesmeghead committed Aug 6, 2024
1 parent ac325b0 commit b7f0dc0
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 186 deletions.
2 changes: 1 addition & 1 deletion custom_components/visonic/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
# "trigger",
#]

CLIENT_VERSION = "0.9.6.14"
CLIENT_VERSION = "0.9.6.16"

MAX_CLIENT_LOG_ENTRIES = 300

Expand Down
31 changes: 28 additions & 3 deletions custom_components/visonic/examples/complete_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ def updateConfig(self, conf=None):
self.visonicProtocol.updateSettings(self.__getConfigData())
#print("[updateConfig] exit")

def getPanelLastEvent(self) -> str:
""" Is the siren active. """
def getPanelLastEvent(self) -> (str, str):
""" Get Last Panel Event. """
if self.visonicProtocol is not None:
return self.visonicProtocol.getPanelLastEvent()
return False
Expand Down Expand Up @@ -864,7 +864,7 @@ def help():
for device in devices:
console.print("Device " + str(device))
else:
console.print("ERROR: There must be a panel connection to perform command " + result)
console.print("ERROR: invalid command " + result)

print("Here ZZZZZZZ")

Expand All @@ -891,11 +891,36 @@ def help():
raise e

def handle_exception(loop, context):

def _createPrefix() -> str:
previous_frame = currentframe().f_back
(
filepath,
line_number,
function,
lines,
index,
) = inspect.getframeinfo(previous_frame)
filename = filepath[filepath.rfind('/')+1:]
s = f"{filename} {line_number:<5} {function:<30} "
previous_frame = currentframe()
(
filepath,
line_number,
function,
lines,
index,
) = inspect.getframeinfo(previous_frame)
filename = filepath[filepath.rfind('/')+1:]

return s + f"{filename} {line_number:<5} {function:<30} "

# context["message"] will always be there; but context["exception"] may not
msg = context.get("exception", context["message"])
if str(msg) != terminating_clean:
print(f"Caught exception: {msg}")
print(f" {context}")
#print(f" {_createPrefix()}")
asyncio.create_task(shutdown(loop))

async def shutdown(loop, signal=None):
Expand Down
2 changes: 1 addition & 1 deletion custom_components/visonic/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
"loggers": ["visonic"],
"requirements": ["Pillow", "pyserial_asyncio"],
"single_config_entry": false,
"version": "0.9.6.13"
"version": "0.9.6.16"
}
4 changes: 2 additions & 2 deletions custom_components/visonic/pyconst.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,9 @@ def isPanelBypass(self) -> bool:
return False

@abstractmethod
def getPanelLastEvent(self) -> str:
def getPanelLastEvent(self) -> (str, str):
""" Return the panels last event string """
return ""
return ("", "")

# @abstractmethod
# def getPanelTroubleStatus(self) -> str:
Expand Down
41 changes: 21 additions & 20 deletions custom_components/visonic/pyhelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def _getUTCTime() -> datetime:
0x3B : AlTroubleType.BATTERY, 0x3C : AlTroubleType.BATTERY, 0x40 : AlTroubleType.BATTERY, 0x43 : AlTroubleType.BATTERY
}

# Convert byte array to a string of hex values
def toString(array_alpha: bytearray, gap = " "):
return ("".join(("%02x"+gap) % b for b in array_alpha))[:-len(gap)] if len(gap) > 0 else ("".join("%02x" % b for b in array_alpha))

class vloggerclass:
def __init__(self, loggy, panel_id : int = -1, detail : bool = False):
self.detail = detail
Expand Down Expand Up @@ -654,11 +658,11 @@ def _validatePDU(self, packet: bytearray) -> bool:
return True

if packet[-2:-1][0] == self._calculateCRC(packet[1:-2])[0] + 1:
log.debug("[_validatePDU] Validated a Packet with a checksum that is 1 more than the actual checksum!!!! {0} and {1} alt calc is {2}".format(self._toString(packet), hex(self._calculateCRC(packet[1:-2])[0]).upper(), hex(self._calculateCRCAlt(packet[1:-2])[0]).upper()))
log.debug("[_validatePDU] Validated a Packet with a checksum that is 1 more than the actual checksum!!!! {0} and {1} alt calc is {2}".format(toString(packet), hex(self._calculateCRC(packet[1:-2])[0]).upper(), hex(self._calculateCRCAlt(packet[1:-2])[0]).upper()))
return True

if packet[-2:-1][0] == self._calculateCRC(packet[1:-2])[0] - 1:
log.debug("[_validatePDU] Validated a Packet with a checksum that is 1 less than the actual checksum!!!! {0} and {1} alt calc is {2}".format(self._toString(packet), hex(self._calculateCRC(packet[1:-2])[0]).upper(), hex(self._calculateCRCAlt(packet[1:-2])[0]).upper()))
log.debug("[_validatePDU] Validated a Packet with a checksum that is 1 less than the actual checksum!!!! {0} and {1} alt calc is {2}".format(toString(packet), hex(self._calculateCRC(packet[1:-2])[0]).upper(), hex(self._calculateCRCAlt(packet[1:-2])[0]).upper()))
return True

log.debug("[_validatePDU] Not valid packet, CRC failed, may be ongoing and not final 0A")
Expand All @@ -667,7 +671,7 @@ def _validatePDU(self, packet: bytearray) -> bool:
# alternative to calculate the checksum for sending and receiving messages
def _calculateCRCAlt(self, msg: bytearray):
""" Calculate CRC Checksum """
# log.debug("[_calculateCRC] Calculating for: %s", self._toString(msg))
# log.debug("[_calculateCRC] Calculating for: %s", toString(msg))
# Calculate the checksum
checksum = 0
for char in msg[0 : len(msg)]:
Expand All @@ -678,21 +682,21 @@ def _calculateCRCAlt(self, msg: bytearray):
checksum = 256 - (checksum % 255)
if checksum == 256:
checksum = 1
# log.debug("[_calculateCRC] Calculating for: {self._toString(msg)} calculated CRC is: {self._toString(bytearray([checksum]))}")
# log.debug("[_calculateCRC] Calculating for: {toString(msg)} calculated CRC is: {toString(bytearray([checksum]))}")
return bytearray([checksum])

# calculate the checksum for sending and receiving messages
def _calculateCRC(self, msg: bytearray):
""" Calculate CRC Checksum """
# log.debug("[_calculateCRC] Calculating for: %s", self._toString(msg))
# log.debug("[_calculateCRC] Calculating for: %s", toString(msg))
# Calculate the checksum
checksum = 0
for char in msg[0 : len(msg)]:
checksum += char
checksum = 0xFF - (checksum % 0xFF)
if checksum == 0xFF:
checksum = 0x00
# log.debug("[_calculateCRC] Calculating for: {self._toString(msg)} calculated CRC is: {self._toString(bytearray([checksum]))}")
# log.debug("[_calculateCRC] Calculating for: {toString(msg)} calculated CRC is: {toString(bytearray([checksum]))}")
return bytearray([checksum])


Expand Down Expand Up @@ -729,7 +733,8 @@ def _initVars(self):

self.PanelAlarmStatus = AlAlarmType.NONE
self.PanelTroubleStatus = AlTroubleType.NONE
self.PanelLastEvent = "Unknown"
self.PanelLastEvent = "Startup/Startup"
self.PanelLastEventTime = self._getTimeFunction().strftime("%d/%m/%Y, %H:%M:%S")
self.PanelStatusText = "Unknown"
self.LastPanelEventData = {}

Expand All @@ -755,7 +760,7 @@ def _dumpSensorsToLogFile(self, incX10 = False):
log.debug(" key {0:<2} X10 {1}".format(key, device))

log.debug(" Model {: <18} PowerMaster {: <18} LastEvent {: <18} Ready {: <13}".format(self.PanelModel,
'Yes' if self.PowerMaster else 'No', self.getPanelLastEvent(), 'Yes' if self.PanelReady else 'No'))
'Yes' if self.PowerMaster else 'No', self.getPanelLastEvent()[0], 'Yes' if self.PanelReady else 'No'))
pm = titlecase(self.PanelMode.name.replace("_"," ")) # str(AlPanelMode()[self.PanelMode]).replace("_"," ")
ts = titlecase(self.PanelTroubleStatus.name.replace("_"," ")) # str(AlTroubleType()[self.PanelTroubleStatus]).replace("_"," ")
al = titlecase(self.PanelAlarmStatus.name.replace("_"," ")) # str(AlAlarmType()[self.PanelAlarmStatus]).replace("_"," ")
Expand Down Expand Up @@ -802,8 +807,8 @@ def isPanelBypass(self) -> bool:
return self.PanelBypass
return False

def getPanelLastEvent(self) -> str:
return self.PanelLastEvent
def getPanelLastEvent(self) -> (str, str):
return (self.PanelLastEvent, self.PanelLastEventTime)

def requestPanelCommand(self, state : AlPanelCommand, code : str = "") -> AlCommandStatus:
""" Send a request to the panel to Arm/Disarm """
Expand Down Expand Up @@ -838,10 +843,6 @@ def getEventLog(self, code : str = "") -> AlCommandStatus:
""" Get Panel Event Log """
return AlCommandStatus.FAIL_ABSTRACT_CLASS_NOT_IMPLEMENTED

# Convert byte array to a string of hex values
def _toString(self, array_alpha: bytearray):
return ("".join("%02x " % b for b in array_alpha))[:-1]

# get the current date and time
def _getTimeFunction(self) -> datetime:
return datetime.now()
Expand All @@ -865,12 +866,13 @@ def setLastPanelEventData(self, count=0, type=[ ], event=[ ], zonemode=[ ], name

if count > 0:
self.PanelLastEvent = name[count-1] + "/" + zonemode[count-1]
self.PanelLastEventTime = self._getTimeFunction().strftime("%d/%m/%Y, %H:%M:%S")
for i in range(0, count):
a = {}
a["name"] = titlecase(name[i].replace("_"," ").lower())
a["event"] = titlecase(zonemode[i].replace("_"," ").lower())
log.debug(f"[PanelUpdate] {a}")
self.onPanelChangeHandler(AlCondition.PANEL_UPDATE, a)
self.sendPanelUpdate(AlCondition.PANEL_UPDATE, a)

#log.debug(f"Last event {datadict}")
return datadict
Expand All @@ -886,6 +888,8 @@ def getEventData(self) -> dict:
datadict["bypass"] = self.PanelBypass
datadict["alarm"] = titlecase(self.PanelAlarmStatus.name.replace("_"," ").lower())
datadict["trouble"] = titlecase(self.PanelTroubleStatus.name.replace("_"," ").lower())
datadict["lastevent"] = titlecase(self.PanelLastEvent.replace("_"," ").lower())
datadict["lasteventtime"] = self.PanelLastEventTime
return datadict

# Set the onDisconnect callback handlers
Expand All @@ -908,12 +912,9 @@ def onPanelLog(self, fn : Callable): # onPanelLog ( event_log_entry
def onPanelChange(self, fn : Callable): # onPanelChange ( datadictionary : dict )
self.onPanelChangeHandler = fn

def sendPanelUpdate(self, ev : AlCondition):
def sendPanelUpdate(self, ev : AlCondition, d : dict = {} ):
if self.onPanelChangeHandler is not None:
if ev == AlCondition.PANEL_UPDATE:
self.onPanelChangeHandler(AlCondition.PUSH_CHANGE, {})
else:
self.onPanelChangeHandler(ev, {})
self.onPanelChangeHandler(ev, d)

def _searchDict(self, dict, v_search):
for k, v in dict.items():
Expand Down
Loading

0 comments on commit b7f0dc0

Please sign in to comment.