|
1 | 1 | """Unit test for RawValue objects."""
|
| 2 | +from unittest.mock import AsyncMock |
| 3 | + |
2 | 4 | import pytest
|
3 | 5 | from xknx import XKNX
|
4 | 6 | from xknx.devices import RawValue
|
@@ -137,6 +139,61 @@ async def test_respond_to_read(self):
|
137 | 139 | )
|
138 | 140 | assert xknx.telegrams.qsize() == 0
|
139 | 141 |
|
| 142 | + # |
| 143 | + # TEST PROCESS CALLBACK |
| 144 | + # |
| 145 | + |
| 146 | + async def test_process_callback(self): |
| 147 | + """Test process / reading telegrams from telegram queue. Test if callback is called.""" |
| 148 | + |
| 149 | + xknx = XKNX() |
| 150 | + sensor = RawValue( |
| 151 | + xknx, |
| 152 | + "TestSensor", |
| 153 | + 2, |
| 154 | + group_address="1/2/3", |
| 155 | + ) |
| 156 | + after_update_callback = AsyncMock() |
| 157 | + sensor.register_device_updated_cb(after_update_callback) |
| 158 | + |
| 159 | + telegram = Telegram( |
| 160 | + destination_address=GroupAddress("1/2/3"), |
| 161 | + payload=GroupValueWrite(DPTArray((0x01, 0x02))), |
| 162 | + ) |
| 163 | + await sensor.process(telegram) |
| 164 | + after_update_callback.assert_called_with(sensor) |
| 165 | + assert sensor.last_telegram == telegram |
| 166 | + # consecutive telegrams with same payload shall only trigger one callback |
| 167 | + after_update_callback.reset_mock() |
| 168 | + await sensor.process(telegram) |
| 169 | + after_update_callback.assert_not_called() |
| 170 | + |
| 171 | + async def test_process_callback_always(self): |
| 172 | + """Test process / reading telegrams from telegram queue. Test if callback is called.""" |
| 173 | + |
| 174 | + xknx = XKNX() |
| 175 | + sensor = RawValue( |
| 176 | + xknx, |
| 177 | + "TestSensor", |
| 178 | + 2, |
| 179 | + group_address="1/2/3", |
| 180 | + always_callback=True, |
| 181 | + ) |
| 182 | + after_update_callback = AsyncMock() |
| 183 | + sensor.register_device_updated_cb(after_update_callback) |
| 184 | + |
| 185 | + telegram = Telegram( |
| 186 | + destination_address=GroupAddress("1/2/3"), |
| 187 | + payload=GroupValueWrite(DPTArray((0x01, 0x02))), |
| 188 | + ) |
| 189 | + await sensor.process(telegram) |
| 190 | + after_update_callback.assert_called_with(sensor) |
| 191 | + assert sensor.last_telegram == telegram |
| 192 | + # every telegram shall trigger callback |
| 193 | + after_update_callback.reset_mock() |
| 194 | + await sensor.process(telegram) |
| 195 | + after_update_callback.assert_called_with(sensor) |
| 196 | + |
140 | 197 | #
|
141 | 198 | # TEST SET
|
142 | 199 | #
|
|
0 commit comments