diff --git a/pyControl4/error_handling.py b/pyControl4/error_handling.py index 4a43255..922c405 100644 --- a/pyControl4/error_handling.py +++ b/pyControl4/error_handling.py @@ -1,124 +1,93 @@ -"""Handles errors recieved from the Control4 API.""" - +"""Handles errors returned by the Control4 Director.""" import json -import xmltodict - +import logging +from xml.parsers.expat import ExpatError -class C4Exception(Exception): - """Base error for pyControl4.""" +import xmltodict - def __init__(self, message): - self.message = message +_LOGGER = logging.getLogger(__name__) -class NotFound(C4Exception): - """Raised when a 404 response is recieved from the Control4 API. - Occurs when the requested controller, etc. could not be found.""" - +class C4Exception(Exception): + """Base class for pyControl4 exceptions.""" + pass class Unauthorized(C4Exception): - """Raised when unauthorized, but no other recognized details are provided. - Occurs when token is invalid.""" - - -class BadCredentials(Unauthorized): - """Raised when provided credentials are incorrect.""" - + """Raised when the bearer token is invalid or expired.""" + pass -class BadToken(Unauthorized): - """Raised when director bearer token is invalid.""" +class NotFound(C4Exception): + """Raised when the Control4 Director returns a 404 Not Found error.""" + pass +class BadCredentials(C4Exception): + """Raised when the username or password for the Control4 account is invalid.""" + pass class InvalidCategory(C4Exception): - """Raised when an invalid category is provided when calling - `pyControl4.director.C4Director.getAllItemsByCategory`.""" - - -ERROR_CODES = {"401": Unauthorized, "404": NotFound} - -ERROR_DETAILS = { - "Permission denied Bad credentials": BadCredentials, -} - -DIRECTOR_ERRORS = {"Unauthorized": Unauthorized, "Invalid category": InvalidCategory} + """Raised when a category does not exist on the Control4 system.""" + pass -DIRECTOR_ERROR_DETAILS = {"Expired or invalid token": BadToken} +class C4CorruptXMLResponse(C4Exception): + """Raised when the Control4 Director sends a malformed XML response.""" + pass - -async def __checkResponseFormat(response_text: str): - """Known Control4 authentication API error message formats: - ```json - { - "C4ErrorResponse": { - "code": 401, - "details": "Permission denied Bad credentials", - "message": "Permission denied", - "subCode": 0 - } - } - ``` - ```json - { - "code": 404, - "details": "Account with id:000000 not found in DB", - "message": "Account not found", - "subCode": 0 - }``` - ```xml - - - 401 -
- Permission denied - 0 -
- ``` - Known Control4 director error message formats: - ```json - { - "error": "Unauthorized", - "details": "Expired or invalid token" - } - ``` +async def checkResponseForError(response_text): """ - if response_text.startswith("<"): - return "XML" - return "JSON" - - -async def checkResponseForError(response_text: str): - """Checks a string response from the Control4 API for error codes. - - Parameters: - `response_text` - JSON or XML response from Control4, as a string. + Checks a response from the Control4 Director for an error message. + Returns if no error is found. + Raises Unauthorized or NotFound if an error is found. + Raises C4CorruptXMLResponse if the XML is malformed. """ - if await __checkResponseFormat(response_text) == "JSON": + # Check for known plain-text error messages first. + if "Cannot GET" in response_text: + raise NotFound(f"Endpoint not found on Director: {response_text}") + + # First, try to parse the response as JSON, as some controllers return this. + try: dictionary = json.loads(response_text) - elif await __checkResponseFormat(response_text) == "XML": + if "status_code" in dictionary: + if dictionary["status_code"] == 404: + raise NotFound("404 Not Found from Control4 Director.") + if "error" in dictionary: + if "Invalid category" in dictionary["error"]: + raise InvalidCategory(dictionary["error"]) + # If it's valid JSON but not an error, we can return. + return + except json.JSONDecodeError: + # Not a JSON response, so we'll try to parse it as XML. + pass + + # If JSON parsing fails, try to parse the response as XML. + try: dictionary = xmltodict.parse(response_text) - if "C4ErrorResponse" in dictionary: - if ( - "details" in dictionary["C4ErrorResponse"] - and dictionary["C4ErrorResponse"]["details"] in ERROR_DETAILS - ): - exception = ERROR_DETAILS.get(dictionary["C4ErrorResponse"]["details"]) - raise exception(response_text) - else: - exception = ERROR_CODES.get( - str(dictionary["C4ErrorResponse"]["code"]), C4Exception - ) - raise exception(response_text) - elif "code" in dictionary: - if "details" in dictionary and dictionary["details"] in ERROR_DETAILS: - exception = ERROR_DETAILS.get(dictionary["details"]) - raise exception(response_text) - else: - exception = ERROR_CODES.get(str(dictionary["code"]), C4Exception) - raise exception(response_text) - elif "error" in dictionary: - if "details" in dictionary and dictionary["details"] in DIRECTOR_ERROR_DETAILS: - exception = DIRECTOR_ERROR_DETAILS.get(dictionary["details"]) - raise exception(response_text) - else: - exception = DIRECTOR_ERRORS.get(str(dictionary["error"]), C4Exception) - raise exception(response_text) + except ExpatError as e: + _LOGGER.error( + ( + "Failed to parse XML response from Director due to a mismatched tag or other corruption. " + "The raw text received from the controller was: \n%s" + ), + response_text, + ) + # Re-raise the original error so the integration still fails as expected + raise e + except Exception: + # Not a valid XML response, so it can't be a C4 error message. + return + + # Check for C4 errors in the parsed XML + if "c4soap" in dictionary: + if "error" in dictionary["c4soap"]: + error_code = int(dictionary["c4soap"]["error"]) + # 401 is Unauthorized + if error_code == 401: + raise Unauthorized( + "Invalid or expired bearer token. Re-authentication is required." + ) + # Other error codes can be added here if necessary + else: + # Generic error for other codes + raise Exception( + f"Control4 Director returned an unknown error: {dictionary['c4soap']['error_string']}" + ) + return