From 5634ff2b94b221c539db0b55ed7c51964d670f85 Mon Sep 17 00:00:00 2001 From: Luke Date: Tue, 2 Sep 2025 22:35:26 -0400 Subject: [PATCH 01/11] fix: handle auth expiring --- roborock/cloud_api.py | 19 +++++++++++++------ roborock/exceptions.py | 5 +++++ roborock/roborock_future.py | 6 +++--- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 0a7f2aa3..d796024b 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -8,10 +8,11 @@ from typing import Any import paho.mqtt.client as mqtt +from paho.mqtt.reasoncodes import ReasonCodes from .api import KEEPALIVE, RoborockClient from .containers import DeviceData, UserData -from .exceptions import RoborockException, VacuumError +from .exceptions import RoborockException, RoborockInvalidUserData, VacuumError from .protocol import ( Decoder, Encoder, @@ -78,13 +79,19 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None: self._encoder: Encoder = create_mqtt_encoder(device_info.device.local_key) def _mqtt_on_connect(self, *args, **kwargs): - _, __, ___, rc, ____ = args + rc: ReasonCodes = args[3] connection_queue = self._waiting_queue.get(CONNECT_REQUEST_ID) - if rc != mqtt.MQTT_ERR_SUCCESS: - message = f"Failed to connect ({mqtt.error_string(rc)})" + if rc != 0: + message = f"Failed to connect ({str(rc)})" self._logger.error(message) if connection_queue: - connection_queue.set_exception(VacuumError(message)) + # These are the ReasonCodes relating to authorization issues. + if rc.value in {24, 25, 133, 134, 135, 144}: + connection_queue.set_exception( + RoborockInvalidUserData("Failed to connect to mqtt. Invalid user data. Re-auth is needed.") + ) + else: + connection_queue.set_exception(VacuumError(message)) else: self._logger.debug("Failed to notify connect future, not in queue") return @@ -92,7 +99,7 @@ def _mqtt_on_connect(self, *args, **kwargs): topic = f"rr/m/o/{self._mqtt_user}/{self._hashed_user}/{self.device_info.device.duid}" (result, mid) = self._mqtt_client.subscribe(topic) if result != 0: - message = f"Failed to subscribe ({mqtt.error_string(rc)})" + message = f"Failed to subscribe ({str(rc)})" self._logger.error(message) if connection_queue: connection_queue.set_exception(VacuumError(message)) diff --git a/roborock/exceptions.py b/roborock/exceptions.py index bb297dad..a3b7030d 100644 --- a/roborock/exceptions.py +++ b/roborock/exceptions.py @@ -1,4 +1,5 @@ """Roborock exceptions.""" + from __future__ import annotations @@ -76,3 +77,7 @@ class RoborockTooManyRequest(RoborockException): class RoborockRateLimit(RoborockException): """Class for our rate limits exceptions.""" + + +class RoborockInvalidUserData(RoborockException): + """Class to state the user data is invalid (expired or manipulated).""" diff --git a/roborock/roborock_future.py b/roborock/roborock_future.py index 9563f785..e9830d1f 100644 --- a/roborock/roborock_future.py +++ b/roborock/roborock_future.py @@ -5,7 +5,7 @@ import async_timeout -from .exceptions import VacuumError +from .exceptions import RoborockInvalidUserData, VacuumError class RoborockFuture: @@ -21,11 +21,11 @@ def _set_result(self, item: Any) -> None: def set_result(self, item: Any) -> None: self.loop.call_soon_threadsafe(self._set_result, item) - def _set_exception(self, exc: VacuumError) -> None: + def _set_exception(self, exc: VacuumError | RoborockInvalidUserData) -> None: if not self.fut.cancelled(): self.fut.set_exception(exc) - def set_exception(self, exc: VacuumError) -> None: + def set_exception(self, exc: VacuumError | RoborockInvalidUserData) -> None: self.loop.call_soon_threadsafe(self._set_exception, exc) async def async_get(self, timeout: float | int) -> tuple[Any, VacuumError | None]: From 3fceb6bdedce74945d2b32ccf52dbc1ad89ebd22 Mon Sep 17 00:00:00 2001 From: Luke Date: Wed, 3 Sep 2025 21:41:28 -0400 Subject: [PATCH 02/11] fix: str some other rcs --- roborock/cloud_api.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index d796024b..874f3521 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -119,7 +119,7 @@ def _mqtt_on_message(self, *args, **kwargs): def _mqtt_on_disconnect(self, *args, **kwargs): _, __, rc, ___ = args try: - exc = RoborockException(mqtt.error_string(rc)) if rc != mqtt.MQTT_ERR_SUCCESS else None + exc = RoborockException(str(rc)) if rc != mqtt.MQTT_ERR_SUCCESS else None super().on_connection_lost(exc) connection_queue = self._waiting_queue.get(DISCONNECT_REQUEST_ID) if connection_queue: @@ -145,7 +145,7 @@ def _sync_disconnect(self) -> Any: if rc != mqtt.MQTT_ERR_SUCCESS: disconnected_future.cancel() - raise RoborockException(f"Failed to disconnect ({mqtt.error_string(rc)})") + raise RoborockException(f"Failed to disconnect ({str(rc)})") return disconnected_future @@ -184,4 +184,4 @@ def _send_msg_raw(self, msg: bytes) -> None: f"rr/m/i/{self._mqtt_user}/{self._hashed_user}/{self.device_info.device.duid}", msg ) if info.rc != mqtt.MQTT_ERR_SUCCESS: - raise RoborockException(f"Failed to publish ({mqtt.error_string(info.rc)})") + raise RoborockException(f"Failed to publish ({str(info.rc)})") From 987cc0b87b2f6d9cf317fbd577177bd30a945cfb Mon Sep 17 00:00:00 2001 From: Luke Date: Sun, 7 Sep 2025 14:14:13 -0400 Subject: [PATCH 03/11] fix: address some comments --- roborock/cloud_api.py | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 874f3521..49160da9 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -8,7 +8,9 @@ from typing import Any import paho.mqtt.client as mqtt -from paho.mqtt.reasoncodes import ReasonCodes + +# Mypy is not seeing this for some reason. It wants me to use the depreciated ReasonCodes +from paho.mqtt.reasoncodes import ReasonCode # type: ignore from .api import KEEPALIVE, RoborockClient from .containers import DeviceData, UserData @@ -68,7 +70,8 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None: self._mqtt_client = _Mqtt() self._mqtt_client.on_connect = self._mqtt_on_connect self._mqtt_client.on_message = self._mqtt_on_message - self._mqtt_client.on_disconnect = self._mqtt_on_disconnect + # Due to the incorrect ReasonCode, it is confused by typing + self._mqtt_client.on_disconnect = self._mqtt_on_disconnect # type: ignore if mqtt_params.tls: self._mqtt_client.tls_set() @@ -78,11 +81,17 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None: self._decoder: Decoder = create_mqtt_decoder(device_info.device.local_key) self._encoder: Encoder = create_mqtt_encoder(device_info.device.local_key) - def _mqtt_on_connect(self, *args, **kwargs): - rc: ReasonCodes = args[3] + def _mqtt_on_connect( + self, + client: mqtt.Client, + data: object, + flags: dict[str, int], + rc: ReasonCode, + properties: mqtt.Properties | None = None, + ): connection_queue = self._waiting_queue.get(CONNECT_REQUEST_ID) - if rc != 0: - message = f"Failed to connect ({str(rc)})" + if rc.is_failure: + message = f"Failed to connect ({rc})" self._logger.error(message) if connection_queue: # These are the ReasonCodes relating to authorization issues. @@ -116,10 +125,16 @@ def _mqtt_on_message(self, *args, **kwargs): except Exception as ex: self._logger.exception(ex) - def _mqtt_on_disconnect(self, *args, **kwargs): - _, __, rc, ___ = args + def _mqtt_on_disconnect( + self, + client: mqtt.Client, + data: object, + flags: dict[str, int], + rc: ReasonCode, + properties: mqtt.Properties | None = None, + ): try: - exc = RoborockException(str(rc)) if rc != mqtt.MQTT_ERR_SUCCESS else None + exc = RoborockException(str(rc)) if rc.is_failure else None super().on_connection_lost(exc) connection_queue = self._waiting_queue.get(DISCONNECT_REQUEST_ID) if connection_queue: From c64e999f38c89dc5d7f41f64341de1d6615b24ed Mon Sep 17 00:00:00 2001 From: Luke Date: Sun, 7 Sep 2025 14:43:57 -0400 Subject: [PATCH 04/11] fix: test --- roborock/cloud_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 49160da9..087e0220 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -130,11 +130,11 @@ def _mqtt_on_disconnect( client: mqtt.Client, data: object, flags: dict[str, int], - rc: ReasonCode, + rc: ReasonCode | None, properties: mqtt.Properties | None = None, ): try: - exc = RoborockException(str(rc)) if rc.is_failure else None + exc = RoborockException(str(rc)) if rc is not None and rc.is_failure else None super().on_connection_lost(exc) connection_queue = self._waiting_queue.get(DISCONNECT_REQUEST_ID) if connection_queue: From 26a906204f00a65577e0a8432ccc0a82dead9a44 Mon Sep 17 00:00:00 2001 From: Luke Date: Thu, 11 Sep 2025 18:06:35 -0400 Subject: [PATCH 05/11] feat: add seperate validate connection for the cloud api and bump keepalive --- roborock/api.py | 2 +- roborock/cloud_api.py | 40 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/roborock/api.py b/roborock/api.py index ee22d491..f49ab0b1 100644 --- a/roborock/api.py +++ b/roborock/api.py @@ -23,7 +23,7 @@ from .util import get_next_int _LOGGER = logging.getLogger(__name__) -KEEPALIVE = 60 +KEEPALIVE = 70 class RoborockClient(ABC): diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 087e0220..19e55c11 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -80,6 +80,8 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None: self._mutex = Lock() self._decoder: Decoder = create_mqtt_decoder(device_info.device.local_key) self._encoder: Encoder = create_mqtt_encoder(device_info.device.local_key) + self.previous_attempt_was_subscribe = False + self._topic = f"rr/m/o/{self._mqtt_user}/{self._hashed_user}/{self.device_info.device.duid}" def _mqtt_on_connect( self, @@ -105,19 +107,19 @@ def _mqtt_on_connect( self._logger.debug("Failed to notify connect future, not in queue") return self._logger.info(f"Connected to mqtt {self._mqtt_host}:{self._mqtt_port}") - topic = f"rr/m/o/{self._mqtt_user}/{self._hashed_user}/{self.device_info.device.duid}" - (result, mid) = self._mqtt_client.subscribe(topic) + (result, mid) = self._mqtt_client.subscribe(self._topic) if result != 0: message = f"Failed to subscribe ({str(rc)})" self._logger.error(message) if connection_queue: connection_queue.set_exception(VacuumError(message)) return - self._logger.info(f"Subscribed to topic {topic}") + self._logger.info(f"Subscribed to topic {self._topic}") if connection_queue: connection_queue.set_result(True) def _mqtt_on_message(self, *args, **kwargs): + self.previous_attempt_was_subscribe = False client, __, msg = args try: messages = self._decoder(msg.payload) @@ -199,4 +201,34 @@ def _send_msg_raw(self, msg: bytes) -> None: f"rr/m/i/{self._mqtt_user}/{self._hashed_user}/{self.device_info.device.duid}", msg ) if info.rc != mqtt.MQTT_ERR_SUCCESS: - raise RoborockException(f"Failed to publish ({str(info.rc)})") + raise RoborockException(f"Failed to publish ({mqtt.error_string(info.rc)})") + + async def validate_connection(self) -> None: + """Override the default validate connection to try to re-subscribe rather than disconnect.""" + if self.previous_attempt_was_subscribe: + # If we have already tried to unsub and resub, and we are still in this state, + # we should just do the normal validate connection. + return await super().validate_connection() + try: + if not self.should_keepalive(): + self.previous_attempt_was_subscribe = True + loop = asyncio.get_running_loop() + + self._logger.info("Resetting Roborock connection due to keepalive timeout") + (result, mid) = await loop.run_in_executor(None, self._mqtt_client.unsubscribe, self._topic) + + if result != 0: + message = f"Failed to unsubscribe ({mqtt.error_string(result)})" + self._logger.error(message) + return await super().validate_connection() + (result, mid) = await loop.run_in_executor(None, self._mqtt_client.subscribe, self._topic) + + if result != 0: + message = f"Failed to subscribe ({mqtt.error_string(result)})" + self._logger.error(message) + return await super().validate_connection() + + self._logger.info(f"Subscribed to topic {self._topic}") + except Exception: # noqa + return await super().validate_connection() + await self.async_connect() From dc8e8a94c8c88cf5fd306de570976bdc2283eab4 Mon Sep 17 00:00:00 2001 From: Luke Date: Tue, 16 Sep 2025 22:38:28 -0400 Subject: [PATCH 06/11] chore: remove extra exception --- roborock/cloud_api.py | 11 ++--------- roborock/exceptions.py | 4 ---- roborock/roborock_future.py | 6 +++--- 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 19e55c11..296390f9 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -14,7 +14,7 @@ from .api import KEEPALIVE, RoborockClient from .containers import DeviceData, UserData -from .exceptions import RoborockException, RoborockInvalidUserData, VacuumError +from .exceptions import RoborockException, VacuumError from .protocol import ( Decoder, Encoder, @@ -96,14 +96,7 @@ def _mqtt_on_connect( message = f"Failed to connect ({rc})" self._logger.error(message) if connection_queue: - # These are the ReasonCodes relating to authorization issues. - if rc.value in {24, 25, 133, 134, 135, 144}: - connection_queue.set_exception( - RoborockInvalidUserData("Failed to connect to mqtt. Invalid user data. Re-auth is needed.") - ) - else: - connection_queue.set_exception(VacuumError(message)) - else: + connection_queue.set_exception(VacuumError(message)) self._logger.debug("Failed to notify connect future, not in queue") return self._logger.info(f"Connected to mqtt {self._mqtt_host}:{self._mqtt_port}") diff --git a/roborock/exceptions.py b/roborock/exceptions.py index a3b7030d..b3e4bd41 100644 --- a/roborock/exceptions.py +++ b/roborock/exceptions.py @@ -77,7 +77,3 @@ class RoborockTooManyRequest(RoborockException): class RoborockRateLimit(RoborockException): """Class for our rate limits exceptions.""" - - -class RoborockInvalidUserData(RoborockException): - """Class to state the user data is invalid (expired or manipulated).""" diff --git a/roborock/roborock_future.py b/roborock/roborock_future.py index e9830d1f..9563f785 100644 --- a/roborock/roborock_future.py +++ b/roborock/roborock_future.py @@ -5,7 +5,7 @@ import async_timeout -from .exceptions import RoborockInvalidUserData, VacuumError +from .exceptions import VacuumError class RoborockFuture: @@ -21,11 +21,11 @@ def _set_result(self, item: Any) -> None: def set_result(self, item: Any) -> None: self.loop.call_soon_threadsafe(self._set_result, item) - def _set_exception(self, exc: VacuumError | RoborockInvalidUserData) -> None: + def _set_exception(self, exc: VacuumError) -> None: if not self.fut.cancelled(): self.fut.set_exception(exc) - def set_exception(self, exc: VacuumError | RoborockInvalidUserData) -> None: + def set_exception(self, exc: VacuumError) -> None: self.loop.call_soon_threadsafe(self._set_exception, exc) async def async_get(self, timeout: float | int) -> tuple[Any, VacuumError | None]: From 0f696d4181d2587d65e19780f95f15032554cfc3 Mon Sep 17 00:00:00 2001 From: Luke Date: Tue, 16 Sep 2025 22:39:31 -0400 Subject: [PATCH 07/11] chore: add else back --- roborock/cloud_api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 296390f9..968d0b90 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -97,6 +97,7 @@ def _mqtt_on_connect( self._logger.error(message) if connection_queue: connection_queue.set_exception(VacuumError(message)) + else: self._logger.debug("Failed to notify connect future, not in queue") return self._logger.info(f"Connected to mqtt {self._mqtt_host}:{self._mqtt_port}") From bcab9cb9a3884d9ed9cb8c1b174d0b2ee4c7e915 Mon Sep 17 00:00:00 2001 From: Luke Lashley Date: Thu, 18 Sep 2025 10:42:49 -0400 Subject: [PATCH 08/11] Update roborock/cloud_api.py Co-authored-by: Allen Porter --- roborock/cloud_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 968d0b90..37ca8c36 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -86,7 +86,7 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None: def _mqtt_on_connect( self, client: mqtt.Client, - data: object, + userdata: object, flags: dict[str, int], rc: ReasonCode, properties: mqtt.Properties | None = None, From 3539f70023939f749f31099e7c79ddb4185258d9 Mon Sep 17 00:00:00 2001 From: Luke Date: Thu, 18 Sep 2025 10:58:27 -0400 Subject: [PATCH 09/11] chore: clean up --- roborock/api.py | 6 -- roborock/cloud_api.py | 77 +++++++++++++------ .../roborock_local_client_v1.py | 6 ++ 3 files changed, 59 insertions(+), 30 deletions(-) diff --git a/roborock/api.py b/roborock/api.py index f49ab0b1..ed7cb5c6 100644 --- a/roborock/api.py +++ b/roborock/api.py @@ -78,12 +78,6 @@ def should_keepalive(self) -> bool: return False return True - async def validate_connection(self) -> None: - if not self.should_keepalive(): - self._logger.info("Resetting Roborock connection due to keepalive timeout") - await self.async_disconnect() - await self.async_connect() - async def _wait_response(self, request_id: int, queue: RoborockFuture) -> Any: try: response = await queue.async_get(self.queue_timeout) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 37ca8c36..5fbc9e33 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -197,32 +197,61 @@ def _send_msg_raw(self, msg: bytes) -> None: if info.rc != mqtt.MQTT_ERR_SUCCESS: raise RoborockException(f"Failed to publish ({mqtt.error_string(info.rc)})") - async def validate_connection(self) -> None: - """Override the default validate connection to try to re-subscribe rather than disconnect.""" - if self.previous_attempt_was_subscribe: - # If we have already tried to unsub and resub, and we are still in this state, - # we should just do the normal validate connection. - return await super().validate_connection() - try: - if not self.should_keepalive(): - self.previous_attempt_was_subscribe = True - loop = asyncio.get_running_loop() + async def unsubscribe(self): + """Unsubscribe from the topic.""" + loop = asyncio.get_running_loop() + (result, mid) = await loop.run_in_executor(None, self._mqtt_client.unsubscribe, self._topic) - self._logger.info("Resetting Roborock connection due to keepalive timeout") - (result, mid) = await loop.run_in_executor(None, self._mqtt_client.unsubscribe, self._topic) + if result != 0: + message = f"Failed to unsubscribe ({mqtt.error_string(result)})" + self._logger.error(message) + else: + self._logger.info(f"Unsubscribed from topic {self._topic}") + return result - if result != 0: - message = f"Failed to unsubscribe ({mqtt.error_string(result)})" - self._logger.error(message) - return await super().validate_connection() - (result, mid) = await loop.run_in_executor(None, self._mqtt_client.subscribe, self._topic) + async def subscribe(self): + """Subscribe to the topic.""" + loop = asyncio.get_running_loop() + (result, mid) = await loop.run_in_executor(None, self._mqtt_client.subscribe, self._topic) - if result != 0: - message = f"Failed to subscribe ({mqtt.error_string(result)})" - self._logger.error(message) - return await super().validate_connection() + if result != 0: + message = f"Failed to subscribe ({mqtt.error_string(result)})" + self._logger.error(message) + else: + self._logger.info(f"Subscribed to topic {self._topic}") + return result + + async def reconnect(self) -> None: + """Reconnect to the MQTT broker.""" + await self.async_disconnect() + await self.async_connect() - self._logger.info(f"Subscribed to topic {self._topic}") - except Exception: # noqa - return await super().validate_connection() + async def validate_connection(self) -> None: + """Override the default validate connection to try to re-subscribe rather than disconnect. + When something seems to be wrong with our connection, we should follow the following steps: + 1. Try to unsubscribe and resubscribe from the topic. + 2. If we don't end up getting a message, we should completely disconnect and reconnect to the MQTT broker. + 3. We will continue to try to disconnect and reconnect until we get a message. + 4. If we get a message, the next time connection is lost, We will go back to step 1. + """ + if not self.should_keepalive(): + self._logger.info("Resetting Roborock connection due to keepalive timeout") + if self.previous_attempt_was_subscribe: + # If we have already tried to unsub and resub, and we are still in this state, + # we should try to reconnect. + return await self.reconnect() + try: + # Mark that we have tried to unsubscribe and resubscribe + self.previous_attempt_was_subscribe = True + if await self.unsubscribe() == 0: + # If we fail to unsubscribe, reconnect to the broker + return await self.reconnect() + if await self.subscribe() == 0: + # If we fail to subscribe, reconnected to the broker. + return await self.reconnect() + + except Exception: # noqa + # If we get any errors at all, we should just reconnect. + return await self.reconnect() + # Call connect to make sure everything is still in a good state. await self.async_connect() diff --git a/roborock/version_1_apis/roborock_local_client_v1.py b/roborock/version_1_apis/roborock_local_client_v1.py index e2c68118..9cf1bab1 100644 --- a/roborock/version_1_apis/roborock_local_client_v1.py +++ b/roborock/version_1_apis/roborock_local_client_v1.py @@ -138,6 +138,12 @@ async def ping(self) -> None: response_protocol=RoborockMessageProtocol.PING_RESPONSE, ) + async def validate_connection(self) -> None: + if not self.should_keepalive(): + self._logger.info("Resetting Roborock connection due to keepalive timeout") + await self.async_disconnect() + await self.async_connect() + def _send_msg_raw(self, data: bytes): try: if not self.transport: From 30b6059e810dd1b884347eea112e0d118b01c310 Mon Sep 17 00:00:00 2001 From: Luke Date: Thu, 18 Sep 2025 11:17:41 -0400 Subject: [PATCH 10/11] fix: changes --- roborock/cloud_api.py | 30 ++++++++++--------- .../roborock_local_client_v1.py | 4 +-- .../version_1_apis/roborock_mqtt_client_v1.py | 2 +- .../roborock_mqtt_client_a01.py | 2 +- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 5fbc9e33..543a3163 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -8,6 +8,7 @@ from typing import Any import paho.mqtt.client as mqtt +from paho.mqtt.enums import MQTTErrorCode # Mypy is not seeing this for some reason. It wants me to use the depreciated ReasonCodes from paho.mqtt.reasoncodes import ReasonCode # type: ignore @@ -80,7 +81,7 @@ def __init__(self, user_data: UserData, device_info: DeviceData) -> None: self._mutex = Lock() self._decoder: Decoder = create_mqtt_decoder(device_info.device.local_key) self._encoder: Encoder = create_mqtt_encoder(device_info.device.local_key) - self.previous_attempt_was_subscribe = False + self.received_message_since_last_disconnect = False self._topic = f"rr/m/o/{self._mqtt_user}/{self._hashed_user}/{self.device_info.device.duid}" def _mqtt_on_connect( @@ -113,7 +114,7 @@ def _mqtt_on_connect( connection_queue.set_result(True) def _mqtt_on_message(self, *args, **kwargs): - self.previous_attempt_was_subscribe = False + self.received_message_since_last_disconnect = False client, __, msg = args try: messages = self._decoder(msg.payload) @@ -197,7 +198,7 @@ def _send_msg_raw(self, msg: bytes) -> None: if info.rc != mqtt.MQTT_ERR_SUCCESS: raise RoborockException(f"Failed to publish ({mqtt.error_string(info.rc)})") - async def unsubscribe(self): + async def _unsubscribe(self) -> MQTTErrorCode: """Unsubscribe from the topic.""" loop = asyncio.get_running_loop() (result, mid) = await loop.run_in_executor(None, self._mqtt_client.unsubscribe, self._topic) @@ -209,7 +210,7 @@ async def unsubscribe(self): self._logger.info(f"Unsubscribed from topic {self._topic}") return result - async def subscribe(self): + async def _subscribe(self) -> MQTTErrorCode: """Subscribe to the topic.""" loop = asyncio.get_running_loop() (result, mid) = await loop.run_in_executor(None, self._mqtt_client.subscribe, self._topic) @@ -221,12 +222,12 @@ async def subscribe(self): self._logger.info(f"Subscribed to topic {self._topic}") return result - async def reconnect(self) -> None: + async def _reconnect(self) -> None: """Reconnect to the MQTT broker.""" await self.async_disconnect() await self.async_connect() - async def validate_connection(self) -> None: + async def _validate_connection(self) -> None: """Override the default validate connection to try to re-subscribe rather than disconnect. When something seems to be wrong with our connection, we should follow the following steps: 1. Try to unsubscribe and resubscribe from the topic. @@ -234,24 +235,25 @@ async def validate_connection(self) -> None: 3. We will continue to try to disconnect and reconnect until we get a message. 4. If we get a message, the next time connection is lost, We will go back to step 1. """ + # If we should no longer keep the current connection alive... if not self.should_keepalive(): self._logger.info("Resetting Roborock connection due to keepalive timeout") - if self.previous_attempt_was_subscribe: + if self.received_message_since_last_disconnect: # If we have already tried to unsub and resub, and we are still in this state, # we should try to reconnect. - return await self.reconnect() + return await self._reconnect() try: # Mark that we have tried to unsubscribe and resubscribe - self.previous_attempt_was_subscribe = True - if await self.unsubscribe() == 0: + self.received_message_since_last_disconnect = True + if await self._unsubscribe() != 0: # If we fail to unsubscribe, reconnect to the broker - return await self.reconnect() - if await self.subscribe() == 0: + return await self._reconnect() + if await self._subscribe() != 0: # If we fail to subscribe, reconnected to the broker. - return await self.reconnect() + return await self._reconnect() except Exception: # noqa # If we get any errors at all, we should just reconnect. - return await self.reconnect() + return await self._reconnect() # Call connect to make sure everything is still in a good state. await self.async_connect() diff --git a/roborock/version_1_apis/roborock_local_client_v1.py b/roborock/version_1_apis/roborock_local_client_v1.py index 9cf1bab1..b886c911 100644 --- a/roborock/version_1_apis/roborock_local_client_v1.py +++ b/roborock/version_1_apis/roborock_local_client_v1.py @@ -138,7 +138,7 @@ async def ping(self) -> None: response_protocol=RoborockMessageProtocol.PING_RESPONSE, ) - async def validate_connection(self) -> None: + async def _validate_connection(self) -> None: if not self.should_keepalive(): self._logger.info("Resetting Roborock connection due to keepalive timeout") await self.async_disconnect() @@ -178,7 +178,7 @@ async def _send_message( method: str | None = None, params: list | dict | int | None = None, ) -> RoborockMessage: - await self.validate_connection() + await self._validate_connection() msg = self._encoder(roborock_message) if method: self._logger.debug(f"id={request_id} Requesting method {method} with {params}") diff --git a/roborock/version_1_apis/roborock_mqtt_client_v1.py b/roborock/version_1_apis/roborock_mqtt_client_v1.py index 236631bc..96a9c592 100644 --- a/roborock/version_1_apis/roborock_mqtt_client_v1.py +++ b/roborock/version_1_apis/roborock_mqtt_client_v1.py @@ -51,7 +51,7 @@ async def _send_command( ) self._logger.debug("Building message id %s for method %s", request_message.request_id, method) - await self.validate_connection() + await self._validate_connection() request_id = request_message.request_id response_protocol = ( RoborockMessageProtocol.MAP_RESPONSE if method in COMMANDS_SECURED else RoborockMessageProtocol.RPC_RESPONSE diff --git a/roborock/version_a01_apis/roborock_mqtt_client_a01.py b/roborock/version_a01_apis/roborock_mqtt_client_a01.py index 9d81ddb3..b6ae3bef 100644 --- a/roborock/version_a01_apis/roborock_mqtt_client_a01.py +++ b/roborock/version_a01_apis/roborock_mqtt_client_a01.py @@ -40,7 +40,7 @@ def __init__( self._logger = RoborockLoggerAdapter(device_info.device.name, _LOGGER) async def _send_message(self, roborock_message: RoborockMessage): - await self.validate_connection() + await self._validate_connection() response_protocol = RoborockMessageProtocol.RPC_RESPONSE m = self._encoder(roborock_message) From 8360b5cfa7e8fe2517c89ea259a050130afcb7c1 Mon Sep 17 00:00:00 2001 From: Luke Date: Sun, 21 Sep 2025 10:58:35 -0400 Subject: [PATCH 11/11] chore: inverse boolean logic to match variable naming --- roborock/cloud_api.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/roborock/cloud_api.py b/roborock/cloud_api.py index 543a3163..f26dd349 100644 --- a/roborock/cloud_api.py +++ b/roborock/cloud_api.py @@ -114,7 +114,7 @@ def _mqtt_on_connect( connection_queue.set_result(True) def _mqtt_on_message(self, *args, **kwargs): - self.received_message_since_last_disconnect = False + self.received_message_since_last_disconnect = True client, __, msg = args try: messages = self._decoder(msg.payload) @@ -238,13 +238,13 @@ async def _validate_connection(self) -> None: # If we should no longer keep the current connection alive... if not self.should_keepalive(): self._logger.info("Resetting Roborock connection due to keepalive timeout") - if self.received_message_since_last_disconnect: + if not self.received_message_since_last_disconnect: # If we have already tried to unsub and resub, and we are still in this state, # we should try to reconnect. return await self._reconnect() try: # Mark that we have tried to unsubscribe and resubscribe - self.received_message_since_last_disconnect = True + self.received_message_since_last_disconnect = False if await self._unsubscribe() != 0: # If we fail to unsubscribe, reconnect to the broker return await self._reconnect()