|
7 | 7 | from freezegun import freeze_time
|
8 | 8 |
|
9 | 9 | from roborock.containers import RoborockBase, UserData
|
| 10 | +from roborock.exceptions import RoborockException |
| 11 | +from roborock.protocol import Utils |
10 | 12 | from roborock.protocols.v1_protocol import (
|
11 | 13 | SecurityData,
|
| 14 | + create_map_response_decoder, |
12 | 15 | create_mqtt_payload_encoder,
|
13 | 16 | decode_rpc_response,
|
14 | 17 | encode_local_payload,
|
|
20 | 23 |
|
21 | 24 | USER_DATA = UserData.from_dict(mock_data.USER_DATA)
|
22 | 25 | TEST_REQUEST_ID = 44444
|
23 |
| -SECURITY_DATA = SecurityData(endpoint="3PBTIjvc", nonce=b"fake-nonce") |
| 26 | +TEST_ENDPOINT = "87ItGWdb" |
| 27 | +TEST_ENDPOINT_BYTES = TEST_ENDPOINT.encode() |
| 28 | +SECURITY_DATA = SecurityData( |
| 29 | + endpoint=TEST_ENDPOINT, |
| 30 | + nonce=b"\x91\xbe\x10\xc9b+\x9d\x8a\xcdH*\x19\xf6\xfe\x81h", |
| 31 | +) |
24 | 32 |
|
25 | 33 |
|
26 | 34 | @pytest.fixture(autouse=True)
|
@@ -62,7 +70,7 @@ def test_encode_local_payload(command, params, expected):
|
62 | 70 | (
|
63 | 71 | RoborockCommand.GET_STATUS,
|
64 | 72 | None,
|
65 |
| - b'{"dps":{"101":"{\\"id\\":44444,\\"method\\":\\"get_status\\",\\"params\\":[],\\"security\\":{\\"endpoint\\":\\"3PBTIjvc\\",\\"nonce\\":\\"66616b652d6e6f6e6365\\"}}"},"t":1737374400}', |
| 73 | + b'{"dps":{"101":"{\\"id\\":44444,\\"method\\":\\"get_status\\",\\"params\\":[],\\"security\\":{\\"endpoint\\":\\"87ItGWdb\\",\\"nonce\\":\\"91be10c9622b9d8acd482a19f6fe8168\\"}}"},"t":1737374400}', |
66 | 74 | )
|
67 | 75 | ],
|
68 | 76 | )
|
@@ -122,3 +130,70 @@ def test_decode_rpc_response(payload: bytes, expected: RoborockBase) -> None:
|
122 | 130 | )
|
123 | 131 | decoded_message = decode_rpc_response(message)
|
124 | 132 | assert decoded_message == expected
|
| 133 | + |
| 134 | + |
| 135 | +def test_create_map_response_decoder(): |
| 136 | + """Test creating and using a map response decoder.""" |
| 137 | + test_data = b"some map\n" |
| 138 | + compressed_data = ( |
| 139 | + b"\x1f\x8b\x08\x08\xf9\x13\x99h\x00\x03foo\x00+\xce\xcfMU\xc8M,\xe0\x02\x00@\xdb\xc6\x1a\t\x00\x00\x00" |
| 140 | + ) |
| 141 | + |
| 142 | + # Create header: endpoint(8) + padding(8) + request_id(2) + padding(6) |
| 143 | + # request_id = 44508 (0xaddc in little endian) |
| 144 | + header = TEST_ENDPOINT_BYTES + b"\x00" * 8 + b"\xdc\xad" + b"\x00" * 6 |
| 145 | + encrypted_data = Utils.encrypt_cbc(compressed_data, SECURITY_DATA.nonce) |
| 146 | + payload = header + encrypted_data |
| 147 | + |
| 148 | + message = RoborockMessage( |
| 149 | + protocol=RoborockMessageProtocol.MAP_RESPONSE, |
| 150 | + payload=payload, |
| 151 | + seq=12750, |
| 152 | + version=b"1.0", |
| 153 | + random=97431, |
| 154 | + timestamp=1652547161, |
| 155 | + ) |
| 156 | + |
| 157 | + decoder = create_map_response_decoder(SECURITY_DATA) |
| 158 | + result = decoder(message) |
| 159 | + |
| 160 | + assert result.request_id == 44508 |
| 161 | + assert result.data == test_data |
| 162 | + |
| 163 | + |
| 164 | +def test_create_map_response_decoder_invalid_endpoint(): |
| 165 | + """Test map response decoder with invalid endpoint.""" |
| 166 | + # Create header with wrong endpoint |
| 167 | + header = b"wrongend" + b"\x00" * 8 + b"\xdc\xad" + b"\x00" * 6 |
| 168 | + payload = header + b"encrypted_data" |
| 169 | + |
| 170 | + message = RoborockMessage( |
| 171 | + protocol=RoborockMessageProtocol.MAP_RESPONSE, |
| 172 | + payload=payload, |
| 173 | + seq=12750, |
| 174 | + version=b"1.0", |
| 175 | + random=97431, |
| 176 | + timestamp=1652547161, |
| 177 | + ) |
| 178 | + |
| 179 | + decoder = create_map_response_decoder(SECURITY_DATA) |
| 180 | + |
| 181 | + with pytest.raises(RoborockException, match="Invalid V1 map response endpoint"): |
| 182 | + decoder(message) |
| 183 | + |
| 184 | + |
| 185 | +def test_create_map_response_decoder_invalid_payload(): |
| 186 | + """Test map response decoder with invalid payload.""" |
| 187 | + message = RoborockMessage( |
| 188 | + protocol=RoborockMessageProtocol.MAP_RESPONSE, |
| 189 | + payload=b"short", # Too short payload |
| 190 | + seq=12750, |
| 191 | + version=b"1.0", |
| 192 | + random=97431, |
| 193 | + timestamp=1652547161, |
| 194 | + ) |
| 195 | + |
| 196 | + decoder = create_map_response_decoder(SECURITY_DATA) |
| 197 | + |
| 198 | + with pytest.raises(RoborockException, match="Invalid V1 map response format: missing payload"): |
| 199 | + decoder(message) |
0 commit comments