|
1 | 1 | import asyncio
|
2 | 2 |
|
3 | 3 | import pytest
|
4 |
| -from zigpy import types as t |
5 | 4 | import zigpy.exceptions
|
6 |
| -from zigpy.zdo.types import ZDOCmd |
| 5 | +import zigpy.state |
| 6 | +import zigpy.types as t |
| 7 | +import zigpy.zdo.types as zdo_t |
7 | 8 |
|
8 | 9 | from zigpy_xbee.api import ModemStatus, XBee
|
9 | 10 | import zigpy_xbee.config as config
|
|
21 | 22 | }
|
22 | 23 |
|
23 | 24 |
|
| 25 | +@pytest.fixture |
| 26 | +def node_info(): |
| 27 | + return zigpy.state.NodeInfo( |
| 28 | + nwk=t.NWK(0x0000), |
| 29 | + ieee=t.EUI64.convert("00:12:4b:00:1c:a1:b8:46"), |
| 30 | + logical_type=zdo_t.LogicalType.Coordinator, |
| 31 | + ) |
| 32 | + |
| 33 | + |
| 34 | +@pytest.fixture |
| 35 | +def network_info(node_info): |
| 36 | + return zigpy.state.NetworkInfo( |
| 37 | + extended_pan_id=t.ExtendedPanId.convert("bd:27:0b:38:37:95:dc:87"), |
| 38 | + pan_id=t.PanId(0x9BB0), |
| 39 | + nwk_update_id=18, |
| 40 | + nwk_manager_id=t.NWK(0x0000), |
| 41 | + channel=t.uint8_t(15), |
| 42 | + channel_mask=t.Channels.ALL_CHANNELS, |
| 43 | + security_level=t.uint8_t(5), |
| 44 | + network_key=zigpy.state.Key( |
| 45 | + key=t.KeyData.convert("2ccade06b3090c310315b3d574d3c85a"), |
| 46 | + seq=108, |
| 47 | + tx_counter=118785, |
| 48 | + ), |
| 49 | + tc_link_key=zigpy.state.Key( |
| 50 | + key=t.KeyData(b"ZigBeeAlliance09"), |
| 51 | + partner_ieee=node_info.ieee, |
| 52 | + tx_counter=8712428, |
| 53 | + ), |
| 54 | + key_table=[], |
| 55 | + children=[], |
| 56 | + nwk_addresses={}, |
| 57 | + |
| 58 | + ) |
| 59 | + |
| 60 | + |
24 | 61 | @pytest.fixture
|
25 | 62 | def app(monkeypatch):
|
26 | 63 | monkeypatch.setattr(application, "TIMEOUT_TX_STATUS", 0.1)
|
@@ -242,59 +279,22 @@ async def test_get_association_state(app):
|
242 | 279 | assert ai is mock.sentinel.ai
|
243 | 280 |
|
244 | 281 |
|
245 |
| -async def test_form_network(app): |
246 |
| - legacy_module = False |
| 282 | +async def test_write_network_info(app, node_info, network_info): |
| 283 | + app._api._queued_at = mock.AsyncMock(spec=XBee._queued_at) |
| 284 | + app._api._at_command = mock.AsyncMock(spec=XBee._at_command) |
| 285 | + app._api._running = mock.AsyncMock(spec=app._api._running) |
247 | 286 |
|
248 |
| - async def mock_at_command(cmd, *args): |
249 |
| - if cmd == "MY": |
250 |
| - return 0x0000 |
251 |
| - if cmd == "OI": |
252 |
| - return 0x1234 |
253 |
| - elif cmd == "ID": |
254 |
| - return 0x1234567812345678 |
255 |
| - elif cmd == "SL": |
256 |
| - return 0x11223344 |
257 |
| - elif cmd == "SH": |
258 |
| - return 0x55667788 |
259 |
| - elif cmd == "WR": |
260 |
| - app._api.coordinator_started_event.set() |
261 |
| - elif cmd == "CE" and legacy_module: |
262 |
| - raise RuntimeError |
263 |
| - return None |
264 |
| - |
265 |
| - app._api._at_command = mock.MagicMock( |
266 |
| - spec=XBee._at_command, side_effect=mock_at_command |
267 |
| - ) |
268 |
| - app._api._queued_at = mock.MagicMock( |
269 |
| - spec=XBee._at_command, side_effect=mock_at_command |
270 |
| - ) |
271 | 287 | app._get_association_state = mock.AsyncMock(
|
272 | 288 | spec=application.ControllerApplication._get_association_state,
|
273 | 289 | return_value=0x00,
|
274 | 290 | )
|
275 | 291 |
|
276 |
| - app.write_network_info = mock.MagicMock(wraps=app.write_network_info) |
277 |
| - |
278 |
| - await app.form_network() |
279 |
| - assert app._api._at_command.call_count >= 1 |
280 |
| - assert app._api._queued_at.call_count >= 7 |
281 |
| - |
282 |
| - network_info = app.write_network_info.mock_calls[0][2]["network_info"] |
283 |
| - |
284 |
| - app._api._queued_at.assert_any_call("SC", 1 << (network_info.channel - 11)) |
285 |
| - app._api._queued_at.assert_any_call("KY", b"ZigBeeAlliance09") |
286 |
| - |
287 |
| - app._api._at_command.reset_mock() |
288 |
| - app._api._queued_at.reset_mock() |
289 |
| - legacy_module = True |
290 |
| - await app.form_network() |
291 |
| - assert app._api._at_command.call_count >= 1 |
292 |
| - assert app._api._queued_at.call_count >= 7 |
293 |
| - |
294 |
| - network_info = app.write_network_info.mock_calls[0][2]["network_info"] |
| 292 | + await app.write_network_info(network_info=network_info, node_info=node_info) |
295 | 293 |
|
296 | 294 | app._api._queued_at.assert_any_call("SC", 1 << (network_info.channel - 11))
|
297 | 295 | app._api._queued_at.assert_any_call("KY", b"ZigBeeAlliance09")
|
| 296 | + app._api._queued_at.assert_any_call("NK", network_info.network_key.key.serialize()) |
| 297 | + app._api._queued_at.assert_any_call("ID", 0xBD270B383795DC87) |
298 | 298 |
|
299 | 299 |
|
300 | 300 | async def _test_start_network(
|
@@ -435,7 +435,7 @@ def _mock_command(
|
435 | 435 | seq,
|
436 | 436 | b"\xaa\x55\xbe\xef",
|
437 | 437 | expect_reply=expect_reply,
|
438 |
| - **kwargs |
| 438 | + **kwargs, |
439 | 439 | )
|
440 | 440 |
|
441 | 441 |
|
@@ -509,7 +509,7 @@ def nwk():
|
509 | 509 |
|
510 | 510 | def test_rx_device_annce(app, ieee, nwk):
|
511 | 511 | dst_ep = 0
|
512 |
| - cluster_id = ZDOCmd.Device_annce |
| 512 | + cluster_id = zdo_t.ZDOCmd.Device_annce |
513 | 513 | device = mock.MagicMock()
|
514 | 514 | device.status = device.Status.NEW
|
515 | 515 | app.get_device = mock.MagicMock(return_value=device)
|
@@ -587,3 +587,11 @@ async def mock_at_command(cmd, *args):
|
587 | 587 | await app.reset_network_info()
|
588 | 588 |
|
589 | 589 | app._api._at_command.assert_called_once_with("NR", 0)
|
| 590 | + |
| 591 | + |
| 592 | +async def test_move_network_to_channel(app): |
| 593 | + app._api._queued_at = mock.AsyncMock(spec=XBee._at_command) |
| 594 | + await app._move_network_to_channel(26, new_nwk_update_id=1) |
| 595 | + |
| 596 | + assert len(app._api._queued_at.mock_calls) == 1 |
| 597 | + app._api._queued_at.assert_any_call("SC", 1 << (26 - 11)) |
0 commit comments