Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 76 additions & 107 deletions pyControl4/error_handling.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,93 @@
"""Handles errors recieved from the Control4 API."""

"""Handles errors returned by the Control4 Director."""
import json
import xmltodict

import logging
from xml.parsers.expat import ExpatError

class C4Exception(Exception):
"""Base error for pyControl4."""
import xmltodict

def __init__(self, message):
self.message = message
_LOGGER = logging.getLogger(__name__)


class NotFound(C4Exception):
"""Raised when a 404 response is recieved from the Control4 API.
Occurs when the requested controller, etc. could not be found."""

class C4Exception(Exception):
"""Base class for pyControl4 exceptions."""
pass

class Unauthorized(C4Exception):
"""Raised when unauthorized, but no other recognized details are provided.
Occurs when token is invalid."""


class BadCredentials(Unauthorized):
"""Raised when provided credentials are incorrect."""

"""Raised when the bearer token is invalid or expired."""
pass

class BadToken(Unauthorized):
"""Raised when director bearer token is invalid."""
class NotFound(C4Exception):
"""Raised when the Control4 Director returns a 404 Not Found error."""
pass

class BadCredentials(C4Exception):
"""Raised when the username or password for the Control4 account is invalid."""
pass

class InvalidCategory(C4Exception):
"""Raised when an invalid category is provided when calling
`pyControl4.director.C4Director.getAllItemsByCategory`."""


ERROR_CODES = {"401": Unauthorized, "404": NotFound}

ERROR_DETAILS = {
"Permission denied Bad credentials": BadCredentials,
}

DIRECTOR_ERRORS = {"Unauthorized": Unauthorized, "Invalid category": InvalidCategory}
"""Raised when a category does not exist on the Control4 system."""
pass

DIRECTOR_ERROR_DETAILS = {"Expired or invalid token": BadToken}
class C4CorruptXMLResponse(C4Exception):
"""Raised when the Control4 Director sends a malformed XML response."""
pass


async def __checkResponseFormat(response_text: str):
"""Known Control4 authentication API error message formats:
```json
{
"C4ErrorResponse": {
"code": 401,
"details": "Permission denied Bad credentials",
"message": "Permission denied",
"subCode": 0
}
}
```
```json
{
"code": 404,
"details": "Account with id:000000 not found in DB",
"message": "Account not found",
"subCode": 0
}```
```xml
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<C4ErrorResponse>
<code>401</code>
<details></details>
<message>Permission denied</message>
<subCode>0</subCode>
</C4ErrorResponse>
```
Known Control4 director error message formats:
```json
{
"error": "Unauthorized",
"details": "Expired or invalid token"
}
```
async def checkResponseForError(response_text):
"""
if response_text.startswith("<"):
return "XML"
return "JSON"


async def checkResponseForError(response_text: str):
"""Checks a string response from the Control4 API for error codes.
Parameters:
`response_text` - JSON or XML response from Control4, as a string.
Checks a response from the Control4 Director for an error message.
Returns if no error is found.
Raises Unauthorized or NotFound if an error is found.
Raises C4CorruptXMLResponse if the XML is malformed.
"""
if await __checkResponseFormat(response_text) == "JSON":
# Check for known plain-text error messages first.
if "Cannot GET" in response_text:
raise NotFound(f"Endpoint not found on Director: {response_text}")

# First, try to parse the response as JSON, as some controllers return this.
try:
dictionary = json.loads(response_text)
elif await __checkResponseFormat(response_text) == "XML":
if "status_code" in dictionary:
if dictionary["status_code"] == 404:
raise NotFound("404 Not Found from Control4 Director.")
if "error" in dictionary:
if "Invalid category" in dictionary["error"]:
raise InvalidCategory(dictionary["error"])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all the exception cases in this function, can you change the behavior so that it includes the entire response_text in the thrown exception? This would match the old behavior and make sure the entire response gets printed in the error logs

# If it's valid JSON but not an error, we can return.
return
except json.JSONDecodeError:
# Not a JSON response, so we'll try to parse it as XML.
pass

# If JSON parsing fails, try to parse the response as XML.
try:
dictionary = xmltodict.parse(response_text)
if "C4ErrorResponse" in dictionary:
if (
"details" in dictionary["C4ErrorResponse"]
and dictionary["C4ErrorResponse"]["details"] in ERROR_DETAILS
):
exception = ERROR_DETAILS.get(dictionary["C4ErrorResponse"]["details"])
raise exception(response_text)
else:
exception = ERROR_CODES.get(
str(dictionary["C4ErrorResponse"]["code"]), C4Exception
)
raise exception(response_text)
elif "code" in dictionary:
if "details" in dictionary and dictionary["details"] in ERROR_DETAILS:
exception = ERROR_DETAILS.get(dictionary["details"])
raise exception(response_text)
else:
exception = ERROR_CODES.get(str(dictionary["code"]), C4Exception)
raise exception(response_text)
elif "error" in dictionary:
if "details" in dictionary and dictionary["details"] in DIRECTOR_ERROR_DETAILS:
exception = DIRECTOR_ERROR_DETAILS.get(dictionary["details"])
raise exception(response_text)
else:
exception = DIRECTOR_ERRORS.get(str(dictionary["error"]), C4Exception)
raise exception(response_text)
except ExpatError as e:
_LOGGER.error(
(
"Failed to parse XML response from Director due to a mismatched tag or other corruption. "
"The raw text received from the controller was: \n%s"
),
response_text,
)
# Re-raise the original error so the integration still fails as expected
raise e
Comment on lines +65 to +73
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using _LOGGER is there a way to include this error message plus the ExpatError inside a C4CorruptXMLResponse instead? Right now it seems that C4CorruptXMLResponse is completely unused.

except Exception:
# Not a valid XML response, so it can't be a C4 error message.
return

# Check for C4 errors in the parsed XML
if "c4soap" in dictionary:
if "error" in dictionary["c4soap"]:
error_code = int(dictionary["c4soap"]["error"])
# 401 is Unauthorized
if error_code == 401:
raise Unauthorized(
"Invalid or expired bearer token. Re-authentication is required."
)
# Other error codes can be added here if necessary
else:
# Generic error for other codes
raise Exception(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise C4Exception instead to match old behavior

f"Control4 Director returned an unknown error: {dictionary['c4soap']['error_string']}"
)
return
Loading