diff --git a/src/figaroh/calibration/base_calibration.py b/src/figaroh/calibration/base_calibration.py index 571a752..757acba 100644 --- a/src/figaroh/calibration/base_calibration.py +++ b/src/figaroh/calibration/base_calibration.py @@ -54,8 +54,9 @@ handle_calibration_errors ) -# Setup logger +# Setup logger for this module logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) class BaseCalibration(ABC): @@ -123,7 +124,7 @@ class BaseCalibration(ABC): - calc_updated_fkm: Forward kinematics computation function - apply_measurement_weighting: Unit-aware weighting utility """ - + @handle_calibration_errors def __init__(self, robot, config_file: str, del_list: List[int] = None): """Initialize robot calibration framework. @@ -161,15 +162,15 @@ def __init__(self, robot, config_file: str, del_list: List[int] = None): """ if del_list is None: del_list = [] - + # Validate inputs if not hasattr(robot, 'model') or not hasattr(robot, 'data'): raise CalibrationError( "Robot must have 'model' and 'data' attributes") - - self._robot = robot - self.model = self._robot.model - self.data = self._robot.data + + self.robot = robot + self.model = self.robot.model + self.data = self.robot.data self.del_list_ = del_list self.calib_config = None self.load_param(config_file) @@ -241,21 +242,21 @@ def solve(self, method="lm", max_iterations=3, outlier_threshold=3.0, max_iterations=max_iterations, outlier_threshold=outlier_threshold, enable_logging=enable_logging) - + # Evaluate solution evaluation = self._evaluate_solution(result, outlier_indices) - + # Log final results if enable_logging: logger.info("="*30) logger.info("FINAL CALIBRATION RESULTS") logger.info("="*30) self._log_iteration_results("FINAL", result, evaluation) - + if len(outlier_indices) > 0: logger.info(f"Outlier samples: {outlier_indices}") logger.info("Calibration completed successfully!") - + # Store results self._store_optimization_results(result, evaluation, outlier_indices) @@ -265,7 +266,7 @@ def solve(self, method="lm", max_iterations=3, outlier_threshold=3.0, if save_results: self.save_results() return result - + def plot_results(self): """Generate comprehensive visualization plots for calibration results. @@ -299,11 +300,11 @@ def plot_results(self): # Plot using unified manager with self.result data self.results_manager.plot_calibration_results() return - + except Exception as e: - print(f"Error plotting with ResultsManager: {e}") - print("Falling back to basic plotting...") - + logger.error(f"Error plotting with ResultsManager: {e}") + logger.info("Falling back to basic plotting...") + # Fallback to basic plotting if ResultsManager not available try: self.plot_errors_distribution() @@ -311,7 +312,7 @@ def plot_results(self): # self.plot_joint_configurations() plt.show() except Exception as e: - print(f"Warning: Plotting failed: {e}") + logger.warning(f"Plotting failed: {e}") def load_param(self, config_file: str, setting_type: str = "calibration"): """Load calibration parameters from YAML configuration file. @@ -325,37 +326,35 @@ def load_param(self, config_file: str, setting_type: str = "calibration"): setting_type (str): Configuration section to load """ try: - print(f"Loading config from {config_file}") - + logger.info(f"Loading config from {config_file}") + # Check if this is a unified configuration format if is_unified_config(config_file): - print(f"Detected unified configuration format") + logger.info("Detected unified configuration format") # Use unified parser parser = UnifiedConfigParser(config_file) unified_config = parser.parse() unified_calib_config = create_task_config( - self._robot, unified_config, setting_type + self.robot, unified_config, setting_type ) # Convert unified format to legacy calib_config format self.calib_config = unified_to_legacy_config( - self._robot, unified_calib_config + self.robot, unified_calib_config ) else: - print("Detected legacy configuration format") + logger.info("Detected legacy configuration format") # Use legacy format parsing with open(config_file, "r") as f: config = yaml.load(f, Loader=SafeLoader) - + if setting_type not in config: raise KeyError( f"Setting type '{setting_type}' not found in config" ) - + calib_data = config[setting_type] - self.calib_config = get_param_from_yaml( - self._robot, calib_data - ) - + self.calib_config = get_param_from_yaml(self.robot, calib_data) + except FileNotFoundError: raise CalibrationError( f"Configuration file not found: {config_file}" @@ -411,7 +410,7 @@ def create_param_list(self, q: Optional[np.ndarray] = None): q_ = [] else: q_ = q - + try: ( Rrand_b, @@ -422,14 +421,14 @@ def create_param_list(self, q: Optional[np.ndarray] = None): ) = calculate_base_kinematics_regressor( q_, self.model, self.data, self.calib_config, tol_qr=1e-6 ) - + if self.calib_config["known_baseframe"] is False: add_base_name(self.calib_config) if self.calib_config["known_tipframe"] is False: add_pee_name(self.calib_config) - + return True - + except Exception as e: raise CalibrationError(f"Parameter list creation failed: {e}") @@ -536,7 +535,7 @@ def cost_function(self, var: np.ndarray) -> np.ndarray: ... return weighted_residuals """ import warnings - + # Issue warning about using default implementation warnings.warn( f"Using default cost function for {self.__class__.__name__}. " @@ -546,12 +545,12 @@ def cost_function(self, var: np.ndarray) -> np.ndarray: UserWarning, stacklevel=2 ) - + # Default implementation: basic residual calculation PEEe = calc_updated_fkm(self.model, self.data, var, self.q_measured, self.calib_config) raw_residuals = self.PEE_measured - PEEe - + # Apply basic measurement weighting if configuration is available try: weighted_residuals = self.apply_measurement_weighting( @@ -591,15 +590,15 @@ def apply_measurement_weighting(self, residuals: np.ndarray, pos_std = self.calib_config.get("measurement_std", {}).get( "position", 0.001) pos_weight = 1.0 / pos_std - + if orient_weight is None: orient_std = self.calib_config.get("measurement_std", {}).get( "orientation", 0.01) orient_weight = 1.0 / orient_std - + weighted_residuals = [] residual_idx = 0 - + # Process each sample for each marker for marker in range(self.calib_config["NbMarkers"]): for dof, is_measured in enumerate(self.calib_config["measurability"]): @@ -619,23 +618,23 @@ def _setup_logging(self): # Create logger logger = logging.getLogger('calibration') logger.setLevel(logging.INFO) - + # Clear existing handlers to avoid duplicates logger.handlers.clear() - + # Create console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) - + # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(formatter) - + # Add handler to logger logger.addHandler(console_handler) - + return logger def _optimize_with_outlier_removal(self, var_init: np.ndarray, @@ -656,10 +655,10 @@ def _optimize_with_outlier_removal(self, var_init: np.ndarray, logger = logging.getLogger('calibration') current_var = var_init.copy() outlier_indices = [] - + for iteration in range(max_iterations): logger.info(f"Outlier removal iteration {iteration + 1}") - + # Run optimization result = least_squares( self.cost_function, @@ -667,30 +666,30 @@ def _optimize_with_outlier_removal(self, var_init: np.ndarray, method=method, max_nfev=1000 ) - + if not result.success: logger.warning( f"Optimization failed at iteration {iteration + 1}") break - + # Calculate residuals and detect outliers PEE_est = self.get_pose_from_measure(result.x) residuals = PEE_est - self.PEE_measured new_outliers = self._detect_outliers(residuals, outlier_threshold) - + if len(new_outliers) == 0: logger.info("No outliers detected, optimization converged") break - + outlier_indices.extend(new_outliers) outlier_indices = list(set(outlier_indices)) # Remove duplicates - + logger.info(f"Detected {len(new_outliers)} new outliers, " f"total outliers: {len(outlier_indices)}") - + # Update for next iteration current_var = result.x - + return result, outlier_indices, residuals def _detect_outliers(self, residuals: np.ndarray, threshold: float) -> List[int]: @@ -706,20 +705,20 @@ def _detect_outliers(self, residuals: np.ndarray, threshold: float) -> List[int] # Reshape residuals to per-sample format n_dofs = self.calib_config["calibration_index"] n_samples = self.calib_config["NbSample"] - + if len(residuals) != n_dofs * n_samples: return [] - + residuals_2d = residuals.reshape((n_dofs, n_samples)) - + # Calculate RMS error per sample rms_errors = np.sqrt(np.mean(residuals_2d**2, axis=0)) - + # Detect outliers mean_error = np.mean(rms_errors) std_error = np.std(rms_errors) threshold_value = mean_error + threshold * std_error - + outliers = np.where(rms_errors > threshold_value)[0].tolist() return outliers @@ -737,12 +736,12 @@ def _evaluate_solution(self, result, outlier_indices: List[int]) -> Dict[str, An residuals = PEE_est - self.PEE_measured n_dofs = self.calib_config["calibration_index"] n_samples = self.calib_config["NbSample"] - + # Calculate metrics rmse = np.sqrt(np.mean(residuals**2)) mae = np.mean(np.abs(residuals)) max_error = np.max(np.abs(residuals)) - + # Per-sample metrics if len(residuals) == n_dofs * n_samples: residuals_2d = residuals.reshape((n_dofs, n_samples)) @@ -752,10 +751,10 @@ def _evaluate_solution(self, result, outlier_indices: List[int]) -> Dict[str, An else: mean_sample_rms = rmse std_sample_rms = 0.0 - + # Calculate standard deviation of estimated parameters self.calc_stddev(result) - + return { 'rmse': rmse, 'mae': mae, @@ -798,7 +797,7 @@ def _log_iteration_results(self, iteration, result, evaluation: Dict[str, Any]): evaluation (dict): Solution evaluation metrics """ logger = logging.getLogger('calibration') - + logger.info(f"Iteration {iteration} Results:") logger.info(f" Success: {evaluation['optimization_success']}") logger.info(f" RMSE: {evaluation['rmse']:.6f}") @@ -822,18 +821,18 @@ def _store_optimization_results(self, result, evaluation: Dict[str, Any], self.LM_result = result self.var_ = result.x self.uncalib_values = np.zeros_like(result.x) # Store initial guess - + # Store evaluation metrics self.evaluation_metrics = evaluation self.outlier_indices = outlier_indices - + # Calculate per-sample error distribution for plotting PEE_est = self.get_pose_from_measure(result.x) residuals = PEE_est - self.PEE_measured n_dofs = self.calib_config["calibration_index"] n_samples = self.calib_config["NbSample"] n_markers = self.calib_config["NbMarkers"] - + # if len(residuals) == n_dofs * n_samples * n_markers: # residuals_3d = residuals.reshape((n_markers, n_dofs, n_samples)) # self._PEE_dist = np.sqrt(np.mean(residuals_3d**2, axis=1)) @@ -874,7 +873,7 @@ def _store_optimization_results(self, result, evaluation: Dict[str, Any], # Initialize ResultsManager for calibration task try: from figaroh.utils.results_manager import ResultsManager - + # Get robot name from class or model robot_name = getattr( self, 'robot_name', @@ -884,9 +883,9 @@ def _store_optimization_results(self, result, evaluation: Dict[str, Any], 'calibration', ''))) # Initialize results manager for calibration task self.results_manager = ResultsManager('calibration', robot_name, self.results_data) - + except ImportError as e: - print(f"Warning: ResultsManager not available: {e}") + logger.warning(f"ResultsManager not available: {e}") self.results_manager = None # Update status @@ -928,7 +927,7 @@ def solve_optimisation(self, var_init: Optional[np.ndarray] = None, raise CalibrationError("Call load_data_set() first") if not hasattr(self, 'q_measured'): raise CalibrationError("Call load_data_set() first") - + # Setup logging if enable_logging: logger = self._setup_logging() @@ -938,20 +937,20 @@ def solve_optimisation(self, var_init: Optional[np.ndarray] = None, logger.info(f"Markers: {self.calib_config['NbMarkers']}") logger.info(f"Samples: {self.calib_config['NbSample']}") logger.info(f"DOFs: {self.calib_config['calibration_index']}") - + # Initialize parameters if var_init is None: var_init, _ = initialize_variables(self.calib_config, mode=0) - + try: # Run optimization with outlier removal result, outlier_indices, final_residuals = \ self._optimize_with_outlier_removal( var_init, method, max_iterations, outlier_threshold ) - + return result, outlier_indices - + except Exception as e: if enable_logging: logger = logging.getLogger('calibration') @@ -1167,9 +1166,9 @@ def plot_joint_configurations(self): def save_results(self, output_dir="results"): """Save calibration results using unified results manager.""" if not hasattr(self, 'result') or self.results_data is None: - print("No calibration results to save. Run solve() first.") + logger.warning("No calibration results to save. Run solve() first.") return - + # Use pre-initialized results manager if available if hasattr(self, 'results_manager') and \ self.results_manager is not None: @@ -1180,12 +1179,12 @@ def save_results(self, output_dir="results"): save_formats=['yaml', 'csv', 'npz'] ) - print("Calibration results saved using ResultsManager") + logger.info("Calibration results saved using ResultsManager") for fmt, path in saved_files.items(): - print(f" {fmt}: {path}") - + logger.info(f" {fmt}: {path}") + return saved_files - + except Exception as e: - print(f"Error saving with ResultsManager: {e}") - print("Falling back to basic saving...") \ No newline at end of file + logger.error(f"Error saving with ResultsManager: {e}") + logger.info("Falling back to basic saving...") diff --git a/src/figaroh/calibration/calibration_tools.py b/src/figaroh/calibration/calibration_tools.py index 29b16cc..7a2f6ab 100644 --- a/src/figaroh/calibration/calibration_tools.py +++ b/src/figaroh/calibration/calibration_tools.py @@ -23,9 +23,14 @@ - Data loading and processing utilities """ +import logging import numpy as np import pinocchio as pin +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + from ..tools.regressor import eliminate_non_dynaffect from ..tools.qrdecomposition import ( get_baseParams, @@ -361,7 +366,7 @@ def update_forward_kinematics(model, data, var, q, calib_config, verbose=0): for axis_id, axis in enumerate(axis_tpl): if axis in key: if verbose == 1: - print( + logger.debug( "Updating [{}] joint placement at axis {} with [{}]".format( j_name, axis, key ) @@ -381,7 +386,7 @@ def update_forward_kinematics(model, data, var, q, calib_config, verbose=0): for axis_pee_id, axis_pee in enumerate(EE_TPL): if axis_pee in key: if verbose == 1: - print( + logger.debug( "Updating [{}_{}] joint placement at axis {} with [{}]".format( ee_name, str(marker_idx), axis_pee, key ) @@ -568,7 +573,7 @@ def calc_updated_fkm(model, data, var, q, calib_config, verbose=0): for axis_pee_id, axis_pee in enumerate(EE_TPL): if axis_pee in key: if verbose == 1: - print( + logger.debug( "Updating [{}_{}] joint placement at axis {} with [{}]".format( ee_name, str(marker_idx), axis_pee, key ) @@ -579,7 +584,7 @@ def calc_updated_fkm(model, data, var, q, calib_config, verbose=0): eeMf = cartesian_to_SE3(pee) else: if calib_config["NbMarkers"] > 1: - print("Multiple markers are not supported.") + logger.warning("Multiple markers are not supported.") else: eeMf = pin.SE3.Identity() @@ -598,7 +603,7 @@ def calc_updated_fkm(model, data, var, q, calib_config, verbose=0): for axis_id, axis in enumerate(axis_tpl): if axis in key: if verbose == 1: - print( + logger.debug( "Updating [{}] joint placement at axis {} with [{}]".format( j_name, axis, key ) diff --git a/src/figaroh/calibration/config.py b/src/figaroh/calibration/config.py index 945bb9e..6717981 100644 --- a/src/figaroh/calibration/config.py +++ b/src/figaroh/calibration/config.py @@ -23,6 +23,12 @@ - Frame and joint configuration management """ +import logging + +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + def get_sup_joints(model, start_frame, end_frame): """Get list of supporting joints between two frames in kinematic chain. @@ -50,8 +56,8 @@ def get_sup_joints(model, start_frame, end_frame): """ start_frameId = model.getFrameId(start_frame) end_frameId = model.getFrameId(end_frame) - start_par = model.frames[start_frameId].parent - end_par = model.frames[end_frameId].parent + start_par = model.frames[start_frameId].parentJoint + end_par = model.frames[end_frameId].parentJoint branch_s = model.supports[start_par].tolist() branch_e = model.supports[end_par].tolist() # remove 'universe' joint from branches @@ -165,7 +171,7 @@ def get_param_from_yaml(robot, calib_data) -> dict: except KeyError: base_to_ref_frame = None ref_frame = None - print("base_to_ref_frame and ref_frame are not defined.") + logger.warning("base_to_ref_frame and ref_frame are not defined.") # Validate base-to-ref frame if provided if base_to_ref_frame is not None: @@ -189,7 +195,7 @@ def get_param_from_yaml(robot, calib_data) -> dict: except KeyError: base_pose = None tip_pose = None - print("base_pose and tip_pose are not defined.") + logger.warning("base_pose and tip_pose are not defined.") calib_config["base_pose"] = base_pose calib_config["tip_pose"] = tip_pose @@ -203,7 +209,7 @@ def get_param_from_yaml(robot, calib_data) -> dict: calib_config["IDX_TOOL"] = IDX_TOOL # tool_joint: ID of the joint right before the tool's frame (parent) - tool_joint = robot.model.frames[IDX_TOOL].parent + tool_joint = robot.model.frames[IDX_TOOL].parentJoint calib_config["tool_joint"] = tool_joint # indices of active joints: from base to tool_joint @@ -462,7 +468,7 @@ def _extract_poses(calib_config, measurements): calib_config["tip_pose"] = tip_pose if not base_pose and not tip_pose: - print("Warning: base_pose and tip_pose are not defined.") + logger.warning("base_pose and tip_pose are not defined.") def _extract_calibration_params(calib_config, robot, parameters): diff --git a/src/figaroh/calibration/data_loader.py b/src/figaroh/calibration/data_loader.py index 11f1fac..f8b34b0 100644 --- a/src/figaroh/calibration/data_loader.py +++ b/src/figaroh/calibration/data_loader.py @@ -24,9 +24,14 @@ - Configuration vector management """ +import logging import numpy as np import pandas as pd +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + # Export public API __all__ = [ @@ -131,7 +136,7 @@ def load_data(path_to_file, model, calib_config, del_list=[]): csv_headers = list(df.columns) for header in PEE_headers + joint_headers: if header not in csv_headers: - print("%s does not exist in the file." % header) + logger.warning("%s does not exist in the file.", header) break # Extract marker position/location diff --git a/src/figaroh/identification/base_identification.py b/src/figaroh/identification/base_identification.py index bcc1e70..c178030 100644 --- a/src/figaroh/identification/base_identification.py +++ b/src/figaroh/identification/base_identification.py @@ -19,10 +19,15 @@ that can be inherited by any robot type (TIAGo, UR10, MATE, etc.). """ +import logging import yaml import numpy as np from abc import ABC, abstractmethod +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + # FIGAROH imports from figaroh.identification.identification_tools import ( get_param_from_yaml as get_identification_param_from_yaml, @@ -39,6 +44,10 @@ build_regressor_reduced, ) from figaroh.identification.identification_tools import get_standard_parameters +from figaroh.identification.parameter import ( + add_standard_additional_parameters, + add_custom_parameters, +) from figaroh.tools.solver import LinearSolver @@ -67,6 +76,8 @@ def __init__(self, robot, config_file="config/robot_config.yaml"): # Initialize attributes for identification results self.dynamic_regressor = None self.standard_parameter = None + self.additional_parameters = None + self.custom_parameters = None self.params_base = None self.dynamic_regressor_base = None self.phi_base = None @@ -82,11 +93,13 @@ def __init__(self, robot, config_file="config/robot_config.yaml"): 'differentiation_method': 'gradient', 'filter_params': {} } - print(f"{self.__class__.__name__} initialized") + logger.info(f"{self.__class__.__name__} initialized") def initialize(self, truncate=None): self.process_data(truncate=truncate) self.calculate_full_regressor() + self.initialize_standard_parameters() + self.compute_reference_torque() def solve(self, decimate=True, decimation_factor=10, zero_tolerance=0.001, plotting=True, save_results=False): @@ -111,7 +124,8 @@ def solve(self, decimate=True, decimation_factor=10, zero_tolerance=0.001, ValueError: If data shapes are incompatible np.linalg.LinAlgError: If QR decomposition fails """ - print(f"Starting {self.__class__.__name__} dynamic parameter identification...") + logger.info( + f"Starting {self.__class__.__name__} dynamic parameter identification...") # Validate prerequisites self._validate_prerequisites() @@ -187,8 +201,9 @@ def solve_with_custom_solver( >>> phi = identification.solve_with_custom_solver( ... method='constrained', bounds=bounds) """ - print(f"Starting {self.__class__.__name__} identification " - f"with custom solver...") + logger.info( + f"Starting {self.__class__.__name__} identification " + f"with custom solver...") # Validate prerequisites self._validate_prerequisites() @@ -239,7 +254,7 @@ def solve_with_custom_solver( ) phi_base = solver.solve(W_base, tau_processed) base_param_dict = {param: phi_base[i] for i, param in enumerate(base_parameters)} - + # Store results self.dynamic_regressor_base = W_base self.phi_base = phi_base @@ -273,8 +288,8 @@ def solve_with_custom_solver( if save_results: self.save_results() - print(f" RMSE: {self.rms_error:.6f}") - print(f" Correlation: {self.correlation:.6f}") + logger.info(f" RMSE: {self.rms_error:.6f}") + logger.info(f" Correlation: {self.correlation:.6f}") return self.phi_base @@ -290,11 +305,11 @@ def load_param(self, config_file, setting_type="identification"): setting_type (str): Configuration section to load """ try: - print(f"Loading config from {config_file}") + logger.info(f"Loading config from {config_file}") # Check if this is a unified configuration format if is_unified_config(config_file): - print("Detected unified configuration format") + logger.info("Detected unified configuration format") # Use unified parser parser = UnifiedConfigParser(config_file) unified_config = parser.parse() @@ -306,7 +321,7 @@ def load_param(self, config_file, setting_type="identification"): self.robot, unified_identif_config ) else: - print("Detected legacy configuration format") + logger.info("Detected legacy configuration format") # Use legacy format parsing with open(config_file, "r") as f: config = yaml.load(f, Loader=yaml.SafeLoader) @@ -314,7 +329,7 @@ def load_param(self, config_file, setting_type="identification"): self.robot, config[setting_type] ) except Exception as e: - print(f"Error loading config {config_file}: {e}") + logger.error(f"Error loading config {config_file}: {e}") raise @abstractmethod @@ -365,17 +380,42 @@ def calculate_full_regressor(self): self.identif_config, ) + def initialize_standard_parameters( + self, + ): + """Initialize standard parameters for the robot.""" + # Compute standard parameters - self.standard_parameter = get_standard_parameters(self.model, self.identif_config) + self.standard_parameter = get_standard_parameters( + self.model, self.identif_config + ) # additional parameters can be added in robot-specific subclass - self.add_additional_parameters() + if ( + self.identif_config.get("has_friction", False) + or self.identif_config.get("has_actuator_inertia", False) + or self.identif_config.get("has_joint_offset", False) + ): + self.additional_parameters = add_standard_additional_parameters( + self.model, self.identif_config + ) + self.standard_parameter.update(self.additional_parameters) + + # Add custom parameters specific to the robot + if self.identif_config.get("has_custom_parameters", False): + self.custom_parameters = add_custom_parameters( + self.model, self.identif_config.get("custom_parameters", {}) + ) + self.standard_parameter.update(self.custom_parameters) # Convert all string values to floats in the standard_parameter dict for key, value in self.standard_parameter.items(): if isinstance(value, str): self.standard_parameter[key] = float(value) + def compute_reference_torque(self): + """Compute reference joint torques based on standard parameters and dynamic regressor.""" + # joint torque estimated from p,v,a with std params phi_ref = np.array(list(self.standard_parameter.values())) tau_ref = np.dot(self.dynamic_regressor, phi_ref) @@ -383,10 +423,6 @@ def calculate_full_regressor(self): # filter only active joints self.tau_ref = tau_ref[range(len(self.identif_config["act_idxv"]) * self.num_samples)] - def add_additional_parameters(self): - """Add additional parameters specific to the robot and recalculate dynamic regressor.""" - pass - def _apply_filters(self, *signals, nbutter=4, f_butter=2, med_fil=5, f_sample=100): """Apply median and lowpass filters to any number of signals. @@ -875,13 +911,13 @@ def _store_results(self, identif_results): self.results_manager = ResultsManager('identification', robot_name, self.result) except ImportError as e: - print(f"Warning: ResultsManager not available: {e}") + logger.warning(f"ResultsManager not available: {e}") self.results_manager = None def plot_results(self): """Plot identification results using unified results manager.""" if not hasattr(self, 'result') or self.result is None: - print("No identification results to plot. Run solve() first.") + logger.warning("No identification results to plot. Run solve() first.") return # Use pre-initialized results manager if available @@ -893,8 +929,8 @@ def plot_results(self): return except Exception as e: - print(f"Error plotting with ResultsManager: {e}") - print("Falling back to basic plotting...") + logger.error(f"Error plotting with ResultsManager: {e}") + logger.info("Falling back to basic plotting...") # Fallback to basic plotting if ResultsManager not available try: @@ -907,7 +943,7 @@ def plot_results(self): np.array([])) if len(tau_measured) == 0 or len(tau_identified) == 0: - print("No torque data available for plotting") + logger.warning("No torque data available for plotting") return plt.figure(figsize=(12, 8)) @@ -935,14 +971,14 @@ def plot_results(self): plt.show() except ImportError: - print("Warning: matplotlib not available for plotting") + logger.warning("matplotlib not available for plotting") except Exception as e: - print(f"Warning: Plotting failed: {e}") + logger.warning(f"Plotting failed: {e}") def save_results(self, output_dir="results"): """Save identification results using unified results manager.""" if not hasattr(self, 'result') or self.result is None: - print("No identification results to save. Run solve() first.") + logger.warning("No identification results to save. Run solve() first.") return # Use pre-initialized results manager if available @@ -955,15 +991,15 @@ def save_results(self, output_dir="results"): save_formats=['yaml', 'csv', 'npz'] ) - print("Identification results saved using ResultsManager") + logger.info("Identification results saved using ResultsManager") for fmt, path in saved_files.items(): - print(f" {fmt}: {path}") + logger.info(f" {fmt}: {path}") return saved_files except Exception as e: - print(f"Error saving with ResultsManager: {e}") - print("Falling back to basic saving...") + logger.error(f"Error saving with ResultsManager: {e}") + logger.info("Falling back to basic saving...") # Fallback to basic saving if ResultsManager not available try: @@ -1005,9 +1041,9 @@ def save_results(self, output_dir="results"): with open(os.path.join(output_dir, filename), "w") as f: yaml.dump(results_dict, f, default_flow_style=False) - print(f"Results saved to {output_dir}/{filename}") + logger.info(f"Results saved to {output_dir}/{filename}") return {filename: os.path.join(output_dir, filename)} except Exception as e: - print(f"Error in fallback saving: {e}") + logger.error(f"Error in fallback saving: {e}") return None diff --git a/src/figaroh/identification/parameter.py b/src/figaroh/identification/parameter.py index c0de2f0..6187336 100644 --- a/src/figaroh/identification/parameter.py +++ b/src/figaroh/identification/parameter.py @@ -23,8 +23,13 @@ - Parameter information queries """ +import logging import numpy as np +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + # Export public API __all__ = [ @@ -68,19 +73,19 @@ def reorder_inertial_parameters(pinocchio_params): return reordered.tolist() -def add_standard_additional_parameters(phi, params, identif_config, model): +def add_standard_additional_parameters(model, identif_config): """Add standard additional parameters (actuator inertia, friction, offsets). Args: - phi: Current parameter values list - params: Current parameter names list - identif_config: Configuration dictionary model: Robot model + identif_config (dict): Identification configuration Returns: - tuple: Updated (phi, params) lists + dict: Additional parameters with their values """ + phi = [] + params = [] # Standard additional parameters configuration additional_params = [ @@ -129,14 +134,14 @@ def add_standard_additional_parameters(phi, params, identif_config, model): value = values_list[link_idx] else: value = param_def['default'] - print( - f"Warning: Missing {param_def['description']} " + logger.warning( + f"Missing {param_def['description']} " f"for joint {jname}, using default: {value}" ) except (KeyError, IndexError, TypeError) as e: value = param_def['default'] - print( - f"Warning: Error getting {param_def['description']} " + logger.warning( + f"Error getting {param_def['description']} " f"for joint {jname}: {e}, using default: {value}" ) else: @@ -144,26 +149,28 @@ def add_standard_additional_parameters(phi, params, identif_config, model): phi.append(value) - return phi, params + return dict(zip(params, phi)) -def add_custom_parameters(phi, params, custom_params, model): +def add_custom_parameters(model, custom_params): """Add custom user-defined parameters. Args: - phi: Current parameter values list - params: Current parameter names list - custom_params: Custom parameter definitions model: Robot model + custom_params (dict): Custom parameter definitions + Format: {param_name: {values: list, per_joint: bool, + default: float}} Returns: - tuple: Updated (phi, params) lists + dict: Custom parameters with their values """ + phi = [] + params = [] for param_name, param_def in custom_params.items(): if not isinstance(param_def, dict): - print( - f"Warning: Invalid custom parameter definition for " + logger.warning( + f"Invalid custom parameter definition for " f"'{param_name}', skipping" ) continue @@ -185,15 +192,15 @@ def add_custom_parameters(phi, params, custom_params, model): value = default_value # Only warn if values were provided but insufficient if values: - print( - f"Warning: Missing value for custom " + logger.warning( + f"Missing value for custom " f"parameter '{param_name}' joint " f"{jname}, using default: {value}" ) except (IndexError, TypeError): value = default_value - print( - f"Warning: Error accessing custom parameter " + logger.warning( + f"Error accessing custom parameter " f"'{param_name}' for joint {jname}, " f"using default: {value}" ) @@ -206,72 +213,32 @@ def add_custom_parameters(phi, params, custom_params, model): value = values[0] if values else default_value except (IndexError, TypeError): value = default_value - print( - f"Warning: Error accessing global custom parameter " + logger.warning( + f"Error accessing global custom parameter " f"'{param_name}', using default: {value}" ) phi.append(value) - return phi, params + return dict(zip(params, phi)) def get_standard_parameters( - model, identif_config=None, include_additional=True, custom_params=None + model, identif_config=None ): """Get standard inertial parameters from robot model with extensible parameter support. Args: model: Robot model (Pinocchio model) - identif_config (dict, optional): Dictionary of parameter settings for - additional parameters. Expected keys: - - has_actuator_inertia (bool): Include actuator inertia parameters - - has_friction (bool): Include friction parameters - - has_joint_offset (bool): Include joint offset parameters - - Ia (list): Actuator inertia values - - fv (list): Viscous friction coefficients - - fs (list): Static friction coefficients - - off (list): Joint offset values - include_additional (bool): Whether to include additional parameters - beyond inertial - custom_params (dict, optional): Custom parameter definitions - Format: {param_name: {values: list, per_joint: bool, - default: float}} + identif_config (dict, optional): Dictionary of parameter settings Returns: dict: Parameter names mapped to their values - - Examples: - # Basic usage - only inertial parameters - params = get_standard_parameters(robot.model) - - # Include standard additional parameters - identif_config = { - 'has_actuator_inertia': True, - 'has_friction': True, - 'Ia': [0.1, 0.2, 0.3], - 'fv': [0.01, 0.02, 0.03], - 'fs': [0.001, 0.002, 0.003] - } - params = get_standard_parameters(robot.model, identif_config) - - # Add custom parameters - custom = { - 'gear_ratio': {'values': [100, 50, 25], 'per_joint': True, - 'default': 1.0}, - 'temperature': {'values': [20.0], 'per_joint': False, - 'default': 25.0} - } - params = get_standard_parameters(robot.model, identif_config, - custom_params=custom) """ if identif_config is None: identif_config = {} - if custom_params is None: - custom_params = {} - phi = [] params = [] @@ -318,15 +285,6 @@ def get_standard_parameters( params.append(f"{param_name}_{jname}") phi.extend(reordered_params) - # Add additional standard parameters if requested - if include_additional: - phi, params = add_standard_additional_parameters( - phi, params, identif_config, model - ) - - # Add custom parameters if provided - if custom_params: - phi, params = add_custom_parameters(phi, params, custom_params, model) return dict(zip(params, phi)) diff --git a/src/figaroh/optimal/base_optimal_calibration.py b/src/figaroh/optimal/base_optimal_calibration.py index 63f478e..f780d2d 100644 --- a/src/figaroh/optimal/base_optimal_calibration.py +++ b/src/figaroh/optimal/base_optimal_calibration.py @@ -19,21 +19,36 @@ generation that can be inherited by any robot type (TIAGo, UR10, MATE, etc.). """ -import os +import logging import yaml import numpy as np import pandas as pd import matplotlib.pyplot as plt -from abc import ABC, abstractmethod +from abc import ABC from yaml.loader import SafeLoader +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + # FIGAROH imports from figaroh.calibration.calibration_tools import ( load_data, get_param_from_yaml, + unified_to_legacy_config, calculate_base_kinematics_regressor, ) +from figaroh.utils.config_parser import ( + UnifiedConfigParser, + create_task_config, + is_unified_config, +) + +# Import from shared modules +from figaroh.utils.error_handling import ( + CalibrationError, +) class BaseOptimalCalibration(ABC): """Base class for robot optimal configuration generation for calibration. @@ -108,7 +123,7 @@ class BaseOptimalCalibration(ABC): TiagoOptimalCalibration: TIAGo-specific implementation UR10OptimalCalibration: UR10-specific implementation """ - + def __init__(self, robot, config_file="config/robot_config.yaml"): """Initialize optimal calibration with robot model and configuration. @@ -148,12 +163,12 @@ def __init__(self, robot, config_file="config/robot_config.yaml"): self.model = robot.model self.data = robot.data self.load_param(config_file) - + # Initialize attributes for optimal calibration self.optimal_configurations = None self.optimal_weights = None self._sampleConfigs_file = self.calib_config.get("sample_configs_file") - + # Calculate minimum number of configurations needed if self.calib_config["calib_model"] == "full_params": self.minNbChosen = ( @@ -169,9 +184,9 @@ def __init__(self, robot, config_file="config/robot_config.yaml"): int(len(self.calib_config["actJoint_idx"]) / self.calib_config["calibration_index"]) + 1 ) - - print(f"{self.__class__.__name__} initialized") - + + logger.info(f"{self.__class__.__name__} initialized") + def initialize(self): """Initialize the optimization process by preparing all required data. @@ -206,7 +221,7 @@ def initialize(self): self.load_candidate_configurations() self.calculate_regressor() self.calculate_detroot_whole() - + def solve(self, save_file=False): """Solve the optimal configuration selection problem. @@ -256,43 +271,60 @@ def solve(self, save_file=False): if save_file: try: self.save_results() - print("Optimal configurations written to file successfully") + logger.info("Optimal configurations written to file successfully") except Exception as e: - print(f"Warning: Could not write to file: {e}") + logger.warning(f"Could not write to file: {e}") self.plot() - - def load_param(self, config_file, setting_type="calibration"): - """Load optimization parameters from YAML configuration file. - - Reads and parses calibration configuration from YAML file, extracting - robot-specific parameters needed for optimal configuration generation. - The configuration supports multiple setting types within the same file. - + + def load_param(self, config_file: str, setting_type: str = "calibration"): + """Load calibration parameters from YAML configuration file. + + This method supports both legacy YAML format and the new unified + configuration format. It automatically detects the format type + and applies the appropriate parser. + Args: - config_file (str): Path to YAML configuration file containing - optimization and calibration parameters - setting_type (str): Configuration section to load. Options include - "calibration", "identification", or custom - section names. Default "calibration". - - Side Effects: - - Updates self.calib_config with loaded configuration dictionary - - Overwrites any existing parameter settings - - Raises: - FileNotFoundError: If config_file does not exist - yaml.YAMLError: If YAML parsing fails - KeyError: If setting_type section not found in config - - Example: - >>> opt_calib.load_param("config/tiago_optimal.yaml") - >>> print(opt_calib.calib_config["calib_model"]) # "full_params" - >>> print(opt_calib.calib_config["NbSample"]) # 1000 + config_file (str): Path to configuration file (legacy or unified) + setting_type (str): Configuration section to load """ - with open(config_file, "r") as f: - config = yaml.load(f, Loader=SafeLoader) - calib_data = config[setting_type] - self.calib_config = get_param_from_yaml(self.robot, calib_data) + try: + logger.info(f"Loading config from {config_file}") + + # Check if this is a unified configuration format + if is_unified_config(config_file): + logger.info("Detected unified configuration format") + # Use unified parser + parser = UnifiedConfigParser(config_file) + unified_config = parser.parse() + unified_calib_config = create_task_config( + self.robot, unified_config, setting_type + ) + # Convert unified format to legacy calib_config format + self.calib_config = unified_to_legacy_config( + self.robot, unified_calib_config + ) + else: + logger.info("Detected legacy configuration format") + # Use legacy format parsing + with open(config_file, "r") as f: + config = yaml.load(f, Loader=SafeLoader) + + if setting_type not in config: + raise KeyError( + f"Setting type '{setting_type}' not found in config" + ) + + calib_data = config[setting_type] + self.calib_config = get_param_from_yaml( + self.robot, calib_data + ) + + except FileNotFoundError: + raise CalibrationError( + f"Configuration file not found: {config_file}" + ) + except Exception as e: + raise CalibrationError(f"Failed to load configuration: {e}") def load_candidate_configurations(self): """Load candidate joint configurations from external data files. @@ -327,11 +359,11 @@ def load_candidate_configurations(self): >>> print(opt_calib.q_measured.shape) # (1000, 7) for TIAGo """ from figaroh.calibration.calibration_tools import get_idxq_from_jname - + if self._sampleConfigs_file is None: raise ValueError("sample_configs_file not specified in " "configuration") - + if "csv" in self._sampleConfigs_file: _, self.q_measured = load_data( self._data_path, self.model, self.calib_config, [] @@ -358,7 +390,7 @@ def load_candidate_configurations(self): self.calib_config["NbSample"] = self.q_measured.shape[0] else: raise ValueError("Data file format not supported. Use CSV or YAML format.") - + def calculate_regressor(self): """Calculate kinematic regressors and information matrices. @@ -413,7 +445,7 @@ def calculate_regressor(self): self._subX_dict = subX_dict self._subX_list = subX_list return True - + def calculate_detroot_whole(self): """Calculate determinant root of complete information matrix. @@ -449,7 +481,7 @@ def calculate_detroot_whole(self): assert self.calculate_regressor(), "Calculate regressor first." M_whole = np.matmul(self.R_rearr.T, self.R_rearr) self.detroot_whole = pc.DetRootN(M_whole) / np.sqrt(M_whole.shape[0]) - print("detrootn of whole matrix:", self.detroot_whole) + logger.info(f"detrootn of whole matrix: {self.detroot_whole}") def rearrange_rb(self, R_b, calib_config): """rearrange the kinematic regressor by sample numbered order""" @@ -505,7 +537,7 @@ def sub_info_matrix(self, R, calib_config): ) ) return subX_list, subX_dict - + def calculate_optimal_configurations(self): """Solve SOCP optimization to find optimal configuration subset. @@ -558,7 +590,7 @@ def calculate_optimal_configurations(self): SOCP_algo = SOCPOptimizer(self._subX_dict, self.calib_config) self.w_list, self.w_dict_sort = SOCP_algo.solve() solve_time = time.time() - prev_time - print("solve time of socp: ", solve_time) + logger.info(f"solve time of socp: {solve_time}") # Select optimal config based on values of weight self.eps_opt = 1e-5 @@ -571,7 +603,7 @@ def calculate_optimal_configurations(self): len(chosen_config) >= self.minNbChosen ), "Infeasible design, try to increase NbSample." - print(len(chosen_config), "configs are chosen: ", chosen_config) + logger.info(f"{len(chosen_config)} configs are chosen: {chosen_config}") self.nb_chosen = len(chosen_config) # Store optimal configurations and weights @@ -676,36 +708,36 @@ def plot(self): plt.show() return True - + def plot_results(self): """Plot optimal calibration results using unified results manager.""" if not hasattr(self, 'optimal_configurations') or self.optimal_configurations is None: - print("No optimal configuration results to plot. Run solve() first.") + logger.warning("No optimal configuration results to plot. Run solve() first.") return - + try: from .results_manager import ResultsManager - + # Initialize results manager robot_name = self.calib_config.get("robot_name", self.model.name) results_manager = ResultsManager('optimal_calibration', robot_name) - + # Prepare data for plotting weights = np.array(list(self.w_dict_sort.values())) if hasattr(self, 'w_dict_sort') else np.array([]) - + # Plot using unified manager results_manager.plot_optimal_calibration_results( configurations=self.optimal_configurations, weights=weights, title="Optimal Calibration Configuration Results" ) - + except ImportError: # Fallback to existing plotting import matplotlib.pyplot as plt - + fig, ax = plt.subplots(1, 2, figsize=(14, 6)) - + ax[0].bar( list(self.w_dict_sort.keys()), list(self.w_dict_sort.values()), @@ -716,7 +748,7 @@ def plot_results(self): ax[0].spines["top"].set_visible(False) ax[0].spines["right"].set_visible(False) ax[0].grid(True, linestyle="--") - + ax[1].bar( list(self.w_dict_sort.keys()), list(self.w_dict_sort.values()), @@ -726,20 +758,20 @@ def plot_results(self): ax[1].spines["right"].set_visible(False) ax[1].grid(True, linestyle="--") plt.show() - + def save_results(self, output_dir="results"): """Save optimal configuration results using unified results manager.""" if not hasattr(self, 'optimal_configurations') or self.optimal_configurations is None: - print("No optimal configuration results to save. Run solve() first.") + logger.warning("No optimal configuration results to save. Run solve() first.") return - + try: from .results_manager import ResultsManager - + # Initialize results manager robot_name = self.calib_config.get("robot_name", self.model.name) results_manager = ResultsManager('optimal_calibration', robot_name) - + # Prepare results dictionary results_dict = { 'optimal_configurations': self.optimal_configurations, @@ -748,27 +780,27 @@ def save_results(self, output_dir="results"): 'configuration_count': len(self.optimal_configurations), 'calibration_config': self.calib_config } - + # Add condition number if available if hasattr(self, 'detroot_whole'): results_dict['condition_number'] = float(self.detroot_whole) - + # Save using unified manager saved_files = results_manager.save_results( results_dict, output_dir, save_formats=['yaml', 'csv'] ) - + return saved_files - + except ImportError: # Fallback to existing saving import os import yaml - + os.makedirs(output_dir, exist_ok=True) - + robot_name = self.calib_config.get("robot_name", self.model.name) filename = f"{robot_name}_optimal_configurations.yaml" @@ -781,9 +813,9 @@ def save_results(self, output_dir="results"): default_flow_style=True, ) except yaml.YAMLError as exc: - print(exc) - print(f"Results saved to {output_dir}/{filename}") - + logger.error(exc) + logger.info(f"Results saved to {output_dir}/{filename}") + return { 'yaml': os.path.join(output_dir, filename) } @@ -851,7 +883,7 @@ def solve(self): w_list = [] for i in range(self.w.dim): w_list.append(float(self.w.value[i])) - print("sum of all element in vector solution: ", sum(w_list)) + logger.info(f"sum of all element in vector solution: {sum(w_list)}") # to dict w_dict = dict(zip(np.arange(self.calib_config["NbSample"]), w_list)) @@ -1089,4 +1121,4 @@ def main_algo(self): fin_set = set(cur_set) self.opt_critD.append(opt_critD) - return self.opt_critD \ No newline at end of file + return self.opt_critD diff --git a/src/figaroh/optimal/base_optimal_trajectory.py b/src/figaroh/optimal/base_optimal_trajectory.py index 8d8bba5..a5c2807 100644 --- a/src/figaroh/optimal/base_optimal_trajectory.py +++ b/src/figaroh/optimal/base_optimal_trajectory.py @@ -21,245 +21,32 @@ IPOPT-based optimization. This framework can be extended for different robots. """ -import yaml import logging +from abc import abstractmethod import numpy as np from matplotlib import pyplot as plt -from yaml.loader import SafeLoader from typing import Dict, List, Tuple, Any -from figaroh.identification.identification_tools import get_standard_parameters +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + from figaroh.tools.regressor import ( build_regressor_basic, build_regressor_reduced, - get_index_eliminate, ) -from figaroh.tools.qrdecomposition import get_baseIndex, build_baseRegressor -from figaroh.tools.robotcollisions import CollisionWrapper -from figaroh.identification.identification_tools import get_param_from_yaml +from figaroh.tools.qrdecomposition import build_baseRegressor from figaroh.utils.cubic_spline import ( - CubicSpline, WaypointsGeneration, calc_torque + CubicSpline, + WaypointsGeneration, + calc_torque, ) from figaroh.tools.robotipopt import ( IPOPTConfig, BaseOptimizationProblem, RobotIPOPTSolver ) - - -class ConfigurationManager: - """Manages configuration loading and validation.""" - - @staticmethod - def load_from_yaml(config_file: str) -> Tuple[Dict[str, Any], Any]: - """Load trajectory parameters from YAML file.""" - try: - with open(config_file, "r") as f: - config = yaml.load(f, Loader=SafeLoader) - - identif_data = config["identification"] - traj_params = identif_data.get("trajectory_params", [{}])[0] - - # Set default values if not present in config - trajectory_config = { - 'n_wps': traj_params.get("n_wps", 5), - 'freq': traj_params.get("freq", 100), - 't_s': traj_params.get("t_s", 2.0), - 'soft_lim': traj_params.get("soft_lim", 0.05), - 'max_attempts': traj_params.get("max_attempts", 1000) - } - - return trajectory_config, identif_data - - except FileNotFoundError: - raise FileNotFoundError( - f"Configuration file {config_file} not found" - ) - except KeyError as e: - raise ValueError(f"Missing required configuration key: {e}") - except Exception as e: - raise ValueError(f"Error loading configuration: {e}") - - -class BaseParameterComputer: - """Handles base parameter computation and indexing.""" - - def __init__(self, robot, identif_config, active_joints, soft_lim_pool): - self.robot = robot - self.identif_config = identif_config - self.active_joints = active_joints - self.soft_lim_pool = soft_lim_pool - - def compute_base_indices(self) -> Tuple[np.ndarray, np.ndarray]: - """Compute base parameter indices from random trajectory.""" - logging.info( - "Computing base parameter indices from random trajectory..." - ) - - try: - # Generate random trajectory for base parameter computation - n_wps_r = 100 - freq_r = 100 - CB_r = CubicSpline(self.robot, n_wps_r, self.active_joints) - WP_r = WaypointsGeneration(self.robot, n_wps_r, self.active_joints) - WP_r.gen_rand_pool(self.soft_lim_pool) - - # Generate waypoints and trajectory - wps_r, vel_wps_r, acc_wps_r = WP_r.gen_rand_wp() - tps_r = np.matrix([0.5 * i for i in range(n_wps_r)]).transpose() - t_r, p_r, v_r, a_r = CB_r.get_full_config( - freq_r, tps_r, wps_r, vel_wps_r, acc_wps_r - ) - - # Compute base indices - idx_e, idx_b = self._get_idx_from_random(p_r, v_r, a_r) - logging.info(f"Computed {len(idx_b)} base parameters") - - return idx_e, idx_b - - except Exception as e: - logging.error(f"Error computing base indices: {e}") - raise - - def _get_idx_from_random(self, q, v, a) -> Tuple[np.ndarray, np.ndarray]: - """Get indices of eliminate and base parameters.""" - W = build_regressor_basic(self.robot, q, v, a, self.identif_config) - params_std = get_standard_parameters( - self.robot.model, self.identif_config - ) - idx_e_, par_r_ = get_index_eliminate(W, params_std, tol_e=0.001) - # Convert to numpy arrays - idx_e_ = np.array(idx_e_, dtype=int) - W_e_ = build_regressor_reduced(W, idx_e_) - idx_base_ = get_baseIndex(W_e_, par_r_) - idx_base_ = np.array(idx_base_, dtype=int) - - return idx_e_, idx_base_ - return idx_e_, idx_base_ - - -class TrajectoryConstraintManager: - """Manages trajectory constraints and bounds.""" - - def __init__(self, robot, CB, traj_params, identif_config): - self.robot = robot - self.CB = CB - self.n_wps = traj_params['n_wps'] - self.freq = traj_params['freq'] - self.identif_config = identif_config - self.collision_wrapper = CollisionWrapper(robot=robot, viz=None) - - def get_variable_bounds(self) -> Tuple[List[float], List[float]]: - """Get variable bounds for optimization.""" - lb, ub = [], [] - for i in range(1, self.n_wps): - lb.extend(self.CB.lower_q) - ub.extend(self.CB.upper_q) - return lb, ub - - def get_constraint_bounds(self, Ns: int) -> Tuple[List, List]: - """Get constraint bounds for optimization.""" - cl, cu = [], [] - - # Position constraint bounds - for i in range(1, self.n_wps): - cl.extend(self.CB.lower_q) - cu.extend(self.CB.upper_q) - - # Velocity constraint bounds - for j in range(Ns): - cl.extend(self.CB.lower_dq) - cu.extend(self.CB.upper_dq) - - # Torque constraint bounds - for j in range(Ns): - cl.extend(self.CB.lower_effort) - cu.extend(self.CB.upper_effort) - - # Collision constraint bounds - n_cols = len(self.robot.geom_model.collisionPairs) - cl.extend([0.01] * n_cols * (self.n_wps - 1)) # 1 cm margin - cu.extend([2 * 1e19] * n_cols * (self.n_wps - 1)) # no upper limit - - return cl, cu - - def evaluate_constraints(self, Ns: int, X: np.ndarray, opt_cb: Dict, - tps, vel_wps, acc_wps, wp_init) -> np.ndarray: - """Evaluate all constraints for optimization.""" - try: - # Reshape and arrange waypoints - X = np.array(X) - wps_X = np.reshape(X, (self.n_wps - 1, len(self.CB.act_idxq))) - wps = np.vstack((wp_init, wps_X)) - wps = wps.transpose() - - # Generate full trajectory configuration - t_f, p_f, v_f, a_f = self.CB.get_full_config( - self.freq, tps, wps, vel_wps, acc_wps - ) - - # Compute joint torques - tau = calc_torque( - p_f.shape[0], self.robot, p_f, v_f, a_f - ) - - # Evaluate individual constraint types - q_constraints = self._evaluate_position_constraints(p_f, tps, t_f) - v_constraints = self._evaluate_velocity_constraints(v_f) - tau_constraints = self._evaluate_torque_constraints(tau, Ns) - collision_constraints = self._evaluate_collision_constraints( - p_f, tps, t_f - ) - - # Concatenate all constraints - return np.concatenate( - (q_constraints, v_constraints, tau_constraints, - collision_constraints), - axis=None - ) - - except Exception as e: - logging.error(f"Error evaluating constraints: {e}") - raise - - def _evaluate_position_constraints(self, p_f, tps, t_f) -> np.ndarray: - """Evaluate position constraints at waypoints.""" - idx_waypoints = self._get_waypoint_indices(tps, t_f) - q_constraints = p_f[idx_waypoints, :] - return q_constraints[:, self.CB.act_idxq] - - def _evaluate_velocity_constraints(self, v_f) -> np.ndarray: - """Evaluate velocity constraints at all samples.""" - return v_f[:, self.CB.act_idxv] - - def _evaluate_torque_constraints(self, tau, Ns) -> np.ndarray: - """Evaluate torque constraints at all samples.""" - tau_constraints = np.zeros((Ns, len(self.CB.act_idxv))) - for k in range(len(self.CB.act_idxv)): - tau_constraints[:, k] = tau[ - range(self.CB.act_idxv[k] * Ns, (self.CB.act_idxv[k] + 1) * Ns) - ] - return tau_constraints - - def _evaluate_collision_constraints(self, p_f, tps, t_f) -> np.ndarray: - """Evaluate collision constraints.""" - idx_waypoints = self._get_waypoint_indices(tps, t_f) - dist_all = [] - for j in idx_waypoints: - self.collision_wrapper.computeCollisions(p_f[j, :]) - dist_all = np.append(dist_all, self.collision_wrapper.getDistances()) - return np.asarray(dist_all) - - def _get_waypoint_indices(self, tps, t_f) -> List[int]: - """Get indices corresponding to waypoint times.""" - idx_waypoints = [] - time_points = tps[range(1, self.n_wps), :] - time_points_flat = np.array(time_points).flatten() - - for i in range(t_f.shape[0]): - t_val = float(t_f[i, 0]) if hasattr(t_f[i, 0], 'item') else t_f[i, 0] - if t_val in time_points_flat: - idx_waypoints.append(i) - - return idx_waypoints +from figaroh.optimal.config import load_param +from figaroh.optimal.base_parameter import BaseParameterComputer +from figaroh.optimal.contraints import TrajectoryConstraintManager class BaseOptimalTrajectory: @@ -275,269 +62,317 @@ class BaseOptimalTrajectory: This base class can be extended for specific robots by implementing robot-specific configuration loading and constraint handling. """ - + def __init__(self, robot, active_joints: List[str], config_file: str = "config/robot_config.yaml"): """Initialize the optimal trajectory generator.""" self.robot = robot + self.model = self.robot.model self.active_joints = active_joints - - # Set up logging - logging.basicConfig(level=logging.INFO) + + # Set up logger (configuration should be done by application, not library) self.logger = logging.getLogger(__name__) - + # Load configuration - self.traj_params, identif_data = ConfigurationManager.load_from_yaml(config_file) - self.identif_config = get_param_from_yaml(robot, identif_data) - - # Initialize components - self._initialize_components() - - # Compute base parameters - self.idx_e, self.idx_b = self.base_computer.compute_base_indices() - + self.trajectory_config, self.identif_config = load_param( + self.robot, config_file + ) + + # # Initialize components + # self.initialize() + # Results storage self.results = { 'T_F': [], 'P_F': [], 'V_F': [], 'A_F': [], 'iteration_data': [], 'final_regressor_shape': None } - - self.logger.info(f"BaseOptimalTrajectory initialized with {len(self.idx_b)} base parameters") - - def _initialize_components(self): + + def initialize(self): """Initialize trajectory generation components.""" # Create soft limit pool n_active_joints = len(self.active_joints) - self.soft_lim_pool = np.full((3, n_active_joints), self.traj_params['soft_lim']) - + self.soft_lim_pool = np.full( + (3, n_active_joints), self.trajectory_config["soft_lim"] + ) + # Initialize cubic spline and waypoint generation self.CB = CubicSpline( - self.robot, self.traj_params['n_wps'], - self.active_joints, self.traj_params['soft_lim'] + self.robot, + self.trajectory_config["n_wps"], + self.active_joints, + self.trajectory_config["soft_lim"], ) self.WP = WaypointsGeneration( - self.robot, self.traj_params['n_wps'], - self.active_joints, self.traj_params['soft_lim'] + self.robot, + self.trajectory_config["n_wps"], + self.active_joints, + self.trajectory_config["soft_lim"], ) - + # Initialize specialized components self.base_computer = BaseParameterComputer( self.robot, self.identif_config, self.active_joints, self.soft_lim_pool ) self.constraint_manager = TrajectoryConstraintManager( - self.robot, self.CB, self.traj_params, self.identif_config + self.robot, self.CB, self.trajectory_config, self.identif_config ) - - def build_base_regressor(self, q, v, a, W_stack=None) -> np.ndarray: - """Build base regressor matrix.""" + + # Compute base parameters + self.idx_e, self.idx_b = self.base_computer.compute_base_indices() + + self.logger.info(f"BaseOptimalTrajectory initialized with {len(self.idx_b)} base parameters") + + def solve(self, stack_reps: int = 2) -> Dict[str, Any]: + """ + Solve the optimal trajectory generation problem. + + Args: + stack_reps: Number of trajectory segments to stack + + Returns: + Dict containing trajectories and optimization info + """ + self.logger.info( + f"Starting optimal trajectory generation with {stack_reps} segments..." + ) + try: - W = build_regressor_basic(self.robot, q, v, a, self.identif_config) - W_e_ = build_regressor_reduced(W, self.idx_e) - W_b_ = build_baseRegressor(W_e_, self.idx_b) - - if isinstance(W_stack, np.ndarray): - W_b_ = np.vstack((W_stack, W_b_)) - - return W_b_ + # Initialize + self.WP.gen_rand_pool(self.soft_lim_pool) + wp_init = np.zeros(len(self.CB.act_idxq)) + vel_wp_init = np.zeros(len(self.CB.act_idxv)) + acc_wp_init = np.zeros(len(self.CB.act_idxv)) + + # Random initial position + for idx in range(len(self.CB.act_idxq)): + wp_init[idx] = np.random.choice(self.WP.pool_q[:, idx], 1)[0] + + W_stack = None + + for s_rep in range(stack_reps): + self.logger.info( + f"Optimizing segment {s_rep + 1}/{stack_reps}" + ) + self.logger.info(f"Initial waypoint: {wp_init}") + + success = self._solve_segment( + s_rep, wp_init, vel_wp_init, acc_wp_init, W_stack + ) + + if not success: + self.logger.error(f"Failed to solve segment {s_rep + 1}") + break + + # Update for next segment + if s_rep < stack_reps - 1: # Not the last segment + wp_init, W_stack = self._prepare_next_segment() + + self.logger.info( + f"Completed! Generated {len(self.results['T_F'])} trajectory segments" + ) + self.results["final_regressor_shape"] = ( + W_stack.shape if W_stack is not None else None + ) + + return self.results + except Exception as e: - self.logger.error(f"Error building base regressor: {e}") + self.logger.error(f"Error in solve: {e}") raise - - def objective_function(self, X, opt_cb, tps, vel_wps, acc_wps, wp_init, W_stack=None): + + def objective_function( + self, X, opt_cb, tps, vel_wps, acc_wps, wp_init, W_stack=None + ): """Objective function: condition number of base regressor matrix.""" try: # Reshape and arrange waypoints X = np.array(X) - wps_X = np.reshape(X, (self.traj_params['n_wps'] - 1, len(self.active_joints))) + wps_X = np.reshape( + X, + (self.trajectory_config["n_wps"] - 1, len(self.active_joints)), + ) wps = np.vstack((wp_init, wps_X)) wps = wps.transpose() - + # Generate full trajectory configuration t_f, p_f, v_f, a_f = self.CB.get_full_config( - self.traj_params['freq'], tps, wps, vel_wps, acc_wps + self.trajectory_config["freq"], tps, wps, vel_wps, acc_wps ) - + # Store in callback dictionary opt_cb.update({"t_f": t_f, "p_f": p_f, "v_f": v_f, "a_f": a_f}) - + # Build stacked base regressor and return condition number - W_b = self.build_base_regressor(p_f, v_f, a_f, W_stack=W_stack) + W_b = self._stack_base_regressors(p_f, v_f, a_f, W_stack=W_stack) return np.linalg.cond(W_b) - + except Exception as e: self.logger.error(f"Error in objective function: {e}") return 1e10 # Return large penalty value - - def generate_feasible_initial_guess(self, wp_init, vel_wp_init, acc_wp_init): + + @abstractmethod + def create_ipopt_problem( + self, + n_joints, + n_wps, + Ns, + tps, + vel_wps, + acc_wps, + wp_init, + vel_wp_init, + acc_wp_init, + W_stack, + ): + """Create IPOPT problem instance. Should be implemented by subclasses.""" + raise NotImplementedError( + "Subclasses must implement create_ipopt_problem" + ) + + def _stack_base_regressors(self, q, v, a, W_stack=None) -> np.ndarray: + """Build base regressor matrix.""" + try: + W = build_regressor_basic(self.robot, q, v, a, self.identif_config) + W_e_ = build_regressor_reduced(W, self.idx_e) + W_b_ = build_baseRegressor(W_e_, self.idx_b) + + if isinstance(W_stack, np.ndarray): + W_b_ = np.vstack((W_stack, W_b_)) + + return W_b_ + except Exception as e: + self.logger.error(f"Error building base regressor: {e}") + raise + + def _generate_feasible_initial_guess(self, wp_init, vel_wp_init, acc_wp_init): """Generate a feasible initial guess for optimization.""" self.logger.info("Generating feasible initial trajectory...") - + count = 0 is_constr_violated = True - - while is_constr_violated and count < self.traj_params['max_attempts']: + + while ( + is_constr_violated + and count < self.trajectory_config["max_attempts"] + ): count += 1 if count % 100 == 0: self.logger.info(f"Attempt {count} to find feasible initial trajectory...") - + try: # Generate random waypoints wps, vel_wps, acc_wps = self.WP.gen_rand_wp(wp_init, vel_wp_init, acc_wp_init) - + # Generate time points - tps = np.matrix([ - self.traj_params['t_s'] * i_wp - for i_wp in range(self.traj_params['n_wps']) - ]).transpose() - + tps = np.matrix( + [ + self.trajectory_config["t_s"] * i_wp + for i_wp in range(self.trajectory_config["n_wps"]) + ] + ).transpose() + # Get full configuration t_i, p_i, v_i, a_i = self.CB.get_full_config( - self.traj_params['freq'], tps, wps, vel_wps, acc_wps + self.trajectory_config["freq"], tps, wps, vel_wps, acc_wps ) - + # Compute torques and check constraints tau_i = calc_torque( p_i.shape[0], self.robot, p_i, v_i, a_i ) tau_i = np.reshape(tau_i, (v_i.shape[1], v_i.shape[0])).transpose() is_constr_violated = self.CB.check_cfg_constraints(p_i, v_i, tau_i) - + except Exception as e: self.logger.warning(f"Error in attempt {count}: {e}") continue - - if count >= self.traj_params['max_attempts']: + + if count >= self.trajectory_config["max_attempts"]: self.logger.warning( - f"Could not find feasible initial trajectory after {self.traj_params['max_attempts']} attempts" + f"Could not find feasible initial trajectory after {self.trajectory_config['max_attempts']} attempts" ) else: self.logger.info(f"Found feasible initial trajectory after {count} attempts") - + return wps, vel_wps, acc_wps, tps, t_i, p_i, v_i, a_i - - def solve(self, stack_reps: int = 2) -> Dict[str, Any]: - """ - Solve the optimal trajectory generation problem. - - Args: - stack_reps: Number of trajectory segments to stack - - Returns: - Dict containing trajectories and optimization info - """ - self.logger.info(f"Starting optimal trajectory generation with {stack_reps} segments...") - - try: - # Initialize - self.WP.gen_rand_pool(self.soft_lim_pool) - wp_init = np.zeros(len(self.CB.act_idxq)) - vel_wp_init = np.zeros(len(self.CB.act_idxv)) - acc_wp_init = np.zeros(len(self.CB.act_idxv)) - - # Random initial position - for idx in range(len(self.CB.act_idxq)): - wp_init[idx] = np.random.choice(self.WP.pool_q[:, idx], 1)[0] - - W_stack = None - - for s_rep in range(stack_reps): - self.logger.info(f"Optimizing segment {s_rep + 1}/{stack_reps}") - self.logger.info(f"Initial waypoint: {wp_init}") - - success = self._solve_segment(s_rep, wp_init, vel_wp_init, acc_wp_init, W_stack) - - if not success: - self.logger.error(f"Failed to solve segment {s_rep + 1}") - break - - # Update for next segment - if s_rep < stack_reps - 1: # Not the last segment - wp_init, W_stack = self._prepare_next_segment() - - self.logger.info(f"Completed! Generated {len(self.results['T_F'])} trajectory segments") - self.results['final_regressor_shape'] = W_stack.shape if W_stack is not None else None - - return self.results - - except Exception as e: - self.logger.error(f"Error in solve: {e}") - raise - + def _solve_segment(self, s_rep, wp_init, vel_wp_init, acc_wp_init, W_stack) -> bool: """Solve a single trajectory segment.""" try: # Generate feasible initial guess - wps, vel_wps, acc_wps, tps, t_i, p_i, v_i, a_i = self.generate_feasible_initial_guess( + wps, vel_wps, acc_wps, tps, t_i, p_i, v_i, a_i = self._generate_feasible_initial_guess( wp_init, vel_wp_init, acc_wp_init ) - + # Adjust time points for stacking - tps = self.traj_params['t_s'] * s_rep + tps - + tps = self.trajectory_config["t_s"] * s_rep + tps + # Create and solve IPOPT problem - This should be implemented by subclasses - problem = self._create_ipopt_problem( - len(self.active_joints), self.traj_params['n_wps'], - p_i.shape[0], tps, vel_wps, acc_wps, wp_init, vel_wp_init, - acc_wp_init, W_stack + problem = self.create_ipopt_problem( + len(self.active_joints), + self.trajectory_config["n_wps"], + p_i.shape[0], + tps, + vel_wps, + acc_wps, + wp_init, + vel_wp_init, + acc_wp_init, + W_stack, ) - + success, result_data = problem.solve_with_waypoints(wps) - + if success: self.results['T_F'].append(result_data['t_f']) - self.results['P_F'].append(result_data['p_f'][:, self.CB.act_idxq]) - self.results['V_F'].append(result_data['v_f'][:, self.CB.act_idxv]) - self.results['A_F'].append(result_data['a_f'][:, self.CB.act_idxv]) + self.results['P_F'].append(result_data['p_f']) + self.results['V_F'].append(result_data['v_f']) + self.results['A_F'].append(result_data['a_f']) self.results['iteration_data'].append(result_data['iter_data']) self.logger.info(f"Segment {s_rep + 1} completed successfully!") return True else: return False - + except Exception as e: self.logger.error(f"Error solving segment {s_rep + 1}: {e}") return False - - def _create_ipopt_problem(self, n_joints, n_wps, Ns, tps, vel_wps, acc_wps, - wp_init, vel_wp_init, acc_wp_init, W_stack): - """Create IPOPT problem instance. Should be implemented by subclasses.""" - raise NotImplementedError("Subclasses must implement _create_ipopt_problem") - + def _prepare_next_segment(self) -> Tuple[np.ndarray, np.ndarray]: """Prepare initial conditions for next segment.""" # Get last result last_result = self.results['iteration_data'][-1] wp_init = last_result['final_waypoint'] - + # Stack regressor last_p_f = self.results['P_F'][-1] last_v_f = self.results['V_F'][-1] last_a_f = self.results['A_F'][-1] - + # Convert back to full configuration for regressor building # This is a simplified version - in practice you'd need to handle this more carefully - W_stack = self.build_base_regressor(last_p_f, last_v_f, last_a_f) - + W_stack = self._stack_base_regressors(last_p_f, last_v_f, last_a_f) + return wp_init, W_stack - + def plot_results(self): """Plot optimal trajectory results using unified results manager.""" if not self.results['T_F']: self.logger.warning("No trajectory data to plot") return - + try: from .results_manager import ResultsManager - + # Initialize results manager robot_name = getattr(self, 'robot_name', self.robot.model.name) results_manager = ResultsManager('optimal_trajectory', robot_name) - + # Calculate overall condition number condition_number = getattr(self, 'final_condition_number', 0.0) if condition_number == 0.0 and hasattr(self, 'results') and 'condition_numbers' in self.results: condition_number = self.results['condition_numbers'][-1] if self.results['condition_numbers'] else 0.0 - + # Plot using unified manager results_manager.plot_optimal_trajectory_results( trajectories=self.results, @@ -545,7 +380,7 @@ def plot_results(self): joint_names=[f"Joint {i+1}" for i in range(len(self.CB.act_Jid))], title="Optimal Trajectory Generation Results" ) - + except ImportError: # Fallback to existing plotting try: @@ -554,65 +389,65 @@ def plot_results(self): fig, axes = plt.subplots(n_joints, 3, sharex=True, figsize=(15, 2*n_joints)) if n_joints == 1: axes = axes.reshape(1, -1) - + fig.suptitle('Optimal Trajectory Results', fontsize=16) - + # Plot each segment colors = plt.cm.tab10(np.linspace(0, 1, len(self.results['T_F']))) - + for seg_idx, (T, P, V, A) in enumerate(zip( self.results['T_F'], self.results['P_F'], self.results['V_F'], self.results['A_F'] )): color = colors[seg_idx] label = f'Segment {seg_idx + 1}' - + for joint_idx in range(n_joints): axes[joint_idx, 0].plot(T, P[:, joint_idx], color=color, label=label) axes[joint_idx, 1].plot(T, V[:, joint_idx], color=color, label=label) axes[joint_idx, 2].plot(T, A[:, joint_idx], color=color, label=label) - + # Set labels and formatting for joint_idx in range(n_joints): axes[joint_idx, 0].set_ylabel(f'Joint {joint_idx+1}\nPosition (rad)') axes[joint_idx, 1].set_ylabel(f'Joint {joint_idx+1}\nVelocity (rad/s)') axes[joint_idx, 2].set_ylabel(f'Joint {joint_idx+1}\nAcceleration (rad/s²)') - + if joint_idx == 0: for col in range(3): axes[joint_idx, col].legend() - + for col in range(3): axes[joint_idx, col].grid(True, alpha=0.3) - + axes[-1, 0].set_xlabel('Time (s)') axes[-1, 1].set_xlabel('Time (s)') axes[-1, 2].set_xlabel('Time (s)') - + plt.tight_layout() plt.show() - + except Exception as e: self.logger.error(f"Error plotting results: {e}") - + def save_results(self, output_dir="results"): """Save optimal trajectory results using unified results manager.""" if not self.results['T_F']: self.logger.warning("No trajectory data to save") return - + try: from .results_manager import ResultsManager - + # Initialize results manager robot_name = getattr(self, 'robot_name', self.robot.model.name) results_manager = ResultsManager('optimal_trajectory', robot_name) - + # Calculate overall condition number condition_number = getattr(self, 'final_condition_number', 0.0) if condition_number == 0.0 and hasattr(self, 'results') and 'condition_numbers' in self.results: condition_number = self.results['condition_numbers'][-1] if self.results['condition_numbers'] else 0.0 - + # Prepare results dictionary results_dict = { 'trajectory_segments': len(self.results['T_F']), @@ -624,42 +459,42 @@ def save_results(self, output_dir="results"): 'velocity_segments': [v.tolist() for v in self.results['V_F']], 'acceleration_segments': [a.tolist() for a in self.results['A_F']] } - + # Add condition number history if available if 'condition_numbers' in self.results: results_dict['condition_number_history'] = [float(c) for c in self.results['condition_numbers']] - + # Save using unified manager saved_files = results_manager.save_results( results_dict, output_dir, save_formats=['yaml', 'npz'] ) - + self.logger.info(f"Trajectory results saved successfully") return saved_files - + except ImportError: # Fallback to basic saving import os import yaml - + os.makedirs(output_dir, exist_ok=True) - + # Basic results dictionary robot_name = getattr(self, 'robot_name', self.robot.model.name) filename = f"{robot_name}_optimal_trajectory.yaml" - + condition_number = getattr(self, 'final_condition_number', 0.0) results_dict = { 'trajectory_segments': len(self.results['T_F']), 'condition_number': float(condition_number), 'joint_count': len(self.CB.act_Jid) } - + with open(os.path.join(output_dir, filename), 'w') as f: yaml.dump(results_dict, f, default_flow_style=False) - + self.logger.info(f"Basic results saved to {output_dir}/{filename}") return {'yaml': os.path.join(output_dir, filename)} diff --git a/src/figaroh/optimal/base_parameter.py b/src/figaroh/optimal/base_parameter.py new file mode 100644 index 0000000..57d0967 --- /dev/null +++ b/src/figaroh/optimal/base_parameter.py @@ -0,0 +1,113 @@ +# Copyright [2021-2025] Thanh Nguyen +# Copyright [2022-2023] [CNRS, Toward SAS] + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import numpy as np +from typing import Dict, List, Tuple, Any + +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + +from figaroh.identification.identification_tools import get_standard_parameters +from figaroh.utils.cubic_spline import ( + CubicSpline, + WaypointsGeneration, +) +from figaroh.tools.qrdecomposition import get_baseIndex +from figaroh.tools.regressor import ( + build_regressor_basic, + build_regressor_reduced, + get_index_eliminate, +) +from figaroh.identification.parameter import ( + add_standard_additional_parameters, + add_custom_parameters +) + + +class BaseParameterComputer: + """Handles base parameter computation and indexing.""" + + def __init__(self, robot, identif_config, active_joints, soft_lim_pool): + self.robot = robot + self.model = self.robot.model + self.standard_parameter = None + self.identif_config = identif_config + self.active_joints = active_joints + self.soft_lim_pool = soft_lim_pool + + def compute_base_indices(self) -> Tuple[np.ndarray, np.ndarray]: + """Compute base parameter indices from random trajectory.""" + logging.info( + "Computing base parameter indices from random trajectory..." + ) + + try: + # Generate random trajectory for base parameter computation + n_wps_r = 100 + freq_r = 100 + CB_r = CubicSpline(self.robot, n_wps_r, self.active_joints) + WP_r = WaypointsGeneration(self.robot, n_wps_r, self.active_joints) + WP_r.gen_rand_pool(self.soft_lim_pool) + + # Generate waypoints and trajectory + wps_r, vel_wps_r, acc_wps_r = WP_r.gen_rand_wp() + tps_r = np.matrix([0.5 * i for i in range(n_wps_r)]).transpose() + t_r, p_r, v_r, a_r = CB_r.get_full_config( + freq_r, tps_r, wps_r, vel_wps_r, acc_wps_r + ) + + # Compute base indices + idx_e, idx_b = self._get_idx_from_random(p_r, v_r, a_r) + logging.info(f"Computed {len(idx_b)} base parameters") + + return idx_e, idx_b + + except Exception as e: + logging.error(f"Error computing base indices: {e}") + raise + + def _get_idx_from_random(self, q, v, a) -> Tuple[np.ndarray, np.ndarray]: + """Get indices of eliminate and base parameters.""" + W = build_regressor_basic(self.robot, q, v, a, self.identif_config) + self.standard_parameter = get_standard_parameters( + self.robot.model, self.identif_config + ) + # additional parameters can be added in robot-specific subclass + if ( + self.identif_config.get("has_friction", False) + or self.identif_config.get("has_actuator_inertia", False) + or self.identif_config.get("has_joint_offset", False) + ): + self.additional_parameters = add_standard_additional_parameters( + self.model, self.identif_config + ) + self.standard_parameter.update(self.additional_parameters) + + # Add custom parameters specific to the robot + if self.identif_config.get("has_custom_parameters", False): + self.custom_parameters = add_custom_parameters( + self.model, self.identif_config.get("custom_parameters", {}) + ) + self.standard_parameter.update(self.custom_parameters) + idx_e_, par_r_ = get_index_eliminate(W, self.standard_parameter, tol_e=0.001) + # Convert to numpy arrays + idx_e_ = np.array(idx_e_, dtype=int) + W_e_ = build_regressor_reduced(W, idx_e_) + idx_base_ = get_baseIndex(W_e_, par_r_) + idx_base_ = np.array(idx_base_, dtype=int) + + return idx_e_, idx_base_ diff --git a/src/figaroh/optimal/config.py b/src/figaroh/optimal/config.py new file mode 100644 index 0000000..d1c0cc3 --- /dev/null +++ b/src/figaroh/optimal/config.py @@ -0,0 +1,108 @@ +# Copyright [2021-2025] Thanh Nguyen +# Copyright [2022-2023] [CNRS, Toward SAS] + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import yaml +from typing import Dict, List, Tuple, Any +from yaml.loader import SafeLoader + +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + +from figaroh.identification.identification_tools import ( + get_param_from_yaml as get_identification_param_from_yaml, + unified_to_legacy_identif_config, +) +from figaroh.utils.config_parser import ( + UnifiedConfigParser, + create_task_config, + is_unified_config, +) + + +class ConfigurationManager: + """Manages configuration loading and validation.""" + + +@staticmethod +def load_param(robot, config_file: str) -> Tuple[Dict[str, Any], Any]: + """Load trajectory parameters from YAML file.""" + + try: + logger.info(f"Loading config from {config_file}") + + # Check if this is a unified configuration format + if is_unified_config(config_file): + logger.info("Detected unified configuration format") + # Use unified parser + parser = UnifiedConfigParser(config_file) + unified_config = parser.parse() + unified_identif_config = create_task_config( + robot, unified_config, "identification" + ) + # Convert unified format to identif_config format + identif_config = unified_to_legacy_identif_config( + robot, unified_identif_config + ) + unified_traj_config = create_task_config( + robot, unified_config, "optimal_trajectory" + ) + trajectory_config = create_config(unified_traj_config) + + else: + logger.info("Detected legacy configuration format") + # Use legacy format parsing + with open(config_file, "r") as f: + config = yaml.load(f, Loader=yaml.SafeLoader) + identif_config = get_identification_param_from_yaml( + robot, config["identification"] + ) + traj_params = config["identification"].get("trajectory_params", [{}])[0] + + # Set default values if not present in config + trajectory_config = { + "n_wps": traj_params.get("n_wps", 5), + "freq": traj_params.get("freq", 100), + "t_s": traj_params.get("t_s", 2.0), + "soft_lim": traj_params.get("soft_lim", 0.05), + "max_attempts": traj_params.get("max_attempts", 1000), + } + return trajectory_config, identif_config + + except FileNotFoundError: + raise FileNotFoundError( + f"Configuration file {config_file} not found" + ) + except KeyError as e: + raise ValueError(f"Missing required configuration key: {e}") + except Exception as e: + raise ValueError(f"Error loading configuration: {e}") + + +def create_config(unified_traj_config) -> dict: + + problem_params = unified_traj_config.get("problem", {}) + traj_params = unified_traj_config.get("trajectory", {}) + constraint_params = unified_traj_config.get("constraints", {}) + output_params = unified_traj_config.get("output", {}) + trajectory_config = { + "n_wps": traj_params.get("waypoints", 5), + "freq": traj_params.get("frequency", 100), + "t_s": traj_params.get("segment_duration", 2.0), + "soft_lim": problem_params.get("soft_lim", 0.05), + "max_attempts": problem_params.get("max_attempts", 1000), + } + return trajectory_config diff --git a/src/figaroh/optimal/contraints.py b/src/figaroh/optimal/contraints.py new file mode 100644 index 0000000..a3ea3f2 --- /dev/null +++ b/src/figaroh/optimal/contraints.py @@ -0,0 +1,165 @@ +# Copyright [2021-2025] Thanh Nguyen +# Copyright [2022-2023] [CNRS, Toward SAS] + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import numpy as np +from typing import Dict, List, Tuple, Any + +from figaroh.tools.robotcollisions import CollisionWrapper +from figaroh.utils.cubic_spline import calc_torque + +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +class TrajectoryConstraintManager: + """Manages trajectory constraints and bounds.""" + + def __init__(self, robot, CB, trajectory_config, identif_config): + self.robot = robot + self.CB = CB + self.n_wps = trajectory_config["n_wps"] + self.freq = trajectory_config["freq"] + self.identif_config = identif_config + self.collision_wrapper = CollisionWrapper(robot=robot, viz=None) + + def get_variable_bounds(self) -> Tuple[List[float], List[float]]: + """Get variable bounds for optimization.""" + lb, ub = [], [] + for i in range(1, self.n_wps): + lb.extend(self.CB.lower_q) + ub.extend(self.CB.upper_q) + return lb, ub + + def get_constraint_bounds(self, Ns: int) -> Tuple[List, List]: + """Get constraint bounds for optimization.""" + cl, cu = [], [] + + # Position constraint bounds + for i in range(1, self.n_wps): + cl.extend(self.CB.lower_q) + cu.extend(self.CB.upper_q) + + # Velocity constraint bounds + for j in range(Ns): + cl.extend(self.CB.lower_dq) + cu.extend(self.CB.upper_dq) + + # Torque constraint bounds + for j in range(Ns): + cl.extend(self.CB.lower_effort) + cu.extend(self.CB.upper_effort) + + # Collision constraint bounds + n_cols = len(self.robot.geom_model.collisionPairs) + cl.extend([0.01] * n_cols * (self.n_wps - 1)) # 1 cm margin + cu.extend([2 * 1e19] * n_cols * (self.n_wps - 1)) # no upper limit + + return cl, cu + + def evaluate_constraints( + self, + Ns: int, + X: np.ndarray, + opt_cb: Dict, + tps, + vel_wps, + acc_wps, + wp_init, + ) -> np.ndarray: + """Evaluate all constraints for optimization.""" + try: + # Reshape and arrange waypoints + X = np.array(X) + wps_X = np.reshape(X, (self.n_wps - 1, len(self.CB.act_idxq))) + wps = np.vstack((wp_init, wps_X)) + wps = wps.transpose() + + # Generate full trajectory configuration + t_f, p_f, v_f, a_f = self.CB.get_full_config( + self.freq, tps, wps, vel_wps, acc_wps + ) + + # Compute joint torques + tau = calc_torque(p_f.shape[0], self.robot, p_f, v_f, a_f) + + # Evaluate individual constraint types + q_constraints = self._evaluate_position_constraints(p_f, tps, t_f) + v_constraints = self._evaluate_velocity_constraints(v_f) + tau_constraints = self._evaluate_torque_constraints(tau, Ns) + collision_constraints = self._evaluate_collision_constraints( + p_f, tps, t_f + ) + + # Concatenate all constraints + return np.concatenate( + ( + q_constraints, + v_constraints, + tau_constraints, + collision_constraints, + ), + axis=None, + ) + + except Exception as e: + logging.error(f"Error evaluating constraints: {e}") + raise + + def _evaluate_position_constraints(self, p_f, tps, t_f) -> np.ndarray: + """Evaluate position constraints at waypoints.""" + idx_waypoints = self._get_waypoint_indices(tps, t_f) + q_constraints = p_f[idx_waypoints, :] + return q_constraints[:, self.CB.act_idxq] + + def _evaluate_velocity_constraints(self, v_f) -> np.ndarray: + """Evaluate velocity constraints at all samples.""" + return v_f[:, self.CB.act_idxv] + + def _evaluate_torque_constraints(self, tau, Ns) -> np.ndarray: + """Evaluate torque constraints at all samples.""" + tau_constraints = np.zeros((Ns, len(self.CB.act_idxv))) + for k in range(len(self.CB.act_idxv)): + tau_constraints[:, k] = tau[ + range(self.CB.act_idxv[k] * Ns, (self.CB.act_idxv[k] + 1) * Ns) + ] + return tau_constraints + + def _evaluate_collision_constraints(self, p_f, tps, t_f) -> np.ndarray: + """Evaluate collision constraints.""" + idx_waypoints = self._get_waypoint_indices(tps, t_f) + dist_all = [] + for j in idx_waypoints: + self.collision_wrapper.computeCollisions(p_f[j, :]) + dist_all = np.append( + dist_all, self.collision_wrapper.getDistances() + ) + return np.asarray(dist_all) + + def _get_waypoint_indices(self, tps, t_f) -> List[int]: + """Get indices corresponding to waypoint times.""" + idx_waypoints = [] + time_points = tps[range(1, self.n_wps), :] + time_points_flat = np.array(time_points).flatten() + + for i in range(t_f.shape[0]): + t_val = ( + float(t_f[i, 0]) if hasattr(t_f[i, 0], "item") else t_f[i, 0] + ) + if t_val in time_points_flat: + idx_waypoints.append(i) + + return idx_waypoints diff --git a/src/figaroh/tools/robotcollisions.py b/src/figaroh/tools/robotcollisions.py index a142a81..a2b7d7e 100644 --- a/src/figaroh/tools/robotcollisions.py +++ b/src/figaroh/tools/robotcollisions.py @@ -15,10 +15,15 @@ """Enhanced collision detection and visualization utilities.""" +import logging from typing import List, Optional, Tuple, Union import numpy as np import pinocchio as pin +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + class CollisionManager: """Enhanced collision detection with better performance and safety.""" @@ -106,11 +111,11 @@ def get_all_distances(self) -> np.ndarray: def print_collision_pairs(self) -> None: """Print all collision pair information.""" if not self.geom_model or not self.geom_data: - print("No geometry model available") + logger.warning("No geometry model available") return - print(f"Total collision pairs: {len(self.geom_model.collisionPairs)}") - print("-" * 60) + logger.info(f"Total collision pairs: {len(self.geom_model.collisionPairs)}") + logger.info("-" * 60) for k in range(len(self.geom_model.collisionPairs)): result = self.geom_data.collisionResults[k] @@ -120,7 +125,7 @@ def print_collision_pairs(self) -> None: name2 = self.geom_model.geometryObjects[pair.second].name status = "COLLISION" if result.isCollision() else "FREE" - print(f"Pair {k:3d}: {name1:20s} <-> {name2:20s} [{status}]") + logger.info(f"Pair {k:3d}: {name1:20s} <-> {name2:20s} [{status}]") def visualize_collisions(self, collision_details: Optional[List] = None) -> None: """Visualize collision contacts with enhanced display.""" diff --git a/src/figaroh/tools/robotipopt.py b/src/figaroh/tools/robotipopt.py index 67c9743..5a57578 100644 --- a/src/figaroh/tools/robotipopt.py +++ b/src/figaroh/tools/robotipopt.py @@ -166,6 +166,10 @@ def constraints(self, x): from dataclasses import dataclass, field from typing import Dict, List, Tuple, Optional, Any, Callable, Union +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + @dataclass class IPOPTConfig: diff --git a/src/figaroh/tools/solver.py b/src/figaroh/tools/solver.py index 65de43e..34ce6bf 100644 --- a/src/figaroh/tools/solver.py +++ b/src/figaroh/tools/solver.py @@ -25,11 +25,16 @@ - Iterative refinement for improved accuracy """ +import logging import numpy as np from scipy import linalg from scipy.optimize import minimize import warnings +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + class LinearSolver: """ @@ -471,28 +476,29 @@ def _compute_solution_quality(self, A, b, x): def _print_solution_info(self): """Print solution information.""" - print("\n" + "=" * 60) - print(f"Linear Solver: {self.method}") - print("=" * 60) + logger.info("") + logger.info("=" * 60) + logger.info(f"Linear Solver: {self.method}") + logger.info("=" * 60) if "condition_number" in self.solver_info: cond = self.solver_info["condition_number"] - print(f"Condition number: {cond:.2e}") + logger.info(f"Condition number: {cond:.2e}") if "rank" in self.solver_info: - print(f"Matrix rank: {self.solver_info['rank']}") + logger.info(f"Matrix rank: {self.solver_info['rank']}") - print(f"RMSE: {self.solver_info['rmse']:.6f}") - print(f"R²: {self.solver_info['r_squared']:.6f}") + logger.info(f"RMSE: {self.solver_info['rmse']:.6f}") + logger.info(f"R²: {self.solver_info['r_squared']:.6f}") if "n_iter" in self.solver_info: - print(f"Iterations: {self.solver_info['n_iter']}") + logger.info(f"Iterations: {self.solver_info['n_iter']}") if "regularization" in self.solver_info: - print(f"Regularization: {self.solver_info['regularization']}") - print(f"Alpha: {self.solver_info['alpha']:.2e}") + logger.info(f"Regularization: {self.solver_info['regularization']}") + logger.info(f"Alpha: {self.solver_info['alpha']:.2e}") - print("=" * 60 + "\n") + logger.info("=" * 60) def solve_linear_system(A, b, method="lstsq", **kwargs): diff --git a/src/figaroh/utils/config_parser.py b/src/figaroh/utils/config_parser.py index 3daf8b3..7df86ca 100644 --- a/src/figaroh/utils/config_parser.py +++ b/src/figaroh/utils/config_parser.py @@ -20,6 +20,7 @@ from .error_handling import ConfigurationError logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) @dataclass diff --git a/src/figaroh/utils/cubic_spline.py b/src/figaroh/utils/cubic_spline.py index 1515eb1..8ba1538 100644 --- a/src/figaroh/utils/cubic_spline.py +++ b/src/figaroh/utils/cubic_spline.py @@ -87,11 +87,17 @@ # from ndcurves import piecewise, ndcurves.exact_cubic, # ndcurves.curve_constraints +import logging + import ndcurves import numpy as np from matplotlib import pyplot as plt import pinocchio as pin +# Configure logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + k = 1.5 # take accel limits as k times of vel limits @@ -165,7 +171,7 @@ class CubicSpline: Joint limits are automatically extracted from the robot model and can be modified with soft limits for safety margins. """ - + def __init__(self, robot, num_waypoints: int, active_joints: list, soft_lim=0): """ @@ -454,11 +460,13 @@ def check_cfg_constraints(self, q, v=None, tau=None, soft_lim=0): - self.rmodel.lowerPositionLimit[j] ) if q[i, j] > self.rmodel.upperPositionLimit[j] - delta_lim: - print("Joint q %d upper limit violated!" % j) + logger.warning("Joint q %d upper limit violated!", j) __isViolated_pos = True elif q[i, j] < self.rmodel.lowerPositionLimit[j] + delta_lim: - print("Joint position idx_q %d lower limit violated!" % j) + logger.warning( + "Joint position idx_q %d lower limit violated!", j + ) __isViolated_pos = True else: __isViolated_pos = False @@ -470,7 +478,9 @@ def check_cfg_constraints(self, q, v=None, tau=None, soft_lim=0): if abs(v[i, j]) > (1 - soft_lim) * abs( self.rmodel.velocityLimit[j] ): - print("Joint vel idx_v %d limits violated!" % j) + logger.warning( + "Joint vel idx_v %d limits violated!", j + ) __isViolated_vel = True else: __isViolated_vel = False @@ -482,18 +492,20 @@ def check_cfg_constraints(self, q, v=None, tau=None, soft_lim=0): if abs(tau[i, j]) > (1 - soft_lim) * abs( self.rmodel.effortLimit[j] ): - print("Joint effort idx_v %d limits violated!" % j) + logger.warning( + "Joint effort idx_v %d limits violated!", j + ) __isViolated_eff = True else: __isViolated_eff = False __isViolated = __isViolated or __isViolated_eff # print(__isViolated) if not __isViolated: - print( - "SUCCEEDED to generate waypoints for a feasible initial cubic spline" + logger.info( + "SUCCEEDED to generate waypoints for a feasible initial cubic spline" ) else: - print("FAILED to generate a feasible cubic spline") + logger.warning("FAILED to generate a feasible cubic spline") return __isViolated def check_self_collision(self): @@ -790,4 +802,3 @@ def calc_torque(N, robot, q, v, a): robot.model, robot.data, q[i, :], v[i, :], a[i, :] )[j] return tau - diff --git a/src/figaroh/utils/error_handling.py b/src/figaroh/utils/error_handling.py index 91f25e3..c4a9665 100644 --- a/src/figaroh/utils/error_handling.py +++ b/src/figaroh/utils/error_handling.py @@ -11,6 +11,7 @@ import numpy as np logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) # Type variable for decorators F = TypeVar('F', bound=Callable[..., Any]) diff --git a/src/figaroh/utils/results_manager.py b/src/figaroh/utils/results_manager.py index 6e5d2a8..3380496 100644 --- a/src/figaroh/utils/results_manager.py +++ b/src/figaroh/utils/results_manager.py @@ -27,7 +27,9 @@ setup_example_logging ) +# Setup logger for this module logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) class ResultsManager: @@ -37,7 +39,7 @@ class ResultsManager: This class provides standardized plotting and saving functionality with consistent styling and formats across different analysis types. """ - + # Color schemes for consistent visualization COLORS = { 'measured': '#1f77b4', # Blue @@ -47,7 +49,7 @@ class ResultsManager: 'reference': '#9467bd', # Purple 'residual': '#8c564b', # Brown } - + # Plot styles for different task types PLOT_STYLES = { 'calibration': {'figsize': (14, 10), 'dpi': 100}, @@ -55,7 +57,7 @@ class ResultsManager: 'optimal_calibration': {'figsize': (10, 8), 'dpi': 100}, 'optimal_trajectory': {'figsize': (15, 10), 'dpi': 100}, } - + def __init__(self, task_type: str, robot_name: str = "robot", results_data: Optional[Dict[str, Any]] = None): """ Initialize results manager. @@ -101,7 +103,7 @@ def plot_calibration_results( if not HAS_MATPLOTLIB: self.logger.warning("Cannot plot: matplotlib not available") return - + try: # Extract data from self.result if not provided as arguments if measured_poses is None: @@ -122,20 +124,20 @@ def plot_calibration_results( "calibrated parameters names", []) if outlier_indices is None: outlier_indices = self.result.get("outlier indices", []) - + # Validate we have the minimum required data if measured_poses.size == 0: self.logger.warning( "No measured pose data available for plotting") return - + fig = plt.figure( figsize=self.PLOT_STYLES['calibration']['figsize']) gs = GridSpec(3, 2, figure=fig, hspace=0.3, wspace=0.3) - + # Check position + orientation (6 DOF) or position only (3 DOF) n_dims = measured_poses.shape[1] - + if n_dims >= 3 and estimated_poses.size > 0: # Position comparison ax1 = fig.add_subplot(gs[0, :]) @@ -143,7 +145,7 @@ def plot_calibration_results( ax1, measured_poses[:, :3], estimated_poses[:, :3], "Position Comparison", "Position (m)", outlier_indices ) - + if n_dims >= 6 and estimated_poses.size > 0: # Orientation comparison ax2 = fig.add_subplot(gs[1, :]) @@ -163,55 +165,55 @@ def plot_calibration_results( if residuals.size > 0: ax3 = fig.add_subplot(gs[2, 0]) self._plot_residuals(ax3, residuals, outlier_indices) - + # Parameter values if parameter_values.size > 0: ax5 = fig.add_subplot(gs[2, 1]) self._plot_parameters(ax5, parameter_values, parameter_names) - + # Add statistics text num_params = self.result.get( "number of calibrated parameters", len(parameter_values)) num_samples = self.result.get("number of samples", "N/A") rmse = self.result.get("rmse", "N/A") mae = self.result.get("mae", "N/A") - + stats_text = (f"Parameters: {num_params}\n" f"Samples: {num_samples}") if rmse != "N/A": stats_text += f"\nRMSE: {rmse:.6f}" if mae != "N/A": stats_text += f"\nMAE: {mae:.6f}" - + ax5.text(0.02, 0.98, stats_text, transform=ax5.transAxes, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) - + fig.suptitle(f"{self.robot_name.upper()} {title}", fontsize=16) # plt.tight_layout() plt.show() - + except Exception as e: self.logger.error(f"Error plotting calibration results: {e}") - # Print debug information - print("Debug info:") + # Log debug information + logger.debug("Debug info:") measured_shape = (measured_poses.shape if hasattr(measured_poses, 'shape') else 'N/A') - print(f" measured_poses shape: {measured_shape}") - + logger.debug(f" measured_poses shape: {measured_shape}") + estimated_shape = (estimated_poses.shape if hasattr(estimated_poses, 'shape') else 'N/A') - print(f" estimated_poses shape: {estimated_shape}") - + logger.debug(f" estimated_poses shape: {estimated_shape}") + residuals_shape = (residuals.shape if hasattr(residuals, 'shape') else 'N/A') - print(f" residuals shape: {residuals_shape}") - + logger.debug(f" residuals shape: {residuals_shape}") + result_keys = (list(self.result.keys()) if self.result else 'None') - print(f" Available result keys: {result_keys}") - + logger.debug(f" Available result keys: {result_keys}") + def plot_identification_results( self, tau_measured: Optional[np.ndarray] = None, @@ -237,7 +239,7 @@ def plot_identification_results( if not HAS_MATPLOTLIB: self.logger.warning("Cannot plot: matplotlib not available") return - + try: # Use data from self.result if parameters not provided if tau_measured is None: @@ -251,64 +253,64 @@ def plot_identification_results( np.array([])) if parameter_names is None: parameter_names = self.result.get("base parameters names", []) - + # Validate data availability if len(tau_measured) == 0 or len(tau_identified) == 0: self.logger.error("No torque data available for plotting") return - + fig = plt.figure( figsize=self.PLOT_STYLES['identification']['figsize']) gs = GridSpec(2, 2, figure=fig, hspace=0.3, wspace=0.3) - + # Determine if we need to reshape data (1D to 2D for multi-joint) if tau_measured.ndim == 1: tau_measured = tau_measured.reshape(-1, 1) if tau_identified.ndim == 1: tau_identified = tau_identified.reshape(-1, 1) - + n_samples = tau_measured.shape[0] - + if time_vector is None: time_vector = np.arange(n_samples) - + # Torque comparison ax1 = fig.add_subplot(gs[0, :]) self._plot_torque_comparison( ax1, time_vector, tau_measured, tau_identified, joint_names ) - + # Residual analysis ax2 = fig.add_subplot(gs[1, :]) residuals = tau_measured - tau_identified self._plot_torque_residuals(ax2, time_vector, residuals, joint_names) - + # Parameter values # ax3 = fig.add_subplot(gs[1, 1]) # self._plot_parameters(ax3, parameter_values, parameter_names) - + # Add quality metrics to title if available condition_num = self.result.get("condition number", "N/A") rmse_norm = self.result.get("rmse norm (N/m)", "N/A") - + fig.suptitle(f"{self.robot_name.upper()} {title}\n" f"Condition: {condition_num:.2e} | " f"RMSE: {rmse_norm:.6f}", fontsize=16) # plt.tight_layout() plt.show() - + except Exception as e: self.logger.error(f"Error plotting identification results: {e}") import traceback traceback.print_exc() plt.tight_layout() plt.show() - + except Exception as e: self.logger.error(f"Error plotting identification results: {e}") - + def plot_optimal_calibration_results( self, configurations: Dict[str, np.ndarray], @@ -330,36 +332,36 @@ def plot_optimal_calibration_results( if not HAS_MATPLOTLIB: self.logger.warning("Cannot plot: matplotlib not available") return - + try: fig = plt.figure(figsize=self.PLOT_STYLES['optimal_calibration']['figsize']) gs = GridSpec(2, 2, figure=fig, hspace=0.3, wspace=0.3) - + # Configuration weights ax1 = fig.add_subplot(gs[0, 0]) self._plot_configuration_weights(ax1, weights) - + # Selected configurations visualization ax2 = fig.add_subplot(gs[0, 1]) self._plot_selected_configurations(ax2, configurations, weights) - + # Condition number analysis (if available) if condition_numbers is not None: ax3 = fig.add_subplot(gs[1, 0]) self._plot_condition_analysis(ax3, condition_numbers) - + # Information matrix visualization (if available) if information_matrix is not None: ax4 = fig.add_subplot(gs[1, 1]) self._plot_information_matrix(ax4, information_matrix) - + fig.suptitle(f"{self.robot_name.upper()} {title}", fontsize=16) plt.tight_layout() plt.show() - + except Exception as e: self.logger.error(f"Error plotting optimal calibration results: {e}") - + def plot_optimal_trajectory_results( self, trajectories: Dict[str, Any], @@ -379,7 +381,7 @@ def plot_optimal_trajectory_results( if not HAS_MATPLOTLIB: self.logger.warning("Cannot plot: matplotlib not available") return - + try: # Extract trajectory data if 'T_F' in trajectories: @@ -394,15 +396,15 @@ def plot_optimal_trajectory_results( positions = [trajectories['q']] velocities = [trajectories.get('dq', np.zeros_like(trajectories['q']))] accelerations = [trajectories.get('ddq', np.zeros_like(trajectories['q']))] - + n_joints = positions[0].shape[1] - + fig = plt.figure(figsize=self.PLOT_STYLES['optimal_trajectory']['figsize']) gs = GridSpec(n_joints, 3, figure=fig, hspace=0.4, wspace=0.3) - + # Plot each joint's trajectory colors = plt.cm.tab10(np.linspace(0, 1, len(times))) - + for joint_idx in range(n_joints): # Position ax_pos = fig.add_subplot(gs[joint_idx, 0]) @@ -414,7 +416,7 @@ def plot_optimal_trajectory_results( if joint_idx == 0: ax_pos.legend() ax_pos.set_title('Joint Positions') - + # Velocity ax_vel = fig.add_subplot(gs[joint_idx, 1]) for seg_idx, (t, vel) in enumerate(zip(times, velocities)): @@ -423,7 +425,7 @@ def plot_optimal_trajectory_results( ax_vel.grid(True, alpha=0.3) if joint_idx == 0: ax_vel.set_title('Joint Velocities') - + # Acceleration ax_acc = fig.add_subplot(gs[joint_idx, 2]) for seg_idx, (t, acc) in enumerate(zip(times, accelerations)): @@ -432,19 +434,19 @@ def plot_optimal_trajectory_results( ax_acc.grid(True, alpha=0.3) if joint_idx == 0: ax_acc.set_title('Joint Accelerations') - + # Set x-labels for bottom row for col in range(3): fig.add_subplot(gs[-1, col]).set_xlabel('Time (s)') - + fig.suptitle(f"{self.robot_name.upper()} {title}\nCondition Number: {condition_number:.2e}", fontsize=16) plt.tight_layout() plt.show() - + except Exception as e: self.logger.error(f"Error plotting trajectory results: {e}") - + def save_results( self, results: Optional[Dict[str, Any]] = None, @@ -468,59 +470,59 @@ def save_results( # Use self.result if no results provided if results is None: results = self.result - + if not results: self.logger.error("No results data available to save") return {} - + # Create output directory output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) - + # Generate file prefix if file_prefix is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_prefix = f"{self.robot_name}_{self.task_type}_{timestamp}" - + saved_files = {} - + # Save in requested formats for fmt in save_formats: if fmt.lower() == 'yaml': file_path = output_path / f"{file_prefix}.yaml" self._save_yaml(results, file_path) saved_files['yaml'] = str(file_path) - + elif fmt.lower() == 'csv': file_path = output_path / f"{file_prefix}.csv" self._save_csv(results, file_path) saved_files['csv'] = str(file_path) - + elif fmt.lower() == 'json': file_path = output_path / f"{file_prefix}.json" self._save_json(results, file_path) saved_files['json'] = str(file_path) - + elif fmt.lower() == 'npz': file_path = output_path / f"{file_prefix}.npz" self._save_npz(results, file_path) saved_files['npz'] = str(file_path) - + # Save metadata metadata_path = output_path / f"{file_prefix}_metadata.yaml" self._save_metadata(results, metadata_path) saved_files['metadata'] = str(metadata_path) - + self.logger.info(f"Results saved to {output_dir}") for fmt, path in saved_files.items(): self.logger.info(f" {fmt.upper()}: {path}") - + return saved_files - + except Exception as e: self.logger.error(f"Error saving results: {e}") raise FigarohExampleError(f"Failed to save results: {e}") - + # Helper plotting methods def _plot_pose_comparison(self, ax, measured, estimated, title, ylabel, outliers=None): """Plot pose comparison with outlier highlighting.""" @@ -529,26 +531,26 @@ def _plot_pose_comparison(self, ax, measured, estimated, title, ylabel, outliers measured = measured.reshape(-1, 1) if estimated.ndim == 1: estimated = estimated.reshape(-1, 1) - + n_samples, n_dims = measured.shape - + for dim in range(n_dims): ax.plot(measured[:, dim], label=f'Measured {dim+1}', color=self.COLORS['measured'], alpha=0.7) ax.plot(estimated[:, dim], label=f'Estimated {dim+1}', color=self.COLORS['identified'], alpha=0.7, linestyle='--') - + if outliers is not None: for outlier_idx in outliers: ax.axvline(outlier_idx, color=self.COLORS['residual'], alpha=0.5, linestyle=':') - + ax.set_title(title) ax.set_ylabel(ylabel) ax.set_xlabel('Sample Index') ax.legend() ax.grid(True, alpha=0.3) - + def _plot_residuals(self, ax, residuals, outliers=None): """Plot residual analysis.""" rms_residuals = np.sqrt(np.mean(residuals**2, axis=1)) @@ -557,76 +559,76 @@ def _plot_residuals(self, ax, residuals, outliers=None): if outliers is not None: ax.scatter(outliers, rms_residuals[outliers], color=self.COLORS['residual'], s=50, marker='x') - + ax.set_title('RMS Residuals') ax.set_ylabel('RMS Error') ax.set_xlabel('Sample Index') ax.grid(True, alpha=0.3) - + def _plot_position_residuals(self, ax, residuals, outliers=None): """Plot position residuals for x, y, z components.""" labels = ['X', 'Y', 'Z'] for i, label in enumerate(labels): if i < residuals.shape[1]: ax.plot(residuals[:, i], label=f'{label} residual', alpha=0.7) - + if outliers is not None: for outlier_idx in outliers: ax.axvline(outlier_idx, color=self.COLORS['residual'], alpha=0.5, linestyle=':') - + ax.set_title('Position Residuals') ax.set_ylabel('Error (m)') ax.set_xlabel('Sample Index') ax.legend() ax.grid(True, alpha=0.3) - + def _plot_parameters(self, ax, values, names=None): """Plot parameter values with names.""" x_pos = np.arange(len(values)) bars = ax.bar(x_pos, values, color=self.COLORS['optimal'], alpha=0.7) - + ax.set_title('Parameter Values') ax.set_ylabel('Value') ax.set_xlabel('Parameter Index') - + if names is not None: ax.set_xticks(x_pos) ax.set_xticklabels(names, rotation=45, ha='right') - + ax.grid(True, alpha=0.3) - + def _plot_torque_comparison(self, ax, time, measured, identified, joint_names=None): """Plot torque comparison for identification.""" n_joints = measured.shape[1] - + for joint in range(n_joints): label_base = f'Joint {joint+1}' if joint_names is None else joint_names[joint] ax.plot(time, measured[:, joint], label=f'{label_base} (measured)', alpha=0.7) ax.plot(time, identified[:, joint], label=f'{label_base} (identified)', alpha=0.7, linestyle='--') - + ax.set_title('Torque Comparison') ax.set_ylabel('Torque (Nm)') ax.set_xlabel('Time (s)') ax.legend() ax.grid(True, alpha=0.3) - + def _plot_torque_residuals(self, ax, time, residuals, joint_names=None): """Plot torque residuals.""" n_joints = residuals.shape[1] - + for joint in range(n_joints): label = f'Joint {joint+1}' if joint_names is None else joint_names[joint] ax.plot(time, residuals[:, joint], label=label, alpha=0.7) - + ax.set_title('Torque Residuals') ax.set_ylabel('Torque Error (Nm)') ax.set_xlabel('Time (s)') ax.legend() ax.grid(True, alpha=0.3) - + def _plot_configuration_weights(self, ax, weights): """Plot configuration selection weights.""" significant_weights = weights[weights > 1e-6] @@ -636,7 +638,7 @@ def _plot_configuration_weights(self, ax, weights): ax.set_ylabel('Weight') ax.set_xlabel('Configuration Index') ax.grid(True, alpha=0.3) - + def _plot_selected_configurations(self, ax, configurations, weights): """Plot selected configuration visualization.""" # This is a placeholder - would need specific implementation @@ -645,7 +647,7 @@ def _plot_selected_configurations(self, ax, configurations, weights): f'from {len(weights)} candidates', ha='center', va='center', transform=ax.transAxes) ax.set_title('Selected Configurations') - + def _plot_condition_analysis(self, ax, condition_numbers): """Plot condition number analysis.""" ax.semilogy(condition_numbers, color=self.COLORS['optimal'], alpha=0.7) @@ -653,27 +655,27 @@ def _plot_condition_analysis(self, ax, condition_numbers): ax.set_ylabel('Condition Number') ax.set_xlabel('Iteration') ax.grid(True, alpha=0.3) - + def _plot_information_matrix(self, ax, info_matrix): """Plot information matrix heatmap.""" im = ax.imshow(info_matrix, cmap='viridis', aspect='auto') ax.set_title('Information Matrix') plt.colorbar(im, ax=ax) - + # Helper saving methods def _save_yaml(self, results, file_path): """Save results to YAML format.""" # Convert numpy arrays to lists for YAML serialization yaml_data = self._convert_for_serialization(results) - + with open(file_path, 'w') as f: yaml.dump(yaml_data, f, default_flow_style=False, indent=2) - + def _save_csv(self, results, file_path): """Save results to CSV format.""" # Extract tabular data for CSV df_data = {} - + for key, value in results.items(): if isinstance(value, np.ndarray): if value.ndim == 1: @@ -683,23 +685,23 @@ def _save_csv(self, results, file_path): df_data[f"{key}_{i}"] = value[:, i] elif isinstance(value, (int, float)): df_data[key] = [value] # Single value as list - + if df_data: df = pd.DataFrame(df_data) df.to_csv(file_path, index=False) - + def _save_json(self, results, file_path): """Save results to JSON format.""" import json json_data = self._convert_for_serialization(results) - + with open(file_path, 'w') as f: json.dump(json_data, f, indent=2) - + def _save_npz(self, results, file_path): """Save results to NumPy compressed format.""" np.savez_compressed(file_path, **results) - + def _save_metadata(self, results, file_path): """Save metadata about the results.""" metadata = { @@ -709,7 +711,7 @@ def _save_metadata(self, results, file_path): 'data_shapes': {}, 'summary_statistics': {} } - + # Add data shape information for key, value in results.items(): if isinstance(value, np.ndarray): @@ -717,7 +719,7 @@ def _save_metadata(self, results, file_path): 'shape': list(value.shape), 'dtype': str(value.dtype) } - + # Add basic statistics for numeric arrays if np.issubdtype(value.dtype, np.number): metadata['summary_statistics'][key] = { @@ -726,10 +728,10 @@ def _save_metadata(self, results, file_path): 'min': float(np.min(value)), 'max': float(np.max(value)) } - + with open(file_path, 'w') as f: yaml.dump(metadata, f, default_flow_style=False, indent=2) - + def _convert_for_serialization(self, data): """Convert numpy arrays and other non-serializable types for saving.""" if isinstance(data, dict): diff --git a/src/figaroh/visualisation/visualizer.py b/src/figaroh/visualisation/visualizer.py index 680b03b..2bb4f35 100644 --- a/src/figaroh/visualisation/visualizer.py +++ b/src/figaroh/visualisation/visualizer.py @@ -13,12 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import numpy as np import meshcat import pinocchio as pin from pinocchio.visualize import MeshcatVisualizer as PMV from . import colors +# Setup logger for this module +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + def materialFromColor(color): """Convert color to Material. @@ -85,7 +90,7 @@ def __init__( if url is not None: if url == "classical": url = "tcp://127.0.0.1:6000" - print("Wrapper tries to connect to server <%s>" % url) + logger.info("Wrapper tries to connect to server <%s>", url) server = meshcat.Visualizer(zmq_url=url) else: server = None @@ -121,10 +126,10 @@ def applyConfiguration(self, name, placement): p = placement[:3] T = np.r_[np.c_[R, p], [[0, 0, 0, 1]]] else: - print("Error, np.shape of placement is not accepted") + logger.error("Error, np.shape of placement is not accepted") return False else: - print("Error format of placement is not accepted") + logger.error("Error format of placement is not accepted") return False self.viewer[name].set_transform(T)