3
3
import asyncio
4
4
import logging
5
5
import socket
6
+ from asyncio import Lock
6
7
from typing import Callable , Coroutine
7
8
8
9
import async_timeout
9
10
10
- from roborock .api import RoborockClient
11
+ from roborock .api import RoborockClient , SPECIAL_COMMANDS
11
12
from roborock .exceptions import RoborockTimeout , CommandVacuumError
12
13
from roborock .typing import RoborockCommand
13
14
from roborock .util import get_running_loop_or_create_one
14
15
15
16
secured_prefix = 199
17
+ get_prefix = 119
18
+ app_prefix = 135
19
+ set_prefix = 151
20
+
16
21
_LOGGER = logging .getLogger (__name__ )
17
22
18
23
@@ -34,9 +39,10 @@ async def async_connect(self):
34
39
async def send_command (
35
40
self , device_id : str , method : RoborockCommand , params : list = None
36
41
):
37
- request_id , timestamp , payload = super ()._get_payload (method , params )
42
+ secured = True if method in SPECIAL_COMMANDS else False
43
+ request_id , timestamp , payload = self ._get_payload (method , params , secured )
38
44
_LOGGER .debug (f"id={ request_id } Requesting method { method } with { params } " )
39
- prefix = secured_prefix
45
+ prefix = secured_prefix if method in SPECIAL_COMMANDS else get_prefix
40
46
protocol = 4
41
47
msg = self ._encode_msg (device_id , protocol , timestamp , payload , prefix )
42
48
_LOGGER .debug (f"Requesting with prefix { prefix } and payload { payload } " )
@@ -49,26 +55,38 @@ async def send_command(
49
55
_LOGGER .debug (f"id={ request_id } Response from { method } : { response } " )
50
56
return response
51
57
58
+ class RoborockSocket (socket .socket ):
59
+ _closed = None
60
+
61
+ def __init__ (self , * args , ** kwargs ):
62
+ super ().__init__ (* args , ** kwargs )
63
+
64
+ @property
65
+ def is_closed (self ):
66
+ return self ._closed
52
67
53
68
class RoborockSocketListener :
54
69
roborock_port = 58867
55
70
56
- def __init__ (self , ip : str , device_id : str , on_message : Callable [[str , bytes ], Coroutine | None ],
71
+ def __init__ (self , ip : str , device_id : str , on_message : Callable [[str , bytes ], Coroutine [ bool ] | bool ],
57
72
timeout : float | int = 4 ):
58
73
self .ip = ip
59
74
self .device_id = device_id
60
- self .socket = socket . socket (socket .AF_INET , socket .SOCK_STREAM )
75
+ self .socket = RoborockSocket (socket .AF_INET , socket .SOCK_STREAM )
61
76
self .socket .setblocking (False )
62
77
self .loop = get_running_loop_or_create_one ()
63
78
self .on_message = on_message
64
79
self .timeout = timeout
65
80
self .is_connected = False
81
+ self ._lock = Lock ()
66
82
67
83
async def _main_coro (self ):
68
- while self .is_connected :
84
+ while not self .socket . is_closed :
69
85
try :
70
86
message = await self .loop .sock_recv (self .socket , 4096 )
71
- await self .on_message (self .device_id , message )
87
+ accepted = await self .on_message (self .device_id , message )
88
+ if accepted :
89
+ self ._lock .release () if self ._lock .locked () else None
72
90
except Exception as e :
73
91
_LOGGER .exception (e )
74
92
self .is_connected = False
@@ -78,14 +96,16 @@ async def connect(self):
78
96
async with async_timeout .timeout (self .timeout ):
79
97
await self .loop .sock_connect (self .socket , (self .ip , 58867 ))
80
98
self .is_connected = True
81
- asyncio .create_task (self ._main_coro ())
99
+ self . loop .create_task (self ._main_coro ())
82
100
83
101
async def send_message (self , data : bytes , local_key : str ):
84
102
response = {}
85
103
try :
86
104
async with async_timeout .timeout (self .timeout ):
105
+ await self ._lock .acquire ()
87
106
await self .loop .sock_sendall (self .socket , data )
88
107
except (asyncio .TimeoutError , asyncio .CancelledError ):
108
+ self ._lock .release () if self ._lock .locked () else None
89
109
raise RoborockTimeout (
90
110
f"Timeout after { self .timeout } seconds waiting for response"
91
111
) from None
0 commit comments