Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions pyrpod/rpod/PlumeMeshUtils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Mesh and visualization utilities for plume study plotting.

Centralizes common operations used by PlumeStrikeEstimationStudy so the
main class can stay focused on simulation logic.
"""
from typing import Dict, Optional, Sequence
import os
import numpy as np
from stl import mesh


def build_thruster_link(vv) -> Dict[str, object]:
"""Create mapping from indexed JFH thruster numbering to thruster names/data.

Returns the same shape as existing code: map of str(i) -> vv.thruster_data[thruster]['name']
"""
link = {}
i = 1
for thruster in vv.thruster_data:
link[str(i)] = vv.thruster_data[thruster]['name']
i += 1
return link


def transform_plume_for_thruster(plume_mesh: mesh.Mesh, thruster_id: str, vv, vv_orientation: np.ndarray, vv_pos) -> mesh.Mesh:
"""Apply the standard sequence of rotations and translations to a plume mesh.

Sequence mirrors legacy code:
1. rotate by thruster DCM (transpose)
2. rotate by VV orientation (transpose)
3. translate by VV position
4. if using clusters: translate by cluster exit for the thruster's cluster
5. translate by thruster exit

Returns the transformed mesh (mutates the provided mesh as the stl.mesh API does).
"""
# First: thruster DCM in thruster coord frame
thruster_orientation = np.array(vv.thruster_data[thruster_id]['dcm'])
plume_mesh.rotate_using_matrix(thruster_orientation.transpose())

# Second: according to DCM of VV in JFH
plume_mesh.rotate_using_matrix(vv_orientation.transpose())

# Third: according to position vector of the VV in JFH
plume_mesh.translate(vv_pos)

# Fourth: cluster offset if present
if getattr(vv, 'use_clusters', False):
cluster_key = thruster_id[0] + thruster_id[1] if len(thruster_id) >= 2 else thruster_id
if cluster_key in vv.cluster_data:
plume_mesh.translate(vv.cluster_data[cluster_key]['exit'][0])

# Fifth: thruster exit vector
plume_mesh.translate(vv.thruster_data[thruster_id]['exit'][0])

return plume_mesh


def compose_meshes(meshes: Sequence[Optional[mesh.Mesh]]) -> Optional[mesh.Mesh]:
"""Concatenate non-None meshes into a single mesh.Mesh or return None.

Avoids repeated concatenation logic in callers.
"""
valid = [m.data for m in meshes if m is not None]
if not valid:
return None
if len(valid) == 1:
return meshes[0]
# concatenate arrays of shape (n,3,3)
combined = mesh.Mesh(np.concatenate(valid))
return combined
133 changes: 46 additions & 87 deletions pyrpod/rpod/PlumeStrikeEstimationStudy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
)
from pyrpod.rpod.io import ensure_results_dirs, write_jfh
from pyrpod.rpod.PlumeStudyExport import PlumeStudyExport
from pyrpod.rpod.PlumeMeshUtils import (
build_thruster_link,
transform_plume_for_thruster,
compose_meshes,
)
from pyrpod.plume.PlumeStrikeCalculator import compute_plume_strikes

logger = get_logger("pyrpod.rpod.PlumeStrikeEstimationStudy")
Expand Down Expand Up @@ -182,12 +187,8 @@ def graph_jfh_thruster_check(self):
Does the method need to return a status message? or pass similar data?
"""

# Link JFH numbering of thrusters to thruster names.
link = {}
i = 1
for thruster in self.vv.thruster_data:
link[str(i)] = self.vv.thruster_data[thruster]['name']
i = i + 1
# Link JFH numbering of thrusters to thruster names.
link = build_thruster_link(self.vv)

# Loop through each firing in the JFH.
for firing in range(len(self.jfh.JFH)):
Expand Down Expand Up @@ -225,11 +226,13 @@ def graph_jfh_thruster_check(self):
scale_factor=0.05
)

# Transform plume according to thruster and VV configuration.
plumeMesh = transform_mesh(
# Transform plume according to thruster and VV configuration using helper
plumeMesh = transform_plume_for_thruster(
plumeMesh,
rotation_matrix=np.array(self.vv.thruster_data[thruster_id]['dcm']).T,
translation_vector=self.vv.thruster_data[thruster_id]['exit'][0]
thruster_id,
self.vv,
np.array(self.vv.thruster_data[thruster_id]['dcm']).T,
self.vv.thruster_data[thruster_id]['exit'][0],
)

logger.debug("Thruster %s DCM: %s", thruster_id, self.vv.thruster_data[thruster_id]['dcm'])
Expand Down Expand Up @@ -257,7 +260,8 @@ def graph_jfh_thruster_check(self):
elif firing < 100:
index = '0' + str(firing)
else:
index = str(i)
# fallback to firing number if original index counter unavailable
index = str(firing)
# delegate figure saving to export helper
self.viz.save_figure(figure, os.path.join(self.environment.case_dir, 'img', 'frame' + str(index) + '.png'))

Expand Down Expand Up @@ -329,12 +333,8 @@ def graph_jfh(self, trade_study = False):
Method doesn't currently return anything. Simply produces data as needed.
Does the method need to return a status message? or pass similar data?
"""
# Link JFH numbering of thrusters to thruster names.
link = {}
i = 1
for thruster in self.vv.thruster_data:
link[str(i)] = self.vv.thruster_data[thruster]['name']
i = i + 1
# Link JFH numbering of thrusters to thruster names.
link = build_thruster_link(self.vv)

# Create results directory if it doesn't already exist.
results_dir = self.environment.case_dir + 'results'
Expand Down Expand Up @@ -397,49 +397,32 @@ def graph_jfh(self, trade_study = False):

# Transform plume

# First, according to DCM of current thruster id in TCF
thruster_orientation = np.array(
self.vv.thruster_data[thruster_id]['dcm']
# Transform plumeMesh for the thruster and the current JFH step
plumeMesh = transform_plume_for_thruster(
plumeMesh,
thruster_id,
self.vv,
vv_orientation,
self.jfh.JFH[firing]['xyz'],
)
plumeMesh.rotate_using_matrix(thruster_orientation.transpose())

# Second, according to DCM of VV in JFH
plumeMesh.rotate_using_matrix(vv_orientation.transpose())

# Third, according to position vector of the VV in JFH
plumeMesh.translate(self.jfh.JFH[firing]['xyz'])

# Fourth, according to position of current cluster in CCF
if self.vv.use_clusters == True:
# thruster_id[0] = "P" and thruster_id[1] = "#", adding these gives the cluster identifier
plumeMesh.translate(self.vv.cluster_data[thruster_id[0] + thruster_id[1]]['exit'][0])

# Fifth, according to exit vector of current thruster id in TCD
plumeMesh.translate(self.vv.thruster_data[thruster_id]['exit'][0])

# Takeaway: Do rotations before translating away from the rotation axes!


if active_cones == None:
# accumulate active cones efficiently via helper
if active_cones is None:
active_cones = plumeMesh
else:
active_cones = mesh.Mesh(
np.concatenate([active_cones.data, plumeMesh.data])
)
active_cones = compose_meshes([active_cones, plumeMesh])

# print('DCM: ', self.vv.thruster_data[thruster_id]['dcm'])
# print('DCM: ', thruster_orientation[0], thruster_orientation[1], thruster_orientation[2])

if self.vv.use_clusters != True:
if not active_cones == None:
VVmesh = mesh.Mesh(
np.concatenate([VVmesh.data, active_cones.data])
)
if self.vv.use_clusters == True:
if not active_cones == None:
VVmesh = mesh.Mesh(
np.concatenate([VVmesh.data, active_cones.data, active_clusters.data])
)
# combine VVmesh with active cones and clusters if present
extras = []
if active_cones is not None:
extras.append(active_cones)
if self.vv.use_clusters and 'active_clusters' in locals() and active_clusters is not None:
extras.append(active_clusters)
if extras:
VVmesh = compose_meshes([VVmesh] + extras)

# print(self.vv.mesh)

Expand Down Expand Up @@ -473,12 +456,8 @@ def visualize_sweep(self, config_iter):
Method doesn't currently return anything. Simply produces data as needed.
Does the method need to return a status message? or pass similar data?
"""
# Link JFH numbering of thrusters to thruster names.
link = {}
i = 1
for thruster in self.vv.thruster_data:
link[str(i)] = self.vv.thruster_data[thruster]['name']
i = i + 1
# Link JFH numbering of thrusters to thruster names.
link = build_thruster_link(self.vv)
# print('link is', link)

# Create results directory if it doesn't already exist.
Expand Down Expand Up @@ -525,40 +504,20 @@ def visualize_sweep(self, config_iter):
# Could naming/code be more clear?
# print('thruster num', thruster, 'thruster id', link[str(thruster)][0])

# Load plume STL in initial configuration.
# Load plume STL and apply standard transform for this thruster
plumeMesh = mesh.Mesh.from_file(self.environment.case_dir + 'stl/' + self.environment.config['vv']['stl_thruster'])

# Transform plume

# First, according to DCM of current thruster id in TCF
thruster_orientation = np.array(
self.vv.thruster_data[thruster_id]['dcm']
plumeMesh = transform_plume_for_thruster(
plumeMesh,
thruster_id,
self.vv,
vv_orientation,
self.jfh.JFH[firing]['xyz'],
)
plumeMesh.rotate_using_matrix(thruster_orientation.transpose())

# Second, according to DCM of VV in JFH
plumeMesh.rotate_using_matrix(vv_orientation.transpose())

# Third, according to position vector of the VV in JFH
plumeMesh.translate(self.jfh.JFH[firing]['xyz'])

# Fourth, according to position of current cluster in CCF
if self.vv.use_clusters == True:
# thruster_id[0] = "P" and thruster_id[1] = "#", adding these gives the cluster identifier
plumeMesh.translate(self.vv.cluster_data[thruster_id[0] + thruster_id[1]]['exit'][0])

# Fifth, according to exit vector of current thruster id in TCD
plumeMesh.translate(self.vv.thruster_data[thruster_id]['exit'][0])

# Takeaway: Do rotations before translating away from the rotation axes!


if active_cones == None:
if active_cones is None:
active_cones = plumeMesh
else:
active_cones = mesh.Mesh(
np.concatenate([active_cones.data, plumeMesh.data])
)
active_cones = compose_meshes([active_cones, plumeMesh])

# print('DCM: ', self.vv.thruster_data[thruster_id]['dcm'])
# print('DCM: ', thruster_orientation[0], thruster_orientation[1], thruster_orientation[2])
Expand Down