Skip to content

Commit

Permalink
refactor(api): liquid probe refactor (#15903)
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj authored Aug 7, 2024
1 parent 76525cd commit 88b6a17
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 42 deletions.
99 changes: 58 additions & 41 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2673,22 +2673,10 @@ async def liquid_probe(
self._pipette_handler.ready_for_tip_action(
instrument, HardwareAction.LIQUID_PROBE, checked_mount
)

if not probe_settings:
probe_settings = self.config.liquid_sense

probe_start_pos = await self.gantry_position(checked_mount, refresh=True)

# plunger travel distance is from TOP->BOTTOM (minus the backlash distance + impulse)
# FIXME: logic for how plunger moves is divided between here and tool_sensors.py
p_impulse_mm = (
probe_settings.plunger_impulse_time * probe_settings.plunger_speed
)
p_total_mm = (
instrument.plunger_positions.bottom - instrument.plunger_positions.top
)

# We need to significatly slow down the 96 channel liquid probe
# We need to significantly slow down the 96 channel liquid probe
if self.gantry_load == GantryLoad.HIGH_THROUGHPUT:
max_plunger_speed = self.config.motion_settings.max_speed_discontinuity[
GantryLoad.HIGH_THROUGHPUT
Expand All @@ -2697,66 +2685,95 @@ async def liquid_probe(
max_plunger_speed, probe_settings.plunger_speed
)

p_working_mm = p_total_mm - (instrument.backlash_distance + p_impulse_mm)
starting_position = await self.gantry_position(checked_mount, refresh=True)

sensor_baseline_plunger_move_mm = (
probe_settings.plunger_impulse_time * probe_settings.plunger_speed
)
total_plunger_axis_mm = (
instrument.plunger_positions.bottom - instrument.plunger_positions.top
)
max_allowed_plunger_distance_mm = total_plunger_axis_mm - (
instrument.backlash_distance + sensor_baseline_plunger_move_mm
)
# height where probe action will begin
# TODO: (sigler) add this to pipette's liquid def (per tip)
probe_pass_overlap_mm = 0.1
non_responsive_z_mm = OT3API.liquid_probe_non_responsive_z_distance(
z_overlap_between_passes_mm = 0.1
sensor_baseline_z_move_mm = OT3API.liquid_probe_non_responsive_z_distance(
probe_settings.mount_speed
)
probe_pass_z_offset_mm = non_responsive_z_mm + probe_pass_overlap_mm
z_offset_per_pass = sensor_baseline_z_move_mm + z_overlap_between_passes_mm

# height that is considered safe to reset the plunger without disturbing liquid
# this usually needs to at least 1-2mm from liquid, to avoid splashes from air
# TODO: (sigler) add this to pipette's liquid def (per tip)
probe_safe_reset_mm = max(2.0, probe_pass_z_offset_mm)
z_offset_for_plunger_prep = max(2.0, z_offset_per_pass)

error: Optional[PipetteLiquidNotFoundError] = None
pos = await self.gantry_position(checked_mount, refresh=True)
# probe_start_pos.z + z_distance of pass - pos.z should be < max_z_dist
# due to rounding errors this can get caught in an infinite loop when the distance is almost equal
# so we check to see if they're within 0.01 which is 1/5th the minimum movement distance from move_utils.py
while (probe_start_pos.z - pos.z) < (max_z_dist + 0.01):
async def prep_plunger_for_probe_move(
position: top_types.Point, aspirate_while_sensing: bool
) -> None:
# safe distance so we don't accidentally aspirate liquid if we're already close to liquid
safe_plunger_pos = top_types.Point(
pos.x, pos.y, pos.z + probe_safe_reset_mm
mount_pos_for_plunger_prep = top_types.Point(
position.x,
position.y,
position.z + z_offset_for_plunger_prep,
)
# overlap amount we want to use between passes
pass_start_pos = top_types.Point(
pos.x, pos.y, pos.z + probe_pass_z_offset_mm
)
max_z_time = (
max_z_dist - probe_start_pos.z + pass_start_pos.z
) / probe_settings.mount_speed
p_travel_required_for_z = max_z_time * probe_settings.plunger_speed
p_pass_travel = min(p_travel_required_for_z, p_working_mm)
# Prep the plunger
await self.move_to(checked_mount, safe_plunger_pos)
if probe_settings.aspirate_while_sensing:
await self.move_to(checked_mount, mount_pos_for_plunger_prep)
if aspirate_while_sensing:
# TODO(cm, 7/8/24): remove p_prep_speed from the rate at some point
await self._move_to_plunger_bottom(checked_mount, rate=1)
else:
await self._move_to_plunger_top_for_liquid_probe(checked_mount, rate=1)

error: Optional[PipetteLiquidNotFoundError] = None
current_position = await self.gantry_position(checked_mount, refresh=True)
# starting_position.z + z_distance of pass - pos.z should be < max_z_dist
# due to rounding errors this can get caught in an infinite loop when the distance is almost equal
# so we check to see if they're within 0.01 which is 1/5th the minimum movement distance from move_utils.py
while (starting_position.z - current_position.z) < (max_z_dist - 0.01):
await prep_plunger_for_probe_move(
position=current_position,
aspirate_while_sensing=probe_settings.aspirate_while_sensing,
)

# overlap amount we want to use between passes
pass_start_pos = top_types.Point(
current_position.x,
current_position.y,
current_position.z + z_offset_per_pass,
)

total_remaining_z_dist = pass_start_pos.z - (
starting_position.z - max_z_dist
)
finish_probe_move_duration = (
total_remaining_z_dist / probe_settings.mount_speed
)
finish_probe_plunger_distance_mm = (
finish_probe_move_duration * probe_settings.plunger_speed
)
plunger_travel_mm = min(
finish_probe_plunger_distance_mm, max_allowed_plunger_distance_mm
)
try:
# move to where we want to start a pass and run a pass
await self.move_to(checked_mount, pass_start_pos)
height = await self._liquid_probe_pass(
checked_mount,
probe_settings,
probe if probe else InstrumentProbeType.PRIMARY,
p_pass_travel + p_impulse_mm,
plunger_travel_mm + sensor_baseline_plunger_move_mm,
)
# if we made it here without an error we found the liquid
error = None
break
except PipetteLiquidNotFoundError as lnfe:
error = lnfe
pos = await self.gantry_position(checked_mount, refresh=True)
await self.move_to(checked_mount, probe_start_pos + top_types.Point(z=2))
current_position = await self.gantry_position(checked_mount, refresh=True)
await self.move_to(checked_mount, starting_position + top_types.Point(z=2))
await self.prepare_for_aspirate(checked_mount)
await self.move_to(checked_mount, probe_start_pos)
await self.move_to(checked_mount, starting_position)
if error is not None:
# if we never found liquid raise an error
raise error
Expand Down
2 changes: 1 addition & 1 deletion api/tests/opentrons/hardware_control/test_ot3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ async def _fake_pos_update_and_raise(
OT3Mount.LEFT, fake_max_z_dist, fake_settings_aspirate
)
# assert that it went through 4 passes and then prepared to aspirate
assert mock_move_to_plunger_bottom.call_count == 4
assert mock_move_to_plunger_bottom.call_count == 5


@pytest.mark.parametrize(
Expand Down

0 comments on commit 88b6a17

Please sign in to comment.