|
39 | 39 | SetPWMRequest,
|
40 | 40 | StatusRequest,
|
41 | 41 | StatusResponse,
|
| 42 | + WriteAnalogRequest, |
| 43 | + WriteAnalogResponse, |
42 | 44 | )
|
43 | 45 | from viam.resource.manager import ResourceManager
|
44 | 46 | from viam.utils import dict_to_struct, struct_to_dict
|
@@ -144,6 +146,15 @@ async def test_get_geometries(self, board: MockBoard):
|
144 | 146 | geometries = await board.get_geometries()
|
145 | 147 | assert geometries == GEOMETRIES
|
146 | 148 |
|
| 149 | + @pytest.mark.asyncio |
| 150 | + async def test_write_analog(self, board: MockBoard): |
| 151 | + value = 10 |
| 152 | + pin = "pin1" |
| 153 | + await board.write_analog(pin=pin, value=value, timeout=1.11) |
| 154 | + assert board.timeout == loose_approx(1.11) |
| 155 | + assert board.analog_write_value == value |
| 156 | + assert board.analog_write_pin == pin |
| 157 | + |
147 | 158 |
|
148 | 159 | class TestService:
|
149 | 160 | @pytest.mark.asyncio
|
@@ -320,6 +331,19 @@ async def test_set_power_mode(self, board: MockBoard, service: BoardRPCService):
|
320 | 331 | assert board.power_mode == PowerMode.POWER_MODE_OFFLINE_DEEP
|
321 | 332 | assert board.power_mode_duration == pm_duration.ToTimedelta()
|
322 | 333 |
|
| 334 | + @pytest.mark.asyncio |
| 335 | + async def test_write_analog(self, board: MockBoard, service: BoardRPCService): |
| 336 | + async with ChannelFor([service]) as channel: |
| 337 | + client = BoardServiceStub(channel) |
| 338 | + pin = "pin1" |
| 339 | + value = 10 |
| 340 | + request = WriteAnalogRequest(name=board.name, pin=pin, value=value) |
| 341 | + response: WriteAnalogResponse = await client.WriteAnalog(request, timeout=6.66) |
| 342 | + assert response == WriteAnalogResponse() |
| 343 | + assert board.timeout == loose_approx(6.66) |
| 344 | + assert board.analog_write_value == value |
| 345 | + assert board.analog_write_pin == pin |
| 346 | + |
323 | 347 |
|
324 | 348 | class TestClient:
|
325 | 349 | @pytest.mark.asyncio
|
@@ -501,3 +525,14 @@ async def test_get_pwm_freq(self, board: MockBoard, service: BoardRPCService):
|
501 | 525 | mock_pin = cast(MockGPIOPin, board.gpios["pin1"])
|
502 | 526 | assert mock_pin.extra == extra
|
503 | 527 | assert mock_pin.timeout is None
|
| 528 | + |
| 529 | + @pytest.mark.asyncio |
| 530 | + async def test_write_analog(self, board: MockBoard, service: BoardRPCService): |
| 531 | + async with ChannelFor([service]) as channel: |
| 532 | + client = BoardClient(name=board.name, channel=channel) |
| 533 | + pin = "pin1" |
| 534 | + value = 42 |
| 535 | + extra = {"foo": "bar", "baz": [1, 2, 3]} |
| 536 | + await client.write_analog(pin, value, extra=extra) |
| 537 | + assert board.analog_write_pin == "pin1" |
| 538 | + assert board.analog_write_value == 42 |
0 commit comments