Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Add FlexStacker functionality to enable stallguard + add animation patterns to set_led. #17349

Merged
merged 8 commits into from
Jan 29, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ def reset_input_buffer(self) -> None:
"""Reset the input buffer"""
self._serial.reset_input_buffer()

def reset_output_buffer(self) -> None:
"""Reset the output buffer"""
self._serial.reset_output_buffer()

@contextlib.asynccontextmanager
async def timeout_override(
self, timeout_property: TimeoutProperties, timeout: Optional[float]
Expand Down
7 changes: 7 additions & 0 deletions api/src/opentrons/drivers/asyncio/communication/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

class ErrorCodes(Enum):
UNHANDLED_GCODE = "ERR003"
MOTOR_STALL = "ERR403"


class SerialException(Exception):
Expand Down Expand Up @@ -43,3 +44,9 @@ class UnhandledGcode(ErrorResponse):
def __init__(self, port: str, response: str, command: str) -> None:
self.command = command
super().__init__(port, response)


class MotorStall(ErrorResponse):
def __init__(self, port: str, response: str, command: str) -> None:
self.command = command
super().__init__(port, response)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@

from opentrons.drivers.command_builder import CommandBuilder

from .errors import NoResponse, AlarmResponse, ErrorResponse, UnhandledGcode, ErrorCodes
from .errors import (
MotorStall,
NoResponse,
AlarmResponse,
ErrorResponse,
UnhandledGcode,
ErrorCodes,
)
from .async_serial import AsyncSerial

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -254,6 +261,9 @@ def raise_on_error(self, response: str, request: str) -> None:
raise UnhandledGcode(
port=self._port, response=response, command=request
)

elif ErrorCodes.MOTOR_STALL.value.lower() in lower:
raise MotorStall(port=self._port, response=response, command=request)
else:
raise ErrorResponse(port=self._port, response=response)

Expand Down
3 changes: 2 additions & 1 deletion api/src/opentrons/drivers/flex_stacker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .abstract import AbstractFlexStackerDriver
from .driver import FlexStackerDriver, STACKER_MOTION_CONFIG
from .driver import FlexStackerDriver, STACKER_MOTION_CONFIG, STALLGUARD_CONFIG
from .simulator import SimulatingDriver
from . import types as FlexStackerTypes

Expand All @@ -9,4 +9,5 @@
"SimulatingDriver",
"FlexStackerTypes",
"STACKER_MOTION_CONFIG",
"STALLGUARD_CONFIG",
]
40 changes: 36 additions & 4 deletions api/src/opentrons/drivers/flex_stacker/abstract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List, Protocol
from typing import List, Optional, Protocol

from .types import (
LEDPattern,
LimitSwitchStatus,
MoveResult,
StackerAxis,
Expand All @@ -9,6 +10,7 @@
MoveParams,
StackerInfo,
LEDColor,
StallGuardParams,
)


Expand Down Expand Up @@ -51,10 +53,30 @@ async def set_ihold_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis hold current in amps."""
...

async def set_stallguard_threshold(
self, axis: StackerAxis, enable: bool, threshold: int
) -> bool:
"""Enables and sets the stallguard threshold for the given axis motor."""
...

async def set_motor_driver_register(
self, axis: StackerAxis, reg: int, value: int
) -> bool:
"""Set the register of the given motor axis driver to the given value."""
...

async def get_motor_driver_register(self, axis: StackerAxis, reg: int) -> int:
"""Gets the register value of the given motor axis driver."""
...

async def get_motion_params(self, axis: StackerAxis) -> MoveParams:
"""Get the motion parameters used by the given axis motor."""
...

async def get_stallguard_threshold(self, axis: StackerAxis) -> StallGuardParams:
"""Get the stallguard parameters by the given axis motor."""
...

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.

Expand Down Expand Up @@ -96,16 +118,26 @@ async def move_to_limit_switch(
"""Move until limit switch is triggered."""
...

async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool:
async def home_axis(self, axis: StackerAxis, direction: Direction) -> MoveResult:
"""Home axis."""
...

async def set_led(
self, power: float, color: LEDColor | None = None, external: bool | None = None
self,
power: float,
color: Optional[LEDColor] = None,
external: Optional[bool] = None,
pattern: Optional[LEDPattern] = None,
duration: Optional[int] = None,
reps: Optional[int] = None,
) -> bool:
"""Set LED color of status bar."""
"""Set LED Status bar color and pattern."""
...

async def enter_programming_mode(self) -> None:
"""Reboot into programming mode"""
...

def reset_serial_buffers(self) -> None:
"""Reset the input and output serial buffers."""
...
Loading