Skip to content

Commit

Permalink
feat(api): add tests for liquid probe movements (#15896)
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj authored Aug 7, 2024
1 parent fe6252c commit a4811c1
Showing 1 changed file with 179 additions and 2 deletions.
181 changes: 179 additions & 2 deletions api/tests/opentrons/hardware_control/test_ot3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
TypedDict,
)
from typing_extensions import Literal
from math import copysign
from math import copysign, isclose
import pytest
import types
from decoy import Decoy
from mock import AsyncMock, patch, Mock, PropertyMock, MagicMock
from mock import AsyncMock, patch, Mock, PropertyMock, MagicMock, call
from hypothesis import given, strategies, settings, HealthCheck, assume, example

from opentrons.calibration_storage.types import CalibrationStatus, SourceType
Expand Down Expand Up @@ -856,6 +856,183 @@ async def test_liquid_probe(
) # should raise no exceptions


@pytest.mark.parametrize(
"mount, head_node, pipette_node",
[
(OT3Mount.LEFT, NodeId.head_l, NodeId.pipette_left),
(OT3Mount.RIGHT, NodeId.head_r, NodeId.pipette_right),
],
)
async def test_liquid_probe_plunger_moves(
mock_move_to: AsyncMock,
ot3_hardware: ThreadManager[OT3API],
hardware_backend: OT3Simulator,
head_node: NodeId,
pipette_node: Axis,
mount: OT3Mount,
fake_liquid_settings: LiquidProbeSettings,
mock_current_position_ot3: AsyncMock,
mock_move_to_plunger_bottom: AsyncMock,
mock_gantry_position: AsyncMock,
) -> None:
"""Verify the plunger moves in liquid_probe."""
# This test verifies that both:
# - the plunger movements in each liquid probe pass are what we expect
# - liquid probe successfully chooses the correct distance to move
# when approaching its max z distance
instr_data = AttachedPipette(
config=load_pipette_data.load_definition(
PipetteModelType("p1000"), PipetteChannelType(1), PipetteVersionType(3, 4)
),
id="fakepip",
)
await ot3_hardware.cache_pipette(mount, instr_data, None)
pipette = ot3_hardware.hardware_pipettes[mount.to_mount()]

assert pipette
await ot3_hardware.add_tip(mount, 100)
await ot3_hardware.home()
mock_move_to.return_value = None

with patch.object(
hardware_backend, "liquid_probe", AsyncMock(spec=hardware_backend.liquid_probe)
) as mock_liquid_probe:

mock_liquid_probe.side_effect = [
PipetteLiquidNotFoundError,
PipetteLiquidNotFoundError,
PipetteLiquidNotFoundError,
PipetteLiquidNotFoundError,
None,
]

fake_max_z_dist = 75.0
config = ot3_hardware.config.liquid_sense
mount_speed = config.mount_speed
non_responsive_z_mm = ot3_hardware.liquid_probe_non_responsive_z_distance(
mount_speed
)

probe_pass_overlap = 0.1
probe_pass_z_offset_mm = non_responsive_z_mm + probe_pass_overlap
probe_safe_reset_mm = max(2.0, probe_pass_z_offset_mm)

# simulate multiple passes of liquid probe
mock_gantry_position.side_effect = [
Point(x=0, y=0, z=100),
Point(x=0, y=0, z=100),
Point(x=0, y=0, z=100),
Point(x=0, y=0, z=82.15),
Point(x=0, y=0, z=64.3),
Point(x=0, y=0, z=46.45),
Point(x=0, y=0, z=28.6),
Point(x=0, y=0, z=25),
]
probe_start_pos = await ot3_hardware.gantry_position(mount)
safe_plunger_pos = Point(
probe_start_pos.x,
probe_start_pos.y,
probe_start_pos.z + probe_safe_reset_mm,
)

p_impulse_mm = config.plunger_impulse_time * config.plunger_speed
p_total_mm = pipette.plunger_positions.bottom - pipette.plunger_positions.top
p_working_mm = p_total_mm - (pipette.backlash_distance + p_impulse_mm)

max_z_time = (
fake_max_z_dist - (probe_start_pos.z - safe_plunger_pos.z)
) / config.mount_speed
p_travel_required_for_z = max_z_time * config.plunger_speed
await ot3_hardware.liquid_probe(mount, fake_max_z_dist)

max_z_distance = fake_max_z_dist
# simulate multiple passes of liquid_probe plunger moves
for _pass in mock_liquid_probe.call_args_list:
plunger_move = _pass[0][1]
expected_plunger_move = (
min(p_travel_required_for_z, p_working_mm) + p_impulse_mm
)
assert isclose(plunger_move, expected_plunger_move)

mount_travel_time = plunger_move / config.plunger_speed
mount_travel_distance = mount_speed * mount_travel_time
max_z_distance -= mount_travel_distance

move_mount_z_time = (max_z_distance + probe_safe_reset_mm) / mount_speed
p_travel_required_for_z = move_mount_z_time * config.plunger_speed


@pytest.mark.parametrize(
"mount, head_node, pipette_node",
[
(OT3Mount.LEFT, NodeId.head_l, NodeId.pipette_left),
(OT3Mount.RIGHT, NodeId.head_r, NodeId.pipette_right),
],
)
async def test_liquid_probe_mount_moves(
mock_move_to: AsyncMock,
ot3_hardware: ThreadManager[OT3API],
hardware_backend: OT3Simulator,
head_node: NodeId,
pipette_node: Axis,
mount: OT3Mount,
fake_liquid_settings: LiquidProbeSettings,
mock_current_position_ot3: AsyncMock,
mock_move_to_plunger_bottom: AsyncMock,
mock_gantry_position: AsyncMock,
) -> None:
"""Verify move targets for one singular liquid pass probe."""
instr_data = AttachedPipette(
config=load_pipette_data.load_definition(
PipetteModelType("p1000"), PipetteChannelType(1), PipetteVersionType(3, 4)
),
id="fakepip",
)
await ot3_hardware.cache_pipette(mount, instr_data, None)
pipette = ot3_hardware.hardware_pipettes[mount.to_mount()]

assert pipette
await ot3_hardware.add_tip(mount, 100)
await ot3_hardware.home()
mock_move_to.return_value = None

with patch.object(
hardware_backend, "liquid_probe", AsyncMock(spec=hardware_backend.liquid_probe)
):

fake_max_z_dist = 10.0
config = ot3_hardware.config.liquid_sense
mount_speed = config.mount_speed
non_responsive_z_mm = ot3_hardware.liquid_probe_non_responsive_z_distance(
mount_speed
)

probe_pass_overlap = 0.1
probe_pass_z_offset_mm = non_responsive_z_mm + probe_pass_overlap
probe_safe_reset_mm = max(2.0, probe_pass_z_offset_mm)

mock_gantry_position.return_value = Point(x=0, y=0, z=100)
probe_start_pos = await ot3_hardware.gantry_position(mount)
safe_plunger_pos = Point(
probe_start_pos.x,
probe_start_pos.y,
probe_start_pos.z + probe_safe_reset_mm,
)
pass_start_pos = Point(
probe_start_pos.x,
probe_start_pos.y,
probe_start_pos.z + probe_pass_z_offset_mm,
)
await ot3_hardware.liquid_probe(mount, fake_max_z_dist)
expected_moves = [
call(mount, safe_plunger_pos),
call(mount, pass_start_pos),
call(mount, Point(z=probe_start_pos.z + 2)),
call(mount, probe_start_pos),
]
assert mock_move_to.call_args_list == expected_moves


async def test_multi_liquid_probe(
mock_move_to: AsyncMock,
ot3_hardware: ThreadManager[OT3API],
Expand Down

0 comments on commit a4811c1

Please sign in to comment.