Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 256 additions & 93 deletions ARCHITECTURE_motor_test.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ def _get_settings_defaults(cls) -> dict[str, Union[int, bool, str, float, dict]]
"annotate_docs_into_param_files": False,
"gui_complexity": "simple", # simple or normal
# Motor test settings
"motor_test_duration": 2.5, # Default test duration in seconds
"motor_test_throttle_pct": 10, # Default throttle percentage (10%)
"motor_test": {
"duration": 2.5, # Default test duration in seconds
"throttle_pct": 10, # Default throttle percentage (10%)
},
}

@staticmethod
Expand Down Expand Up @@ -341,14 +343,14 @@ def set_setting(setting: str, value: Union[bool, str, float]) -> None:
@staticmethod
def motor_diagram_filepath(frame_class: int, frame_type: int) -> tuple[str, str]:
"""
Get the filepath for the motor diagram SVG file.
Get the filepath for the motor diagram PNG file.

Args:
frame_class: ArduPilot frame class (1=QUAD, 2=HEXA, etc.)
frame_type: ArduPilot frame type (0=PLUS, 1=X, etc.)

Returns:
str: Absolute path to the motor diagram SVG file
str: Absolute path to the motor diagram PNG file
str: Error message if multiple or no files found, empty string if no error

"""
Expand All @@ -361,12 +363,12 @@ def motor_diagram_filepath(frame_class: int, frame_type: int) -> tuple[str, str]
# Running as script
application_path = os_path.dirname(os_path.dirname(os_path.abspath(__file__)))

images_dir = os_path.join(application_path, "ardupilot_methodic_configurator", "images")
images_dir = os_path.join(application_path, "ardupilot_methodic_configurator", "images", "motor_diagrams_png")

# Generate SVG filename based on frame configuration
filename = f"m_{frame_class:02d}_{frame_type:02d}_*.svg"
# Generate PNG filename based on frame configuration
filename = f"m_{frame_class:02d}_{frame_type:02d}_*.png"

# Search for matching SVG file (since exact naming varies)
# Search for matching PNG file (since exact naming varies)
matching_files = glob.glob(os_path.join(images_dir, filename))

err_msg = (
Expand Down
138 changes: 99 additions & 39 deletions ardupilot_methodic_configurator/backend_flightcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def close(self) -> None:
]


class FlightController: # pylint: disable=too-many-public-methods
class FlightController: # pylint: disable=too-many-public-methods,too-many-instance-attributes
"""
A class to manage the connection and parameters of a flight controller.

Expand All @@ -114,6 +114,10 @@ def __init__(self, reboot_time: int = DEFAULT_REBOOT_TIME, baudrate: int = DEFAU
self.fc_parameters: dict[str, float] = {}
self.info = BackendFlightcontrollerInfo()

# Battery status tracking
self._last_battery_message_time: float = 0.0
self._last_battery_status: Union[tuple[float, float], None] = None

def discover_connections(self) -> None:
comports = FlightController.__list_serial_ports()
netports = FlightController.__list_network_ports()
Expand Down Expand Up @@ -714,6 +718,43 @@ def set_param(self, param_name: str, param_value: float) -> None:
return
self.master.param_set_send(param_name, param_value)

def fetch_param(self, param_name: str, timeout: int = 5) -> Optional[float]:
"""
Fetch a parameter from the flight controller using MAVLink PARAM_REQUEST_READ message.

Args:
param_name (str): The name of the parameter to fetch.
timeout (int): Timeout in seconds to wait for the response. Default is 5.

Returns:
float: The value of the parameter, or None if not found or timeout occurred.

"""
if self.master is None: # FIXME for testing only pylint: disable=fixme
return None

# Send PARAM_REQUEST_READ message
self.master.mav.param_request_read_send(
self.master.target_system,
self.master.target_component,
param_name.encode("utf-8"),
-1, # param_index: -1 means use param_id instead
)

# Wait for PARAM_VALUE response
start_time = time_time()
while time_time() - start_time < timeout:
msg = self.master.recv_match(type="PARAM_VALUE", blocking=False)
if msg is not None:
# Check if this is the parameter we requested
received_param_name = msg.param_id.rstrip("\x00")
if received_param_name == param_name:
logging_debug(_("Received parameter: %s = %s"), param_name, msg.param_value)
return float(msg.param_value)
time_sleep(0.01) # Small sleep to prevent busy waiting

raise TimeoutError(_("Timeout waiting for parameter %s") % param_name)

def reset_all_parameters_to_default(self) -> tuple[bool, str]:
"""
Reset all parameters to their factory default values.
Expand Down Expand Up @@ -817,12 +858,16 @@ def __list_serial_ports() -> list[serial.tools.list_ports_common.ListPortInfo]:

# Motor Test Functionality

def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]:
def test_motor( # pylint: disable=too-many-arguments, too-many-positional-arguments
self, test_sequence_nr: int, motor_letters: str, motor_output_nr: int, throttle_percent: int, timeout_seconds: int
) -> tuple[bool, str]:
"""
Test a specific motor.

Args:
motor_number: Motor number (1-based, as used by ArduPilot)
test_sequence_nr: Motor test number, this is not the same as the output number!
motor_letters: Motor letters (for logging purposes only)
motor_output_nr: Motor output number (for logging purposes only)
throttle_percent: Throttle percentage (0-100)
timeout_seconds: Test duration in seconds

Expand All @@ -840,20 +885,24 @@ def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds:
# https://mavlink.io/en/messages/common.html#MAV_CMD_DO_MOTOR_TEST
success, error_msg = self._send_command_and_wait_ack(
mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST,
param1=motor_number, # motor number
param1=test_sequence_nr + 1, # motor test number, this is not the same as the output number!
param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type
param3=throttle_percent, # throttle value
param4=timeout_seconds, # timeout
param5=motor_number, # motor count (same as motor number for single motor test)
param6=mavutil.mavlink.MOTOR_TEST_ORDER_BOARD, # test order
param5=0, # motor count (0=test just the motor specified in param1)
param6=0, # test order (0=default/board order)
param7=0, # unused
)

if success:
logging_info(
_("Motor test command confirmed: Motor %(motor)d at %(throttle)d%% for %(duration)d seconds"),
_(
"Motor test command acknowledged: Motor %(seq)s on output %(output)d at %(throttle)d%% thrust"
" for %(duration)d seconds"
),
{
"motor": motor_number,
"seq": motor_letters,
"output": motor_output_nr,
"throttle": throttle_percent,
"duration": timeout_seconds,
},
Expand All @@ -864,11 +913,12 @@ def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds:

return success, error_msg

def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]:
def test_all_motors(self, nr_of_motors: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]:
"""
Test all motors simultaneously.

Args:
nr_of_motors: Number of motors to test
throttle_percent: Throttle percentage (0-100)
timeout_seconds: Test duration in seconds

Expand All @@ -882,38 +932,34 @@ def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[
logging_error(error_msg)
return False, error_msg

# MAV_CMD_DO_MOTOR_TEST command for all motors
success, error_msg = self._send_command_and_wait_ack(
mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST,
param1=0, # motor count (all motors)
param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type
param3=throttle_percent, # throttle value
param4=timeout_seconds, # timeout
param5=0, # motor count
param6=mavutil.mavlink.MOTOR_TEST_ORDER_BOARD, # test order
param7=0, # unused
)

if success:
logging_info(
_("All motors test command confirmed at %(throttle)d%% for %(duration)d seconds"),
{
"throttle": throttle_percent,
"duration": timeout_seconds,
},
for i in range(nr_of_motors):
# MAV_CMD_DO_MOTOR_TEST command for all motors
self.master.mav.command_long_send(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST,
0, # confirmation
param1=i + 1, # motor number (1-based)
param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type
param3=throttle_percent, # throttle value
param4=timeout_seconds, # timeout
param5=0, # motor count (0=all motors when param1=0)
param6=0, # test order (0=default/board order)
param7=0, # unused
)
else:
error_msg = _("All motors test command failed: %(error)s") % {"error": error_msg}
logging_error(error_msg)
time_sleep(0.01) # to let the FC parse each command individually

return success, error_msg
return True, ""

def test_motors_in_sequence(self, motor_number: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]:
def test_motors_in_sequence(
self, start_motor: int, motor_count: int, throttle_percent: int, timeout_seconds: int
) -> tuple[bool, str]:
"""
Test motors in sequence (A, B, C, D, etc.).

Args:
motor_number: The motor number to test (1-based index)
start_motor: The first motor to test (1-based index)
motor_count: Number of motors to test in sequence
throttle_percent: Throttle percentage (1-100)
timeout_seconds: Test duration per motor in seconds

Expand All @@ -930,11 +976,11 @@ def test_motors_in_sequence(self, motor_number: int, throttle_percent: int, time
# MAV_CMD_DO_MOTOR_TEST command for sequence test
success, error_msg = self._send_command_and_wait_ack(
mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST,
param1=motor_number, # motor count
param1=start_motor, # starting motor number (1-based)
param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type
param3=throttle_percent, # throttle value
param4=timeout_seconds, # timeout per motor
param5=motor_number, # motor count
param5=motor_count, # number of motors to test in sequence
param6=mavutil.mavlink.MOTOR_TEST_ORDER_SEQUENCE, # test order (sequence)
param7=0, # unused
)
Expand Down Expand Up @@ -974,8 +1020,8 @@ def stop_all_motors(self) -> tuple[bool, str]:
param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type
param3=0, # throttle value (0% = stop)
param4=0, # timeout (0 = immediate stop)
param5=0, # motor count
param6=mavutil.mavlink.MOTOR_TEST_ORDER_BOARD, # test order
param5=0, # motor count (0 = all motors when param1=0)
param6=0, # test order (0 = default/board order)
param7=0, # unused
)

Expand Down Expand Up @@ -1015,6 +1061,7 @@ def request_periodic_battery_status(self, interval_microseconds: int = 1000000)
param5=0, # unused
param6=0, # unused
param7=0, # unused
timeout=0.8, # shorter timeout for battery status requests
)

if success:
Expand Down Expand Up @@ -1049,15 +1096,21 @@ def get_battery_status(self) -> tuple[Union[tuple[float, float], None], str]:

try:
# Try to get real telemetry data
battery_status = self.master.recv_match(type="BATTERY_STATUS", blocking=True, timeout=2)
battery_status = self.master.recv_match(type="BATTERY_STATUS", blocking=False, timeout=0.3)
if battery_status:
# Convert from millivolts to volts, and centiamps to amps
voltage = battery_status.voltages[0] / 1000.0 if battery_status.voltages[0] != -1 else 0.0
current = battery_status.current_battery / 100.0 if battery_status.current_battery != -1 else 0.0
self._last_battery_status = (voltage, current)
self._last_battery_message_time = time_time()
return (voltage, current), ""
except Exception as e: # pylint: disable=broad-exception-caught
logging_debug(_("Failed to get battery status from telemetry: %(error)s"), {"error": str(e)})

if self._last_battery_message_time and (time_time() - self._last_battery_message_time) < 3:
# If we received a battery message recently, don't log an error
return self._last_battery_status, ""
self._last_battery_status = None
error_msg = _("Battery status not available from telemetry")
return None, error_msg

Expand Down Expand Up @@ -1367,3 +1420,10 @@ def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser:
help=_("Flight controller reboot time. Default is %(default)s"),
)
return parser

@property
def comport_device(self) -> str:
"""Get the current self.comport.device string."""
if self.comport is not None:
return str(getattr(self.comport, "device", ""))
return ""
Loading
Loading