diff --git a/motorlib/grain.py b/motorlib/grain.py index b4255f6..45c9a82 100644 --- a/motorlib/grain.py +++ b/motorlib/grain.py @@ -8,14 +8,14 @@ import numpy as np import skfmm - -from scipy.signal import savgol_filter from scipy import interpolate +from scipy.signal import savgol_filter import mathlib + from . import geometry +from .properties import EnumProperty, FloatProperty, PropertyCollection from .simResult import SimAlert, SimAlertLevel, SimAlertType -from .properties import FloatProperty, EnumProperty, PropertyCollection from .constants import maximumRefDiameter, maximumRefLength diff --git a/motorlib/motor.py b/motorlib/motor.py index e0688d1..0231fad 100644 --- a/motorlib/motor.py +++ b/motorlib/motor.py @@ -1,43 +1,66 @@ """Contains the motor class and a supporting configuration property collection.""" -from .grains import grainTypes + +from typing import Dict, Union, List +import numpy as np +from scipy.optimize import newton + +from . import geometry +from .constants import atmosphericPressure, gasConstant +from .grains import EndBurningGrain, grainTypes from .nozzle import Nozzle from .propellant import Propellant -from . import geometry -from .simResult import SimulationResult, SimAlert, SimAlertLevel, SimAlertType -from .grains import EndBurningGrain -from .properties import PropertyCollection, FloatProperty, IntProperty -from .constants import gasConstant, atmosphericPressure -from scipy.optimize import newton -import numpy as np +from .properties import FloatProperty, IntProperty, PropertyCollection +from .simResult import SimAlert, SimAlertLevel, SimAlertType, SimulationResult +from .grain import Grain + class MotorConfig(PropertyCollection): """Contains the settings required for simulation, including environmental conditions and details about how to run the simulation.""" - def __init__(self): + + def __init__(self) -> None: super().__init__() # Limits - self.props['maxPressure'] = FloatProperty('Maximum Allowed Pressure', 'Pa', 0, 7e7) - self.props['maxMassFlux'] = FloatProperty('Maximum Allowed Mass Flux', 'kg/(m^2*s)', 0, 1e4) - self.props['maxMachNumber'] = FloatProperty('Maximum Allowed Core Mach Number', '', 0.00, 1e2) - self.props['minPortThroat'] = FloatProperty('Minimum Allowed Port/Throat Ratio', '', 1, 4) - self.props['flowSeparationWarnPercent'] = FloatProperty('Flow Separation Warning Threshold', '', 0.00, 1) + self.props["maxPressure"] = FloatProperty( + "Maximum Allowed Pressure", "Pa", 0, 7e7 + ) + self.props["maxMassFlux"] = FloatProperty( + "Maximum Allowed Mass Flux", "kg/(m^2*s)", 0, 1e4 + ) + self.props["maxMachNumber"] = FloatProperty( + "Maximum Allowed Core Mach Number", "", 0.00, 1e2 + ) + self.props["minPortThroat"] = FloatProperty( + "Minimum Allowed Port/Throat Ratio", "", 1, 4 + ) + self.props["flowSeparationWarnPercent"] = FloatProperty( + "Flow Separation Warning Threshold", "", 0.00, 1 + ) # Simulation - self.props['burnoutWebThres'] = FloatProperty('Web Burnout Threshold', 'm', 2.54e-5, 3.175e-3) - self.props['burnoutThrustThres'] = FloatProperty('Thrust Burnout Threshold', '%', 0.01, 10) - self.props['timestep'] = FloatProperty('Simulation Timestep', 's', 0.0001, 0.1) - self.props['ambPressure'] = FloatProperty('Ambient Pressure', 'Pa', 0.0001, 102000) - self.props['mapDim'] = IntProperty('Grain Map Dimension', '', 250, 2000) - self.props['sepPressureRatio'] = FloatProperty('Separation Pressure Ratio', '', 0.001, 1) - - - -class Motor(): + self.props["burnoutWebThres"] = FloatProperty( + "Web Burnout Threshold", "m", 2.54e-5, 3.175e-3 + ) + self.props["burnoutThrustThres"] = FloatProperty( + "Thrust Burnout Threshold", "%", 0.01, 10 + ) + self.props["timestep"] = FloatProperty("Simulation Timestep", "s", 0.0001, 0.1) + self.props["ambPressure"] = FloatProperty( + "Ambient Pressure", "Pa", 0.0001, 102000 + ) + self.props["mapDim"] = IntProperty("Grain Map Dimension", "", 250, 2000) + self.props["sepPressureRatio"] = FloatProperty( + "Separation Pressure Ratio", "", 0.001, 1 + ) + + +class Motor: """The motor class stores a number of grains, a nozzle instance, a propellant, and a configuration that it uses to run simulations. Simulations return a simRes object that includes any warnings or errors associated with the simulation and the data. The propellant field may be None if the motor has no propellant set, and the grains list is allowed to be empty. The nozzle field and config must be filled, even if they are empty property collections.""" - def __init__(self, propDict=None): - self.grains = [] + + def __init__(self, propDict: Union[Dict, None] = None) -> None: + self.grains: List[Grain] = [] self.propellant = None self.nozzle = Nozzle() self.config = MotorConfig() @@ -45,39 +68,45 @@ def __init__(self, propDict=None): if propDict is not None: self.applyDict(propDict) - def getDict(self): + def getDict(self) -> Dict: """Returns a serializable representation of the motor. The dictionary has keys 'nozzle', 'propellant', 'grains', and 'config', which hold to the properties of their corresponding fields. Grains is a list of dicts, each containing a type and properties. Propellant may be None if the motor has no propellant set.""" motorData = {} - motorData['nozzle'] = self.nozzle.getProperties() + motorData["nozzle"] = self.nozzle.getProperties() if self.propellant is not None: - motorData['propellant'] = self.propellant.getProperties() + motorData["propellant"] = self.propellant.getProperties() else: - motorData['propellant'] = None - motorData['grains'] = [{'type': grain.geomName, 'properties': grain.getProperties()} for grain in self.grains] - motorData['config'] = self.config.getProperties() + motorData["propellant"] = None + motorData["grains"] = [ + {"type": grain.geomName, "properties": grain.getProperties()} + for grain in self.grains + ] + motorData["config"] = self.config.getProperties() return motorData def applyDict(self, dictionary): """Makes the motor copy properties from the dictionary that is passed in, which must be formatted like the result passed out by 'getDict'""" - self.nozzle.setProperties(dictionary['nozzle']) - if dictionary['propellant'] is not None: - self.propellant = Propellant(dictionary['propellant']) + self.nozzle.setProperties(dictionary["nozzle"]) + if dictionary["propellant"] is not None: + self.propellant = Propellant(dictionary["propellant"]) else: self.propellant = None self.grains = [] - for entry in dictionary['grains']: - self.grains.append(grainTypes[entry['type']]()) - self.grains[-1].setProperties(entry['properties']) - self.config.setProperties(dictionary['config']) + for entry in dictionary["grains"]: + self.grains.append(grainTypes[entry["type"]]()) + self.grains[-1].setProperties(entry["properties"]) + self.config.setProperties(dictionary["config"]) def calcBurningSurfaceArea(self, regDepth): - burnoutThres = self.config.getProperty('burnoutWebThres') + burnoutThres = self.config.getProperty("burnoutWebThres") gWithReg = zip(self.grains, regDepth) - perGrain = [gr.getSurfaceAreaAtRegression(reg) * int(gr.isWebLeft(reg, burnoutThres)) for gr, reg in gWithReg] + perGrain = [ + gr.getSurfaceAreaAtRegression(reg) * int(gr.isWebLeft(reg, burnoutThres)) + for gr, reg in gWithReg + ] return sum(perGrain) def calcKN(self, regDepth, dThroat): @@ -98,14 +127,18 @@ def calcForce(self, chamberPres, dThroat, exitPres=None): """Calculates the force of the motor at a given regression depth per grain. Calculates exit pressure by default, but can also use a value passed in.""" _, _, gamma, _, _ = self.propellant.getCombustionProperties(chamberPres) - ambPressure = self.config.getProperty('ambPressure') - thrustCoeff = self.nozzle.getAdjustedThrustCoeff(chamberPres, ambPressure, gamma, dThroat, exitPres) + ambPressure = self.config.getProperty("ambPressure") + thrustCoeff = self.nozzle.getAdjustedThrustCoeff( + chamberPres, ambPressure, gamma, dThroat, exitPres + ) thrust = thrustCoeff * self.nozzle.getThroatArea(dThroat) * chamberPres return max(thrust, 0) def calcFreeVolume(self, regDepth): """Calculates the volume inside of the motor not occupied by proppellant for a set of regression depths.""" - return sum([grain.getFreeVolume(reg) for grain, reg in zip(self.grains, regDepth)]) + return sum( + [grain.getFreeVolume(reg) for grain, reg in zip(self.grains, regDepth)] + ) def calcTotalVolume(self): """Calculates the bounding-cylinder volume of the combustion chamber.""" @@ -115,33 +148,45 @@ def calcMachNumber(self, chamberPres, massFlux): """Calculates the mach number in the core of a grain for a given chamber pressure and mass flux.""" _, _, gamma, T, molarMass = self.propellant.getCombustionProperties(chamberPres) - if chamberPres <= atmosphericPressure: # Mach calculation gets weird at low chamber pressures + if ( + chamberPres <= atmosphericPressure + ): # Mach calculation gets weird at low chamber pressures return 0 def machFunc(M, chamberPres, massFlux, gamma, T, molarMass, gasConstant): A = chamberPres * (gamma * molarMass / (gasConstant * T)) ** 0.5 B = 1.0 + ((gamma - 1.0) / 2.0) * M**2 C = -(gamma + 1.0) / (2.0 * (gamma - 1.0)) - return A * M * (B ** C) - massFlux + return A * M * (B**C) - massFlux - def machFuncDerivative(M, chamberPres, massFlux, gamma, T, molarMass, gasConstant): + def machFuncDerivative( + M, chamberPres, massFlux, gamma, T, molarMass, gasConstant + ): A = chamberPres * (gamma * molarMass / gasConstant / T) ** 0.5 B = 1.0 + ((gamma - 1.0) / 2.0) * M**2 C = -(gamma + 1.0) / (2.0 * (gamma - 1.0)) dB_dM = (gamma - 1.0) * M - return A * (B**C + M * C * (B**(C - 1.0)) * dB_dM) + return A * (B**C + M * C * (B ** (C - 1.0)) * dB_dM) - maxMassFlux = machFunc(1.0, chamberPres, massFlux, gamma, T, molarMass, gasConstant) + massFlux + maxMassFlux = ( + machFunc(1.0, chamberPres, massFlux, gamma, T, molarMass, gasConstant) + + massFlux + ) - if massFlux >= maxMassFlux: # Boom + if massFlux >= maxMassFlux: # Boom return 1.0 x0 = np.arcsin(massFlux / maxMassFlux) * 2 / np.pi - M = newton(machFunc, fprime=machFuncDerivative, x0=x0, args=(chamberPres, massFlux, gamma, T, molarMass, gasConstant)) + M = newton( + machFunc, + fprime=machFuncDerivative, + x0=x0, + args=(chamberPres, massFlux, gamma, T, molarMass, gasConstant), + ) return max(M, 0) - def runSimulation(self, callback=None): + def runSimulation(self, callback=None) -> SimulationResult: """Runs a simulation of the motor and returns a simRes instance with the results. Constraints are checked, including the number of grains, if the motor has a propellant set, and if the grains have geometry errors. If all of these tests are passed, the motor's operation is simulated by calculating Kn, using this value to get @@ -149,29 +194,45 @@ def runSimulation(self, callback=None): using the pressure to determine how the motor will regress in the given timestep at the current pressure. This process is repeated and regression tracked until all grains have burned out, when the results and any warnings are returned.""" - burnoutWebThres = self.config.getProperty('burnoutWebThres') - burnoutThrustThres = self.config.getProperty('burnoutThrustThres') - dTime = self.config.getProperty('timestep') + burnoutWebThres = self.config.getProperty("burnoutWebThres") + burnoutThrustThres = self.config.getProperty("burnoutThrustThres") + dTime = self.config.getProperty("timestep") simRes = SimulationResult(self) # Check for geometry errors if len(self.grains) == 0: - aText = 'Motor must have at least one propellant grain' - simRes.addAlert(SimAlert(SimAlertLevel.ERROR, SimAlertType.CONSTRAINT, aText, 'Motor')) + aText = "Motor must have at least one propellant grain" + simRes.addAlert( + SimAlert(SimAlertLevel.ERROR, SimAlertType.CONSTRAINT, aText, "Motor") + ) for gid, grain in enumerate(self.grains): - if isinstance(grain, EndBurningGrain) and gid != 0: # Endburners have to be at the foward end - aText = 'End burning grains must be the forward-most grain in the motor' - simRes.addAlert(SimAlert(SimAlertLevel.ERROR, SimAlertType.CONSTRAINT, aText, 'Grain {}'.format(gid + 1))) + if ( + isinstance(grain, EndBurningGrain) and gid != 0 + ): # Endburners have to be at the foward end + aText = "End burning grains must be the forward-most grain in the motor" + simRes.addAlert( + SimAlert( + SimAlertLevel.ERROR, + SimAlertType.CONSTRAINT, + aText, + "Grain {}".format(gid + 1), + ) + ) for alert in grain.getGeometryErrors(): - alert.location = 'Grain {}'.format(gid + 1) + alert.location = "Grain {}".format(gid + 1) simRes.addAlert(alert) for alert in self.nozzle.getGeometryErrors(): simRes.addAlert(alert) # Make sure the motor has a propellant set if self.propellant is None: - alert = SimAlert(SimAlertLevel.ERROR, SimAlertType.CONSTRAINT, 'Motor must have a propellant set', 'Motor') + alert = SimAlert( + SimAlertLevel.ERROR, + SimAlertType.CONSTRAINT, + "Motor must have a propellant set", + "Motor", + ) simRes.addAlert(alert) else: for alert in self.propellant.getErrors(): @@ -182,7 +243,7 @@ def runSimulation(self, callback=None): return simRes # Pull the required numbers from the propellant - density = self.propellant.getProperty('density') + density = self.propellant.getProperty("density") # Precalculate these are they don't change motorVolume = self.calcTotalVolume() @@ -195,28 +256,47 @@ def runSimulation(self, callback=None): perGrainReg = [0 for grain in self.grains] # At t = 0, the motor has ignited - simRes.channels['time'].addData(0) - simRes.channels['kn'].addData(self.calcKN(perGrainReg, 0)) - simRes.channels['pressure'].addData(self.calcIdealPressure(perGrainReg, 0, None)) - simRes.channels['force'].addData(0) - simRes.channels['mass'].addData([grain.getVolumeAtRegression(0) * density for grain in self.grains]) - simRes.channels['volumeLoading'].addData(100 * (1 - (self.calcFreeVolume(perGrainReg) / motorVolume))) - simRes.channels['massFlow'].addData([0 for grain in self.grains]) - simRes.channels['massFlux'].addData([0 for grain in self.grains]) - simRes.channels['regression'].addData([0 for grains in self.grains]) - simRes.channels['web'].addData([grain.getWebLeft(0) for grain in self.grains]) - simRes.channels['exitPressure'].addData(0) - simRes.channels['dThroat'].addData(0) - simRes.channels['machNumber'].addData([0 for grain in self.grains]) + simRes.channels["time"].addData(0) + simRes.channels["kn"].addData(self.calcKN(perGrainReg, 0)) + simRes.channels["pressure"].addData( + self.calcIdealPressure(perGrainReg, 0, None) + ) + simRes.channels["force"].addData(0) + simRes.channels["mass"].addData( + [grain.getVolumeAtRegression(0) * density for grain in self.grains] + ) + simRes.channels["volumeLoading"].addData( + 100 * (1 - (self.calcFreeVolume(perGrainReg) / motorVolume)) + ) + simRes.channels["massFlow"].addData([0 for grain in self.grains]) + simRes.channels["massFlux"].addData([0 for grain in self.grains]) + simRes.channels["regression"].addData([0 for grains in self.grains]) + simRes.channels["web"].addData([grain.getWebLeft(0) for grain in self.grains]) + simRes.channels["exitPressure"].addData(0) + simRes.channels["dThroat"].addData(0) + simRes.channels["machNumber"].addData([0 for grain in self.grains]) # Check port/throat ratio and add a warning if it is not large enough aftPort = self.grains[-1].getPortArea(0) if aftPort is not None: - minAllowed = self.config.getProperty('minPortThroat') - ratio = aftPort / geometry.circleArea(self.nozzle.props['throat'].getValue()) + minAllowed = self.config.getProperty("minPortThroat") + ratio = aftPort / geometry.circleArea( + self.nozzle.props["throat"].getValue() + ) if ratio < minAllowed: - description = 'Initial port/throat ratio of {:.3f} was less than {:.3f}'.format(ratio, minAllowed) - simRes.addAlert(SimAlert(SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, description, 'N/A')) + description = ( + "Initial port/throat ratio of {:.3f} was less than {:.3f}".format( + ratio, minAllowed + ) + ) + simRes.addAlert( + SimAlert( + SimAlertLevel.WARNING, + SimAlertType.CONSTRAINT, + description, + "N/A", + ) + ) # Perform timesteps while simRes.shouldContinueSim(burnoutThrustThres): @@ -229,100 +309,133 @@ def runSimulation(self, callback=None): for gid, grain in enumerate(self.grains): if grain.getWebLeft(perGrainReg[gid]) > burnoutWebThres: # Calculate regression at the current pressure - reg = dTime * self.propellant.getBurnRate(simRes.channels['pressure'].getLast()) + reg = dTime * self.propellant.getBurnRate( + simRes.channels["pressure"].getLast() + ) # Find the mass flux through the grain based on the mass flow fed into from grains above it - perGrainMassFlux[gid] = grain.getPeakMassFlux(massFlow, dTime, perGrainReg[gid], reg, density) + perGrainMassFlux[gid] = grain.getPeakMassFlux( + massFlow, dTime, perGrainReg[gid], reg, density + ) # Find the mass of the grain after regression - perGrainMass[gid] = grain.getVolumeAtRegression(perGrainReg[gid]) * density + perGrainMass[gid] = ( + grain.getVolumeAtRegression(perGrainReg[gid]) * density + ) # Add the change in grain mass to the mass flow - massFlow += (simRes.channels['mass'].getLast()[gid] - perGrainMass[gid]) / dTime + massFlow += ( + simRes.channels["mass"].getLast()[gid] - perGrainMass[gid] + ) / dTime # Apply the regression perGrainReg[gid] += reg perGrainWeb[gid] = grain.getWebLeft(perGrainReg[gid]) perGrainMassFlow[gid] = massFlow - simRes.channels['regression'].addData(perGrainReg[:]) - simRes.channels['web'].addData(perGrainWeb) + simRes.channels["regression"].addData(perGrainReg[:]) + simRes.channels["web"].addData(perGrainWeb) - simRes.channels['volumeLoading'].addData(100 * (1 - (self.calcFreeVolume(perGrainReg) / motorVolume))) - simRes.channels['mass'].addData(perGrainMass) - simRes.channels['massFlow'].addData(perGrainMassFlow) - simRes.channels['massFlux'].addData(perGrainMassFlux) + simRes.channels["volumeLoading"].addData( + 100 * (1 - (self.calcFreeVolume(perGrainReg) / motorVolume)) + ) + simRes.channels["mass"].addData(perGrainMass) + simRes.channels["massFlow"].addData(perGrainMassFlow) + simRes.channels["massFlux"].addData(perGrainMassFlux) # Calculate KN - dThroat = simRes.channels['dThroat'].getLast() - simRes.channels['kn'].addData(self.calcKN(perGrainReg, dThroat)) + dThroat = simRes.channels["dThroat"].getLast() + simRes.channels["kn"].addData(self.calcKN(perGrainReg, dThroat)) # Calculate Pressure - lastKn = simRes.channels['kn'].getLast() + lastKn = simRes.channels["kn"].getLast() pressure = self.calcIdealPressure(perGrainReg, dThroat, lastKn) - simRes.channels['pressure'].addData(pressure) + simRes.channels["pressure"].addData(pressure) # Calculate Mach Number perGrainMachNumber = [0 for grain in self.grains] for gid, grain in enumerate(self.grains): - perGrainMachNumber[gid] = self.calcMachNumber(pressure, perGrainMassFlux[gid]) - simRes.channels['machNumber'].addData(perGrainMachNumber) + perGrainMachNumber[gid] = self.calcMachNumber( + pressure, perGrainMassFlux[gid] + ) + simRes.channels["machNumber"].addData(perGrainMachNumber) # Calculate Exit Pressure _, _, gamma, _, _ = self.propellant.getCombustionProperties(pressure) exitPressure = self.nozzle.getExitPressure(gamma, pressure) - simRes.channels['exitPressure'].addData(exitPressure) + simRes.channels["exitPressure"].addData(exitPressure) # Calculate force - force = self.calcForce(simRes.channels['pressure'].getLast(), dThroat, exitPressure) - simRes.channels['force'].addData(force) + force = self.calcForce( + simRes.channels["pressure"].getLast(), dThroat, exitPressure + ) + simRes.channels["force"].addData(force) - simRes.channels['time'].addData(simRes.channels['time'].getLast() + dTime) + simRes.channels["time"].addData(simRes.channels["time"].getLast() + dTime) # Calculate any slag deposition or erosion of the throat if pressure == 0: slagRate = 0 else: - slagRate = (1 / pressure) * self.nozzle.getProperty('slagCoeff') - erosionRate = pressure * self.nozzle.getProperty('erosionCoeff') + slagRate = (1 / pressure) * self.nozzle.getProperty("slagCoeff") + erosionRate = pressure * self.nozzle.getProperty("erosionCoeff") change = dTime * ((-2 * slagRate) + (2 * erosionRate)) - simRes.channels['dThroat'].addData(dThroat + change) + simRes.channels["dThroat"].addData(dThroat + change) if callback is not None: # Uses the grain with the largest percentage of its web left - progress = max([g.getWebLeft(r) / g.getWebLeft(0) for g, r in zip(self.grains, perGrainReg)]) - if callback(1 - progress): # If the callback returns true, it is time to cancel + progress = max( + [ + g.getWebLeft(r) / g.getWebLeft(0) + for g, r in zip(self.grains, perGrainReg) + ] + ) + if callback( + 1 - progress + ): # If the callback returns true, it is time to cancel return simRes simRes.success = True - if simRes.getPeakMassFlux() > self.config.getProperty('maxMassFlux'): - desc = 'Peak mass flux exceeded configured limit' - alert = SimAlert(SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, 'Motor') + if simRes.getPeakMassFlux() > self.config.getProperty("maxMassFlux"): + desc = "Peak mass flux exceeded configured limit" + alert = SimAlert( + SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, "Motor" + ) simRes.addAlert(alert) - if simRes.getMaxPressure() > self.config.getProperty('maxPressure'): - desc = 'Max pressure exceeded configured limit' - alert = SimAlert(SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, 'Motor') + if simRes.getMaxPressure() > self.config.getProperty("maxPressure"): + desc = "Max pressure exceeded configured limit" + alert = SimAlert( + SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, "Motor" + ) simRes.addAlert(alert) if simRes.getPeakMachNumber() >= 1.0: - desc = 'Max core Mach number exceeded allowable subsonic limit (M>1.0)' - alert = SimAlert(SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, 'Motor') + desc = "Max core Mach number exceeded allowable subsonic limit (M>1.0)" + alert = SimAlert( + SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, "Motor" + ) simRes.addAlert(alert) - elif simRes.getPeakMachNumber() > self.config.getProperty('maxMachNumber'): - desc = 'Max core Mach number exceeded configured limit' - alert = SimAlert(SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, 'Motor') + elif simRes.getPeakMachNumber() > self.config.getProperty("maxMachNumber"): + desc = "Max core Mach number exceeded configured limit" + alert = SimAlert( + SimAlertLevel.WARNING, SimAlertType.CONSTRAINT, desc, "Motor" + ) simRes.addAlert(alert) - if (simRes.getPercentBelowThreshold('exitPressure', self.config.getProperty('ambPressure') * self.config.getProperty('sepPressureRatio')) > self.config.getProperty('flowSeparationWarnPercent')): - desc = 'Low exit pressure, nozzle flow may separate' - alert = SimAlert(SimAlertLevel.WARNING, SimAlertType.VALUE, desc, 'Nozzle') + if simRes.getPercentBelowThreshold( + "exitPressure", + self.config.getProperty("ambPressure") + * self.config.getProperty("sepPressureRatio"), + ) > self.config.getProperty("flowSeparationWarnPercent"): + desc = "Low exit pressure, nozzle flow may separate" + alert = SimAlert(SimAlertLevel.WARNING, SimAlertType.VALUE, desc, "Nozzle") simRes.addAlert(alert) if simRes.getAverageForce() < burnoutThrustThres: - desc = 'Motor did not generate thrust. Check chamber pressure and expansion ratio.' - alert = SimAlert(SimAlertLevel.ERROR, SimAlertType.VALUE, desc, 'Motor') + desc = "Motor did not generate thrust. Check chamber pressure and expansion ratio." + alert = SimAlert(SimAlertLevel.ERROR, SimAlertType.VALUE, desc, "Motor") simRes.addAlert(alert) # Note that this only adds all errors found on the first datapoint where there were errors to avoid repeating # errors. It should be revisited if getPressureErrors ever returns multiple types of errors - for pressure in simRes.channels['pressure'].getData(): + for pressure in simRes.channels["pressure"].getData(): if pressure > 0: err = self.propellant.getPressureErrors(pressure) if len(err) > 0: @@ -333,16 +446,20 @@ def runSimulation(self, callback=None): def getQuickResults(self): results = { - 'volumeLoading': 0, - 'initialKn': 0, - 'propellantMass': 0, - 'portRatio': 0, - 'length': 0 + "volumeLoading": 0, + "initialKn": 0, + "propellantMass": 0, + "portRatio": 0, + "length": 0, } simRes = SimulationResult(self) - density = self.propellant.getProperty('density') if self.propellant is not None else None + density = ( + self.propellant.getProperty("density") + if self.propellant is not None + else None + ) throatArea = self.nozzle.getThroatArea() motorVolume = self.calcTotalVolume() @@ -359,12 +476,16 @@ def getQuickResults(self): perGrainReg = [0 for grain in self.grains] - results['volumeLoading'] = 100 * (1 - (self.calcFreeVolume(perGrainReg) / motorVolume)) + results["volumeLoading"] = 100 * ( + 1 - (self.calcFreeVolume(perGrainReg) / motorVolume) + ) if throatArea != 0: - results['initialKn'] = self.calcKN(perGrainReg, 0) - results['portRatio'] = simRes.getPortRatio() + results["initialKn"] = self.calcKN(perGrainReg, 0) + results["portRatio"] = simRes.getPortRatio() if density is not None: - results['propellantMass'] = sum([grain.getVolumeAtRegression(0) * density for grain in self.grains]) - results['length'] = simRes.getPropellantLength() + results["propellantMass"] = sum( + [grain.getVolumeAtRegression(0) * density for grain in self.grains] + ) + results["length"] = simRes.getPropellantLength() return results diff --git a/motorlib/properties.py b/motorlib/properties.py index de50960..d5e8946 100644 --- a/motorlib/properties.py +++ b/motorlib/properties.py @@ -4,9 +4,11 @@ from . import units -class Property(): + +class Property: """The base class that properties inherit from. It associates a human-readable display name with the data, as well as a unit and value type that it casts all inputs to.""" + def __init__(self, dispName, unit, valueType): self.dispName = dispName self.unit = unit @@ -23,11 +25,12 @@ def getValue(self): def dispFormat(self, unit): """Returns a human-readable version of the property's current value, including the unit.""" - return '{} {}'.format(self.value, unit) + return "{} {}".format(self.value, unit) class FloatProperty(Property): """A property that handles floats. It forces the value to be in a certain range.""" + def __init__(self, dispName, unit, minValue, maxValue): super().__init__(dispName, unit, float) self.min = minValue @@ -39,14 +42,15 @@ def setValue(self, value): super().setValue(value) def dispFormat(self, unit): - return '{:.6g} {}'.format(units.convert(self.value, self.unit, unit), unit) + return "{:.6g} {}".format(units.convert(self.value, self.unit, unit), unit) class EnumProperty(Property): """This property operates on strings, but only allows values from a list that is set when the property is defined""" + def __init__(self, dispName, values): - super().__init__(dispName, '', object) + super().__init__(dispName, "", object) self.values = values self.value = self.values[0] @@ -61,6 +65,7 @@ def setValue(self, value): class IntProperty(Property): """A property with an integer as the value that is clamped to a certain range.""" + def __init__(self, dispName, unit, minValue, maxValue): super().__init__(dispName, unit, int) self.min = minValue @@ -74,27 +79,31 @@ def setValue(self, value): class StringProperty(Property): """A property that works on the set of all strings""" + def __init__(self, dispName): - super().__init__(dispName, '', str) + super().__init__(dispName, "", str) class BooleanProperty(Property): """A property with a single boolean as the value""" + def __init__(self, dispName): - super().__init__(dispName, '', bool) + super().__init__(dispName, "", bool) class PolygonProperty(Property): """A property that contains a list of polygons, each a list of points""" + def __init__(self, dispName): - super().__init__(dispName, '', list) + super().__init__(dispName, "", list) self.value = [] class TabularProperty(Property): """A property that is composed of a number of 'tabs', each of which is a property collection of its own.""" + def __init__(self, dispName, collection): - super().__init__(dispName, '', list) + super().__init__(dispName, "", list) self.collection = collection self.tabs = [] @@ -109,8 +118,9 @@ def setValue(self, value): self.tabs = [self.collection(data) for data in value] -class PropertyCollection(): +class PropertyCollection: """Holds a set of properties and allows batch operations on them through dictionaries""" + def __init__(self): self.props = {} @@ -118,7 +128,9 @@ def setProperties(self, props): """Sets the value(s) of one of more properties at a time by passing in a dictionary of property names and values""" for prop in props.keys(): - if prop in self.props: # This allows loading settings when the name of a field has changed + if ( + prop in self.props + ): # This allows loading settings when the name of a field has changed self.props[prop].setValue(props[prop]) def getProperties(self, props=None): @@ -126,7 +138,7 @@ def getProperties(self, props=None): being requested. It defaults to None, which returns all properties.""" if props is None: props = self.props.keys() - return {k:self.props[k].getValue() for k in props} + return {k: self.props[k].getValue() for k in props} def getProperty(self, prop): """Returns the value of a specific property.""" diff --git a/motorlib/simResult.py b/motorlib/simResult.py index 1237fec..325cf44 100644 --- a/motorlib/simResult.py +++ b/motorlib/simResult.py @@ -1,6 +1,7 @@ """This module contains the classes that are returned from a simulation, including the main results class and the channels and components that it is comprised of.""" +from typing import List import math from enum import Enum @@ -8,36 +9,43 @@ from . import units from . import constants + class SimAlertLevel(Enum): """Levels of severity for sim alerts""" + ERROR = 1 WARNING = 2 MESSAGE = 3 + class SimAlertType(Enum): """Types of sim alerts""" + GEOMETRY = 1 CONSTRAINT = 2 VALUE = 3 + alertLevelNames = { - SimAlertLevel.ERROR: 'Error', - SimAlertLevel.WARNING: 'Warning', - SimAlertLevel.MESSAGE: 'Message' + SimAlertLevel.ERROR: "Error", + SimAlertLevel.WARNING: "Warning", + SimAlertLevel.MESSAGE: "Message", } alertTypeNames = { - SimAlertType.GEOMETRY: 'Geometry', - SimAlertType.CONSTRAINT: 'Constraint', - SimAlertType.VALUE: 'Value' + SimAlertType.GEOMETRY: "Geometry", + SimAlertType.CONSTRAINT: "Constraint", + SimAlertType.VALUE: "Value", } -class SimAlert(): + +class SimAlert: """A sim alert signifies a possible problem with a motor. It has levels of severity including 'error' (simulation should not continue or has failed), 'warning' (values entered appear incorrect but can be simulated), and 'message' (other information). The type describes the variety of issue the alert is associated with, and the description is a human-readable version string with more details about the problem. The location can either be None or a string to help the user find the problem.""" + def __init__(self, level, alertType, description, location=None): self.level = level self.type = alertType @@ -45,13 +53,14 @@ def __init__(self, level, alertType, description, location=None): self.location = location -class LogChannel(): +class LogChannel: """A log channel accepts data from a single source throughout a simulation. It has a human-readable name such as 'Pressure' to help the user interpret the result, a value type that data passed in will be cast to, and a unit to aid in conversion and display. The data type can either be a scalar (float or int) or a list (list or tuple).""" + def __init__(self, name, valueType, unit): if valueType not in (int, float, list, tuple): - raise TypeError('Value type not in allowed set') + raise TypeError("Value type not in allowed set") self.name = name self.unit = unit self.valueType = valueType @@ -59,7 +68,7 @@ def __init__(self, name, valueType, unit): def getData(self, unit=None): """Return all of the data in the channel, converting it if a type is specified.""" - if unit is None: # No conversion needed + if unit is None: # No conversion needed return self.data if self.valueType in (list, tuple): @@ -82,7 +91,7 @@ def addData(self, data): def getAverage(self): """Returns the average of the datapoints.""" if self.valueType in (list, tuple): - raise NotImplementedError('Average not supported for list types') + raise NotImplementedError("Average not supported for list types") return sum(self.data) / len(self.data) def getMax(self): @@ -91,7 +100,7 @@ def getMax(self): if self.valueType in (list, tuple): return max([max(l) for l in self.data]) return max(self.data) - + def getMin(self): """Returns the minimum value of all datapoints. For list datatypes, this operation finds the smallest single value in any list.""" @@ -99,33 +108,44 @@ def getMin(self): return min([min(l) for l in self.data]) return min(self.data) -singleValueChannels = ['time', 'kn', 'pressure', 'force', 'volumeLoading', 'exitPressure', 'dThroat'] -multiValueChannels = ['mass', 'massFlow', 'massFlux', 'regression', 'web', 'machNumber'] -class SimulationResult(): +singleValueChannels = [ + "time", + "kn", + "pressure", + "force", + "volumeLoading", + "exitPressure", + "dThroat", +] +multiValueChannels = ["mass", "massFlow", "massFlux", "regression", "web", "machNumber"] + + +class SimulationResult: """A SimulationResult instance contains all results from a single simulation. It has a number of LogChannels, each capturing a single stream of outputs from the simulation. It also includes a flag of whether the simulation was considered a sucess, along with a list of alerts that the simulation produced while it was running.""" + def __init__(self, motor): self.motor = motor - self.alerts = [] + self.alerts: List[SimAlert] = [] self.success = False self.channels = { - 'time': LogChannel('Time', float, 's'), - 'kn': LogChannel('Kn', float, ''), - 'pressure': LogChannel('Chamber Pressure', float, 'Pa'), - 'force': LogChannel('Thrust', float, 'N'), - 'mass': LogChannel('Propellant Mass', tuple, 'kg'), - 'volumeLoading': LogChannel('Volume Loading', float, '%'), - 'massFlow': LogChannel('Mass Flow', tuple, 'kg/s'), - 'massFlux': LogChannel('Mass Flux', tuple, 'kg/(m^2*s)'), - 'regression': LogChannel('Regression Depth', tuple, 'm'), - 'web': LogChannel('Web', tuple, 'm'), - 'exitPressure': LogChannel('Nozzle Exit Pressure', float, 'Pa'), - 'dThroat': LogChannel('Change in Throat Diameter', float, 'm'), - 'machNumber': LogChannel('Core Mach Number', tuple, ''), + "time": LogChannel("Time", float, "s"), + "kn": LogChannel("Kn", float, ""), + "pressure": LogChannel("Chamber Pressure", float, "Pa"), + "force": LogChannel("Thrust", float, "N"), + "mass": LogChannel("Propellant Mass", tuple, "kg"), + "volumeLoading": LogChannel("Volume Loading", float, "%"), + "massFlow": LogChannel("Mass Flow", tuple, "kg/s"), + "massFlux": LogChannel("Mass Flux", tuple, "kg/(m^2*s)"), + "regression": LogChannel("Regression Depth", tuple, "m"), + "web": LogChannel("Web", tuple, "m"), + "exitPressure": LogChannel("Nozzle Exit Pressure", float, "Pa"), + "dThroat": LogChannel("Change in Throat Diameter", float, "m"), + "machNumber": LogChannel("Core Mach Number", tuple, ""), } def addAlert(self, alert): @@ -135,29 +155,29 @@ def addAlert(self, alert): def getBurnTime(self): """Returns the burntime of the simulated motor, which is the time from the start when it was last producing thrust above the user's defined threshold.""" - return self.channels['time'].getLast() + return self.channels["time"].getLast() def getInitialKN(self): """Returns the motor's Kn before it started firing.""" - return self.channels['kn'].getPoint(0) + return self.channels["kn"].getPoint(0) def getPeakKN(self): """Returns the highest Kn that was observed during the motor's burn.""" - return self.channels['kn'].getMax() + return self.channels["kn"].getMax() def getAveragePressure(self): """Returns the average chamber pressure observed during the simulation.""" - return self.channels['pressure'].getAverage() + return self.channels["pressure"].getAverage() def getMaxPressure(self): """Returns the highest chamber pressure that was observed during the motor's burn.""" - return self.channels['pressure'].getMax() - + return self.channels["pressure"].getMax() + def getMinExitPressure(self): """Returns the lowest exit pressure that was observed during the motor's burn, ignoring startup and shutdown transients""" - exit_pressures = self.channels['exitPressure'].getData() + exit_pressures = self.channels["exitPressure"].getData() return min(exit_pressures) - + def getPercentBelowThreshold(self, channel, threshold): """Returns the total number of seconds spent below a given threshold value""" count = 0 @@ -165,38 +185,48 @@ def getPercentBelowThreshold(self, channel, threshold): for point in data: if point < threshold: count += 1 - return count/len(data) + return count / len(data) def getImpulse(self, stop=None): """Returns the impulse the simulated motor produced. If 'stop' is set to a value other than None, only the impulse to that point in the data is returned.""" impulse = 0 lastTime = 0 - for time, force in zip(self.channels['time'].data[:stop], self.channels['force'].data[:stop]): + for time, force in zip( + self.channels["time"].data[:stop], self.channels["force"].data[:stop] + ): impulse += force * (time - lastTime) lastTime = time return impulse def getAverageForce(self): """Returns the average force the motor produced during its burn.""" - return self.channels['force'].getAverage() + return self.channels["force"].getAverage() def getDesignation(self): """Returns the standard amateur rocketry designation (H128, M1297) for the motor.""" imp = self.getImpulse() - if imp < 1.25: # This is to avoid a domain error finding log(0) - return 'N/A' - letters = '' - order = int(math.log(imp / 1.25, 2)) + 1 # The number of powers of two in the impulse - for place in range(0, int(math.log(order, 26)) + 1): # Loop over the number of letters in the designation + if imp < 1.25: # This is to avoid a domain error finding log(0) + return "N/A" + letters = "" + order = ( + int(math.log(imp / 1.25, 2)) + 1 + ) # The number of powers of two in the impulse + for place in range( + 0, int(math.log(order, 26)) + 1 + ): # Loop over the number of letters in the designation remainder = order % 26 - letters = chr(remainder + 64) + letters # 64 + 1 will produce "A", 64 + 2 "B", and so on - order = int((order - remainder) / 26) # Move up a place by subtracting this one and dividing by the base (26) - return '{}{:.0f}'.format(letters, self.getAverageForce()) + letters = ( + chr(remainder + 64) + letters + ) # 64 + 1 will produce "A", 64 + 2 "B", and so on + order = int( + (order - remainder) / 26 + ) # Move up a place by subtracting this one and dividing by the base (26) + return "{}{:.0f}".format(letters, self.getAverageForce()) def getFullDesignation(self): """Returns the full motor designation, which also includes the total impulse prepended on""" - return '{:.0f}{}'.format(self.getImpulse(), self.getDesignation()) + return "{:.0f}{}".format(self.getImpulse(), self.getDesignation()) def getImpulseClassPercentage(self): """Returns the percentage of the way between the minimum and maximum impulse for the impulse class that the @@ -209,26 +239,26 @@ def getImpulseClassPercentage(self): def getPeakMassFlux(self): """Returns the maximum mass flux observed at any grain end.""" - return self.channels['massFlux'].getMax() + return self.channels["massFlux"].getMax() def getPeakMassFluxLocation(self): """Returns the grain number at which the peak mass flux was observed.""" value = self.getPeakMassFlux() # Find the value to get the location - for frame in self.channels['massFlux'].getData(): + for frame in self.channels["massFlux"].getData(): if value in frame: return frame.index(value) return None def getPeakMachNumber(self): """Returns the maximum core mach number observed at any grain end.""" - return self.channels['machNumber'].getMax() + return self.channels["machNumber"].getMax() def getPeakMachNumberLocation(self): """Returns the grain number at which the peak core mach number was observed.""" value = self.getPeakMachNumber() # Find the value to get the location - for frame in self.channels['machNumber'].getData(): + for frame in self.channels["machNumber"].getData(): if value in frame: return frame.index(value) return None @@ -247,29 +277,31 @@ def getPortRatio(self): """Returns the port/throat ratio of the motor, or None if it doesn't have a port.""" aftPort = self.motor.grains[-1].getPortArea(0) if aftPort is not None: - return aftPort / geometry.circleArea(self.motor.nozzle.props['throat'].getValue()) + return aftPort / geometry.circleArea( + self.motor.nozzle.props["throat"].getValue() + ) return None def getPropellantLength(self): """Returns the total length of all propellant before the simulated burn.""" - return sum([g.props['length'].getValue() for g in self.motor.grains]) + return sum([g.props["length"].getValue() for g in self.motor.grains]) def getPropellantMass(self, index=0): """Returns the total mass of all propellant before the simulated burn. Optionally accepts a index that the mass will be sampled at.""" - return sum(self.channels['mass'].getPoint(index)) + return sum(self.channels["mass"].getPoint(index)) def getVolumeLoading(self, index=0): """Returns the percentage of the motor's volume occupied by propellant. Optionally accepts a index that the value will be sampled at.""" - return self.channels['volumeLoading'].getPoint(index) + return self.channels["volumeLoading"].getPoint(index) def getIdealThrustCoefficient(self): """Returns the motor's thrust coefficient for the average pressure during the burn and no throat diameter changes or performance losses.""" chamberPres = self.getAveragePressure() _, _, gamma, _, _ = self.motor.propellant.getCombustionProperties(chamberPres) - ambPressure = self.motor.config.getProperty('ambPressure') + ambPressure = self.motor.config.getProperty("ambPressure") return self.motor.nozzle.getIdealThrustCoeff(chamberPres, ambPressure, gamma, 0) def getAdjustedThrustCoefficient(self): @@ -277,8 +309,10 @@ def getAdjustedThrustCoefficient(self): changes, but including performance losses.""" chamberPres = self.getAveragePressure() _, _, gamma, _, _ = self.motor.propellant.getCombustionProperties(chamberPres) - ambPressure = self.motor.config.getProperty('ambPressure') - return self.motor.nozzle.getAdjustedThrustCoeff(chamberPres, ambPressure, gamma, 0) + ambPressure = self.motor.config.getProperty("ambPressure") + return self.motor.nozzle.getAdjustedThrustCoeff( + chamberPres, ambPressure, gamma, 0 + ) def getAlertsByLevel(self, level): """Returns all simulation alerts of the specified level.""" @@ -291,16 +325,19 @@ def getAlertsByLevel(self, level): def shouldContinueSim(self, thrustThres): """Returns if the simulation should continue based on the thrust from the last timestep.""" # With only one data point, there is nothing to compare - if len(self.channels['time'].getData()) == 1: + if len(self.channels["time"].getData()) == 1: return True # Otherwise perform the comparison. 0.01 converts the threshold to a % - return self.channels['force'].getLast() > thrustThres * 0.01 * self.channels['force'].getMax() + return ( + self.channels["force"].getLast() + > thrustThres * 0.01 * self.channels["force"].getMax() + ) def getCSV(self, pref=None, exclude=[], excludeGrains=[]): """Returns a string that contains a CSV of the simulated data. Preferences can be passed in to set units that the values will be converted to. All log channels are included unless their names are in the include - argument. """ - out = '' + argument.""" + out = "" outUnits = {} for chan in self.channels: if chan in exclude: @@ -313,40 +350,51 @@ def getCSV(self, pref=None, exclude=[], excludeGrains=[]): # Add title for column if self.channels[chan].valueType in (float, int): out += self.channels[chan].name - if outUnits[chan] != '': - out += '({})'.format(outUnits[chan]) - out += ',' + if outUnits[chan] != "": + out += "({})".format(outUnits[chan]) + out += "," elif self.channels[chan].valueType in (list, tuple): for grain in range(1, len(self.channels[chan].getLast()) + 1): if grain - 1 not in excludeGrains: - out += self.channels[chan].name + '(' - out += 'G{}'.format(grain) - if outUnits[chan] != '': - out += ';{}'.format(outUnits[chan]) - out += '),' + out += self.channels[chan].name + "(" + out += "G{}".format(grain) + if outUnits[chan] != "": + out += ";{}".format(outUnits[chan]) + out += ")," - out = out[:-1] # Remove the last comma - out += '\n' + out = out[:-1] # Remove the last comma + out += "\n" places = 5 - for ind, time in enumerate(self.channels['time'].getData()): - out += str(round(time, places)) + ',' + for ind, time in enumerate(self.channels["time"].getData()): + out += str(round(time, places)) + "," for chan in self.channels: if chan in exclude: continue - if chan != 'time': + if chan != "time": if self.channels[chan].valueType in (float, int): orig = self.channels[chan].getPoint(ind) - conv = units.convert(orig, self.channels[chan].unit, outUnits[chan]) + conv = units.convert( + orig, self.channels[chan].unit, outUnits[chan] + ) rounded = round(conv, places) - out += str(rounded) + ',' + out += str(rounded) + "," elif self.channels[chan].valueType in (list, tuple): - for gid, grainVal in enumerate(self.channels[chan].getPoint(ind)): + for gid, grainVal in enumerate( + self.channels[chan].getPoint(ind) + ): if gid not in excludeGrains: - conv = round(units.convert(grainVal, self.channels[chan].unit, outUnits[chan]), places) - out += str(conv) + ',' - - out = out[:-1] # Remove the last comma - out += '\n' + conv = round( + units.convert( + grainVal, + self.channels[chan].unit, + outUnits[chan], + ), + places, + ) + out += str(conv) + "," + + out = out[:-1] # Remove the last comma + out += "\n" return out