diff --git a/pyrpod/plume/PlumeMeshUtils.py b/pyrpod/plume/PlumeMeshUtils.py new file mode 100644 index 0000000..08c03bd --- /dev/null +++ b/pyrpod/plume/PlumeMeshUtils.py @@ -0,0 +1,58 @@ +"""Mesh and visualization utilities for plume study plotting. + +Centralizes common operations used by PlumeStrikeEstimationStudy so the +main class can stay focused on simulation logic. +""" +from typing import Dict, Optional, Sequence +import os +import numpy as np +from stl import mesh + + +def transform_plume_for_thruster(plume_mesh: mesh.Mesh, thruster_id: str, vv, vv_orientation: np.ndarray, vv_pos) -> mesh.Mesh: + """Apply the standard sequence of rotations and translations to a plume mesh. + + Sequence mirrors legacy code: + 1. rotate by thruster DCM (transpose) + 2. rotate by VV orientation (transpose) + 3. translate by VV position + 4. if using clusters: translate by cluster exit for the thruster's cluster + 5. translate by thruster exit + + Returns the transformed mesh (mutates the provided mesh as the stl.mesh API does). + """ + # First: thruster DCM in thruster coord frame + thruster_orientation = np.array(vv.thruster_data[thruster_id]['dcm']) + plume_mesh.rotate_using_matrix(thruster_orientation.transpose()) + + # Second: according to DCM of VV in JFH + plume_mesh.rotate_using_matrix(vv_orientation.transpose()) + + # Third: according to position vector of the VV in JFH + plume_mesh.translate(vv_pos) + + # Fourth: cluster offset if present + if getattr(vv, 'use_clusters', False): + cluster_key = thruster_id[0] + thruster_id[1] if len(thruster_id) >= 2 else thruster_id + if cluster_key in vv.cluster_data: + plume_mesh.translate(vv.cluster_data[cluster_key]['exit'][0]) + + # Fifth: thruster exit vector + plume_mesh.translate(vv.thruster_data[thruster_id]['exit'][0]) + + return plume_mesh + + +def compose_meshes(meshes: Sequence[Optional[mesh.Mesh]]) -> Optional[mesh.Mesh]: + """Concatenate non-None meshes into a single mesh.Mesh or return None. + + Avoids repeated concatenation logic in callers. + """ + valid = [m.data for m in meshes if m is not None] + if not valid: + return None + if len(valid) == 1: + return meshes[0] + # concatenate arrays of shape (n,3,3) + combined = mesh.Mesh(np.concatenate(valid)) + return combined diff --git a/pyrpod/rpod/PlumeStudyExport.py b/pyrpod/plume/PlumeStudyExport.py similarity index 100% rename from pyrpod/rpod/PlumeStudyExport.py rename to pyrpod/plume/PlumeStudyExport.py diff --git a/pyrpod/rpod/PlumeStrikeEstimationStudy.py b/pyrpod/rpod/PlumeStrikeEstimationStudy.py index aa0f7ed..8e388b8 100644 --- a/pyrpod/rpod/PlumeStrikeEstimationStudy.py +++ b/pyrpod/rpod/PlumeStrikeEstimationStudy.py @@ -26,7 +26,14 @@ compute_1d_approach, ) from pyrpod.rpod.io import ensure_results_dirs, write_jfh -from pyrpod.rpod.PlumeStudyExport import PlumeStudyExport +from pyrpod.plume.PlumeStudyExport import PlumeStudyExport +from pyrpod.plume.PlumeMeshUtils import ( + transform_plume_for_thruster, + compose_meshes, +) + +from pyrpod.vehicle.VisitingVehicle import build_thruster_link + from pyrpod.plume.PlumeStrikeCalculator import compute_plume_strikes logger = get_logger("pyrpod.rpod.PlumeStrikeEstimationStudy") @@ -182,12 +189,8 @@ def graph_jfh_thruster_check(self): Does the method need to return a status message? or pass similar data? """ - # Link JFH numbering of thrusters to thruster names. - link = {} - i = 1 - for thruster in self.vv.thruster_data: - link[str(i)] = self.vv.thruster_data[thruster]['name'] - i = i + 1 + # Link JFH numbering of thrusters to thruster names. + link = build_thruster_link(self.vv) # Loop through each firing in the JFH. for firing in range(len(self.jfh.JFH)): @@ -225,11 +228,13 @@ def graph_jfh_thruster_check(self): scale_factor=0.05 ) - # Transform plume according to thruster and VV configuration. - plumeMesh = transform_mesh( + # Transform plume according to thruster and VV configuration using helper + plumeMesh = transform_plume_for_thruster( plumeMesh, - rotation_matrix=np.array(self.vv.thruster_data[thruster_id]['dcm']).T, - translation_vector=self.vv.thruster_data[thruster_id]['exit'][0] + thruster_id, + self.vv, + np.array(self.vv.thruster_data[thruster_id]['dcm']).T, + self.vv.thruster_data[thruster_id]['exit'][0], ) logger.debug("Thruster %s DCM: %s", thruster_id, self.vv.thruster_data[thruster_id]['dcm']) @@ -257,7 +262,8 @@ def graph_jfh_thruster_check(self): elif firing < 100: index = '0' + str(firing) else: - index = str(i) + # fallback to firing number if original index counter unavailable + index = str(firing) # delegate figure saving to export helper self.viz.save_figure(figure, os.path.join(self.environment.case_dir, 'img', 'frame' + str(index) + '.png')) @@ -329,12 +335,8 @@ def graph_jfh(self, trade_study = False): Method doesn't currently return anything. Simply produces data as needed. Does the method need to return a status message? or pass similar data? """ - # Link JFH numbering of thrusters to thruster names. - link = {} - i = 1 - for thruster in self.vv.thruster_data: - link[str(i)] = self.vv.thruster_data[thruster]['name'] - i = i + 1 + # Link JFH numbering of thrusters to thruster names. + link = build_thruster_link(self.vv) # Create results directory if it doesn't already exist. results_dir = self.environment.case_dir + 'results' @@ -397,49 +399,32 @@ def graph_jfh(self, trade_study = False): # Transform plume - # First, according to DCM of current thruster id in TCF - thruster_orientation = np.array( - self.vv.thruster_data[thruster_id]['dcm'] + # Transform plumeMesh for the thruster and the current JFH step + plumeMesh = transform_plume_for_thruster( + plumeMesh, + thruster_id, + self.vv, + vv_orientation, + self.jfh.JFH[firing]['xyz'], ) - plumeMesh.rotate_using_matrix(thruster_orientation.transpose()) - - # Second, according to DCM of VV in JFH - plumeMesh.rotate_using_matrix(vv_orientation.transpose()) - - # Third, according to position vector of the VV in JFH - plumeMesh.translate(self.jfh.JFH[firing]['xyz']) - - # Fourth, according to position of current cluster in CCF - if self.vv.use_clusters == True: - # thruster_id[0] = "P" and thruster_id[1] = "#", adding these gives the cluster identifier - plumeMesh.translate(self.vv.cluster_data[thruster_id[0] + thruster_id[1]]['exit'][0]) - - # Fifth, according to exit vector of current thruster id in TCD - plumeMesh.translate(self.vv.thruster_data[thruster_id]['exit'][0]) - # Takeaway: Do rotations before translating away from the rotation axes! - - - if active_cones == None: + # accumulate active cones efficiently via helper + if active_cones is None: active_cones = plumeMesh else: - active_cones = mesh.Mesh( - np.concatenate([active_cones.data, plumeMesh.data]) - ) + active_cones = compose_meshes([active_cones, plumeMesh]) # print('DCM: ', self.vv.thruster_data[thruster_id]['dcm']) # print('DCM: ', thruster_orientation[0], thruster_orientation[1], thruster_orientation[2]) - if self.vv.use_clusters != True: - if not active_cones == None: - VVmesh = mesh.Mesh( - np.concatenate([VVmesh.data, active_cones.data]) - ) - if self.vv.use_clusters == True: - if not active_cones == None: - VVmesh = mesh.Mesh( - np.concatenate([VVmesh.data, active_cones.data, active_clusters.data]) - ) + # combine VVmesh with active cones and clusters if present + extras = [] + if active_cones is not None: + extras.append(active_cones) + if self.vv.use_clusters and 'active_clusters' in locals() and active_clusters is not None: + extras.append(active_clusters) + if extras: + VVmesh = compose_meshes([VVmesh] + extras) # print(self.vv.mesh) @@ -473,12 +458,8 @@ def visualize_sweep(self, config_iter): Method doesn't currently return anything. Simply produces data as needed. Does the method need to return a status message? or pass similar data? """ - # Link JFH numbering of thrusters to thruster names. - link = {} - i = 1 - for thruster in self.vv.thruster_data: - link[str(i)] = self.vv.thruster_data[thruster]['name'] - i = i + 1 + # Link JFH numbering of thrusters to thruster names. + link = build_thruster_link(self.vv) # print('link is', link) # Create results directory if it doesn't already exist. @@ -525,40 +506,20 @@ def visualize_sweep(self, config_iter): # Could naming/code be more clear? # print('thruster num', thruster, 'thruster id', link[str(thruster)][0]) - # Load plume STL in initial configuration. + # Load plume STL and apply standard transform for this thruster plumeMesh = mesh.Mesh.from_file(self.environment.case_dir + 'stl/' + self.environment.config['vv']['stl_thruster']) - - # Transform plume - - # First, according to DCM of current thruster id in TCF - thruster_orientation = np.array( - self.vv.thruster_data[thruster_id]['dcm'] + plumeMesh = transform_plume_for_thruster( + plumeMesh, + thruster_id, + self.vv, + vv_orientation, + self.jfh.JFH[firing]['xyz'], ) - plumeMesh.rotate_using_matrix(thruster_orientation.transpose()) - - # Second, according to DCM of VV in JFH - plumeMesh.rotate_using_matrix(vv_orientation.transpose()) - - # Third, according to position vector of the VV in JFH - plumeMesh.translate(self.jfh.JFH[firing]['xyz']) - - # Fourth, according to position of current cluster in CCF - if self.vv.use_clusters == True: - # thruster_id[0] = "P" and thruster_id[1] = "#", adding these gives the cluster identifier - plumeMesh.translate(self.vv.cluster_data[thruster_id[0] + thruster_id[1]]['exit'][0]) - # Fifth, according to exit vector of current thruster id in TCD - plumeMesh.translate(self.vv.thruster_data[thruster_id]['exit'][0]) - - # Takeaway: Do rotations before translating away from the rotation axes! - - - if active_cones == None: + if active_cones is None: active_cones = plumeMesh else: - active_cones = mesh.Mesh( - np.concatenate([active_cones.data, plumeMesh.data]) - ) + active_cones = compose_meshes([active_cones, plumeMesh]) # print('DCM: ', self.vv.thruster_data[thruster_id]['dcm']) # print('DCM: ', thruster_orientation[0], thruster_orientation[1], thruster_orientation[2]) diff --git a/pyrpod/vehicle/VisitingVehicle.py b/pyrpod/vehicle/VisitingVehicle.py index fea43ad..54c4dfa 100644 --- a/pyrpod/vehicle/VisitingVehicle.py +++ b/pyrpod/vehicle/VisitingVehicle.py @@ -11,8 +11,24 @@ from pyrpod.mdao import SweepConfig from pyrpod.logging_utils import get_logger +from typing import Dict, Optional, Sequence + + logger = get_logger("pyrpod.vehicle.VisitingVehicle") +def build_thruster_link(vv) -> Dict[str, object]: + """Create mapping from indexed JFH thruster numbering to thruster names/data. + + Returns the same shape as existing code: map of str(i) -> vv.thruster_data[thruster]['name'] + """ + link = {} + i = 1 + for thruster in vv.thruster_data: + link[str(i)] = vv.thruster_data[thruster]['name'] + i += 1 + return link + + # Adapted from # https://stackoverflow.com/questions/54616049/converting-a-rotation-matrix-to-euler-angles-and-back-special-case def rot2eul(R):