diff --git a/ARCHITECTURE_motor_test.md b/ARCHITECTURE_motor_test.md index d12d006a..7e9dd47a 100644 --- a/ARCHITECTURE_motor_test.md +++ b/ARCHITECTURE_motor_test.md @@ -20,8 +20,8 @@ It allows users to test motor functionality, verify motor order and direction, a - ✅ Immediate parameter application to flight controller on selection - ✅ Dynamic motor count calculation based on frame configuration - ✅ Motor diagram display showing the currently selected frame configuration - - ✅ SVG motor diagrams loaded from local images directory (downloaded from [ArduPilot documentation](https://ardupilot.org/copter/docs/connect-escs-and-motors.html)) - - The original diagrams are the `.svg` files in the `https://ardupilot.org/copter/_images/` directory + - ✅ PNG motor diagrams loaded from local images directory (converted from ArduPilot documentation SVG files) + - The original diagrams are the `.svg` files in the `https://ardupilot.org/copter/_images/` directory, converted to PNG for better compatibility 2. **Motor Parameter Configuration** - ✅ "Set Motor Spin Arm" button with parameter dialog for MOT_SPIN_ARM @@ -59,33 +59,54 @@ It allows users to test motor functionality, verify motor order and direction, a - ✅ Visual feedback for correct/incorrect motor placement - 🟡 Guidance for correcting wiring issues +### Additional Implemented Features (Beyond Original Requirements) + +#### Enhanced User Experience + +- ✅ Status column for real-time visual feedback during motor testing +- ✅ First-time safety confirmation popup +- ✅ Keyboard shortcuts for critical functions: + - Escape: Emergency stop all motors + - Ctrl+A: Test all motors simultaneously + - Ctrl+S: Test motors in sequence +- ✅ Settings persistence for test duration and throttle percentage +- ✅ Enhanced error handling and user feedback messages +- ✅ PNG diagram display with improved compatibility (no external tksvg dependency required) + +#### Advanced Safety Features + +- ✅ Multiple safety confirmation layers +- ✅ Comprehensive parameter validation with bounds checking +- ✅ Motor direction display (CW/CCW) for each motor position +- ✅ Battery safety threshold validation with visual indicators + ### Non-Functional Requirements 1. **Safety** - - Multiple safety warnings prominently displayed - - Clear indication of active motor testing - - Emergency stop functionality always accessible - - Safe parameter defaults and validation + - ✅ Multiple safety warnings prominently displayed + - ✅ Clear indication of active motor testing + - ✅ Emergency stop functionality always accessible + - ✅ Safe parameter defaults and validation 2. **Usability** - - Intuitive frame-based layout with logical workflow progression - - Clear visual feedback for active operations - - Immediate parameter application with confirmation - - Responsive UI with real-time feedback - - Keyboard shortcuts for critical functions + - ✅ Intuitive frame-based layout with logical workflow progression + - ✅ Clear visual feedback for active operations + - ✅ Immediate parameter application with confirmation + - ✅ Responsive UI with real-time feedback + - ✅ Keyboard shortcuts for critical functions 3. **Reliability** - - Active flight controller connection required for all motor testing operations - - Robust error handling for communication failures - - Parameter validation and bounds checking - - Graceful degradation when features unavailable - - Comprehensive logging for debugging + - ✅ Active flight controller connection required for all motor testing operations + - ✅ Robust error handling for communication failures + - ✅ Parameter validation and bounds checking + - ✅ Graceful degradation when features unavailable + - ✅ Comprehensive logging for debugging 4. **Performance** - - Responsive UI updates during motor testing - - Efficient parameter reading/writing - - Minimal latency for emergency stop operations - - Low resource usage + - ✅ Responsive UI updates during motor testing + - ✅ Efficient parameter reading/writing + - ✅ Minimal latency for emergency stop operations + - ✅ Low resource usage ## System Design @@ -126,7 +147,7 @@ The motor test sub-application follows the Model-View separation pattern establi │ └─────────────────────────────────────┤ │ ┌─────────────────────────────────────┤ │ │ backend_filesystem_program_settings.py │ - │ │ - Motor diagram SVG file access │ + │ │ - Motor diagram PNG file access │ │ │ - Settings persistence (test duration)│ │ │ - Application configuration │ │ └─────────────────────────────────────┤ @@ -141,12 +162,36 @@ The motor test sub-application follows the Model-View separation pattern establi - Frame type detection and motor count calculation - Motor label generation (numbers and letters) +- **JSON-driven motor direction retrieval** from AP_Motors_test.json with schema validation - Parameter reading and writing (MOT_SPIN_ARM, MOT_SPIN_MIN) - Battery status monitoring (BATT1 voltage and current when BATT_MONITOR != 0) - Voltage threshold validation (BATT_ARM_VOLT and MOT_BAT_VOLT_MAX thresholds) - Business logic validation - Backend coordination and abstraction +#### Motor Order and Direction Logic Architecture + +The motor test sub-application implements a robust JSON-driven architecture for motor test order and direction determination: + +**JSON Data Sources:** + +- **`AP_Motors_test.json`** - Authoritative motor layout data from ArduPilot project +- **`AP_Motors_test_schema.json`** - JSON schema for data structure validation + +**Key Features:** + +- ✅ **Schema Validation**: All JSON data validated against schema on load +- ✅ **Frame-specific Lookup**: Motor test order and directions retrieved based on FRAME_CLASS and FRAME_TYPE +- ✅ **Motor Number Ordering**: Directions sorted by motor number for consistent mapping +- ✅ **Error Logging**: Comprehensive logging for debugging and troubleshooting + +**Data Flow:** + +1. JSON data loaded during model initialization with schema validation +2. Motor test order and directions retrieved by matching current frame configuration +3. Results sorted by motor number and adapted to expected motor count +4. Error if frame not found or data invalid + **Key Methods:** ```python @@ -154,16 +199,16 @@ def __init__(self, flight_controller: FlightController, filesystem: LocalFilesys def get_motor_count(self) -> int def get_motor_labels(self) -> list[str] def get_motor_numbers(self) -> list[int] -def get_motor_directions(self) -> list[str] # Returns CW/CCW direction labels for each motor +def get_motor_directions(self) -> list[str] # ✅ Returns CW/CCW direction labels from AP_Motors_test.json def get_battery_status(self) -> Optional[tuple[float, float]] # voltage, current or None if BATT_MONITOR == 0 def get_voltage_status(self) -> str # "safe", "critical" "unavailable" or "disabled" def is_battery_monitoring_enabled(self) -> bool # True if BATT_MONITOR != 0 def is_motor_test_safe(self) -> tuple[bool, str] # (is_safe, reason) def set_parameter(self, param_name: str, value: float) -> tuple[bool, str] # (success, error_message) def get_parameter(self, param_name: str) -> Optional[float] -def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] -def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] -def test_motors_in_sequence(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] +def test_motor(self, test_sequence_nr: int, motor_letters: str, motor_output_nr: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] +def test_all_motors(self, nr_of_motors: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] +def test_motors_in_sequence(self, start_motor: int, motor_count: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] def stop_all_motors(self) -> tuple[bool, str] def get_motor_diagram_path(self) -> tuple[str, str] # Returns (filepath, description) def motor_diagram_exists(self) -> bool @@ -175,6 +220,8 @@ def update_frame_configuration(self, frame_class: int, frame_type: int) -> tuple def get_frame_options(self) -> dict[str, dict[int, str]] def refresh_connection_status(self) -> bool def get_voltage_thresholds(self) -> tuple[float, float] # Returns (BATT_ARM_VOLT, MOT_BAT_VOLT_MAX) +def refresh_from_flight_controller(self) -> bool # ✅ Refresh frame configuration from FC +def _load_motor_data(self) -> None # ✅ Load motor configuration from AP_Motors_test.json with schema validation ``` **Data Attributes:** @@ -182,9 +229,11 @@ def get_voltage_thresholds(self) -> tuple[float, float] # Returns (BATT_ARM_VOL - `flight_controller`: Backend flight controller interface - `filesystem`: Backend filesystem interface - `settings`: Backend program settings interface -- `frame_class`: Detected vehicle frame class -- `frame_type`: Detected vehicle frame type -- `motor_count`: Number of motors for current frame +- `_frame_class`: Detected vehicle frame class +- `_frame_type`: Detected vehicle frame type +- `_motor_count`: Number of motors for current frame +- `_motor_data_loader`: FilesystemJSONWithSchema instance for loading AP_Motors_test.json +- `_motor_data`: Loaded motor configuration data from JSON with schema validation #### Backend Layer Distribution @@ -196,24 +245,35 @@ The motor test sub-application backend logic is distributed across three special - Direct MAVLink communication with flight controller - Motor testing command execution -- Real-time battery monitoring and telemetry +- Real-time battery monitoring and telemetry with timestamp tracking (`_last_battery_message_time`, `_last_battery_status`) - Parameter read/write operations - Flight controller status monitoring **Key Motor Test Methods:** ```python -def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] -def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] -def test_motors_in_sequence(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] +def test_motor(self, test_sequence_nr: int, motor_letters: str, motor_output_nr: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] +def test_all_motors(self, nr_of_motors: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] +def test_motors_in_sequence(self, start_motor: int, motor_count: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str] def stop_all_motors(self) -> tuple[bool, str] def get_battery_status(self) -> tuple[Union[tuple[float, float], None], str] def get_voltage_thresholds(self) -> tuple[float, float] def is_battery_monitoring_enabled(self) -> bool def get_frame_info(self) -> tuple[int, int] def get_motor_count_from_frame(self) -> int +def fetch_param(self, param_name: str, timeout: int = 5) -> Optional[float] ``` +**Motor Test Implementation:** + +The motor test functionality uses MAVLink parameter handling: + +- **Individual Motor Testing**: Uses `MAV_CMD_DO_MOTOR_TEST` with `param1=test_sequence_nr + 1`, `param5=0` for single motor tests +- **Simultaneous Motor Testing**: Rapidly sends individual `MAV_CMD_DO_MOTOR_TEST` commands for each motor to achieve simultaneous effect +- **Sequential Motor Testing**: Uses `MAV_CMD_DO_MOTOR_TEST` with `param1=start_motor`, `param5=motor_count` for sequence testing +- **Parameter Fetching**: `fetch_param()` method for reliable parameter retrieval with timeout handling +- **Logging**: Motor test commands log test sequence letters, output numbers, and detailed execution information + ##### `backend_filesystem.py` - Safety & Parameter Support **Responsibilities:** @@ -227,20 +287,25 @@ def get_motor_count_from_frame(self) -> int **Responsibilities:** -- Motor diagram SVG file access and validation -- User settings persistence (test duration, preferences) +- Motor diagram PNG file access and validation +- User settings persistence with nested structure: `motor_test.duration` and `motor_test.throttle_pct` - Application configuration management **Key Motor Test Methods:** ```python @staticmethod -def motor_diagram_filepath(frame_class: int, frame_type: int) -> str +def motor_diagram_filepath(frame_class: int, frame_type: int) -> tuple[str, str] @staticmethod def motor_diagram_exists(frame_class: int, frame_type: int) -> bool -# Settings persistence methods (protected methods for test duration, etc.) ``` +##### `backend_filesystem_json_with_schema.py` - Motor Frames & rotation order and direction + +**Responsibilities:** + +- loading and validating AP_Motors_test.json + #### GUI Layer (`frontend_tkinter_motor_test.py`) **Primary Responsibilities:** @@ -254,38 +319,58 @@ def motor_diagram_exists(frame_class: int, frame_type: int) -> bool **Key UI Components:** - Information and safety warnings at the top -- Frame configuration section with FRAME_TYPE and FRAME_CLASS comboboxes +- Frame configuration section with FRAME_TYPE combobox - Arm and min throttle configuration with parameter buttons - Motor order/direction configuration with test buttons and direction detection - Real-time battery status display with color-coded voltage indication - Prominent red emergency stop button and test sequence controls +- Status column for real-time motor testing feedback (NEW) +- Keyboard shortcuts for critical operations (NEW) + +**Enhanced Features:** + +- ✅ PNG diagram rendering with improved compatibility and reliability +- ✅ First-time safety confirmation popups +- ✅ Real-time status updates during motor testing +- ✅ **Spinbox value management** - Automatic initialization from data model and proper change handling +- ✅ **Input validation and recovery** - Invalid inputs gracefully handled with model fallback +- ✅ Keyboard shortcuts: + - Escape: Emergency stop all motors + - Ctrl+A: Test all motors simultaneously + - Ctrl+S: Test motors in sequence +- ✅ Settings persistence for user preferences +- ✅ Enhanced error handling and logging +- ✅ **Comprehensive test coverage** - 34 BDD pytest tests ensuring reliability **Layout Structure:** ```text -┌─────────────────────────────────────────────────────────────┐ -│ Information & Safety Warnings │ -│ • Remove propellers before testing │ -│ • Ensure vehicle is secured │ -│ • Emergency stop always available │ -├─────────────────────────────────────────────────────────────┤ -│ 1. Frame configuration │ -│ Frame Type: [FRAME_TYPE ▼] Frame Class: [FRAME_CLASS ▼] │ -├─────────────────────────────────────────────────────────────┤ -│ 2. Arm and min throttle configuration │ -│ [Set Motor Spin Arm] [Set Motor Spin Min] │ -├─────────────────────────────────────────────────────────────┤ -│ 3. Motor order/direction configuration │ -│ Duration: [____] seconds Battery: 12.4V/2.1A │ -│ │ -│ [Motor A] Motor 1 CW [Detected: ▼] │ -│ [Motor B] Motor 2 CCW [Detected: ▼] │ -│ [Motor C] Motor 3 CCW [Detected: ▼] │ -│ [Motor D] Motor 4 CW [Detected: ▼] │ -│ ... │ -│ │ -│ [🛑 STOP ALL MOTORS] [Test in Sequence] │ -└─────────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────────────┐ +│ Information & Safety Warnings │ +│ • Remove propellers before testing │ +│ • Ensure vehicle is secured │ +│ • Emergency stop always available │ +├─────────────────────────────────────────────────────────────────────┤ +│ 1. Frame configuration │ +│ Frame Type: [FRAME_TYPE ▼] │ +│ │ +│ Frame Type PNG diagram │ +│ │ +├─────────────────────────────────────────────────────────────────────┤ +│ 2. Arm and min throttle configuration │ +│ [Set Motor Spin Arm] [Set Motor Spin Min] │ +├─────────────────────────────────────────────────────────────────────┤ +│ 3. Motor order/direction configuration │ +│ Throttle: [____] % Duration: [____] seconds Battery: 12.4V/2.1A | +│ │ +│ [Motor A] Motor 1 CW [Detected: ▼] │ +│ [Motor B] Motor 2 CCW [Detected: ▼] │ +│ [Motor C] Motor 3 CCW [Detected: ▼] │ +│ [Motor D] Motor 4 CW [Detected: ▼] │ +│ ... │ +│ │ +│ [🛑 STOP ALL MOTORS] [Test in Sequence] │ +└─────────────────────────────────────────────────────────────────────┘ ``` ### Data Flow @@ -363,7 +448,7 @@ def motor_diagram_exists(frame_class: int, frame_type: int) -> bool #### Data Model Tests (`tests/test_data_model_motor_test.py`) - ✅ IMPLEMENTED -**Status:** 54 comprehensive BDD pytest tests implemented covering all functionality +**Status:** 81 comprehensive BDD pytest tests implemented covering all functionality with **100% code coverage** **Test Coverage Areas:** @@ -377,20 +462,25 @@ def motor_diagram_exists(frame_class: int, frame_type: int) -> bool - ✅ Settings persistence and configuration management - ✅ Exception handling and edge cases -#### GUI Tests (`tests/test_frontend_tkinter_motor_test.py`) - ❌ NOT IMPLEMENTED +#### GUI Tests (`tests/test_frontend_tkinter_motor_test.py`) - ✅ IMPLEMENTED -**Status:** Frontend tests not yet implemented +**Status:** 34 comprehensive BDD pytest tests implemented covering frontend functionality with **high test coverage** -**Planned Test Coverage:** +**Test Coverage Areas:** -- ❌ UI component creation and layout -- ❌ Button generation based on motor count -- ❌ Event handling for all interactive elements -- ❌ Safety confirmation dialogs -- ❌ Parameter input validation -- ❌ Integration with data model layer -- ❌ Keyboard shortcut functionality -- ❌ SVG diagram rendering +- ✅ UI component creation and layout validation +- ✅ Button generation based on motor count changes +- ✅ Event handling for all interactive elements +- ✅ Safety confirmation dialogs and user interactions +- ✅ Parameter input validation and error handling +- ✅ Integration with data model layer via dependency injection +- ✅ Keyboard shortcut functionality verification +- ✅ PNG diagram rendering and error handling +- ✅ Spinbox initialization and change handling +- ✅ Frame type selection and immediate parameter application +- ✅ Battery status monitoring and display updates +- ✅ Motor status visual feedback during testing +- ✅ Emergency stop functionality and safety mechanisms ### Integration Testing @@ -420,22 +510,102 @@ def mock_flight_controller() -> MagicMock: @pytest.fixture def motor_test_data_model(mock_flight_controller) -> MotorTestDataModel: - """Fixture providing configured motor test data model.""" + """Fixture providing configured motor test data model with dependency injection support.""" + +@pytest.fixture +def motor_test_view_setup() -> tuple[MagicMock, ...]: + """Fixture providing complete mock setup for testing MotorTestView without full window creation.""" @pytest.fixture def motor_test_window(motor_test_data_model) -> MotorTestWindow: - """Fixture providing configured motor test GUI window.""" + """Fixture providing configured motor test GUI window for integration testing.""" -def test_user_can_test_individual_motor(self, motor_test_window) -> None: +def test_user_can_test_individual_motor(self, motor_test_view_setup) -> None: """ User can test individual motors safely. - GIVEN: A configured vehicle with detected frame type + GIVEN: A configured motor test view with mock dependencies WHEN: User clicks a motor test button - THEN: The corresponding motor should activate with feedback + THEN: The corresponding motor should activate with proper validation and feedback + """ + +def test_spinbox_values_initialize_from_data_model(self, motor_test_view_setup) -> None: + """ + Spinbox values are properly initialized from the data model. + + GIVEN: A motor test view with configured data model + WHEN: The view is updated + THEN: Spinbox values should reflect the current model values + """ + +def test_spinbox_changes_update_data_model(self, motor_test_view_setup) -> None: + """ + Spinbox changes properly update the data model. + + GIVEN: A motor test view with Spinbox widgets + WHEN: User changes Spinbox values + THEN: The data model should be updated with the new values """ ``` +## Implementation Status Summary + +### ✅ COMPLETED FEATURES + +**Core Functionality:** + +- ✅ Complete data model with 30+ methods (**100% test coverage** with 81 BDD pytest tests) +- ✅ Full Tkinter GUI implementation with all required components (**34 BDD pytest tests** with comprehensive coverage) +- ✅ **Enhanced Model-View separation** with dependency injection for improved testability +- ✅ Real backend integration (FlightController, LocalFilesystem, ProgramSettings) +- ✅ Frame configuration with parameter metadata integration +- ✅ Motor testing (individual, all, sequence, emergency stop) +- ✅ Battery monitoring with safety validation +- ✅ Parameter configuration (MOT_SPIN_ARM, MOT_SPIN_MIN) +- ✅ PNG diagram rendering with improved compatibility +- ✅ **JSON-driven motor direction logic** with schema validation (AP_Motors_test.json) +- ✅ **Motor rotation direction display** (CW/CCW) from authoritative ArduPilot data +- ✅ **Motor count mismatch handling** with extension/truncation for various frame configurations +- ✅ **Spinbox value synchronization** - proper initialization from data model and change handling + +**Enhanced Features:** + +- ✅ Status column for real-time feedback +- ✅ Keyboard shortcuts for critical functions +- ✅ First-time safety confirmation +- ✅ Settings persistence with Spinbox initialization from data model +- ✅ Enhanced error handling and logging +- ✅ Motor direction display (CW/CCW) +- ✅ **FilesystemJSONWithSchema integration** for robust JSON loading and validation + +### 🟡 PARTIAL FEATURES (Implemented but not fully tested) + +- 🟡 Motor order comparison and validation logic (framework present) +- 🟡 Wiring issue guidance (basic framework implemented) + +### ❌ MISSING FEATURES + +- ❌ End-to-end integration tests with real flight controller hardware +- ❌ Advanced motor order validation algorithms with automatic correction suggestions + +### Dependencies and Requirements + +#### Backend Dependencies + +- ✅ `backend_flightcontroller.py` - All required motor test methods verified present +- ✅ `backend_filesystem.py` - Enhanced with `get_frame_options()` method +- ✅ `backend_filesystem_program_settings.py` - Settings persistence implemented +- ✅ `backend_filesystem_json_with_schema.py` - JSON loading with schema validation for motor data +- ✅ `annotate_params.py` - Parameter metadata parsing support + +#### File Dependencies + +- ✅ Motor diagram PNG files in `ardupilot_methodic_configurator/images/motor_diagrams_png/` directory +- ✅ Parameter documentation metadata files (`.pdef.xml`) +- ✅ Frame configuration JSON schemas +- ✅ **AP_Motors_test.json** - Motor layout configuration data with motor positions, rotations, and test order +- ✅ **AP_Motors_test_schema.json** - JSON schema for validating motor configuration data structure + ## Implementation Guidelines ### Code Style Requirements @@ -448,13 +618,14 @@ def test_user_can_test_individual_motor(self, motor_test_window) -> None: ### Safety Implementation -1. **Parameter Validation**: All parameters validated before sending to FC -2. **Battery Monitoring**: Real-time battery voltage monitoring with BATT_ARM_VOLT/MOT_BAT_VOLT_MAX threshold validation (when BATT_MONITOR != 0) -3. **Voltage Safety**: Safety popup when attempting motor test with voltage outside safe range, prompting user to connect battery and/or ensure charged state -4. **Timeout Mechanisms**: Automatic motor stop after configured timeout -5. **Emergency Stop**: Always accessible stop functionality -6. **User Confirmation**: Display a Safety confirmation popup the first time a Motor test button is pressed -7. **Visual Feedback**: Clear indication of active motor testing (status column) and battery status (green/red voltage display) +1. **Parameter Validation**: ✅ All parameters validated before sending to FC +2. **Battery Monitoring**: ✅ Real-time battery voltage monitoring with BATT_ARM_VOLT/MOT_BAT_VOLT_MAX threshold validation (when BATT_MONITOR != 0) +3. **Voltage Safety**: ✅ Safety popup when attempting motor test with voltage outside safe range, prompting user to connect battery and/or ensure charged state +4. **Timeout Mechanisms**: ✅ Automatic motor stop after configured timeout +5. **Emergency Stop**: ✅ Always accessible stop functionality with keyboard shortcut (Escape) +6. **User Confirmation**: ✅ Display a Safety confirmation popup the first time a Motor test button is pressed +7. **Visual Feedback**: ✅ Clear indication of active motor testing (status column) and battery status (green/red voltage display) +8. **Multi-layer Safety**: ✅ Multiple confirmation dialogs and safety checks throughout the workflow ### Performance Considerations @@ -494,7 +665,7 @@ The motor test sub-application leverages the existing backend infrastructure wit #### Program Settings Integration (`backend_filesystem_program_settings.py`) -- **Motor Diagrams**: Comprehensive SVG diagram support for all ArduPilot frame types +- **Motor Diagrams**: Comprehensive PNG diagram support for all ArduPilot frame types - **Settings Persistence**: Consistent user preference storage using established patterns - **Resource Management**: Proper handling of application resources and file paths @@ -511,14 +682,6 @@ Communication with ArduPilot flight controller via: 4. **Status Monitoring**: Real-time status and safety monitoring 5. **Frame Detection**: Automatic vehicle configuration detection -## Future Enhancements - -### Planned Features - -1. **Motor Health Monitoring**: Current draw and performance metrics -2. **Calibration Assistance**: Guided motor calibration procedures -3. **Visual Frame Display**: Graphical representation of motor layout - ## Security Considerations ### Safety Measures diff --git a/ardupilot_methodic_configurator/backend_filesystem_program_settings.py b/ardupilot_methodic_configurator/backend_filesystem_program_settings.py index 72504f34..b18b7017 100644 --- a/ardupilot_methodic_configurator/backend_filesystem_program_settings.py +++ b/ardupilot_methodic_configurator/backend_filesystem_program_settings.py @@ -68,8 +68,10 @@ def _get_settings_defaults(cls) -> dict[str, Union[int, bool, str, float, dict]] "annotate_docs_into_param_files": False, "gui_complexity": "simple", # simple or normal # Motor test settings - "motor_test_duration": 2.5, # Default test duration in seconds - "motor_test_throttle_pct": 10, # Default throttle percentage (10%) + "motor_test": { + "duration": 2.5, # Default test duration in seconds + "throttle_pct": 10, # Default throttle percentage (10%) + }, } @staticmethod @@ -341,14 +343,14 @@ def set_setting(setting: str, value: Union[bool, str, float]) -> None: @staticmethod def motor_diagram_filepath(frame_class: int, frame_type: int) -> tuple[str, str]: """ - Get the filepath for the motor diagram SVG file. + Get the filepath for the motor diagram PNG file. Args: frame_class: ArduPilot frame class (1=QUAD, 2=HEXA, etc.) frame_type: ArduPilot frame type (0=PLUS, 1=X, etc.) Returns: - str: Absolute path to the motor diagram SVG file + str: Absolute path to the motor diagram PNG file str: Error message if multiple or no files found, empty string if no error """ @@ -361,12 +363,12 @@ def motor_diagram_filepath(frame_class: int, frame_type: int) -> tuple[str, str] # Running as script application_path = os_path.dirname(os_path.dirname(os_path.abspath(__file__))) - images_dir = os_path.join(application_path, "ardupilot_methodic_configurator", "images") + images_dir = os_path.join(application_path, "ardupilot_methodic_configurator", "images", "motor_diagrams_png") - # Generate SVG filename based on frame configuration - filename = f"m_{frame_class:02d}_{frame_type:02d}_*.svg" + # Generate PNG filename based on frame configuration + filename = f"m_{frame_class:02d}_{frame_type:02d}_*.png" - # Search for matching SVG file (since exact naming varies) + # Search for matching PNG file (since exact naming varies) matching_files = glob.glob(os_path.join(images_dir, filename)) err_msg = ( diff --git a/ardupilot_methodic_configurator/backend_flightcontroller.py b/ardupilot_methodic_configurator/backend_flightcontroller.py index 6fe9968b..4e0120cb 100644 --- a/ardupilot_methodic_configurator/backend_flightcontroller.py +++ b/ardupilot_methodic_configurator/backend_flightcontroller.py @@ -88,7 +88,7 @@ def close(self) -> None: ] -class FlightController: # pylint: disable=too-many-public-methods +class FlightController: # pylint: disable=too-many-public-methods,too-many-instance-attributes """ A class to manage the connection and parameters of a flight controller. @@ -114,6 +114,10 @@ def __init__(self, reboot_time: int = DEFAULT_REBOOT_TIME, baudrate: int = DEFAU self.fc_parameters: dict[str, float] = {} self.info = BackendFlightcontrollerInfo() + # Battery status tracking + self._last_battery_message_time: float = 0.0 + self._last_battery_status: Union[tuple[float, float], None] = None + def discover_connections(self) -> None: comports = FlightController.__list_serial_ports() netports = FlightController.__list_network_ports() @@ -714,6 +718,43 @@ def set_param(self, param_name: str, param_value: float) -> None: return self.master.param_set_send(param_name, param_value) + def fetch_param(self, param_name: str, timeout: int = 5) -> Optional[float]: + """ + Fetch a parameter from the flight controller using MAVLink PARAM_REQUEST_READ message. + + Args: + param_name (str): The name of the parameter to fetch. + timeout (int): Timeout in seconds to wait for the response. Default is 5. + + Returns: + float: The value of the parameter, or None if not found or timeout occurred. + + """ + if self.master is None: # FIXME for testing only pylint: disable=fixme + return None + + # Send PARAM_REQUEST_READ message + self.master.mav.param_request_read_send( + self.master.target_system, + self.master.target_component, + param_name.encode("utf-8"), + -1, # param_index: -1 means use param_id instead + ) + + # Wait for PARAM_VALUE response + start_time = time_time() + while time_time() - start_time < timeout: + msg = self.master.recv_match(type="PARAM_VALUE", blocking=False) + if msg is not None: + # Check if this is the parameter we requested + received_param_name = msg.param_id.rstrip("\x00") + if received_param_name == param_name: + logging_debug(_("Received parameter: %s = %s"), param_name, msg.param_value) + return float(msg.param_value) + time_sleep(0.01) # Small sleep to prevent busy waiting + + raise TimeoutError(_("Timeout waiting for parameter %s") % param_name) + def reset_all_parameters_to_default(self) -> tuple[bool, str]: """ Reset all parameters to their factory default values. @@ -817,12 +858,16 @@ def __list_serial_ports() -> list[serial.tools.list_ports_common.ListPortInfo]: # Motor Test Functionality - def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: + def test_motor( # pylint: disable=too-many-arguments, too-many-positional-arguments + self, test_sequence_nr: int, motor_letters: str, motor_output_nr: int, throttle_percent: int, timeout_seconds: int + ) -> tuple[bool, str]: """ Test a specific motor. Args: - motor_number: Motor number (1-based, as used by ArduPilot) + test_sequence_nr: Motor test number, this is not the same as the output number! + motor_letters: Motor letters (for logging purposes only) + motor_output_nr: Motor output number (for logging purposes only) throttle_percent: Throttle percentage (0-100) timeout_seconds: Test duration in seconds @@ -840,20 +885,24 @@ def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds: # https://mavlink.io/en/messages/common.html#MAV_CMD_DO_MOTOR_TEST success, error_msg = self._send_command_and_wait_ack( mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST, - param1=motor_number, # motor number + param1=test_sequence_nr + 1, # motor test number, this is not the same as the output number! param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type param3=throttle_percent, # throttle value param4=timeout_seconds, # timeout - param5=motor_number, # motor count (same as motor number for single motor test) - param6=mavutil.mavlink.MOTOR_TEST_ORDER_BOARD, # test order + param5=0, # motor count (0=test just the motor specified in param1) + param6=0, # test order (0=default/board order) param7=0, # unused ) if success: logging_info( - _("Motor test command confirmed: Motor %(motor)d at %(throttle)d%% for %(duration)d seconds"), + _( + "Motor test command acknowledged: Motor %(seq)s on output %(output)d at %(throttle)d%% thrust" + " for %(duration)d seconds" + ), { - "motor": motor_number, + "seq": motor_letters, + "output": motor_output_nr, "throttle": throttle_percent, "duration": timeout_seconds, }, @@ -864,11 +913,12 @@ def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds: return success, error_msg - def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: + def test_all_motors(self, nr_of_motors: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: """ Test all motors simultaneously. Args: + nr_of_motors: Number of motors to test throttle_percent: Throttle percentage (0-100) timeout_seconds: Test duration in seconds @@ -882,38 +932,34 @@ def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[ logging_error(error_msg) return False, error_msg - # MAV_CMD_DO_MOTOR_TEST command for all motors - success, error_msg = self._send_command_and_wait_ack( - mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST, - param1=0, # motor count (all motors) - param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type - param3=throttle_percent, # throttle value - param4=timeout_seconds, # timeout - param5=0, # motor count - param6=mavutil.mavlink.MOTOR_TEST_ORDER_BOARD, # test order - param7=0, # unused - ) - - if success: - logging_info( - _("All motors test command confirmed at %(throttle)d%% for %(duration)d seconds"), - { - "throttle": throttle_percent, - "duration": timeout_seconds, - }, + for i in range(nr_of_motors): + # MAV_CMD_DO_MOTOR_TEST command for all motors + self.master.mav.command_long_send( + self.master.target_system, + self.master.target_component, + mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST, + 0, # confirmation + param1=i + 1, # motor number (1-based) + param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type + param3=throttle_percent, # throttle value + param4=timeout_seconds, # timeout + param5=0, # motor count (0=all motors when param1=0) + param6=0, # test order (0=default/board order) + param7=0, # unused ) - else: - error_msg = _("All motors test command failed: %(error)s") % {"error": error_msg} - logging_error(error_msg) + time_sleep(0.01) # to let the FC parse each command individually - return success, error_msg + return True, "" - def test_motors_in_sequence(self, motor_number: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: + def test_motors_in_sequence( + self, start_motor: int, motor_count: int, throttle_percent: int, timeout_seconds: int + ) -> tuple[bool, str]: """ Test motors in sequence (A, B, C, D, etc.). Args: - motor_number: The motor number to test (1-based index) + start_motor: The first motor to test (1-based index) + motor_count: Number of motors to test in sequence throttle_percent: Throttle percentage (1-100) timeout_seconds: Test duration per motor in seconds @@ -930,11 +976,11 @@ def test_motors_in_sequence(self, motor_number: int, throttle_percent: int, time # MAV_CMD_DO_MOTOR_TEST command for sequence test success, error_msg = self._send_command_and_wait_ack( mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST, - param1=motor_number, # motor count + param1=start_motor, # starting motor number (1-based) param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type param3=throttle_percent, # throttle value param4=timeout_seconds, # timeout per motor - param5=motor_number, # motor count + param5=motor_count, # number of motors to test in sequence param6=mavutil.mavlink.MOTOR_TEST_ORDER_SEQUENCE, # test order (sequence) param7=0, # unused ) @@ -974,8 +1020,8 @@ def stop_all_motors(self) -> tuple[bool, str]: param2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT, # throttle type param3=0, # throttle value (0% = stop) param4=0, # timeout (0 = immediate stop) - param5=0, # motor count - param6=mavutil.mavlink.MOTOR_TEST_ORDER_BOARD, # test order + param5=0, # motor count (0 = all motors when param1=0) + param6=0, # test order (0 = default/board order) param7=0, # unused ) @@ -1015,6 +1061,7 @@ def request_periodic_battery_status(self, interval_microseconds: int = 1000000) param5=0, # unused param6=0, # unused param7=0, # unused + timeout=0.8, # shorter timeout for battery status requests ) if success: @@ -1049,15 +1096,21 @@ def get_battery_status(self) -> tuple[Union[tuple[float, float], None], str]: try: # Try to get real telemetry data - battery_status = self.master.recv_match(type="BATTERY_STATUS", blocking=True, timeout=2) + battery_status = self.master.recv_match(type="BATTERY_STATUS", blocking=False, timeout=0.3) if battery_status: # Convert from millivolts to volts, and centiamps to amps voltage = battery_status.voltages[0] / 1000.0 if battery_status.voltages[0] != -1 else 0.0 current = battery_status.current_battery / 100.0 if battery_status.current_battery != -1 else 0.0 + self._last_battery_status = (voltage, current) + self._last_battery_message_time = time_time() return (voltage, current), "" except Exception as e: # pylint: disable=broad-exception-caught logging_debug(_("Failed to get battery status from telemetry: %(error)s"), {"error": str(e)}) + if self._last_battery_message_time and (time_time() - self._last_battery_message_time) < 3: + # If we received a battery message recently, don't log an error + return self._last_battery_status, "" + self._last_battery_status = None error_msg = _("Battery status not available from telemetry") return None, error_msg @@ -1367,3 +1420,10 @@ def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser: help=_("Flight controller reboot time. Default is %(default)s"), ) return parser + + @property + def comport_device(self) -> str: + """Get the current self.comport.device string.""" + if self.comport is not None: + return str(getattr(self.comport, "device", "")) + return "" diff --git a/ardupilot_methodic_configurator/data_model_motor_test.py b/ardupilot_methodic_configurator/data_model_motor_test.py index eb4769a4..e4b305e4 100644 --- a/ardupilot_methodic_configurator/data_model_motor_test.py +++ b/ardupilot_methodic_configurator/data_model_motor_test.py @@ -14,7 +14,7 @@ from logging import warning as logging_warning from math import nan from os import path as os_path -from typing import Any, Optional +from typing import Any, Callable, Optional, Union from ardupilot_methodic_configurator import _ from ardupilot_methodic_configurator.backend_filesystem import LocalFilesystem @@ -25,6 +25,40 @@ # pylint: disable=too-many-lines +DURATION_S_MIN = 1.0 # Minimum duration for motor tests in seconds +DURATION_S_MAX = 60.0 # Maximum duration for motor tests in seconds +THROTTLE_PCT_MIN = 1 # Minimum throttle percentage for motor tests +THROTTLE_PCT_MAX = 100 # Maximum throttle percentage for motor tests + + +class MotorTestError(Exception): + """Base exception for motor test related errors.""" + + +class FlightControllerConnectionError(MotorTestError): + """Raised when flight controller is not connected.""" + + +class MotorTestSafetyError(MotorTestError): + """Raised when motor test conditions are unsafe.""" + + +class ParameterError(MotorTestError): + """Raised when parameter operations fail.""" + + +class MotorTestExecutionError(MotorTestError): + """Raised when motor test execution fails.""" + + +class FrameConfigurationError(MotorTestError): + """Raised when frame configuration operations fail.""" + + +class ValidationError(MotorTestError): + """Raised when validation of input parameters fails.""" + + class MotorTestDataModel: # pylint: disable=too-many-public-methods, too-many-instance-attributes """ Data model for motor test functionality. @@ -60,9 +94,20 @@ def __init__( self._frame_class: int = 0 # Default to invalid self._frame_type: int = 0 # Default to invalid self._motor_count: int = 0 # Default to invalid + self._motor_labels: list[str] = [] # default to empty + self._motor_numbers: list[int] = [] # default to empty + self._test_order: list[int] = [] # default to empty + self._motor_directions: list[str] = [] # default to empty self._frame_layout: dict[str, Any] = {} # default to empty + + self._test_throttle_pct = 0.0 + self._test_duration_s = 0.0 + self._got_battery_status = False + # Cache for frame options to avoid reloading them repeatedly + self._cached_frame_options: Optional[dict[str, dict[int, str]]] = None + # Load motor configuration data self._load_motor_data() @@ -70,45 +115,7 @@ def __init__( # We must fail early if the connection is not available self._update_frame_configuration() - def _update_frame_configuration(self) -> None: - """Update frame configuration from flight controller.""" - if self.flight_controller.master is None or not self.flight_controller.fc_parameters: - msg = _("Flight controller connection required for motor testing") - logging_error(msg) - raise RuntimeError(msg) - - try: - # Get from flight controller - frame_class, frame_type = self.flight_controller.get_frame_info() - if self._frame_class == frame_class and self._frame_type == frame_type: - return - self._frame_class = frame_class - self._frame_type = frame_type - self._motor_count = 0 - if self._motor_data_loader.data and "layouts" in self._motor_data_loader.data: - # find a layout that matches the current frame class and type - for layout in self._motor_data_loader.data["layouts"]: - if layout["Class"] == self._frame_class and layout["Type"] == self._frame_type and "motors" in layout: - self._frame_layout = layout - self._motor_count = len(layout["motors"]) - break - if self._motor_count == 0: - raise RuntimeError( - _("No motor configuration found for frame class %(class)d and type %(type)d") - % {"class": self._frame_class, "type": self._frame_type} - ) - - logging_debug( - _("Frame configuration updated: Class=%(class)d, Type=%(type)d, Motors=%(motors)d"), - { - "class": self._frame_class, - "type": self._frame_type, - "motors": self._motor_count, - }, - ) - except Exception as e: - logging_error(_("Failed to update frame configuration: %(error)s"), {"error": str(e)}) - raise + self._get_test_settings_from_disk() def _load_motor_data(self) -> None: """Load motor configuration data from AP_Motors_test.json file.""" @@ -117,6 +124,9 @@ def _load_motor_data(self) -> None: current_dir = os_path.dirname(__file__) self._motor_data = self._motor_data_loader.load_json_data(current_dir) + # Clear frame options cache since motor data changed + self._cached_frame_options = None + if not self._motor_data: logging_warning(_("Failed to load motor test data from AP_Motors_test.json")) else: @@ -129,6 +139,94 @@ def _load_motor_data(self) -> None: logging_error(_("Error loading motor test data: %(error)s"), {"error": str(e)}) self._motor_data = {} + def _configure_frame_layout(self, frame_class: int, frame_type: int) -> None: + """ + Configure frame layout based on frame class and type. + + This is the pure logic function that processes frame configuration + without flight controller interaction, making it easily testable. + + Args: + frame_class: Frame class (e.g., 1 for QUAD) + frame_type: Frame type (e.g., 0 for PLUS, 1 for X) + + Raises: + RuntimeError: If no motor configuration found for the frame + + """ + # Check if frame configuration has changed + if self._frame_class == frame_class and self._frame_type == frame_type: + return + + # Update frame parameters + self._frame_class = frame_class + self._frame_type = frame_type + self._motor_count = 0 + self._frame_layout = {} + + # Find matching layout in motor data and populate motor arrays + if self._motor_data_loader.data and "layouts" in self._motor_data_loader.data: + for layout in self._motor_data_loader.data["layouts"]: + if layout["Class"] == self._frame_class and layout["Type"] == self._frame_type and "motors" in layout: + self._frame_layout = layout + self._motor_count = len(layout["motors"]) + # Generate motor labels: A-Z for first 26, then AA, AB, AC... for motors 27-32 + self._motor_labels = [] + for i in range(self._motor_count): + if i < 26: + self._motor_labels.append(chr(ord("A") + i)) + else: + # For motors 27-32: AA, AB, AC, AD, AE, AF + self._motor_labels.append("A" + chr(ord("A") + (i - 26))) + self._motor_numbers = [0] * self._motor_count + self._test_order = [0] * self._motor_count + self._motor_directions = [""] * self._motor_count + for i, motor in enumerate(self._frame_layout.get("motors", [])): + test_order = motor.get("TestOrder") + if test_order and 1 <= test_order <= self._motor_count: + self._motor_numbers[test_order - 1] = motor.get("Number") + self._motor_directions[test_order - 1] = motor.get("Rotation") + self._test_order[i] = test_order + break + + if self._motor_count == 0: + raise RuntimeError( + _("No motor configuration found for frame class %(class)d and type %(type)d") + % {"class": self._frame_class, "type": self._frame_type} + ) + + logging_debug( + _("Frame configuration updated: Class=%(class)d, Type=%(type)d, Motors=%(motors)d"), + { + "class": self._frame_class, + "type": self._frame_type, + "motors": self._motor_count, + }, + ) + + def _get_test_settings_from_disk(self) -> None: + """Load test settings from disk.""" + self._test_throttle_pct = self._get_test_throttle_pct() + self._test_duration_s = self._get_test_duration_s() + + def _update_frame_configuration(self) -> None: + """Update frame configuration from flight controller.""" + if self.flight_controller.master is None or not self.flight_controller.fc_parameters: + msg = _("Flight controller connection required for motor testing") + logging_error(msg) + raise RuntimeError(msg) + + try: + # Get frame info from flight controller + frame_class, frame_type = self.flight_controller.get_frame_info() + + # Configure frame layout using the separated logic + self._configure_frame_layout(frame_class, frame_type) + + except Exception as e: + logging_error(_("Failed to update frame configuration: %(error)s"), {"error": str(e)}) + raise + def refresh_from_flight_controller(self) -> bool: """ Refresh frame configuration from flight controller. @@ -153,61 +251,31 @@ def frame_type(self) -> int: """Get the current frame type.""" return self._frame_type - def get_motor_count(self) -> int: - """ - Get the number of motors for the current frame configuration. - - Returns: - int: Number of motors - - """ + @property + def motor_count(self) -> int: + """Get the number of motors for the current frame configuration.""" return self._motor_count - def get_motor_labels(self) -> list[str]: - """ - Generate motor labels (A, B, C, D, etc.) based on motor count. - - Returns: - list[str]: List of motor labels - - """ - return [chr(ord("A") + i) for i in range(self._motor_count)] - - def get_motor_numbers(self) -> list[int]: - """ - Get motor numbers in test order (1, 4, 3, 2, etc.). - - Returns: - list[int]: List of motor numbers (1-based) in their test order - - """ - motor_numbers: list[int] = [0] * self._motor_count - if self._frame_layout: - for motor in self._frame_layout.get("motors", []): - test_order = motor.get("TestOrder") - if test_order and 1 <= test_order <= self._motor_count: - motor_numbers[test_order - 1] = motor.get("Number") - return motor_numbers - err_msg = _("No Frame layout found, not possible to generate motor test order") - raise ValueError(err_msg) + @property + def motor_labels(self) -> list[str]: + """Get motor labels (A, B, C, D, etc.) based on motor count.""" + return self._motor_labels - def get_motor_directions(self) -> list[str]: - """ - Get expected motor rotation directions based on frame configuration. + @property + def motor_numbers(self) -> list[int]: + """Get motor numbers in test order (1, 4, 3, 2, etc.).""" + return self._motor_numbers - Returns: - list[str]: List of motor directions ("CW" or "CCW") + def test_order(self, motor_number: int) -> int: + """Get the test order for a specific motor.""" + if 1 <= motor_number <= self._motor_count: + return self._test_order[motor_number - 1] - 1 + raise ValueError(_("Invalid motor number: %(number)d") % {"number": motor_number}) - """ - motor_directions: list[str] = [""] * self._motor_count - if self._frame_layout: - for motor in self._frame_layout.get("motors", []): - test_order = motor.get("TestOrder") - if test_order and 1 <= test_order <= self._motor_count: - motor_directions[test_order - 1] = motor.get("Rotation") - return motor_directions - err_msg = _("No Frame layout found, not possible to generate motor test rotation order") - raise ValueError(err_msg) + @property + def motor_directions(self) -> list[str]: + """Get expected motor rotation directions based on frame configuration.""" + return self._motor_directions def is_battery_monitoring_enabled(self) -> bool: """ @@ -286,88 +354,127 @@ def get_voltage_status(self) -> str: return "safe" return "critical" - def is_motor_test_safe(self) -> tuple[bool, str]: + def is_motor_test_safe(self) -> None: """ Check if motor testing is currently safe. - Returns: - tuple[bool, str]: (is_safe, reason) - True if safe with empty reason, - False with explanation if unsafe + Raises: + FlightControllerConnectionError: If flight controller is not connected + MotorTestSafetyError: If motor test conditions are unsafe """ # Check if flight controller is connected if self.flight_controller.master is None: - return False, _("Flight controller not connected.") + raise FlightControllerConnectionError(_("Flight controller not connected.")) # Check battery monitoring if not self.is_battery_monitoring_enabled(): - # If battery monitoring is disabled, assume it's safe but warn user - return True, _("Battery monitoring disabled, cannot verify voltage.") + # If battery monitoring is disabled, we still warn but don't fail + logging_warning(_("Battery monitoring disabled, cannot verify voltage.")) + return # Check battery voltage if monitoring is enabled battery_status = self.get_battery_status() if battery_status is None: - return False, _("Could not read battery status.") + raise MotorTestSafetyError(_("Could not read battery status.")) voltage, _current = battery_status min_voltage, max_voltage = self.get_voltage_thresholds() if not min_voltage < voltage < max_voltage: - return False, _("Battery voltage %(voltage).1fV is outside safe range (%(min).1fV - %(max).1fV)") % { - "voltage": voltage, - "min": min_voltage, - "max": max_voltage, - } - - return True, "" + raise MotorTestSafetyError( + _("Battery voltage %(voltage).1fV is outside safe range (%(min).1fV - %(max).1fV)") + % { + "voltage": voltage, + "min": min_voltage, + "max": max_voltage, + } + ) - def set_parameter(self, param_name: str, value: float) -> tuple[bool, str]: + def set_parameter( # pylint: disable=too-many-arguments, too-many-positional-arguments + self, + param_name: str, + value: float, + reset_progress_callback: Union[None, Callable[[int, int], None]] = None, + connection_progress_callback: Union[None, Callable[[int, int], None]] = None, + extra_sleep_time: Optional[int] = 0, + ) -> None: """ Set a parameter value and upload to flight controller. Args: param_name: Parameter name (e.g., "MOT_SPIN_ARM") value: Parameter value + reset_progress_callback: Optional callback for reset progress updates + connection_progress_callback: Optional callback for connection progress updates + extra_sleep_time: Optional additional sleep time before re-connecting - Returns: - tuple[bool, str]: (success, error_message) - True on success with empty message, - False with error description on failure + Raises: + FlightControllerConnectionError: If flight controller is not connected + ValidationError: If parameter value is invalid + ParameterError: If parameter setting fails + TimeoutError: If parameter setting times out """ if self.flight_controller.master is None: - error_msg = _("No flight controller connection available") - return False, error_msg + raise FlightControllerConnectionError(_("No flight controller connection available")) try: # Validate parameter bounds if possible if param_name in ["MOT_SPIN_ARM", "MOT_SPIN_MIN"] and not 0.0 <= value <= 1.0: - error_msg = _("%(param)s value %(value).3f is outside valid range (0.0 - 1.0)") % { - "param": param_name, - "value": value, - } - return False, error_msg + raise ValidationError( + _("%(param)s value %(value).3f is outside valid range (0.0 - 1.0)") + % { + "param": param_name, + "value": value, + } + ) + + requires_reset = False + if param_name in self.filesystem.doc_dict: + min_value = self.filesystem.doc_dict[param_name].get("min", -float("inf")) + max_value = self.filesystem.doc_dict[param_name].get("max", float("inf")) + requires_reset = self.filesystem.doc_dict[param_name].get("RebootRequired", False) + if value < min_value: + raise ValidationError( + _("%(param)s value %(value).3f is smaller than %(min)f") + % {"param": param_name, "value": value, "min": min_value} + ) + if value > max_value: + raise ValidationError( + _("%(param)s value %(value).3f is greater than %(max)f") + % {"param": param_name, "value": value, "max": max_value} + ) # Set parameter and verify it was set correctly self.flight_controller.set_param(param_name, value) # Read back the parameter to verify it was set correctly - actual_value = self.get_parameter(param_name) + actual_value = self.flight_controller.fetch_param(param_name) if actual_value is not None and abs(actual_value - value) < 0.001: # Allow small floating-point tolerance logging_info(_("Parameter %(param)s set to %(value).3f"), {"param": param_name, "value": value}) - return True, "" - error_msg = _("Parameter %(param)s verification failed: expected %(expected).3f, got %(actual)s") % { - "param": param_name, - "expected": value, - "actual": actual_value if actual_value is not None else "None", - } - return False, error_msg + if requires_reset: + self.flight_controller.reset_and_reconnect( + reset_progress_callback, connection_progress_callback, extra_sleep_time + ) + return + raise ParameterError( + _("Parameter %(param)s verification failed: expected %(expected).3f, got %(actual)s") + % { + "param": param_name, + "expected": value, + "actual": actual_value if actual_value is not None else "None", + } + ) - except Exception as e: # pylint: disable=broad-exception-caught + except (ValidationError, ParameterError, TimeoutError): + raise + except Exception as e: error_msg = _("Error setting parameter %(param)s: %(error)s") % { "param": param_name, "error": str(e), } logging_error(error_msg) - return False, error_msg + raise ParameterError(error_msg) from e def get_parameter(self, param_name: str) -> Optional[float]: """ @@ -385,44 +492,87 @@ def get_parameter(self, param_name: str) -> Optional[float]: return self.flight_controller.fc_parameters.get(param_name) - def test_motor(self, motor_number: int, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: + def test_motor(self, test_sequence_nr: int, motor_output_nr: int, throttle_percent: int, timeout_seconds: int) -> None: """ Test a specific motor. Args: - motor_number: Motor number (1-based, as used by ArduPilot) + test_sequence_nr: Test sequence number (0-based) + motor_output_nr: Motor output number (1-based, as used by ArduPilot) throttle_percent: Throttle percentage (1-100) timeout_seconds: Test duration in seconds - Returns: - tuple[bool, str]: (success, error_message) - True on success with empty message, - False with error description on failure + Raises: + FlightControllerConnectionError: If flight controller is not connected + MotorTestSafetyError: If motor test conditions are unsafe + ValidationError: If motor number or throttle percentage is invalid + MotorTestExecutionError: If motor test execution fails """ # Safety check - is_safe, safety_reason = self.is_motor_test_safe() - if not is_safe: - return False, safety_reason - - # Validate motor number - if not 1 <= motor_number <= self._motor_count: - error_msg = _("Invalid motor number %(number)d (valid range: 1-%(max)d)") % { - "number": motor_number, - "max": self._motor_count, - } - return False, error_msg + self.is_motor_test_safe() + + # Validate test sequence number + if not 0 <= test_sequence_nr <= self._motor_count - 1: + raise ValidationError( + _("Invalid test sequence number %(number)d (valid range: 0-%(max)d)") + % { + "number": test_sequence_nr, + "max": self._motor_count - 1, + } + ) + + # Validate motor output number + if not 1 <= motor_output_nr <= self._motor_count: + raise ValidationError( + _("Invalid motor output number %(number)d (valid range: 1-%(max)d)") + % { + "number": motor_output_nr, + "max": self._motor_count, + } + ) + + # Validate motor output number + if self.motor_numbers[test_sequence_nr] != motor_output_nr: + raise ValidationError( + _("Invalid motor output number %(number)d (expected: %(expected)d)") + % { + "number": motor_output_nr, + "expected": self.motor_numbers[test_sequence_nr], + } + ) # Validate throttle percentage - if not 1 <= throttle_percent <= 100: - error_msg = _("Invalid throttle percentage %(throttle)d (valid range: 1-100)") % { - "throttle": throttle_percent, - } - return False, error_msg + if not THROTTLE_PCT_MIN <= throttle_percent <= THROTTLE_PCT_MAX: + raise ValidationError( + _("Invalid throttle percentage %(throttle)d (valid range: %(min)d-%(max)d)") + % { + "throttle": throttle_percent, + "min": THROTTLE_PCT_MIN, + "max": THROTTLE_PCT_MAX, + } + ) + + # Validate test duration + if timeout_seconds < DURATION_S_MIN: + raise ValidationError( + _("Invalid test duration %(duration)d (valid range: %(min)d-%(max)d)") + % {"duration": timeout_seconds, "min": DURATION_S_MIN, "max": DURATION_S_MAX} + ) + if timeout_seconds > DURATION_S_MAX: + raise ValidationError( + _("Invalid test duration %(duration)d (valid range: %(min)d-%(max)d)") + % {"duration": timeout_seconds, "min": DURATION_S_MIN, "max": DURATION_S_MAX} + ) # Execute motor test - return self.flight_controller.test_motor(motor_number, throttle_percent, timeout_seconds) + success, message = self.flight_controller.test_motor( + test_sequence_nr, self.motor_labels[test_sequence_nr], motor_output_nr, throttle_percent, timeout_seconds + ) + if not success: + raise MotorTestExecutionError(message) - def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: + def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> None: """ Test all motors simultaneously. @@ -430,20 +580,21 @@ def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> tuple[ throttle_percent: Throttle percentage (1-100) timeout_seconds: Test duration in seconds - Returns: - tuple[bool, str]: (success, error_message) - True on success with empty message, - False with error description on failure + Raises: + FlightControllerConnectionError: If flight controller is not connected + MotorTestSafetyError: If motor test conditions are unsafe + MotorTestExecutionError: If motor test execution fails """ # Safety check - is_safe, safety_reason = self.is_motor_test_safe() - if not is_safe: - return False, safety_reason + self.is_motor_test_safe() # Execute all motors test - return self.flight_controller.test_all_motors(throttle_percent, timeout_seconds) + success, message = self.flight_controller.test_all_motors(self.motor_count, throttle_percent, timeout_seconds) + if not success: + raise MotorTestExecutionError(message) - def test_motors_in_sequence(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: + def test_motors_in_sequence(self, throttle_percent: int, timeout_seconds: int) -> None: """ Test motors in sequence (A, B, C, D, etc.). @@ -451,29 +602,33 @@ def test_motors_in_sequence(self, throttle_percent: int, timeout_seconds: int) - throttle_percent: Throttle percentage (0-100) timeout_seconds: Test duration per motor in seconds - Returns: - tuple[bool, str]: (success, error_message) - True on success with empty message, - False with error description on failure + Raises: + FlightControllerConnectionError: If flight controller is not connected + MotorTestSafetyError: If motor test conditions are unsafe + MotorTestExecutionError: If motor test execution fails """ # Safety check - is_safe, safety_reason = self.is_motor_test_safe() - if not is_safe: - return False, safety_reason + self.is_motor_test_safe() - # Execute sequential test - return self.flight_controller.test_motors_in_sequence(self.get_motor_count(), throttle_percent, timeout_seconds) + # Execute sequential test starting from motor 1 + success, message = self.flight_controller.test_motors_in_sequence( + 1, self.motor_count, throttle_percent, timeout_seconds + ) + if not success: + raise MotorTestExecutionError(message) - def stop_all_motors(self) -> tuple[bool, str]: + def stop_all_motors(self) -> None: """ Emergency stop for all motors. - Returns: - tuple[bool, str]: (success, error_message) - True on success with empty message, - False with error description on failure + Raises: + MotorTestExecutionError: If emergency stop fails """ - return self.flight_controller.stop_all_motors() + success, message = self.flight_controller.stop_all_motors() + if not success: + raise MotorTestExecutionError(message) def get_motor_diagram_path(self) -> tuple[str, str]: """ @@ -496,9 +651,9 @@ def motor_diagram_exists(self) -> bool: """ return ProgramSettings.motor_diagram_exists(self._frame_class, self._frame_type) - def get_test_duration(self) -> float: + def _get_test_duration_s(self) -> float: """ - Get the current motor test duration setting. + Get the current motor test duration setting from disk. Returns: float: Test duration in seconds @@ -509,43 +664,50 @@ def get_test_duration(self) -> float: if duration is None: raise ReferenceError(_("Motor test duration setting not found")) duration = float(duration) - if duration < 0.1: - raise ValueError(_("Motor test duration must be at least 0.1 seconds")) - if duration > 10.0: - raise ValueError(_("Motor test duration must not exceed 10 seconds")) + if duration < DURATION_S_MIN: + raise ValueError(_("Motor test duration must be at least %(min)d second") % {"min": DURATION_S_MIN}) + if duration > DURATION_S_MAX: + raise ValueError(_("Motor test duration must not exceed %(max)d seconds") % {"max": DURATION_S_MAX}) return duration except (ReferenceError, ValueError) as exc: logging_error(_("Invalid motor test duration setting: %(error)s"), {"error": str(exc)}) raise exc - def set_test_duration(self, duration: float) -> bool: + def get_test_duration_s(self) -> float: + """ + Get the current motor test duration setting. + + Returns: + float: Test duration in seconds + + """ + return self._test_duration_s + + def set_test_duration_s(self, duration: float) -> None: """ Set the motor test duration setting. Args: duration: Test duration in seconds - Returns: - bool: True if setting was saved successfully - """ try: - if duration < 0.1: - raise ValueError(_("Motor test duration must be at least 0.1 seconds")) - if duration > 10.0: - raise ValueError(_("Motor test duration must not exceed 10 seconds")) + if duration < DURATION_S_MIN: + raise ValueError(_("Motor test duration must be at least %(min)d second") % {"min": DURATION_S_MIN}) + if duration > DURATION_S_MAX: + raise ValueError(_("Motor test duration must not exceed %(max)d seconds") % {"max": DURATION_S_MAX}) ProgramSettings.set_setting("motor_test/duration", duration) - return True + self._test_duration_s = int(duration) except ValueError as exc: logging_error(_("Invalid motor test duration setting: %(error)s"), {"error": str(exc)}) - return False - except Exception as exc: # pylint: disable=broad-exception-caught + raise exc + except Exception as exc: logging_error(_("Failed to save duration setting: %(error)s"), {"error": str(exc)}) - return False + raise exc - def get_test_throttle_pct(self) -> int: + def _get_test_throttle_pct(self) -> int: """ - Get the current motor test throttle percentage setting. + Get the current motor test throttle percentage setting from disk. Returns: int: Throttle percentage (1-100) @@ -556,41 +718,48 @@ def get_test_throttle_pct(self) -> int: if throttle_pct is None: raise ReferenceError(_("Motor test throttle percentage setting not found")) throttle_pct = int(throttle_pct) - if throttle_pct < 1: - raise ValueError(_("Motor test throttle percentage must be at least 1")) - if throttle_pct > 100: - raise ValueError(_("Motor test throttle percentage must not exceed 100")) + if throttle_pct < THROTTLE_PCT_MIN: + raise ValueError(_("Motor test throttle percentage must be at least %(min)d%%") % {"min": THROTTLE_PCT_MIN}) + if throttle_pct > THROTTLE_PCT_MAX: + raise ValueError(_("Motor test throttle percentage must not exceed %(max)d%%") % {"max": THROTTLE_PCT_MAX}) return throttle_pct except (ReferenceError, ValueError) as exc: logging_error(_("Invalid motor test throttle percentage setting: %(error)s"), {"error": str(exc)}) raise exc - def set_test_throttle_pct(self, throttle_pct: int) -> bool: + def get_test_throttle_pct(self) -> int: + """ + Get the current motor test throttle percentage setting. + + Returns: + int: Throttle percentage (1-100) + + """ + return int(self._test_throttle_pct) + + def set_test_throttle_pct(self, throttle_pct: int) -> None: """ Set the motor test throttle percentage setting. Args: throttle_pct: Throttle percentage (1-100) - Returns: - bool: True if setting was saved successfully - """ try: - if throttle_pct < 1: - raise ValueError(_("Motor test throttle percentage must be at least 1")) - if throttle_pct > 100: - raise ValueError(_("Motor test throttle percentage must not exceed 100")) + if throttle_pct < THROTTLE_PCT_MIN: + raise ValueError(_("Motor test throttle percentage must be at least %(min)d%%") % {"min": THROTTLE_PCT_MIN}) + if throttle_pct > THROTTLE_PCT_MAX: + raise ValueError(_("Motor test throttle percentage must not exceed %(max)d%%") % {"max": THROTTLE_PCT_MAX}) ProgramSettings.set_setting("motor_test/throttle_pct", throttle_pct) - return True + self._test_throttle_pct = throttle_pct except ValueError as exc: logging_error(_("Invalid motor test throttle percentage setting: %(error)s"), {"error": str(exc)}) - return False - except Exception as exc: # pylint: disable=broad-exception-caught + raise exc + except Exception as exc: logging_error(_("Failed to save throttle percentage setting: %(error)s"), {"error": str(exc)}) - return False + raise exc - def update_frame_configuration(self, frame_class: int, frame_type: int) -> tuple[bool, str]: + def update_frame_configuration(self, frame_class: int, frame_type: int) -> None: """ Update the frame configuration and upload to flight controller. @@ -598,25 +767,23 @@ def update_frame_configuration(self, frame_class: int, frame_type: int) -> tuple frame_class: New frame class frame_type: New frame type - Returns: - tuple[bool, str]: (success, error_message) - True on success with empty message, - False with error description on failure + Raises: + FlightControllerConnectionError: If flight controller is not connected + ParameterError: If parameter setting fails + FrameConfigurationError: If frame configuration update fails """ if self.flight_controller.master is None or not self.flight_controller.fc_parameters: - error_msg = _("Flight controller connection required for frame configuration update") - return False, error_msg + raise FlightControllerConnectionError(_("Flight controller connection required for frame configuration update")) try: - # Set FRAME_CLASS parameter - class_success, class_error = self.set_parameter("FRAME_CLASS", float(frame_class)) - if not class_success: - return False, class_error + # Only set FRAME_CLASS parameter if it has changed + if self._frame_class != frame_class: + self.set_parameter("FRAME_CLASS", float(frame_class)) - # Set FRAME_TYPE parameter - type_success, type_error = self.set_parameter("FRAME_TYPE", float(frame_type)) - if not type_success: - return False, type_error + # Only set FRAME_TYPE parameter if it has changed + if self._frame_type != frame_type: + self.set_parameter("FRAME_TYPE", float(frame_type)) # Update internal state self._frame_class = frame_class @@ -641,23 +808,76 @@ def update_frame_configuration(self, frame_class: int, frame_type: int) -> tuple }, ) - return True, "" - - except Exception as e: # pylint: disable=broad-exception-caught + except (FlightControllerConnectionError, ParameterError): + raise + except Exception as e: error_msg = _("Failed to update frame configuration: %(error)s") % {"error": str(e)} logging_error(error_msg) - return False, error_msg + raise FrameConfigurationError(error_msg) from e + + def get_current_frame_class_types(self) -> dict[int, str]: + """ + Get frame types available for the current frame class only. + + Returns: + dict[int, str]: Dictionary of frame type codes to names for current frame class + + """ + if self.flight_controller.master is None or not self.flight_controller.fc_parameters: + logging_warning(_("Flight controller connection required for current frame class")) + return {} + + # Get current frame class from flight controller + current_frame_class = self.flight_controller.fc_parameters.get("FRAME_CLASS") + if current_frame_class is None: + logging_warning(_("FRAME_CLASS parameter not found in flight controller")) + return {} + + frame_class_int = int(current_frame_class) + + # Get all frame options + all_frame_options = self.get_frame_options() + + # Find the class name that corresponds to the current frame class number + # Frame classes are typically 1-indexed in ArduPilot + class_names = list(all_frame_options.keys()) + if 1 <= frame_class_int <= len(class_names): + current_class_name = class_names[frame_class_int - 1] + types_for_class = all_frame_options.get(current_class_name, {}) + logging_debug( + _("Found %(count)d frame types for current frame class %(class)d (%(name)s)"), + { + "count": len(types_for_class), + "class": frame_class_int, + "name": current_class_name, + }, + ) + return types_for_class + + logging_warning( + _("Current frame class %(class)d is outside valid range (1-%(max)d)"), + {"class": frame_class_int, "max": len(class_names)}, + ) + return {} def get_frame_options(self) -> dict[str, dict[int, str]]: # pylint: disable=too-many-branches """ Get all available frame configuration options. Uses motor data loader as the primary source, falling back to doc_dict if necessary. + Results are cached to avoid repeated processing. Returns: dict[str, dict[int, str]]: A dictionary of frame options grouped by class name. """ + # Return cached result if available + if self._cached_frame_options is not None: + logging_debug( + _("Returning cached frame options with %(count)d classes"), {"count": len(self._cached_frame_options)} + ) + return self._cached_frame_options + frame_options: dict[str, dict[int, str]] = {} # Primary source: Use motor data loader - same logic as _update_frame_configuration @@ -734,6 +954,9 @@ def get_frame_options(self) -> dict[str, dict[int, str]]: # pylint: disable=too else: logging_warning(_("No frame options could be loaded from motor data or parameter metadata")) + # Cache the result for future calls + self._cached_frame_options = frame_options + return frame_options def refresh_connection_status(self) -> bool: @@ -755,117 +978,119 @@ def refresh_connection_status(self) -> bool: logging_warning(_("Failed to refresh frame configuration: %(error)s"), {"error": str(e)}) return False - def parse_frame_type_selection(self, selected_text: str) -> tuple[bool, int, int, str]: + def parse_frame_type_selection(self, selected_text: str) -> tuple[int, int]: """ Parse frame type selection text and return frame codes. Args: - selected_text: Selected text in format "Frame Class: Frame Type" + selected_text: Selected text with frame type name (frame class is current FRAME_CLASS parameter) Returns: - tuple[bool, int, int, str]: (success, frame_class_code, frame_type_code, error_message) + tuple[int, int]: (frame_class_code, frame_type_code) + + Raises: + ValidationError: If frame type selection text is invalid or frame configuration not found """ - if ":" not in selected_text: - return False, 0, 0, _("Invalid frame type format: %(text)s") % {"text": selected_text} + if self.flight_controller.master is None or not self.flight_controller.fc_parameters: + raise ValidationError(_("Flight controller connection required for frame type parsing")) + + # Get current frame class from flight controller + current_frame_class = self.flight_controller.fc_parameters.get("FRAME_CLASS") + if current_frame_class is None: + raise ValidationError(_("FRAME_CLASS parameter not found in flight controller")) try: - frame_class_name, frame_type_name = selected_text.split(":", 1) - frame_class_name = frame_class_name.strip() - frame_type_name = frame_type_name.strip() + frame_class_code = int(current_frame_class) + frame_type_name = selected_text.strip() - # Get frame options to find the numeric codes - frame_options = self.get_frame_options() + # Get frame types for current frame class + current_types = self.get_current_frame_class_types() - # Find frame class code - frame_class_code = None + # Find frame type code by name frame_type_code = None - for class_name, types in frame_options.items(): - if class_name.upper() == frame_class_name.upper(): - # Find frame type code within this class - for type_code, type_name in types.items(): - if type_name.upper() == frame_type_name.upper(): - frame_class_code = list(frame_options.keys()).index(class_name) + 1 - frame_type_code = type_code - break + for type_code, type_name in current_types.items(): + if type_name.upper() == frame_type_name.upper(): + frame_type_code = type_code break - if frame_class_code is None or frame_type_code is None: - return False, 0, 0, _("Could not find frame configuration for: %(text)s") % {"text": selected_text} + if frame_type_code is None: + raise ValidationError( + _("Could not find frame type '%(text)s' in current frame class") % {"text": selected_text} + ) - return True, frame_class_code, frame_type_code, "" + return frame_class_code, frame_type_code - except Exception as e: # pylint: disable=broad-exception-caught - return False, 0, 0, _("Error parsing frame type: %(error)s") % {"error": str(e)} + except (ValueError, TypeError) as e: + raise ValidationError(_("Error parsing frame type: %(error)s") % {"error": str(e)}) from e - def update_frame_type_from_selection(self, selected_text: str) -> tuple[bool, str]: + def update_frame_type_from_selection( + self, + selected_text: str, + reset_progress_callback: Union[None, Callable[[int, int], None]] = None, + connection_progress_callback: Union[None, Callable[[int, int], None]] = None, + extra_sleep_time: Optional[int] = None, + ) -> bool: """ Update frame configuration based on user selection. Args: selected_text: Selected text in format "Frame Class: Frame Type" + reset_progress_callback: Callback for resetting progress + connection_progress_callback: Callback for connection progress + extra_sleep_time: Additional sleep time before setting parameters Returns: - tuple[bool, str]: (success, error_message) - - """ - success, frame_class_code, frame_type_code, error_msg = self.parse_frame_type_selection(selected_text) - if not success: - return False, error_msg + bool: True if successful - # Immediately upload parameters to flight controller - success_class, msg_class = self.set_parameter("FRAME_CLASS", frame_class_code) - success_type, msg_type = self.set_parameter("FRAME_TYPE", frame_type_code) - - if not success_class or not success_type: - error_msg = _("Failed to update frame parameters:\n%(msg1)s\n%(msg2)s") % { - "msg1": msg_class, - "msg2": msg_type, - } - return False, error_msg - - logging_info( - _("Updated frame configuration: FRAME_CLASS=%(class)d, FRAME_TYPE=%(type)d"), - {"class": frame_class_code, "type": frame_type_code}, - ) + Raises: + ValidationError: If frame type selection text is invalid + ParameterError: If parameter setting fails + FrameConfigurationError: If frame configuration update fails - # Update internal state and recalculate motor count - self._frame_class = frame_class_code - self._frame_type = frame_type_code - self._motor_count = 0 - if self._motor_data_loader.data and "layouts" in self._motor_data_loader.data: - # Find a layout that matches the current frame class and type - for layout in self._motor_data_loader.data["layouts"]: - if layout.get("Class") == frame_class_code and layout.get("Type") == frame_type_code and "motors" in layout: - self._frame_layout = layout - self._motor_count = len(layout["motors"]) - break + """ + try: + frame_class_code, frame_type_code = self.parse_frame_type_selection(selected_text) - return True, "" + # Immediately upload parameters to flight controller + if self.frame_class != frame_class_code: + self.set_parameter( + "FRAME_CLASS", frame_class_code, reset_progress_callback, connection_progress_callback, extra_sleep_time + ) + if self.frame_type != frame_type_code: + self.set_parameter( + "FRAME_TYPE", frame_type_code, reset_progress_callback, connection_progress_callback, extra_sleep_time + ) - def get_svg_scaling_info( - self, canvas_width: int, canvas_height: int, svg_width: int, svg_height: int - ) -> tuple[float, int]: - """ - Calculate SVG scaling information for diagram display. + logging_info( + _("Updated frame configuration: FRAME_CLASS=%(class)d, FRAME_TYPE=%(type)d"), + {"class": frame_class_code, "type": frame_type_code}, + ) - Args: - canvas_width: Canvas width in pixels - canvas_height: Canvas height in pixels - svg_width: SVG width in pixels - svg_height: SVG height in pixels + # Update internal state and recalculate motor count + self._frame_class = frame_class_code + self._frame_type = frame_type_code + self._motor_count = 0 + if self._motor_data_loader.data and "layouts" in self._motor_data_loader.data: + # Find a layout that matches the current frame class and type + for layout in self._motor_data_loader.data["layouts"]: + if ( + layout.get("Class") == frame_class_code + and layout.get("Type") == frame_type_code + and "motors" in layout + ): + self._frame_layout = layout + self._motor_count = len(layout["motors"]) + break - Returns: - tuple[float, int]: (scale_factor, scaled_height) + return True - """ - if svg_width > 0 and svg_height > 0: - scale_x = canvas_width / svg_width - scale_y = canvas_height / svg_height - scale = min(scale_x, scale_y) * 0.9 # Leave some margin - scaled_height = int(svg_height * scale) - return scale, scaled_height - return 1.0, svg_height + except (ParameterError, ValidationError): + raise + except Exception as e: + error_msg = _("Failed to update frame configuration: %(error)s") % {"error": str(e)} + logging_error(error_msg) + raise FrameConfigurationError(error_msg) from e def get_battery_status_color(self) -> str: """ @@ -903,26 +1128,6 @@ def get_battery_display_text(self) -> tuple[str, str]: return voltage_text, current_text return _("Voltage: N/A"), _("Current: N/A") - def validate_motor_test_parameters(self, throttle_percent: int, timeout_seconds: int) -> tuple[bool, str]: - """ - Validate motor test parameters before execution. - - Args: - throttle_percent: Throttle percentage (0-100) - timeout_seconds: Test duration in seconds - - Returns: - tuple[bool, str]: (is_valid, error_message) - - """ - if not 0 <= throttle_percent <= 100: - return False, _("Throttle percentage must be between 0 and 100") - - if not 0.1 <= timeout_seconds <= 60: - return False, _("Test duration must be between 0.1 and 60 seconds") - - return True, "" - def should_show_first_test_warning(self) -> bool: """ Check if first-time safety warning should be shown. @@ -989,20 +1194,39 @@ def get_current_frame_selection_text(self) -> str: Get current frame configuration as selection text. Returns: - str: Frame selection text in format "Class: Type" + str: Frame selection text in format "Type Name" (only type, since class is fixed) """ - frame_options = self.get_frame_options() + current_types = self.get_current_frame_class_types() - # Find class name - class_names = list(frame_options.keys()) - if self._frame_class <= len(class_names): - class_name = class_names[self._frame_class - 1] + # Find type name for current frame type + return current_types.get(self._frame_type, f"Type {self._frame_type}") - # Find type name - types = frame_options.get(class_name, {}) - type_name = types.get(self._frame_type, f"Type {self._frame_type}") + def get_frame_type_pairs(self) -> list[tuple[str, str]]: + """ + Get frame type options as (code, display_text) tuples for PairTupleCombobox. - return f"{class_name}: {type_name}" + Returns: + list[tuple[str, str]]: List of (type_code, "type_code: type_name") tuples - return f"Class {self._frame_class}: Type {self._frame_type}" + """ + current_types = self.get_current_frame_class_types() + + # Convert to list of tuples with format: (code, "code: name") + pairs = [] + for type_code, type_name in current_types.items(): + code_str = str(type_code) + display_text = f"{type_code}: {type_name}" + pairs.append((code_str, display_text)) + + return pairs + + def get_current_frame_selection_key(self) -> str: + """ + Get current frame type code as string key for PairTupleCombobox selection. + + Returns: + str: Current frame type code as string + + """ + return str(self._frame_type) diff --git a/scripts/batch_convert_motor_diagrams.py b/scripts/batch_convert_motor_diagrams.py index a9ae769c..150159ef 100755 --- a/scripts/batch_convert_motor_diagrams.py +++ b/scripts/batch_convert_motor_diagrams.py @@ -76,28 +76,39 @@ def crop_whitespace(image: Image.Image, margin: int = 5) -> Image.Image: return image.crop((left, top, right, bottom)) -def get_firefox_service() -> Union[FirefoxService, None]: - """Return a FirefoxService using system or fallback geckodriver.""" +def setup_firefox_service() -> tuple[Union[FirefoxService, None], str]: + """Set up Firefox service with geckodriver fallback logic.""" + # Create Firefox driver with fallback for geckodriver try: + # Try to use system geckodriver first geckodriver_path = shutil.which("geckodriver") if geckodriver_path: - return FirefoxService(executable_path=geckodriver_path) - return FirefoxService(GeckoDriverManager().install()) - except (OSError, RuntimeError): + return FirefoxService(executable_path=geckodriver_path), "Success" + # Fallback to webdriver_manager (may hit rate limit) + return FirefoxService(GeckoDriverManager().install()), "Success" + except (OSError, RuntimeError, ValueError) as e: + # If webdriver_manager fails due to rate limit, try common system paths common_paths = [ + "geckodriver.exe", # Windows executable + "geckodriver", # Unix executable "/usr/bin/geckodriver", "/usr/local/bin/geckodriver", "/snap/bin/geckodriver", - "geckodriver", + "C:\\Program Files\\geckodriver\\geckodriver.exe", # Windows common install + "C:\\Windows\\System32\\geckodriver.exe", # Windows system path ] + for path in common_paths: - if os.path.exists(path) or (path == "geckodriver" and shutil.which("geckodriver")): + if os.path.exists(path) or shutil.which(path): try: - return FirefoxService(executable_path=path) - except (OSError, RuntimeError) as exc: + logger.info("Using geckodriver: %s", path) # Debug output + return FirefoxService(executable_path=path), "Success" + except (OSError, RuntimeError, ValueError) as exc: + # Log the exception but continue trying other paths logger.warning("Failed to create FirefoxService with %s: %s", path, exc) continue - return None + + return None, f"Geckodriver not found. Please install geckodriver. Original error: {e!s}" def process_image(png_data: bytes, resize_width: int, resize_height: int) -> Image.Image: @@ -138,20 +149,31 @@ def convert_with_firefox( # pylint: disable=too-many-arguments, too-many-positi firefox_options.add_argument("--no-sandbox") firefox_options.add_argument("--disable-dev-shm-usage") - service = get_firefox_service() - if not service: - return False, "Geckodriver not found. Please install geckodriver." + # Set up Firefox service + service, error_msg = setup_firefox_service() + if service is None: + return False, error_msg driver = webdriver.Firefox(service=service, options=firefox_options) # Set window size to large canvas for high quality rendering driver.set_window_size(canvas_width, canvas_height) - # Convert SVG path to absolute path + # Convert SVG path to absolute path and handle Windows paths svg_abs_path = os.path.abspath(svg_path) + # Convert Windows path to file URL format + if os.name == "nt": # Windows + # Replace backslashes with forward slashes and handle drive letter + svg_abs_path = svg_abs_path.replace("\\", "/") + svg_abs_path = f"file:///{svg_abs_path}" if svg_abs_path[1] == ":" else f"file://{svg_abs_path}" + else: + svg_abs_path = f"file://{svg_abs_path}" + + logger.info("Opening URL: %s", svg_abs_path) # Debug output + # Open SVG file - driver.get(f"file://{svg_abs_path}") + driver.get(svg_abs_path) # Take screenshot png_data = driver.get_screenshot_as_png() @@ -224,10 +246,16 @@ def batch_convert_and_compare( # pylint: disable=too-many-locals, too-many-argu png_path = Path(png_dir) png_path.mkdir(parents=True, exist_ok=True) + logger.info("Looking for SVG files in: %s", svg_path.absolute()) + # Find all motor diagram SVG files svg_files = list(svg_path.glob("m_*.svg")) svg_files.sort() + logger.info( + "Found %d SVG files: %s%s", len(svg_files), [f.name for f in svg_files[:5]], "..." if len(svg_files) > 5 else "" + ) + if not svg_files: logger.warning("No motor diagram SVG files found in %s", svg_dir) return @@ -318,25 +346,33 @@ def batch_convert_and_compare( # pylint: disable=too-many-locals, too-many-argu # Update logging level logger.setLevel(args.loglevel) - # Convert all motor diagrams with new processing pipeline: - # - Customizable canvas size for high quality rendering - # - Crop whitespace around content - # - Remove alpha channel - # - Anti-aliasing resize to user-specified size - # - Save as optimized PNG - # Note: Output PNGs will be resized to (resize_width, resize_height) - # Example usage: - # python batch_convert_motor_diagrams.py \ - # --svg-dir ./images \ - # --png-dir ./pngs \ - # --resize-width 300 \ - # --resize-height 300 \ - # --loglevel DEBUG - batch_convert_and_compare( - args.svg_dir, - args.png_dir, - canvas_width=args.canvas_width, - canvas_height=args.canvas_height, - resize_width=args.resize_width, - resize_height=args.resize_height, - ) + logger.info("Starting motor diagram batch conversion...") + try: + # Convert all motor diagrams with new processing pipeline: + # - Customizable canvas size for high quality rendering + # - Crop whitespace around content + # - Remove alpha channel + # - Anti-aliasing resize to user-specified size + # - Save as optimized PNG + # Note: Output PNGs will be resized to (resize_width, resize_height) + # Example usage: + # python batch_convert_motor_diagrams.py \ + # --svg-dir ./images \ + # --png-dir ./pngs \ + # --resize-width 200 \ + # --resize-height 200 \ + # --loglevel DEBUG + batch_convert_and_compare( + args.svg_dir, + args.png_dir, + canvas_width=args.canvas_width, + canvas_height=args.canvas_height, + resize_width=args.resize_width, + resize_height=args.resize_height, + ) + logger.info("Conversion completed successfully!") + except (OSError, RuntimeError, ValueError) as e: + logger.error("Error during conversion: %s", e) + import traceback + + traceback.print_exc() diff --git a/scripts/download_motor_diagrams.py b/scripts/download_motor_diagrams.py index 3c3b9399..d091b1cb 100755 --- a/scripts/download_motor_diagrams.py +++ b/scripts/download_motor_diagrams.py @@ -11,11 +11,13 @@ SPDX-License-Identifier: GPL-3.0-or-later """ +import logging import urllib.request from pathlib import Path +from urllib.error import URLError from urllib.parse import urlparse -# ruff: noqa: T201 +from batch_convert_motor_diagrams import DEFAULT_RESIZE_HEIGHT, DEFAULT_RESIZE_WIDTH, batch_convert_and_compare # List of all motor diagram SVG files from the ArduPilot documentation at # https://ardupilot.org/copter/docs/connect-escs-and-motors.html @@ -76,7 +78,7 @@ def download_motor_diagrams() -> None: - """Download all motor diagram SVG files.""" + """Download all motor diagram SVG files and convert them to PNG using batch_convert_and_compare.""" base_url = "https://ardupilot.org/copter/_images/" images_dir = Path("ardupilot_methodic_configurator/images") @@ -86,6 +88,9 @@ def download_motor_diagrams() -> None: downloaded = 0 failed = 0 + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logger = logging.getLogger(__name__) + for filename in motor_diagrams: try: url = base_url + filename @@ -94,19 +99,29 @@ def download_motor_diagrams() -> None: # Validate URL scheme for security parsed_url = urlparse(url) if parsed_url.scheme not in ("http", "https"): - print(f"Invalid URL scheme for {filename}: {parsed_url.scheme}") + logger.error("Invalid URL scheme for %s: %s", filename, parsed_url.scheme) failed += 1 continue - print(f"Downloading {filename}...") + logger.info("Downloading %s...", filename) urllib.request.urlretrieve(url, output_path) # noqa: S310 downloaded += 1 - except Exception as e: # pylint: disable=broad-exception-caught - print(f"Failed to download {filename}: {e}") + except (URLError, OSError) as e: + logger.error("Failed to download %s: %s", filename, e) failed += 1 - print(f"\nDownload complete: {downloaded} succeeded, {failed} failed") + logger.info("Download complete: %d succeeded, %d failed", downloaded, failed) + + # Call batch_convert_and_compare to convert all SVGs to PNGs + batch_convert_and_compare( + svg_dir=str(images_dir), + png_dir=str(images_dir / "motor_diagrams_png"), + canvas_width=1200, + canvas_height=1200, + resize_width=DEFAULT_RESIZE_WIDTH, + resize_height=DEFAULT_RESIZE_HEIGHT, + ) if __name__ == "__main__": diff --git a/tests/test_backend_filesystem_program_settings.py b/tests/test_backend_filesystem_program_settings.py index ab981eae..f69ff774 100755 --- a/tests/test_backend_filesystem_program_settings.py +++ b/tests/test_backend_filesystem_program_settings.py @@ -364,6 +364,7 @@ def test_user_can_load_existing_settings_file(self, mock_user_config, sample_pro # Add all missing defaults that would be added by _recursive_merge_defaults expected_result["annotate_docs_into_param_files"] = False # Added by default expected_result["gui_complexity"] = "simple" # Added by default + expected_result["motor_test"] = {"duration": 2.5, "throttle_pct": 10} # Added by default # Update directory_selection with the defaults that would be merged in expected_result["directory_selection"]["new_base_dir"] = os_path.join(mock_user_config["config_dir"], "vehicles") diff --git a/tests/test_backend_flightcontroller.py b/tests/test_backend_flightcontroller.py index 29005325..73cc40fa 100755 --- a/tests/test_backend_flightcontroller.py +++ b/tests/test_backend_flightcontroller.py @@ -226,12 +226,16 @@ def test_user_can_test_individual_motor(self, flight_controller) -> None: AND: The function should return True indicating success """ # Arrange: Set up motor test parameters - motor_number = 1 + test_sequence_nr = 0 # First motor (0-based for test sequence) + motor_letters = "A" + motor_output_nr = 1 # First output (1-based) throttle_percent = 15 timeout_seconds = 3 # Act: Execute motor test - success, error_msg = flight_controller.test_motor(motor_number, throttle_percent, timeout_seconds) + success, error_msg = flight_controller.test_motor( + test_sequence_nr, motor_letters, motor_output_nr, throttle_percent, timeout_seconds + ) # Assert: Motor test command sent correctly assert success is True, f"Motor test should succeed, but got error: {error_msg}" @@ -243,12 +247,12 @@ def test_user_can_test_individual_motor(self, flight_controller) -> None: assert call_args[1] == 1 # target_component assert call_args[2] == mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST # command assert call_args[3] == 0 # confirmation - assert call_args[4] == motor_number # param1: motor number + assert call_args[4] == test_sequence_nr + 1 # param1: motor test number (1-based) assert call_args[5] == mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT # param2: throttle type assert call_args[6] == throttle_percent # param3: throttle value assert call_args[7] == timeout_seconds # param4: timeout - assert call_args[8] == motor_number # param5: motor count - assert call_args[9] == mavutil.mavlink.MOTOR_TEST_ORDER_BOARD # param6: test order + assert call_args[8] == 0 # param5: motor count (0=single motor test) + assert call_args[9] == 0 # param6: test order (0=default/board order) def test_user_can_stop_all_motors_immediately(self, flight_controller) -> None: """ @@ -279,7 +283,7 @@ def test_user_can_stop_all_motors_immediately(self, flight_controller) -> None: assert call_args[6] == 0 # param3: throttle value (0 = stop) assert call_args[7] == 0 # param4: timeout (0 = immediate) assert call_args[8] == 0 # param5: motor count (0 = all) - assert call_args[9] == mavutil.mavlink.MOTOR_TEST_ORDER_BOARD # param6: test order + assert call_args[9] == 0 # param6: test order (0 = default/board order) def test_user_can_test_motors_in_sequence(self, flight_controller) -> None: """ @@ -299,7 +303,11 @@ def test_user_can_test_motors_in_sequence(self, flight_controller) -> None: timeout_seconds = 2 # Act: Execute sequential motor test - success, error_msg = flight_controller.test_motors_in_sequence(4, throttle_percent, timeout_seconds) + start_motor = 1 # Start with first motor + motor_count = 4 # Test 4 motors + success, error_msg = flight_controller.test_motors_in_sequence( + start_motor, motor_count, throttle_percent, timeout_seconds + ) # Assert: Sequential test command sent for all motors assert success is True, f"Sequential motor test should succeed, but got error: {error_msg}" @@ -311,11 +319,11 @@ def test_user_can_test_motors_in_sequence(self, flight_controller) -> None: assert call_args[1] == 1 # target_component assert call_args[2] == mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST # command assert call_args[3] == 0 # confirmation - assert call_args[4] == 4 # param1: motor count (4 for quad) + assert call_args[4] == start_motor # param1: starting motor number (1-based) assert call_args[5] == mavutil.mavlink.MOTOR_TEST_THROTTLE_PERCENT # param2: throttle type assert call_args[6] == throttle_percent # param3: throttle value assert call_args[7] == timeout_seconds # param4: timeout per motor - assert call_args[8] == 4 # param5: motor count + assert call_args[8] == motor_count # param5: number of motors to test in sequence assert call_args[9] == mavutil.mavlink.MOTOR_TEST_ORDER_SEQUENCE # param6: test order (sequence) def test_motor_test_handles_communication_failure(self, flight_controller) -> None: @@ -331,7 +339,7 @@ def test_motor_test_handles_communication_failure(self, flight_controller) -> No flight_controller.master.mav.command_long_send.side_effect = Exception("Connection lost") # Act: Attempt motor test during communication failure - success, error_msg = flight_controller.test_motor(1, 10, 2) + success, error_msg = flight_controller.test_motor(0, "A", 1, 10, 2) # Assert: Function handles error gracefully assert success is False @@ -469,7 +477,13 @@ def test_motor_commands_are_sent_to_flight_controller(self, flight_controller) - # Act & Assert: Test command sending for each case for motor_num, throttle, timeout in test_cases: - success, error_msg = flight_controller.test_motor(motor_num, throttle, timeout) + # Convert motor_num to test parameters + test_sequence_nr = motor_num - 1 # Convert to 0-based index + motor_letters = chr(ord("A") + test_sequence_nr) # A, B, C, etc. + motor_output_nr = motor_num # Keep 1-based for output number + success, error_msg = flight_controller.test_motor( + test_sequence_nr, motor_letters, motor_output_nr, throttle, timeout + ) # Assert: Command should be sent successfully assert success is True, f"Motor test command should be sent successfully for motor {motor_num}, error: {error_msg}" @@ -489,7 +503,7 @@ def test_command_sending_handles_no_connection_gracefully(self, flight_controlle flight_controller.master = None # Act: Attempt motor test without connection - success, error_msg = flight_controller.test_motor(1, 10, 2.0) + success, error_msg = flight_controller.test_motor(0, "A", 1, 10, 2.0) # Assert: Should fail gracefully assert success is False, "Motor test should fail gracefully when no connection is available" diff --git a/tests/test_data_model_motor_test.py b/tests/test_data_model_motor_test.py index 9fc7d35f..8cd84798 100755 --- a/tests/test_data_model_motor_test.py +++ b/tests/test_data_model_motor_test.py @@ -17,7 +17,14 @@ from ardupilot_methodic_configurator.backend_filesystem import LocalFilesystem from ardupilot_methodic_configurator.backend_filesystem_program_settings import ProgramSettings from ardupilot_methodic_configurator.backend_flightcontroller import FlightController -from ardupilot_methodic_configurator.data_model_motor_test import MotorTestDataModel +from ardupilot_methodic_configurator.data_model_motor_test import ( + FlightControllerConnectionError, + FrameConfigurationError, + MotorTestDataModel, + MotorTestSafetyError, + ParameterError, + ValidationError, +) # pylint: disable=too-many-lines,redefined-outer-name,protected-access,too-few-public-methods @@ -51,7 +58,19 @@ def mock_flight_controller() -> MagicMock: fc.test_all_motors.return_value = (True, "") fc.test_motors_in_sequence.return_value = (True, "") fc.stop_all_motors.return_value = (True, "") - fc.set_param.return_value = True + + # Configure set_param to update fc_parameters and return success + def set_param_side_effect(param_name: str, value: float) -> bool: + fc.fc_parameters[param_name] = value + return True + + fc.set_param.side_effect = set_param_side_effect + + # Configure fetch_param to return values from fc_parameters + def fetch_param_side_effect(param_name: str) -> float | None: + return fc.fc_parameters.get(param_name) + + fc.fetch_param.side_effect = fetch_param_side_effect return fc @@ -267,7 +286,7 @@ def test_user_can_initialize_model_with_connected_flight_controller(self, mock_f # Assert: Frame configuration loaded correctly assert model.frame_class == 1 assert model.frame_type == 1 - assert model.get_motor_count() == 4 + assert model.motor_count == 4 mock_flight_controller.get_frame_info.assert_called_once() def test_model_initialization_fails_with_disconnected_flight_controller( @@ -341,7 +360,7 @@ def test_user_can_get_motor_labels_for_current_frame(self, motor_test_model) -> # Arrange: Motor count is configured in fixture (4 motors) # Act: Get motor labels - labels = motor_test_model.get_motor_labels() + labels = motor_test_model.motor_labels # Assert: Correct labels returned assert labels == ["A", "B", "C", "D"] @@ -358,7 +377,7 @@ def test_user_can_get_motor_numbers_for_current_frame(self, motor_test_model) -> # Arrange: Motor count is configured in fixture (4 motors, QUAD X) # Act: Get motor numbers (in test order) - numbers = motor_test_model.get_motor_numbers() + numbers = motor_test_model.motor_numbers # Assert: Correct numbers returned in test order for QUAD X # Based on mock_motor_data_json: QUAD X has TestOrder [1,2,3,4] for Motors [1,2,3,4] @@ -376,14 +395,12 @@ def test_user_can_update_frame_configuration_successfully(self, motor_test_model # Arrange: Configure successful parameter setting with patch.object(motor_test_model, "set_parameter", return_value=(True, "")): # Act: Update frame configuration - success, error_msg = motor_test_model.update_frame_configuration(2, 1) # Hexa X + motor_test_model.update_frame_configuration(2, 1) # Hexa X # Assert: Update successful - assert success is True - assert error_msg == "" assert motor_test_model.frame_class == 2 assert motor_test_model.frame_type == 1 - assert motor_test_model.get_motor_count() == 6 # HEXA X has 6 motors + assert motor_test_model.motor_count == 6 # HEXA X has 6 motors def test_user_can_update_frame_configuration_and_motor_count(self, motor_test_model) -> None: """ @@ -396,14 +413,12 @@ def test_user_can_update_frame_configuration_and_motor_count(self, motor_test_mo # Arrange: Mock set_parameter to return success with patch.object(motor_test_model, "set_parameter", return_value=(True, "")): # Act: Update frame configuration - success, error = motor_test_model.update_frame_configuration(frame_class=2, frame_type=1) + motor_test_model.update_frame_configuration(frame_class=2, frame_type=1) # Assert: Configuration updated successfully - assert success is True - assert error == "" assert motor_test_model.frame_class == 2 assert motor_test_model.frame_type == 1 - assert motor_test_model.get_motor_count() == 6 # HEXA X has 6 motors + assert motor_test_model.motor_count == 6 # HEXA X has 6 motors def test_frame_configuration_update_fails_with_disconnected_controller( self, disconnected_flight_controller, mock_filesystem @@ -427,7 +442,7 @@ def test_frame_configuration_update_fails_with_disconnected_controller2( GIVEN: A motor test model with disconnected flight controller WHEN: User attempts to update frame configuration - THEN: Should return failure with clear error message + THEN: Should raise FlightControllerConnectionError with clear error message """ # Arrange: Create model with disconnected FC (will fail initialization) # We'll test this by mocking the model's flight controller directly @@ -435,12 +450,9 @@ def test_frame_configuration_update_fails_with_disconnected_controller2( model = MotorTestDataModel(flight_controller=disconnected_flight_controller, filesystem=mock_filesystem) model.flight_controller = disconnected_flight_controller - # Act: Attempt frame configuration update - success, error = model.update_frame_configuration(frame_class=2, frame_type=1) - - # Assert: Update failed with appropriate error - assert success is False - assert "Flight controller connection required" in error + # Act & Assert: Attempt frame configuration update should raise exception + with pytest.raises(FlightControllerConnectionError, match="Flight controller connection required"): + model.update_frame_configuration(frame_class=2, frame_type=1) class TestMotorTestDataModelBatteryMonitoring: @@ -575,16 +587,14 @@ def test_motor_testing_is_safe_with_good_conditions(self, motor_test_model) -> N GIVEN: Motor test model with connected flight controller and good battery WHEN: User checks if motor testing is safe - THEN: Should return True with no safety warnings + THEN: Should not raise any exceptions (test is safe) """ # Arrange: Good conditions configured in fixture - # Act: Check motor testing safety - is_safe, reason = motor_test_model.is_motor_test_safe() + # Act: Check motor testing safety - should not raise exceptions + motor_test_model.is_motor_test_safe() - # Assert: Motor testing is safe - assert is_safe is True - assert reason == "" + # Assert: If we reach here, the test is safe (no exceptions raised) def test_motor_testing_unsafe_with_disconnected_controller(self, motor_test_model) -> None: """ @@ -592,17 +602,14 @@ def test_motor_testing_unsafe_with_disconnected_controller(self, motor_test_mode GIVEN: Motor test model with disconnected flight controller WHEN: User checks if motor testing is safe - THEN: Should return False with appropriate error message + THEN: Should raise FlightControllerConnectionError with appropriate error message """ # Arrange: Disconnect flight controller motor_test_model.flight_controller.master = None - # Act: Check motor testing safety - is_safe, reason = motor_test_model.is_motor_test_safe() - - # Assert: Motor testing is unsafe - assert is_safe is False - assert "not connected" in reason.lower() + # Act & Assert: Check motor testing safety should raise exception + with pytest.raises(FlightControllerConnectionError, match="not connected"): + motor_test_model.is_motor_test_safe() def test_motor_testing_unsafe_with_no_battery_monitoring(self, motor_test_model) -> None: """ @@ -610,17 +617,14 @@ def test_motor_testing_unsafe_with_no_battery_monitoring(self, motor_test_model) GIVEN: A disconnected flight controller WHEN: User checks if motor testing is safe - THEN: Should return False with connection error message + THEN: Should raise FlightControllerConnectionError with connection error message """ # Arrange: Disconnect flight controller motor_test_model.flight_controller.master = None - # Act: Check motor test safety - is_safe, reason = motor_test_model.is_motor_test_safe() - - # Assert: Motor testing is unsafe - assert is_safe is False - assert "Flight controller not connected" in reason + # Act & Assert: Check motor test safety should raise exception + with pytest.raises(FlightControllerConnectionError, match="Flight controller not connected"): + motor_test_model.is_motor_test_safe() def test_motor_testing_safe_with_battery_monitoring_disabled(self, motor_test_model) -> None: """ @@ -628,17 +632,15 @@ def test_motor_testing_safe_with_battery_monitoring_disabled(self, motor_test_mo GIVEN: A flight controller with battery monitoring disabled WHEN: User checks if motor testing is safe - THEN: Should return True with a warning about disabled monitoring + THEN: Should not raise exceptions but log warning about disabled monitoring """ # Arrange: Disable battery monitoring motor_test_model.flight_controller.is_battery_monitoring_enabled.return_value = False - # Act: Check motor test safety - is_safe, reason = motor_test_model.is_motor_test_safe() + # Act: Check motor test safety - should not raise exceptions + motor_test_model.is_motor_test_safe() - # Assert: Motor testing is safe even without battery monitoring - assert is_safe is True - assert "Battery monitoring disabled" in reason + # Assert: If we reach here, test is safe despite disabled monitoring def test_motor_testing_unsafe_with_low_voltage(self, motor_test_model) -> None: """ @@ -646,18 +648,15 @@ def test_motor_testing_unsafe_with_low_voltage(self, motor_test_model) -> None: GIVEN: A battery with voltage below the minimum threshold WHEN: User checks if motor testing is safe - THEN: Should return False with low voltage warning + THEN: Should raise MotorTestSafetyError with low voltage warning """ # Arrange: Set voltage below minimum threshold motor_test_model.flight_controller.get_battery_status.return_value = ((10.5, 2.1), "") - min_voltage, max_voltage = motor_test_model.get_voltage_thresholds() + _min_voltage, _max_voltage = motor_test_model.get_voltage_thresholds() - # Act: Check motor test safety - is_safe, reason = motor_test_model.is_motor_test_safe() - - # Assert: Motor testing is unsafe due to low voltage - assert is_safe is False - assert f"Battery voltage 10.5V is outside safe range ({min_voltage}V - {max_voltage}V)" in reason + # Act & Assert: Check motor test safety should raise exception + with pytest.raises(MotorTestSafetyError, match=r"Battery voltage 10.5V is outside safe range"): + motor_test_model.is_motor_test_safe() def test_motor_testing_unsafe_with_high_voltage(self, motor_test_model) -> None: """ @@ -665,18 +664,15 @@ def test_motor_testing_unsafe_with_high_voltage(self, motor_test_model) -> None: GIVEN: A battery with voltage above the maximum threshold WHEN: User checks if motor testing is safe - THEN: Should return False with high voltage warning + THEN: Should raise MotorTestSafetyError with high voltage warning """ # Arrange: Set voltage above maximum threshold motor_test_model.flight_controller.get_battery_status.return_value = ((17.0, 2.1), "") - min_voltage, max_voltage = motor_test_model.get_voltage_thresholds() + _min_voltage, _max_voltage = motor_test_model.get_voltage_thresholds() - # Act: Check motor test safety - is_safe, reason = motor_test_model.is_motor_test_safe() - - # Assert: Motor testing is unsafe due to high voltage - assert is_safe is False - assert f"Battery voltage 17.0V is outside safe range ({min_voltage}V - {max_voltage}V)" in reason + # Act & Assert: Check motor test safety should raise exception + with pytest.raises(MotorTestSafetyError, match=r"Battery voltage 17.0V is outside safe range"): + motor_test_model.is_motor_test_safe() class TestMotorTestDataModelParameterManagement: @@ -688,17 +684,15 @@ def test_user_can_set_motor_parameter_successfully(self, motor_test_model) -> No GIVEN: A connected flight controller and valid parameter WHEN: User sets a motor parameter - THEN: Parameter should be set and verified successfully + THEN: Parameter should be set successfully without raising exceptions """ # Arrange: Configure parameter reading motor_test_model.flight_controller.fc_parameters["MOT_SPIN_ARM"] = 0.12 - # Act: Set parameter - success, error = motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) + # Act: Set parameter - should not raise exceptions + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) - # Assert: Parameter set successfully - assert success is True - assert error == "" + # Assert: If we reach here, parameter was set successfully motor_test_model.flight_controller.set_param.assert_called_once_with("MOT_SPIN_ARM", 0.12) def test_parameter_setting_fails_with_disconnected_controller(self, motor_test_model) -> None: @@ -707,17 +701,14 @@ def test_parameter_setting_fails_with_disconnected_controller(self, motor_test_m GIVEN: A disconnected flight controller WHEN: User attempts to set a parameter - THEN: Should return failure with connection error + THEN: Should raise FlightControllerConnectionError with connection error """ # Arrange: Disconnect flight controller motor_test_model.flight_controller.master = None - # Act: Attempt to set parameter - success, error = motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) - - # Assert: Parameter setting failed - assert success is False - assert "No flight controller connection available" in error + # Act & Assert: Attempt to set parameter should raise exception + with pytest.raises(FlightControllerConnectionError, match="No flight controller connection available"): + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) def test_parameter_validation_rejects_out_of_bounds_values(self, motor_test_model) -> None: """ @@ -725,16 +716,13 @@ def test_parameter_validation_rejects_out_of_bounds_values(self, motor_test_mode GIVEN: A motor parameter with defined bounds WHEN: User attempts to set value outside bounds - THEN: Should return failure with validation error + THEN: Should raise ValidationError with validation error """ # Arrange: Parameter bounds defined in code (0.0 - 1.0) - # Act: Attempt to set out-of-bounds value - success, error = motor_test_model.set_parameter("MOT_SPIN_ARM", 1.5) - - # Assert: Parameter validation failed - assert success is False - assert "outside valid range (0.0 - 1.0)" in error + # Act & Assert: Attempt to set out-of-bounds value should raise exception + with pytest.raises(ValidationError, match=r"outside valid range \(0.0 - 1.0\)"): + motor_test_model.set_parameter("MOT_SPIN_ARM", 1.5) def test_parameter_verification_detects_setting_failure(self, motor_test_model) -> None: """ @@ -742,18 +730,15 @@ def test_parameter_verification_detects_setting_failure(self, motor_test_model) GIVEN: A parameter that appears to set but reads back differently WHEN: User sets the parameter - THEN: Should return failure with verification error + THEN: Should raise ParameterError with verification error """ - # Arrange: Simulate verification failure - motor_test_model.flight_controller.fc_parameters["MOT_SPIN_ARM"] = 0.10 # Different from set value - - # Act: Set parameter with different result - success, error = motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) + # Arrange: Override fetch_param to return different value for verification failure + motor_test_model.flight_controller.fetch_param.side_effect = None # Clear side_effect + motor_test_model.flight_controller.fetch_param.return_value = 0.10 # Different from set value - # Assert: Verification failed - assert success is False - assert "verification failed" in error - assert "expected 0.120, got 0.1" in error + # Act & Assert: Set parameter with different result should raise exception + with pytest.raises(ParameterError, match="verification failed"): + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) def test_user_can_get_parameter_value(self, motor_test_model) -> None: """ @@ -799,17 +784,15 @@ def test_user_can_test_individual_motor_successfully(self, motor_test_model) -> GIVEN: Motor test model with safe conditions WHEN: User tests a specific motor with valid parameters - THEN: Motor test should be executed successfully + THEN: Motor test should be executed successfully without raising exceptions """ # Arrange: Safe conditions configured in fixture - # Act: Test motor 1 at 10% throttle for 2 seconds - success, error_msg = motor_test_model.test_motor(1, 10, 2) + # Act: Test motor 1 (test_sequence_nr=0, motor_output_nr=1) at 10% throttle for 2 seconds + motor_test_model.test_motor(0, 1, 10, 2) - # Assert: Motor test successful - assert success is True - assert error_msg == "" - motor_test_model.flight_controller.test_motor.assert_called_once_with(1, 10, 2) + # Assert: Motor test successful (verified that flight controller was called with correct parameters) + motor_test_model.flight_controller.test_motor.assert_called_once_with(0, "A", 1, 10, 2) def test_motor_test_fails_with_invalid_motor_number(self, motor_test_model) -> None: """ @@ -817,18 +800,13 @@ def test_motor_test_fails_with_invalid_motor_number(self, motor_test_model) -> N GIVEN: Motor test model with 4-motor configuration WHEN: User attempts to test motor 5 (out of range) - THEN: Should return failure with descriptive error message + THEN: Should raise ValidationError with descriptive error message """ # Arrange: 4-motor configuration in fixture - # Act: Attempt to test invalid motor number - success, error_msg = motor_test_model.test_motor(5, 10, 2) - - # Assert: Test fails with appropriate error - assert success is False - assert "Invalid motor number" in error_msg - assert "5" in error_msg - assert "1-4" in error_msg + # Act & Assert: Attempt to test invalid test sequence number (4 is out of range for 4-motor config) + with pytest.raises(ValidationError, match="Invalid test sequence number"): + motor_test_model.test_motor(4, 5, 10, 2) def test_motor_test_fails_with_invalid_motor_number2(self, motor_test_model) -> None: """ @@ -836,16 +814,13 @@ def test_motor_test_fails_with_invalid_motor_number2(self, motor_test_model) -> GIVEN: A 4-motor vehicle configuration WHEN: User attempts to test motor number outside valid range - THEN: Should return failure with validation error + THEN: Should raise ValidationError with motor number validation message """ # Arrange: 4-motor configuration in fixture - # Act: Test invalid motor number - success, error = motor_test_model.test_motor(motor_number=5, throttle_percent=10, timeout_seconds=2) - - # Assert: Motor test failed - assert success is False - assert "Invalid motor number 5 (valid range: 1-4)" in error + # Act & Assert: Test invalid motor output number should raise ValidationError + with pytest.raises(ValidationError, match=r"Invalid motor output number 5"): + motor_test_model.test_motor(0, 5, 10, 2) # test_sequence_nr=0, motor_output_nr=5 (invalid) def test_motor_test_fails_with_invalid_throttle_percentage(self, motor_test_model) -> None: """ @@ -853,16 +828,13 @@ def test_motor_test_fails_with_invalid_throttle_percentage(self, motor_test_mode GIVEN: Safe conditions for motor testing WHEN: User attempts test with throttle outside valid range - THEN: Should return failure with validation error + THEN: Should raise ValidationError with throttle validation message """ # Arrange: Safe conditions configured in fixture - # Act: Test with invalid throttle - success, error = motor_test_model.test_motor(motor_number=1, throttle_percent=150, timeout_seconds=2) - - # Assert: Motor test failed - assert success is False - assert "Invalid throttle percentage 150 (valid range: 1-100)" in error + # Act & Assert: Test with invalid throttle should raise ValidationError + with pytest.raises(ValidationError, match=r"Invalid throttle percentage 150 \(valid range: 1-100\)"): + motor_test_model.test_motor(0, 1, 150, 2) def test_motor_test_fails_under_unsafe_conditions(self, motor_test_model) -> None: """ @@ -870,17 +842,14 @@ def test_motor_test_fails_under_unsafe_conditions(self, motor_test_model) -> Non GIVEN: Unsafe battery voltage conditions WHEN: User attempts motor test - THEN: Should return failure with safety + THEN: Should raise MotorTestSafetyError with safety message """ # Arrange: Set unsafe battery voltage motor_test_model.flight_controller.get_battery_status.return_value = ((10.0, 2.1), "") - # Act: Attempt motor test - success, error = motor_test_model.test_motor(motor_number=1, throttle_percent=10, timeout_seconds=2) - - # Assert: Motor test failed due to safety - assert success is False - assert "Battery voltage 10.0V is outside safe range" in error + # Act & Assert: Attempt motor test should raise MotorTestSafetyError + with pytest.raises(MotorTestSafetyError, match=r"Battery voltage 10\.0V is outside safe range"): + motor_test_model.test_motor(0, 1, 10, 2) def test_user_can_test_all_motors_simultaneously(self, motor_test_model) -> None: """ @@ -888,17 +857,15 @@ def test_user_can_test_all_motors_simultaneously(self, motor_test_model) -> None GIVEN: Safe conditions for motor testing WHEN: User tests all motors - THEN: All motors test should execute successfully + THEN: All motors test should execute successfully without exception """ # Arrange: Safe conditions configured in fixture - # Act: Test all motors - success, error = motor_test_model.test_all_motors(throttle_percent=10, timeout_seconds=2) + # Act: Test all motors (should not raise exception) + motor_test_model.test_all_motors(throttle_percent=10, timeout_seconds=2) - # Assert: All motors test successful - assert success is True - assert error == "" - motor_test_model.flight_controller.test_all_motors.assert_called_once_with(10, 2) + # Assert: Verify method was called correctly + motor_test_model.flight_controller.test_all_motors.assert_called_once_with(4, 10, 2) def test_user_can_test_motors_in_sequence(self, motor_test_model) -> None: """ @@ -906,17 +873,15 @@ def test_user_can_test_motors_in_sequence(self, motor_test_model) -> None: GIVEN: Safe conditions for motor testing WHEN: User tests motors in sequence - THEN: Sequential test should execute successfully + THEN: Sequential test should execute successfully without exception """ # Arrange: Safe conditions configured in fixture - # Act: Test motors in sequence - success, error = motor_test_model.test_motors_in_sequence(throttle_percent=10, timeout_seconds=2) + # Act: Test motors in sequence (should not raise exception) + motor_test_model.test_motors_in_sequence(throttle_percent=10, timeout_seconds=2) - # Assert: Sequential test successful - assert success is True - assert error == "" - motor_test_model.flight_controller.test_motors_in_sequence.assert_called_once_with(4, 10, 2) + # Assert: Verify method was called correctly + motor_test_model.flight_controller.test_motors_in_sequence.assert_called_once_with(1, 4, 10, 2) def test_user_can_stop_all_motors_emergency(self, motor_test_model) -> None: """ @@ -924,16 +889,14 @@ def test_user_can_stop_all_motors_emergency(self, motor_test_model) -> None: GIVEN: Motors potentially running WHEN: User triggers emergency stop - THEN: All motors should stop immediately + THEN: All motors should stop immediately without exception """ # Arrange: No special setup needed - # Act: Emergency stop - success, error = motor_test_model.stop_all_motors() + # Act: Emergency stop (should not raise exception) + motor_test_model.stop_all_motors() - # Assert: Emergency stop successful - assert success is True - assert error == "" + # Assert: Verify method was called correctly motor_test_model.flight_controller.stop_all_motors.assert_called_once() @@ -1053,17 +1016,14 @@ def test_model_handles_flight_controller_communication_errors_gracefully(self, m GIVEN: A flight controller that raises communication errors WHEN: User performs operations that trigger communication - THEN: Should handle errors gracefully with appropriate messages + THEN: Should raise ParameterError with appropriate message """ # Arrange: Configure communication error motor_test_model.flight_controller.set_param.side_effect = Exception("Communication timeout") - # Act: Attempt parameter setting - success, error = motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) - - # Assert: Error handled gracefully - assert success is False - assert "Communication timeout" in error + # Act & Assert: Attempt parameter setting should raise ParameterError + with pytest.raises(ParameterError, match=r"Communication timeout"): + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) def test_model_handles_missing_parameters_gracefully(self, motor_test_model) -> None: """ @@ -1113,10 +1073,10 @@ def test_frame_configuration_update_exception_handling(self, motor_test_model) - WHEN: An exception occurs during frame configuration update THEN: The exception should be caught and re-raised with proper logging """ + # Arrange: Mock an exception during frame configuration + # Act & Assert: Exception should be caught and re-raised with ( - # Arrange: Mock an exception during frame configuration patch.object(MotorTestDataModel, "_update_frame_configuration", side_effect=Exception("Test exception")), - # Act & Assert: Exception should be caught and re-raised pytest.raises(Exception, match="Test exception"), ): MotorTestDataModel( @@ -1221,16 +1181,15 @@ def test_motor_test_with_unsafe_conditions(self, motor_test_model) -> None: GIVEN: A motor test model with unsafe conditions WHEN: Attempting to test all motors - THEN: Should fail with safety reason + THEN: Should raise MotorTestSafetyError with safety reason """ - # Arrange: Make conditions unsafe - with patch.object(motor_test_model, "is_motor_test_safe", return_value=(False, "Unsafe voltage")): - # Act: Attempt motor test - success, reason = motor_test_model.test_all_motors(50, 3) - - # Assert: Test fails due to safety - assert success is False - assert reason == "Unsafe voltage" + # Arrange: Make conditions unsafe by mocking is_motor_test_safe to raise exception + with ( + patch.object(motor_test_model, "is_motor_test_safe", side_effect=MotorTestSafetyError("Unsafe voltage")), + pytest.raises(MotorTestSafetyError, match=r"Unsafe voltage"), + ): + # Act: Attempt motor test should raise MotorTestSafetyError + motor_test_model.test_all_motors(50, 3) def test_sequential_motor_test_with_unsafe_conditions(self, motor_test_model) -> None: """ @@ -1238,16 +1197,15 @@ def test_sequential_motor_test_with_unsafe_conditions(self, motor_test_model) -> GIVEN: A motor test model with unsafe conditions WHEN: Attempting to test motors in sequence - THEN: Should fail with safety reason + THEN: Should raise MotorTestSafetyError with safety reason """ - # Arrange: Make conditions unsafe - with patch.object(motor_test_model, "is_motor_test_safe", return_value=(False, "Battery too low")): - # Act: Attempt sequential motor test - success, reason = motor_test_model.test_motors_in_sequence(throttle_percent=30, timeout_seconds=2) - - # Assert: Test fails due to safety - assert success is False - assert reason == "Battery too low" + # Arrange: Make conditions unsafe by mocking is_motor_test_safe to raise exception + with ( + patch.object(motor_test_model, "is_motor_test_safe", side_effect=MotorTestSafetyError("Battery too low")), + pytest.raises(MotorTestSafetyError, match=r"Battery too low"), + ): + # Act: Attempt sequential motor test should raise MotorTestSafetyError + motor_test_model.test_motors_in_sequence(throttle_percent=30, timeout_seconds=2) def test_set_test_duration_exception_handling(self, motor_test_model) -> None: """ @@ -1258,15 +1216,15 @@ def test_set_test_duration_exception_handling(self, motor_test_model) -> None: THEN: Should return False and log error """ # Arrange: Mock exception in ProgramSettings.set_setting - with patch( - "ardupilot_methodic_configurator.backend_filesystem_program_settings.ProgramSettings.set_setting", - side_effect=Exception("Save error"), + # Act & Assert: Attempt to set duration should raise exception + with ( + patch( + "ardupilot_methodic_configurator.backend_filesystem_program_settings.ProgramSettings.set_setting", + side_effect=Exception("Save error"), + ), + pytest.raises(Exception, match="Save error"), ): - # Act: Attempt to set duration - result = motor_test_model.set_test_duration(5) - - # Assert: Returns False on exception - assert result is False + motor_test_model.set_test_duration_s(5) def test_set_test_throttle_exception_handling(self, motor_test_model) -> None: """ @@ -1277,15 +1235,15 @@ def test_set_test_throttle_exception_handling(self, motor_test_model) -> None: THEN: Should return False and log error """ # Arrange: Mock exception in ProgramSettings.set_setting - with patch( - "ardupilot_methodic_configurator.backend_filesystem_program_settings.ProgramSettings.set_setting", - side_effect=Exception("Save error"), + # Act & Assert: Attempt to set throttle should raise exception + with ( + patch( + "ardupilot_methodic_configurator.backend_filesystem_program_settings.ProgramSettings.set_setting", + side_effect=Exception("Save error"), + ), + pytest.raises(Exception, match="Save error"), ): - # Act: Attempt to set throttle - result = motor_test_model.set_test_throttle_pct(75) - - # Assert: Returns False on exception - assert result is False + motor_test_model.set_test_throttle_pct(75) def test_update_frame_configuration_class_parameter_failure(self, motor_test_model) -> None: """ @@ -1293,21 +1251,18 @@ def test_update_frame_configuration_class_parameter_failure(self, motor_test_mod GIVEN: A motor test model WHEN: Setting FRAME_CLASS parameter fails - THEN: Should return failure with appropriate error message + THEN: Should raise ParameterError with appropriate error message """ # Arrange: Mock parameter setting to fail for FRAME_CLASS with patch.object(motor_test_model, "set_parameter") as mock_set_param: mock_set_param.side_effect = [ - (False, "Failed to set FRAME_CLASS"), # First call fails - (True, ""), # Second call would succeed but shouldn't be reached + ParameterError("Failed to set FRAME_CLASS"), # First call fails + None, # Second call would succeed but shouldn't be reached ] - # Act: Attempt frame configuration update - success, error = motor_test_model.update_frame_configuration(2, 0) - - # Assert: Returns failure for FRAME_CLASS error - assert success is False - assert error == "Failed to set FRAME_CLASS" + # Act & Assert: Attempt frame configuration update should raise ParameterError + with pytest.raises(ParameterError, match=r"Failed to set FRAME_CLASS"): + motor_test_model.update_frame_configuration(2, 0) def test_update_frame_configuration_type_parameter_failure(self, motor_test_model) -> None: """ @@ -1315,21 +1270,18 @@ def test_update_frame_configuration_type_parameter_failure(self, motor_test_mode GIVEN: A motor test model WHEN: Setting FRAME_TYPE parameter fails - THEN: Should return failure with appropriate error message + THEN: Should raise ParameterError with appropriate error message """ # Arrange: Mock parameter setting to fail for FRAME_TYPE with patch.object(motor_test_model, "set_parameter") as mock_set_param: mock_set_param.side_effect = [ - (True, ""), # FRAME_CLASS succeeds - (False, "Failed to set FRAME_TYPE"), # FRAME_TYPE fails + None, # FRAME_CLASS succeeds (void) + ParameterError("Failed to set FRAME_TYPE"), # FRAME_TYPE fails ] - # Act: Attempt frame configuration update - success, error = motor_test_model.update_frame_configuration(2, 0) - - # Assert: Returns failure for FRAME_TYPE error - assert success is False - assert error == "Failed to set FRAME_TYPE" + # Act & Assert: Attempt frame configuration update should raise ParameterError + with pytest.raises(ParameterError, match=r"Failed to set FRAME_TYPE"): + motor_test_model.update_frame_configuration(2, 0) def test_update_frame_configuration_exception_during_update(self, motor_test_model) -> None: """ @@ -1337,18 +1289,17 @@ def test_update_frame_configuration_exception_during_update(self, motor_test_mod GIVEN: A motor test model WHEN: An exception occurs during the update process - THEN: Should return failure with error message + THEN: Should raise FrameConfigurationError with error message """ # Arrange: Mock an exception during the update process with patch.object(motor_test_model, "set_parameter") as mock_set_param: mock_set_param.side_effect = Exception("Internal error during parameter setting") - # Act: Attempt frame configuration update - success, error = motor_test_model.update_frame_configuration(2, 0) - - # Assert: Returns failure with exception message - assert success is False - assert "Failed to update frame configuration: Internal error during parameter setting" in error + # Act & Assert: Attempt frame configuration update should raise FrameConfigurationError + with pytest.raises( + FrameConfigurationError, match=r"Failed to update frame configuration: Internal error during parameter setting" + ): + motor_test_model.update_frame_configuration(2, 0) def test_refresh_connection_status_exception_handling(self, motor_test_model) -> None: """ @@ -1382,7 +1333,7 @@ def test_user_can_get_motor_directions_from_valid_json_data(self, motor_test_mod model = motor_test_model_with_json_data # Act: Get motor directions - directions = model.get_motor_directions() + directions = model.motor_directions # Assert: Correct directions returned from JSON data expected_directions = ["CCW", "CW", "CW", "CCW"] # From mock JSON for QUAD X @@ -1399,20 +1350,14 @@ def test_user_can_get_motor_directions_for_quad_plus_frame(self, motor_test_mode """ # Arrange: Update model to QUAD PLUS frame (class=1, type=0) model = motor_test_model_with_json_data - model._frame_class = 1 - model._frame_type = 0 - model._motor_count = 4 - # Update the frame layout to match QUAD PLUS - for layout in model._motor_data_loader.data["layouts"]: - if layout["Class"] == 1 and layout["Type"] == 0: - model._frame_layout = layout - break + model._configure_frame_layout(frame_class=1, frame_type=0) # Act: Get motor directions - directions = model.get_motor_directions() + directions = model.motor_directions # Assert: Correct PLUS frame directions in test order - expected_directions = ["CW", "CCW", "CW", "CCW"] # Test order 1,2,3,4 from mock JSON for QUAD PLUS + # TestOrder 1: Motor 3 (CW), TestOrder 2: Motor 1 (CCW), TestOrder 3: Motor 4 (CW), TestOrder 4: Motor 2 (CCW) + expected_directions = ["CW", "CCW", "CW", "CCW"] # Ordered by TestOrder from mock JSON for QUAD PLUS assert directions == expected_directions assert len(directions) == 4 @@ -1426,80 +1371,80 @@ def test_user_can_get_motor_directions_for_hexa_frame(self, motor_test_model_wit """ # Arrange: Update model to HEXA frame (class=2, type=1) model = motor_test_model_with_json_data - model._frame_class = 2 - model._frame_type = 1 - model._motor_count = 6 - # Update the frame layout to match HEXA X - for layout in model._motor_data_loader.data["layouts"]: - if layout["Class"] == 2 and layout["Type"] == 1: - model._frame_layout = layout - break + model._configure_frame_layout(frame_class=2, frame_type=1) # Act: Get motor directions - directions = model.get_motor_directions() + directions = model.motor_directions # Assert: Correct HEXA frame directions + # From JSON: TestOrder 1-6 → Motors 1-6 → CW,CCW,CW,CCW,CW,CCW expected_directions = ["CW", "CCW", "CW", "CCW", "CW", "CCW"] # From mock JSON for HEXA X assert directions == expected_directions assert len(directions) == 6 - def test_user_gets_fallback_when_frame_not_found_in_json(self, motor_test_model_with_json_data) -> None: + def test_user_gets_empty_directions_when_frame_not_found_in_json(self, motor_test_model_with_json_data) -> None: """ - User gets appropriate error when frame configuration not found in JSON. + User gets empty motor directions when frame configuration not found in JSON. GIVEN: A motor test model with JSON data WHEN: User requests directions for unsupported frame configuration - THEN: Should raise ValueError with appropriate error message + THEN: Should return empty motor directions list """ # Arrange: Configure unsupported frame (class=99, type=99) model = motor_test_model_with_json_data model._frame_class = 99 model._frame_type = 99 - model._motor_count = 4 - model._frame_layout = {} # No frame layout found + model._motor_count = 0 + model._motor_directions = [] # No frame layout found + + # Act: Get motor directions + directions = model.motor_directions - # Act & Assert: Should raise ValueError - with pytest.raises(ValueError, match="No Frame layout found"): - model.get_motor_directions() + # Assert: Should return empty list + assert directions == [] - def test_user_gets_fallback_when_json_data_empty(self, motor_test_model_with_empty_json_data) -> None: + def test_user_gets_empty_directions_when_json_data_empty(self, motor_test_model_with_empty_json_data) -> None: """ - User gets appropriate error when JSON data is empty or invalid. + User gets empty motor directions when JSON data is empty or invalid. GIVEN: A motor test model with empty or failed JSON data loading WHEN: User requests motor directions - THEN: Should raise ValueError with appropriate error message + THEN: Should return empty motor directions list """ # Arrange: Model with empty JSON data from fixture model = motor_test_model_with_empty_json_data - model._frame_layout = {} # No frame layout available + model._motor_directions = [] # No frame layout available + + # Act: Get motor directions + directions = model.motor_directions - # Act & Assert: Should raise ValueError - with pytest.raises(ValueError, match="No Frame layout found"): - model.get_motor_directions() + # Assert: Should return empty list + assert directions == [] - def test_user_gets_fallback_when_json_data_corrupted(self, motor_test_model_with_corrupted_json_data) -> None: + def test_user_gets_empty_directions_when_json_data_corrupted(self, motor_test_model_with_corrupted_json_data) -> None: """ - User gets appropriate error when JSON data structure is invalid. + User gets empty motor directions when JSON data structure is invalid. GIVEN: A motor test model with corrupted JSON data structure WHEN: User requests motor directions - THEN: Should raise ValueError with appropriate error message + THEN: Should return empty motor directions list """ # Arrange: Model with corrupted JSON data from fixture model = motor_test_model_with_corrupted_json_data - # Act & Assert: Get motor directions should raise ValueError - with pytest.raises(ValueError, match="No Frame layout found, not possible to generate motor test rotation order"): - model.get_motor_directions() + # Act: Get motor directions + directions = model.motor_directions + + # Assert: Should return empty list + assert directions == [] - def test_motor_count_mismatch_handled_with_extension(self, motor_test_model_with_json_data) -> None: + def test_motor_directions_unchanged_when_count_manually_increased(self, motor_test_model_with_json_data) -> None: """ - System handles motor count mismatch by leaving missing positions empty. + Motor directions remain unchanged when motor count is manually increased. GIVEN: A motor test model where JSON has fewer motors than expected - WHEN: User requests directions for frame needing more motors - THEN: Should return directions with empty strings for missing motors + WHEN: User manually increases motor count after configuration + THEN: Should return original directions (implementation doesn't auto-extend) """ # Arrange: Configure frame with valid class/type then simulate motor count mismatch model = motor_test_model_with_json_data @@ -1510,38 +1455,38 @@ def test_motor_count_mismatch_handled_with_extension(self, motor_test_model_with model._motor_count = 8 # Expect 8 motors but frame layout only has 4 # Act: Get motor directions - directions = model.get_motor_directions() + directions = model.motor_directions - # Assert: Directions list has correct length with empty strings for missing motors - assert len(directions) == 8 - # First 4 from JSON, remaining 4 are empty strings - expected_directions = ["CCW", "CW", "CW", "CCW", "", "", "", ""] + # Assert: Directions list has original length (no auto-extension) + assert len(directions) == 4 + expected_directions = ["CCW", "CW", "CW", "CCW"] # From mock JSON for QUAD X assert directions == expected_directions - def test_motor_count_mismatch_handled_with_truncation(self, motor_test_model_with_json_data) -> None: + def test_motor_directions_unchanged_when_count_manually_decreased(self, motor_test_model_with_json_data) -> None: """ - System handles motor count mismatch by truncating directions appropriately. + Motor directions remain unchanged when motor count is manually decreased. GIVEN: A motor test model where JSON has more motors than expected - WHEN: User requests directions for frame needing fewer motors - THEN: Should truncate directions to match expected count + WHEN: User manually decreases motor count after configuration + THEN: Should return original directions (implementation doesn't auto-truncate) """ # Arrange: Configure frame with valid class/type then simulate motor count mismatch model = motor_test_model_with_json_data - with patch.object(model, "set_parameter", return_value=(True, None)): - model.update_frame_configuration(2, 1) # Configure HEXA frame (has 6 motors) + + # Configure HEXA frame directly without using update_frame_configuration + model._configure_frame_layout(2, 1) # Configure HEXA frame (has 6 motors) # Manually decrease motor count to simulate mismatch + original_directions = model.motor_directions.copy() # Save original directions model._motor_count = 4 # Expect only 4 motors but frame layout has 6 # Act: Get motor directions - directions = model.get_motor_directions() + directions = model.motor_directions - # Assert: Directions truncated to match motor count - assert len(directions) == 4 - # Only first 4 directions from frame layout - expected_directions = ["CW", "CCW", "CW", "CCW"] + # Assert: Directions remain original (implementation doesn't auto-truncate based on motor count) + expected_directions = ["CW", "CCW", "CW", "CCW", "CW", "CCW"] # From mock JSON for HEXA X assert directions == expected_directions + assert directions == original_directions class TestMotorTestDataModelJSONLoading: @@ -1706,14 +1651,14 @@ def test_user_can_get_test_duration_from_settings(self, motor_test_model) -> Non WHEN: The user requests the current test duration THEN: The stored duration value should be returned """ - # Arrange: Mock settings to return a specific duration - with patch.object(ProgramSettings, "get_setting", return_value="2.5") as mock_get: - # Act: Get test duration - duration = motor_test_model.get_test_duration() + # Arrange: Set internal duration value + motor_test_model._test_duration_s = 2.5 + + # Act: Get test duration + duration = motor_test_model.get_test_duration_s() - # Assert: Correct duration returned - assert duration == 2.5 - mock_get.assert_called_once_with("motor_test/duration") + # Assert: Correct duration returned + assert duration == 2.5 def test_user_can_get_test_throttle_percentage_from_settings(self, motor_test_model) -> None: """ @@ -1723,14 +1668,14 @@ def test_user_can_get_test_throttle_percentage_from_settings(self, motor_test_mo WHEN: The user requests the current throttle percentage THEN: The stored throttle value should be returned """ - # Arrange: Mock settings to return a specific throttle percentage - with patch.object(ProgramSettings, "get_setting", return_value="15") as mock_get: - # Act: Get throttle percentage - throttle = motor_test_model.get_test_throttle_pct() + # Arrange: Set internal throttle value + motor_test_model._test_throttle_pct = 15 - # Assert: Correct throttle returned - assert throttle == 15 - mock_get.assert_called_once_with("motor_test/throttle_pct") + # Act: Get throttle percentage + throttle = motor_test_model.get_test_throttle_pct() + + # Assert: Correct throttle returned + assert throttle == 15 def test_set_test_duration_handles_exception_gracefully(self, motor_test_model) -> None: """ @@ -1745,11 +1690,11 @@ def test_set_test_duration_handles_exception_gracefully(self, motor_test_model) patch.object(ProgramSettings, "set_setting", side_effect=Exception("Settings save failed")), patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, ): - # Act: Try to set duration - result = motor_test_model.set_test_duration(2.0) + # Act & Assert: Try to set duration should raise exception + with pytest.raises(Exception, match="Settings save failed"): + motor_test_model.set_test_duration_s(2.0) - # Assert: Failure handled gracefully - assert result is False + # Assert: Error was logged mock_log.assert_called_once() error_call = mock_log.call_args[0] assert "Failed to save duration setting" in error_call[0] @@ -1767,11 +1712,11 @@ def test_set_test_throttle_handles_exception_gracefully(self, motor_test_model) patch.object(ProgramSettings, "set_setting", side_effect=Exception("Settings save failed")), patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, ): - # Act: Try to set throttle - result = motor_test_model.set_test_throttle_pct(10) + # Act & Assert: Try to set throttle should raise exception + with pytest.raises(Exception, match="Settings save failed"): + motor_test_model.set_test_throttle_pct(10) - # Assert: Failure handled gracefully - assert result is False + # Assert: Error was logged mock_log.assert_called_once() error_call = mock_log.call_args[0] assert "Failed to save throttle percentage setting" in error_call[0] @@ -1786,7 +1731,7 @@ def test_motor_testing_unsafe_when_battery_status_unavailable(self, motor_test_m GIVEN: A motor test model with battery monitoring enabled WHEN: Battery status returns None (unreadable) - THEN: Motor testing should be considered unsafe + THEN: Should raise MotorTestSafetyError with appropriate message """ # Arrange: Mock flight controller as connected but battery status unavailable motor_test_model.flight_controller.master = MagicMock() @@ -1795,13 +1740,12 @@ def test_motor_testing_unsafe_when_battery_status_unavailable(self, motor_test_m } # Mock get_battery_status to return None - with patch.object(motor_test_model, "get_battery_status", return_value=None): - # Act: Check safety - is_safe, message = motor_test_model.is_motor_test_safe() - - # Assert: Testing is unsafe due to unreadable battery - assert is_safe is False - assert "Could not read battery status" in message + with ( + patch.object(motor_test_model, "get_battery_status", return_value=None), + pytest.raises(MotorTestSafetyError, match=r"Could not read battery status"), + ): + # Act: Check safety should raise MotorTestSafetyError + motor_test_model.is_motor_test_safe() class TestMotorTestDataModelFrameOptionsAdvanced: @@ -1932,10 +1876,9 @@ def test_set_test_duration_succeeds_with_valid_value(self, motor_test_model) -> # Act: Save test duration with patch("ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.set_setting") as mock_set: - result = motor_test_model.set_test_duration(duration) + motor_test_model.set_test_duration_s(duration) - # Assert: Setting saved successfully - assert result is True + # Assert: Setting saved successfully (no exception raised) mock_set.assert_called_once_with("motor_test/duration", duration) def test_set_test_throttle_pct_succeeds_with_valid_value(self, motor_test_model) -> None: @@ -1951,10 +1894,9 @@ def test_set_test_throttle_pct_succeeds_with_valid_value(self, motor_test_model) # Act: Save throttle percentage with patch("ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.set_setting") as mock_set: - result = motor_test_model.set_test_throttle_pct(throttle) + motor_test_model.set_test_throttle_pct(throttle) - # Assert: Setting saved successfully - assert result is True + # Assert: Setting saved successfully (no exception raised) mock_set.assert_called_once_with("motor_test/throttle_pct", throttle) @@ -1973,11 +1915,9 @@ def test_set_test_duration_handles_exceptions(self, motor_test_model) -> None: with patch("ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings") as mock_settings: mock_settings.set_setting.side_effect = Exception("Settings error") - # Act: Try to save test duration - result = motor_test_model.set_test_duration(2.0) - - # Assert: Should return False - assert result is False + # Act & Assert: Try to save test duration should raise exception + with pytest.raises(Exception, match="Settings error"): + motor_test_model.set_test_duration_s(2.0) def test_set_test_throttle_pct_handles_exceptions(self, motor_test_model) -> None: """ @@ -1991,11 +1931,9 @@ def test_set_test_throttle_pct_handles_exceptions(self, motor_test_model) -> Non with patch("ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings") as mock_settings: mock_settings.set_setting.side_effect = Exception("Settings error") - # Act: Try to save test throttle - result = motor_test_model.set_test_throttle_pct(50) - - # Assert: Should return False - assert result is False + # Act & Assert: Try to save test throttle should raise exception + with pytest.raises(Exception, match="Settings error"): + motor_test_model.set_test_throttle_pct(50) def test_refresh_from_flight_controller_handles_runtime_error(self, motor_test_model) -> None: """