From 62083d3bf49d3a764355ab3c96c3664ef2a585de Mon Sep 17 00:00:00 2001 From: vincthnngyn Date: Wed, 20 Aug 2025 17:02:39 -0400 Subject: [PATCH 1/3] Add YAML input file support for RMG - Add YAMLInputReader class to parse YAML input files - Add auto-detection of input file format based on extension - Support all existing RMG features in YAML format - Add example YAML input files This allows users to write RMG input files in YAML format as an alternative to Python files, providing better structure and validation. --- examples/rmg/butane_yaml_example/input.yaml | 399 ++++++++++ rmgpy/rmg/input.py | 45 ++ rmgpy/rmg/yaml_input_reader.py | 803 ++++++++++++++++++++ 3 files changed, 1247 insertions(+) create mode 100644 examples/rmg/butane_yaml_example/input.yaml create mode 100644 rmgpy/rmg/yaml_input_reader.py diff --git a/examples/rmg/butane_yaml_example/input.yaml b/examples/rmg/butane_yaml_example/input.yaml new file mode 100644 index 0000000000..7fd742e603 --- /dev/null +++ b/examples/rmg/butane_yaml_example/input.yaml @@ -0,0 +1,399 @@ +# sample test yaml input file, info taken from commented input file in examples + +# Data sources +database: + # overrides RMG thermo calculation of RMG with these values. + # libraries found at http://rmg.mit.edu/database/thermo/libraries/ + # if species exist in multiple libraries, the earlier libraries overwrite the + # previous values + thermoLibraries: + - BurkeH2O2 + - primaryThermoLibrary + - DFT_QCI_thermo + - CBS_QB3_1dHR + + # overrides RMG transport calculations with these values. + # if species exist in multiple libraries, the earlier libraries overwrite the previous values + transportLibraries: + - PrimaryTransportLibrary + + # overrides RMG kinetics estimation if needed in the core of RMG. + # list of libraries found at http://rmg.mit.edu/database/kinetics/libraries/ + # libraries can be input as either a string or tuple of form ('library_name',True/False) + # where a `True` indicates that all unused reactions will be automatically added + # to the chemkin file at the end of the simulation. Placing just string values + # defaults the tuple to `False`. The string input is sufficient in almost + # all situations + reactionLibraries: + - name: C3 + seed: false + + # seed mechanisms are reactionLibraries that are forced into the initial mechanism + # in addition to species listed in this input file. + # This is helpful for reducing run time for species you know will appear in + # the mechanism. + seedMechanisms: + - primaryH2O2 + - ERC-FoundationFuelv0.9 + + # lists specific families used to generate the model. 'default' uses a list of + # families from RMG-Database/input/kinetics/families/recommended.py + # a visual list of families is available in PDF form at RMG-database/families + kineticsFamilies: default + + # this is normally not changed in general RMG runs. Usually used for testing with + # outside kinetics databases + kineticsDepositories: default + + # specifies how RMG calculates rates. currently, the only option is 'rate rules' + kineticsEstimator: rate rules + +# List of species +# list initial and expected species below to automatically put them into the core mechanism. +# 'structure' can utilize method of SMILES("put_SMILES_here"), +# adjacencyList("""put_adj_list_here"""), or InChI("put_InChI_here") +# for molecular oxygen, use the smiles string [O][O] so the triplet form is used +species: + - label: butane + reactive: true # this parameter is optional if true + structure: + SMILES: CCCC + + - label: O2 + structure: + SMILES: "[O][O]" + + - label: N2 + reactive: false # necessary if false + structure: + adjacencyList: | + 1 N u0 p1 c0 {2,T} + 2 N u0 p1 c0 {1,T} + # for adjacencyLists and adjacencyListGroups, use '|' for a multi-line string like how + # we used """ in python + + # You can list species not initially in reactor to make sure RMG includes them in the mechanism + - label: QOOH + reactive: true + structure: + SMILES: OOCC[CH]C + + - label: CO2 + reactive: true + structure: + SMILES: O=C=O + +# You can also list forbidden structures to forbid a certain molecule or set of molecules from your model +# To prevent a single molecule from your model, use SMILES or adjacencyList to define the structure +# For example, if you do not want cyclopropyne in your model, you can forbid it using SMILES +# forbidden: +# - label: cyclopropyne +# structure: +# SMILES: C1#CC1 +# +# or (optional: explicit) adjacencyList +# - label: cyclopropyne +# structure: | +# 1 C u0 p0 c0 {2,S} {3,S} {4,S} {5,S} +# 2 C u0 p0 c0 {1,S} {3,T} +# 3 C u0 p0 c0 {1,S} {2,T} +# 4 H u0 p0 c0 {1,S} +# 5 H u0 p0 c0 {1,S} +# +# If you want to exclude not just cyclopropyne but all three member rings, use `adjacencyListGroup` +# to define the forbidden group structure +# - label: Three-member Ring +# structure: +# adjacencyListGroup: | +# 1 R ux {2,[S,D,T]} {3,[S,D,T]} +# 2 R ux {1,[S,D,T]} {3,[S,D,T]} +# 3 R ux {1,[S,D,T]} {2,[S,D,T]} + +# Reaction systems +# currently RMG models only constant temperature and pressure as homogeneous batch reactors. +# two options are: simpleReactor for gas phase or liquidReactor for liquid phase +# use can use multiple reactors in an input file for each condition you want to test. +simpleReactor: + # specifies reaction temperature with units + temperature: + value: 700 + units: K + + # specifies reaction pressure with units + pressure: + value: 10.0 + units: bar + + # list initial mole fractions of compounds using the label from the 'species' label. + # RMG will normalize if sum/=1 + initialMoleFractions: + N2: 4 + O2: 1 + butane: 0.1538461538 # 1/6.5 + + # number of simulations used to explore variable temperature and pressure reactors + nSims: 6 + + # the following two values specify when to determine the final output model + # only one must be specified + # the first condition to be satisfied will terminate the process + terminationConversion: + butane: 0.99 + + terminationTime: + value: 40 + units: s + + # the next two optional values specify how RMG computes sensitivities of + # rate coefficients with respect to species concentrations. + # sensitivity contains a list of species' labels to conduct sensitivity analysis on. + # sensitivityThreshold is the required sensitivity to be recorded in the csv output file + # sensitivity: + # - CH4 + # sensitivityThreshold: 0.0001 + +# liquidReactor: +# temperature: +# value: 500 +# units: K +# initialConcentrations: +# N2: 4 +# O2: 1 +# CO: 1 +# terminationConversion: null +# terminationTime: +# value: 3600 +# units: s +# sensitivity: null +# sensitivityThreshold: 0.001 + +# liquid reactors also have solvents, you can specify one solvent +# list of solvents available at : http://rmg.mit.edu/database/solvation/libraries/solvent/ +# solvation: +# solvent: water + +# determines absolute and relative tolerances for ODE solver and sensitivities. +# normally this doesn't cause many issues and is modified after other issues are +# ruled out +simulator: + atol: 1e-16 + rtol: 1e-8 + # sensAtol: 1e-6 + # sensRtol: 1e-4 + +# used to add species to the model and to reduce memory usage by removing unimportant additional species. +# all relative values are normalized by a characteristic flux at that time point +model: + # when running a new model, it is recommended to start with higher values and then decrease to converge on the model + # determines the relative flux to put a species into the core. + # A smaller value will result in a larger, more complex model + toleranceMoveToCore: 0.1 + + # comment out the next three terms to disable pruning + # determines the relative flux needed to not remove species from the model. + # Lower values will keep more species and utilize more memory + toleranceKeepInEdge: 0.01 + + # determines when to stop a ODE run to add a species. + # Lower values will improve speed. + # if it is too low, may never get to the end simulation to prune species. + toleranceInterruptSimulation: 1 + + # number of edge species needed to accumulate before pruning occurs + # larger values require more memory and will prune less often + maximumEdgeSpecies: 100000 + + # minimum number of core species needed before pruning occurs. + # this prevents pruning when kinetic model is far away from completeness + minCoreSizeForPrune: 50 + + # make sure that the pruned edge species have existed for a set number of RMG iterations. + # the user can specify to increase it from the default value of 2 + minSpeciesExistIterationsForPrune: 2 + + # filter the reactions during the enlarge step to omit species from reacting if their + # concentration are deemed to be too low + filterReactions: false + + # for bimolecular reactions, will only allow them to react if + # filterThreshold*C_A*C_B > toleranceMoveToCore*characteristic_rate + # and if filterReactions=True + filterThreshold: 1e8 + +options: + # provides a name for the seed mechanism produced at the end of an rmg run default is 'Seed' + name: SeedName + + # if True (default) every iteration it saves the current model as libraries/seeds + # (and deletes the old one) + # Unlike HTML this is inexpensive time-wise + # note a seed mechanism will be generated at the end of a completed run and some incomplete + # runs even if this is set as False + generateSeedEachIteration: true + + # If True the mechanism will also be saved directly as kinetics and thermo libraries in the database + saveSeedToDatabase: false + + # only option is 'si' + units: si + + # Draws images of species and reactions and saves the model output to HTML. + # May consume extra memory when running large models. + generateOutputHTML: true + + # generates plots of the RMG's performance statistics. Not helpful if you just want a model. + generatePlots: false + + # saves mole fraction of species in 'solver/' to help you create plots + saveSimulationProfiles: false + + # gets RMG to output comments on where kinetics were obtained in the chemkin file. + # useful for debugging kinetics but increases memory usage of the chemkin output file + verboseComments: false + + # gets RMG to generate edge species chemkin files. Uses lots of memory in output. + # Helpful for seeing why some reaction are not appearing in core model. + saveEdgeSpecies: false + + # Sets a time limit in the form DD:HH:MM:SS after which the RMG job will stop. Useful for profiling on jobs that + # do not converge. + # wallTime: "00:00:00" + + # Forces RMG to import library reactions as reversible (default). Otherwise, if set to True, RMG will import library + # reactions while keeping the reversibility as is. + keepIrreversible: false + + # Allows families with three products to react in the diverse direction (default). + trimolecularProductReversible: true + + # Allows a seed to be saved every n iterations. + # The default of -1 causes the iteration to only be saved at the end of the RMG job + saveSeedModulus: -1 + +# optional module allows for correction to unimolecular reaction rates at low pressures and/or temperatures. +pressureDependence: + # two methods available: 'modified strong collision' is faster and less accurate than 'reservoir state' + method: modified strong collision + + # these two categories determine how fine energy is discretized. + # more grains increases accuracy but takes longer + maximumGrainSize: + value: 0.5 + units: kcal/mol + minimumNumberOfGrains: 250 + + # the conditions for the rate to be output over + # parameter order is: low_value, high_value, units, internal points + temperatures: + min: 300 + max: 2200 + units: K + count: 2 + + pressures: + min: 0.01 + max: 100 + units: bar + count: 3 + + # The two options for interpolation are 'PDepArrhenius' (no extra arguments) and + # 'Chebyshev' which is followed by the number of basis sets in + # Temperature and Pressure. These values must be less than the number of + # internal points specified above + interpolation: + - Chebyshev + - 6 # Temperature basis sets + - 4 # Pressure basis sets + + # turns off pressure dependence for molecules with number of atoms greater than the number specified below + # this is due to faster internal rate of energy transfer for larger molecules + maximumAtoms: 15 + +# optional block adds constraints on what RMG can output. +# This is helpful for improving the efficiency of RMG, but wrong inputs can lead to many errors. +generatedSpeciesConstraints: + # allows exceptions to the following restrictions + allowed: + - input species + - seed mechanisms + - reaction libraries + + # maximum number of each atom in a molecule + maximumCarbonAtoms: 4 + maximumOxygenAtoms: 7 + maximumNitrogenAtoms: 0 + maximumSiliconAtoms: 0 + maximumSulfurAtoms: 0 + maximumSurfaceSites: 2 # maximum number of surface sites (for heterogeneous catalysis) + maximumSurfaceBondOrder: 2 # maximum bond order of each surface sites (for heterogeneous catalysis) + + # max number of non-hydrogen atoms + # maximumHeavyAtoms: 20 + + # maximum radicals on a molecule + maximumRadicalElectrons: 1 + + # maximum number of singlet carbenes (lone pair on a carbon atom) in a molecule + maximumSingletCarbenes: 1 + + # maximum number of radicals on a molecule with a singlet carbene + # should be lower than maximumRadicalElectrons in order to have an effect + maximumCarbeneRadicals: 0 + + # If this is false or missing, RMG will throw an error if the more less-stable form of O2 is entered + # which doesn't react in the RMG system. normally input O2 as triplet with SMILES [O][O] + # allowSingletO2: false + + # maximum allowed number of non-normal isotope atoms: + # maximumIsotopicAtoms: 2 + +# optional block allows thermo to be estimated through quantum calculations +# quantumMechanics: +# # the software package for calculations...can use 'mopac' or 'gaussian' if installed +# software: mopac +# # methods available for calculations. 'pm2' 'pm3' or 'pm7' (last for mopac only) +# method: pm3 +# # where to store calculations +# fileStore: QMfiles +# # where to store temporary run files +# scratchDirectory: null +# # onlyCyclics allows linear molecules to be calculated using bensen group additivity....need to verify +# onlyCyclics: true +# # how many radicals should be utilized in the calculation. +# # If the amount of radicals is more than this, RMG will use hydrogen bond incrementation method +# maxRadicalNumber: 0 + +# optional block allows thermo to be estimated through ML estimator +# mlEstimator: +# thermo: true +# # Name of folder containing ML architecture and parameters in database +# name: main +# # Limits on atom numbers +# minHeavyAtoms: 1 +# maxHeavyAtoms: null +# minCarbonAtoms: 0 +# maxCarbonAtoms: null +# minOxygenAtoms: 0 +# maxOxygenAtoms: null +# minNitrogenAtoms: 0 +# maxNitrogenAtoms: null +# # Limits on cycles +# onlyCyclics: false +# onlyHeterocyclics: false # If onlyHeterocyclics is True, the machine learning estimator is restricted to only +# # heterocyclics species regardless of onlyCyclics setting. +# # But onlyCyclics should also be True if onlyHeterocyclics is True. +# minCycleOverlap: 0 # specifies the minimum number of atoms that must be shared between any two cycles. +# # If minCycleOverlap is greater than zero, the machine learning estimator is restricted to +# # only cyclic species with the specified minimum cyclic overlap regardless of onlyCyclics +# # setting. +# # If the estimated uncertainty of the thermo prediction is greater than +# # any of these values, then don't use the ML estimate +# H298UncertaintyCutoff: +# value: 3.0 +# units: kcal/mol +# S298UncertaintyCutoff: +# value: 2.0 +# units: cal/(mol*K) +# CpUncertaintyCutoff: +# value: 2.0 +# units: cal/(mol*K) \ No newline at end of file diff --git a/rmgpy/rmg/input.py b/rmgpy/rmg/input.py index 8c34e815d4..f8ff75faf3 100644 --- a/rmgpy/rmg/input.py +++ b/rmgpy/rmg/input.py @@ -1873,3 +1873,48 @@ def get_input(name): raise Exception('Unrecognized keyword: {}'.format(name)) raise Exception('Could not get variable with name: {}'.format(name)) + +################################################################################ +# YAML Input Support +################################################################################ + +def read_input_file_auto(path, rmg0): + """ + read an RMG input file (either Python or YAML format) and process it + + this function automatically detects the file format based on the extension + and calls the appropriate reader + + :param path: Path to the input file (.py or .yaml/.yml) + :param rmg0: RMG object to populate + """ + from pathlib import Path + + # Get the file extension + file_path = Path(path) + extension = file_path.suffix.lower() + + # Check if file exists + if not file_path.exists(): + raise IOError(f'The input file "{path}" could not be found.') + + # Route to appropriate reader based on extension + if extension == '.py': + # Use the original Python input file reader + logging.info(f'Detected Python input file format (.py)') + read_input_file(path, rmg0) + elif extension in ['.yaml', '.yml']: + # Use the YAML input file reader + try: + from rmgpy.rmg.yaml_input_reader import read_yaml_input_file + logging.info(f'Detected YAML input file format ({extension})') + read_yaml_input_file(path, rmg0) + except ImportError: + raise ImportError( + "YAML support requires PyYAML. Install it with: pip install pyyaml" + ) + else: + raise ValueError( + f'Unsupported input file format "{extension}". ' + f'RMG supports .py and .yaml/.yml input files.' + ) diff --git a/rmgpy/rmg/yaml_input_reader.py b/rmgpy/rmg/yaml_input_reader.py new file mode 100644 index 0000000000..6179c40e8f --- /dev/null +++ b/rmgpy/rmg/yaml_input_reader.py @@ -0,0 +1,803 @@ +""" +YAML input reader for RMG +this module reads YAML format RMG input files +and calls the existing input.py functions + +preserves ability to still use legacy python input files +""" + +import yaml +import logging +import os +from pathlib import Path + +# import ALL the existing functions from original input.py +from rmgpy.rmg.input import ( + database, catalyst_properties, species, forbidden, + simple_reactor, constant_V_ideal_gas_reactor, constant_TP_ideal_gas_reactor, + liquid_cat_reactor, constant_T_V_liquid_reactor, liquid_reactor, + surface_reactor, mb_sampled_reactor, simulator, solvation, + model, quantum_mechanics, ml_estimator, pressure_dependence, + options, generated_species_constraints, thermo_central_database, + uncertainty, restart_from_seed, liquid_volumetric_mass_transfer_coefficient_power_law, + smiles, inchi, adjacency_list, adjacency_list_group, smarts, + fragment_adj, fragment_smiles, react +) + +class YAMLInputReader: + def __init__(self, path): + """ + initialize the YAML input reader with a file path + + :param path: path to the YAML input file + """ + self.path = Path(path) + self.data = None + self.species_dict = {} + + def read(self): + """ + read and parse YAML input file + """ + logging.info(f'Reading YAML input file "{self.path}"...') + # let user know in termal that file is being read + + with open(self.path, 'r') as file: + #store content to log onto terminal + content = file.read() + + self.data = yaml.safe_load(file) + + # check if input file is empty + if not isinstance(self.data, dict): + raise ValueError("Yo ur missing a dictionary bro YAML file needs a dictionary") + + # log contents of file into terminal + logging.info(content) + + def process(self): + """ + processes the info from YAML file and calls on the preexisting functions in input.py + """ + if not self.data: + raise RuntimeError("Yo no data loaded run read() first") + + # now process the data in sections (same order as for .py inputs) + # lowkey a really monkey way of doing this w a ton of if statements but I cant really do + # switch cases since i need to check if EVERY PIECE OF INFO IS VALID + # ⬇ libraries and species information ⬇ + if 'database' in self.data: + self._process_database(self.data['database']) + if 'catalystProperties' in self.data: + self._process_catalyst_properties(self.data['catalystProperties']) + if 'species' in self.data: + self._process_species(self.data['species']) + if 'forbidden' in self.data: + self._process_forbidden(self.data['forbidden']) + if 'react' in self.data: + self._process_react(self.data['react']) + # process each reactor type individually + # each reactor can appear at the top level + # ⬇ reactor information ⬇ + if 'simpleReactor' in self.data: + self._process_simple_reactor(self.data['simpleReactor']) + if 'constantVIdealGasReactor' in self.data: + self._process_constant_v_reactor(self.data['constantVIdealGasReactor']) + if 'constantTPIdealGasReactor' in self.data: + self._process_constant_tp_reactor(self.data['constantTPIdealGasReactor']) + if 'liquidCatReactor' in self.data: + self._process_liquid_cat_reactor(self.data['liquidCatReactor']) + if 'constantTVLiquidReactor' in self.data: + self._process_constant_tv_liquid_reactor(self.data['constantTVLiquidReactor']) + if 'liquidReactor' in self.data: + self._process_liquid_reactor(self.data['liquidReactor']) + if 'surfaceReactor' in self.data: + self._process_surface_reactor(self.data['surfaceReactor']) + if 'mbSampledReactor' in self.data: + self._process_mb_sampled_reactor(self.data['mbSampledReactor']) + # ⬇ other info/options ⬇ + if 'solvation' in self.data: + self._process_solvation(self.data['solvation']) + if 'liquidVolumetricMassTransferCoefficientPowerLaw' in self.data: + self._process_volumetric_mass_transfer(self.data['liquidVolumetricMassTransferCoefficientPowerLaw']) + if 'simulator' in self.data: + self._process_simulator(self.data['simulator']) + if 'model' in self.data: + self._process_simulator(self.data['model']) + if 'quantumMechanics' in self.data: + self._process_quantum_mechanics(self.data['quantumMechanics']) + if 'mlEstimator' in self.data: + self._process_ml_estimator(self.data['mlEstimator']) + if 'pressureDependence' in self.data: + self._process_pressure_dependence(self.data['pressureDependence']) + if 'generatedSpeciesConstraints' in self.data: + self._process_species_constraints(self.data['generatedSpeciesConstraints']) + if 'thermoCentralDatabase' in self.data: + self._process_thermo_central_database(self.data['thermoCentralDatabase']) + if 'uncertainty' in self.data: + self._process_uncertainty(self.data['uncertainty']) + if 'restartFromSeed' in self.data: + self._process_restart_from_seed(self.data['restartFromSeed']) + if 'options' in self.data: + self._process_options(self.data['options']) + + + # FOR PROCESSOR FUNCTIONS DO THIS: + # FOR DATABASE PROCESSOR, JUST PASS THE INFO INTO THE OG DB FUNC + def _process_database(self, db_data): + """ + process database input + """ + reaction_libraries = [] + if 'reactionLibraries' in db_data: + for lib in db_data['reactionLibraries']: + if isinstance(lib, str): + reaction_libraries.append(lib) + elif isinstance(lib, dict): + # convert dict format to tuple format + name = lib.get('name') + seed = lib.get('seed', False) + # if no seed bool set, default to False + reaction_libraries.append((name, seed)) + database( + thermoLibraries = db_data.get('thermoLibraries'), + transportLibraries = db_data.get('transportLibraries'), + reactionLibraries = db_data.get('reactionLibraries'), + frequenciesLibraries = db_data.get('frequenciesLibraries'), + seedMechanisms = db_data.get('seedMechanisms'), + kineticsFamilies = db_data.get('kineticsFamilies', 'default'), + kineticsDepositories = db_data.get('kineticsDepositories', 'default'), + kineticsEstimator = db_data.get('kineticsEstimator', 'rate rules'), + adsorptionGroups = db_data.get('adsorptionGroups', 'adsorptionPt111') + ) + # FOR SPECIES PROCESSORS, CALL ON THE EXISTING FUNCS WITH DATA FROM YAML FILE + def _process_catalyst_properties(self, cat_data): + """ + process catalyst input + """ + catalyst_properties( + bindingEnergies = cat_data.get('bindingEnergies'), + surfaceSiteDensity = cat_data.get('surfaceSiteDensity'), + metal = cat_data.get('metal'), + coverageDependence = cat_data.get('coverageDependence', False) + # ^ if no coverage dependence bool is set, default to False like in input.py + ) + def _process_species(self, spec_list): + """ + process species definition + """ + for spec in spec_list: + # handle structure based on nested format or explicit type + if 'structure' in spec: + struc_data = spec['structure'] + + if isinstance(struc_data, str): + structure = adjacency_list(struc_data) + # assumes explicit adjacency list if is only string + elif isinstance(struc_data, dict): + # if not, check if its a dict w a key | [name]: [value] + if 'SMILES' in struc_data: + structure = smiles(struc_data['SMILES']) + elif 'InChI' in struc_data: + structure = inchi(struc_data['InChI']) + elif 'adjacencyList' in struc_data: + structure = adjacency_list(struc_data['adjacencyList']) + elif 'fragmentAdjacencyList' in struc_data: + structure = fragment_adj(struc_data['fragmentAdjacencyList']) + elif 'fragmentSMILES' in struc_data: + structure = fragment_smiles(struc_data['fragmentSMILES']) + else: + raise ValueError(f"Unknown structure format in species {spec.get('label', 'unknown')}") + else: + raise ValueError(f"Invalid structure format for forbidden {spec.get('label', 'unknown')}") + else: + raise ValueError(f"No structure provided for species {spec.get('label', 'unknown')}") + + species( + label = spec['label'], + structure = structure, + reactive = spec.get('reactive', True), + cut = spec.get('cut', False), + size_threshold = spec.get('sizeThreshold') + ) + def _process_forbidden(self, forb_list): + """ + process forbidden structures + similar method as did species + """ + for forb in forb_list: + if 'structure' in forb: + struc_data = forb['structure'] + + if isinstance(struc_data, str): + # assume adjacency list group + structure = adjacency_list_group(struc_data) + elif isinstance(struc_data, dict): + if 'SMILES' in struc_data: + structure = smiles(struc_data['SMILES']) + elif 'SMARTS' in struc_data: + structure = smarts(struc_data['SMARTS']) + elif 'adjacencyList' in struc_data: + structure = adjacency_list(struc_data['adjacencyList']) + elif 'adjacencyListGroup' in struc_data: + structure = adjacency_list_group(struc_data['adjacencyListGroup']) + else: + raise ValueError(f"Unknown structure format in forbidden {forb.get('label', 'unknown')}") + else: + raise ValueError(f"Invalid structure format for forbidden {forb.get('label', 'unknown')}") + else: + raise ValueError(f"No structure provided for species {forb.get('label', 'unknown')}") + forbidden( + label=forb['label'], + structure=structure + ) + + + + # FOR REACTOR PROCESSOR, ALSO DO THE SAME + + def _process_react(self, react_data): + """ + process react specifications + """ + react(react_data) + + def _process_simple_reactor(self, reactor_data): + """ + process simple reactor configuration + """ + # Handle both single reactor and list of reactors + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + # Handle sensitivity which could be None, a string, or a list + sensitivity = reactor.get('sensitivity') + if sensitivity is None: + sensitivity = None + elif isinstance(sensitivity, str): + sensitivity = [sensitivity] + else: + sensitivity = sensitivity + + simple_reactor( + temperature=self._convert_quantity(reactor['temperature']), + pressure=self._convert_quantity(reactor['pressure']), + initialMoleFractions=reactor['initialMoleFractions'], + nSims=reactor.get('nSims', 6), + terminationConversion=reactor.get('terminationConversion'), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + terminationRateRatio=reactor.get('terminationRateRatio'), + balanceSpecies=reactor.get('balanceSpecies'), + sensitivity=sensitivity, + sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3), + sensitivityTemperature=self._convert_quantity(reactor.get('sensitivityTemperature')), + sensitivityPressure=self._convert_quantity(reactor.get('sensitivityPressure')), + sensitivityMoleFractions=reactor.get('sensitivityMoleFractions'), + constantSpecies=reactor.get('constantSpecies') + ) + + def _process_constant_v_reactor(self, reactor_data): + """ + process constant V ideal gas reactor configuration + """ + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + constant_V_ideal_gas_reactor( + temperature=self._convert_quantity(reactor['temperature']), + pressure=self._convert_quantity(reactor['pressure']), + initialMoleFractions=reactor['initialMoleFractions'], + terminationConversion=reactor.get('terminationConversion'), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + terminationRateRatio=reactor.get('terminationRateRatio'), + balanceSpecies=reactor.get('balanceSpecies') + ) + + def _process_constant_tp_reactor(self, reactor_data): + """ + process constant T,P ideal gas reactor configuration + """ + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + constant_TP_ideal_gas_reactor( + temperature=self._convert_quantity(reactor['temperature']), + pressure=self._convert_quantity(reactor['pressure']), + initialMoleFractions=reactor['initialMoleFractions'], + terminationConversion=reactor.get('terminationConversion'), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + terminationRateRatio=reactor.get('terminationRateRatio'), + balanceSpecies=reactor.get('balanceSpecies') + ) + + def _process_liquid_cat_reactor(self, reactor_data): + """ + process liquid catalyst reactor configuration + """ + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + liquid_cat_reactor( + temperature=self._convert_quantity(reactor['temperature']), + initialConcentrations=self._convert_concentration_dict(reactor['initialConcentrations']), + initialSurfaceCoverages=reactor['initialSurfaceCoverages'], + surfaceVolumeRatio=self._convert_quantity(reactor['surfaceVolumeRatio']), + distance=self._convert_quantity(reactor.get('distance')), + viscosity=self._convert_quantity(reactor.get('viscosity')), + surfPotential=self._convert_quantity(reactor.get('surfPotential')), + liqPotential=self._convert_quantity(reactor.get('liqPotential')), + terminationConversion=reactor.get('terminationConversion'), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + terminationRateRatio=reactor.get('terminationRateRatio'), + constantSpecies=reactor.get('constantSpecies', []) + ) + + def _process_constant_tv_liquid_reactor(self, reactor_data): + """ + process constant T,V liquid reactor configuration + """ + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + constant_T_V_liquid_reactor( + temperature=self._convert_quantity(reactor['temperature']), + initialConcentrations=self._convert_concentration_dict(reactor['initialConcentrations']), + liquidVolume=self._convert_quantity(reactor.get('liquidVolume')), + residenceTime=self._convert_quantity(reactor.get('residenceTime')), + inletVolumetricFlowRate=self._convert_quantity(reactor.get('inletVolumetricFlowRate')), + outletVolumetricFlowRate=self._convert_quantity(reactor.get('outletVolumetricFlowRate')), + inletConcentrations=self._convert_concentration_dict(reactor.get('inletConcentrations', {})), + vaporPressure=self._convert_quantity(reactor.get('vaporPressure')), + vaporMoleFractions=reactor.get('vaporMoleFractions'), + terminationConversion=reactor.get('terminationConversion'), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + terminationRateRatio=reactor.get('terminationRateRatio'), + constantSpecies=reactor.get('constantSpecies', []) + ) + + def _process_liquid_reactor(self, reactor_data): + """ + process liquid reactor configuration + """ + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + # Handle sensitivity which could be None, a string, or a list + sensitivity = reactor.get('sensitivity') + if sensitivity is None: + sensitivity = None + elif isinstance(sensitivity, str): + sensitivity = [sensitivity] + else: + sensitivity = sensitivity + + liquid_reactor( + temperature=self._convert_quantity(reactor['temperature']), + initialConcentrations=self._convert_concentration_dict(reactor['initialConcentrations']), + terminationConversion=reactor.get('terminationConversion'), + nSims=reactor.get('nSims', 4), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + terminationRateRatio=reactor.get('terminationRateRatio'), + sensitivity=sensitivity, + sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3), + sensitivityTemperature=self._convert_quantity(reactor.get('sensitivityTemperature')), + sensitivityConcentrations=self._convert_concentration_dict(reactor.get('sensitivityConcentrations', {})), + constantSpecies=reactor.get('constantSpecies') + ) + + def _process_surface_reactor(self, reactor_data): + """ + process surface reactor configuration + """ + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + # Handle sensitivity which could be None, a string, or a list + sensitivity = reactor.get('sensitivity') + if sensitivity is None: + sensitivity = None + elif isinstance(sensitivity, str): + sensitivity = [sensitivity] + else: + sensitivity = sensitivity + + surface_reactor( + temperature=self._convert_quantity(reactor['temperature']), + initialPressure=self._convert_quantity(reactor['initialPressure']), + initialGasMoleFractions=reactor['initialGasMoleFractions'], + initialSurfaceCoverages=reactor['initialSurfaceCoverages'], + surfaceVolumeRatio=self._convert_quantity(reactor['surfaceVolumeRatio']), + nSims=reactor.get('nSims', 4), + terminationConversion=reactor.get('terminationConversion'), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + terminationRateRatio=reactor.get('terminationRateRatio'), + sensitivity=sensitivity, + sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3) + ) + + def _process_mb_sampled_reactor(self, reactor_data): + """ + process MB sampled reactor configuration + """ + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] + + for reactor in reactors: + # Handle sensitivity which could be None, a string, or a list + sensitivity = reactor.get('sensitivity') + if sensitivity is None: + sensitivity = None + elif isinstance(sensitivity, str): + sensitivity = [sensitivity] + else: + sensitivity = sensitivity + + mb_sampled_reactor( + temperature=self._convert_quantity(reactor['temperature']), + pressure=self._convert_quantity(reactor['pressure']), + initialMoleFractions=reactor['initialMoleFractions'], + mbsamplingRate=self._convert_quantity(reactor['mbsamplingRate']), + terminationConversion=reactor.get('terminationConversion'), + terminationTime=self._convert_quantity(reactor.get('terminationTime')), + sensitivity=sensitivity, + sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3), + constantSpecies=reactor.get('constantSpecies') + ) + + def _process_solvation(self, solv_data): + """ + process solvation settings + """ + # Handle SolventData if provided + + # (FINISH THIS IDK HOW TO DO IT RN) + + def _process_liquid_mass_transfer(self, lmt_data): + """ + process liquid volumetric mass transfer coefficient + """ + liquid_volumetric_mass_transfer_coefficient_power_law( + prefactor=self._convert_quantity(lmt_data.get('prefactor', (0, "1/s"))), + diffusionCoefficientPower=lmt_data.get('diffusionCoefficientPower', 0), + solventViscosityPower=lmt_data.get('solventViscosityPower', 0), + solventDensityPower=lmt_data.get('solventDensityPower', 0) + ) + + def _process_simulator(self, sim_data): + """ + process simulator settings + """ + simulator( + atol=sim_data.get('atol', 1e-16), + rtol=sim_data.get('rtol', 1e-8), + sens_atol=sim_data.get('sens_atol', 1e-6), + sens_rtol=sim_data.get('sens_rtol', 1e-4) + ) + + def _process_model(self, model_data): + """ + process model settings + """ + model( + toleranceMoveToCore=model_data.get('toleranceMoveToCore'), + toleranceRadMoveToCore=model_data.get('toleranceRadMoveToCore', float('inf')), + toleranceMoveEdgeReactionToCore=model_data.get('toleranceMoveEdgeReactionToCore', float('inf')), + toleranceKeepInEdge=model_data.get('toleranceKeepInEdge', 0.0), + toleranceInterruptSimulation=model_data.get('toleranceInterruptSimulation', 1.0), + toleranceMoveEdgeReactionToSurface=model_data.get('toleranceMoveEdgeReactionToSurface', float('inf')), + toleranceMoveSurfaceSpeciesToCore=model_data.get('toleranceMoveSurfaceSpeciesToCore', float('inf')), + toleranceMoveSurfaceReactionToCore=model_data.get('toleranceMoveSurfaceReactionToCore', float('inf')), + toleranceMoveEdgeReactionToSurfaceInterrupt=model_data.get('toleranceMoveEdgeReactionToSurfaceInterrupt'), + toleranceMoveEdgeReactionToCoreInterrupt=model_data.get('toleranceMoveEdgeReactionToCoreInterrupt'), + maximumEdgeSpecies=model_data.get('maximumEdgeSpecies', 1000000), + minCoreSizeForPrune=model_data.get('minCoreSizeForPrune', 50), + minSpeciesExistIterationsForPrune=model_data.get('minSpeciesExistIterationsForPrune', 2), + filterReactions=model_data.get('filterReactions', False), + filterThreshold=model_data.get('filterThreshold', 1e8), + ignoreOverallFluxCriterion=model_data.get('ignoreOverallFluxCriterion', False), + maxNumSpecies=model_data.get('maxNumSpecies'), + maxNumObjsPerIter=model_data.get('maxNumObjsPerIter', 1), + terminateAtMaxObjects=model_data.get('terminateAtMaxObjects', False), + toleranceThermoKeepSpeciesInEdge=model_data.get('toleranceThermoKeepSpeciesInEdge', float('inf')), + dynamicsTimeScale=self._convert_quantity(model_data.get('dynamicsTimeScale', (0.0, 'sec'))), + toleranceBranchReactionToCore=model_data.get('toleranceBranchReactionToCore', 0.0), + branchingIndex=model_data.get('branchingIndex', 0.5), + branchingRatioMax=model_data.get('branchingRatioMax', 1.0), + toleranceTransitoryDict=model_data.get('toleranceTransitoryDict', {}), + transitoryStepPeriod=model_data.get('transitoryStepPeriod', 20), + toleranceReactionToCoreDeadendRadical=model_data.get('toleranceReactionToCoreDeadendRadical', 0.0) + ) + + def _process_quantum_mechanics(self, qm_data): + """ + process quantum mechanics settings + """ + quantum_mechanics( + software=qm_data['software'], + method=qm_data['method'], + fileStore=qm_data.get('fileStore'), + scratchDirectory=qm_data.get('scratchDirectory'), + onlyCyclics=qm_data.get('onlyCyclics', False), + maxRadicalNumber=qm_data.get('maxRadicalNumber', 0) + ) + + def _process_ml_estimator(self, ml_data): + """ + process ML estimator settings + """ + ml_estimator( + thermo=ml_data.get('thermo', True), + name=ml_data.get('name', 'main'), + minHeavyAtoms=ml_data.get('minHeavyAtoms', 1), + maxHeavyAtoms=ml_data.get('maxHeavyAtoms'), + minCarbonAtoms=ml_data.get('minCarbonAtoms', 0), + maxCarbonAtoms=ml_data.get('maxCarbonAtoms'), + minOxygenAtoms=ml_data.get('minOxygenAtoms', 0), + maxOxygenAtoms=ml_data.get('maxOxygenAtoms'), + minNitrogenAtoms=ml_data.get('minNitrogenAtoms', 0), + maxNitrogenAtoms=ml_data.get('maxNitrogenAtoms'), + onlyCyclics=ml_data.get('onlyCyclics', False), + onlyHeterocyclics=ml_data.get('onlyHeterocyclics', False), + minCycleOverlap=ml_data.get('minCycleOverlap', 0), + H298UncertaintyCutoff=self._convert_quantity(ml_data.get('H298UncertaintyCutoff', (3.0, 'kcal/mol'))), + S298UncertaintyCutoff=self._convert_quantity(ml_data.get('S298UncertaintyCutoff', (2.0, 'cal/(mol*K)'))), + CpUncertaintyCutoff=self._convert_quantity(ml_data.get('CpUncertaintyCutoff', (2.0, 'cal/(mol*K)'))) + ) + + def _process_pressure_dependence(self, pd_data): + """ + process pressure dependence settings + """ + # Process temperatures - can be dict with min/max/count/units or list + temps = pd_data['temperatures'] + if isinstance(temps, dict): + temperatures = [temps['min'], temps['max'], temps['units'], temps['count']] + else: + temperatures = temps + + # Process pressures - can be dict with min/max/count/units or list + press = pd_data['pressures'] + if isinstance(press, dict): + pressures = [press['min'], press['max'], press['units'], press['count']] + else: + pressures = press + + # Process interpolation - can be list or tuple + interp = pd_data.get('interpolation') + if isinstance(interp, list) and len(interp) > 1: + # Convert list format [method, param1, param2] to tuple + interpolation = tuple(interp) + else: + interpolation = interp + + pressure_dependence( + method=pd_data['method'], + temperatures=temperatures, + pressures=pressures, + maximumGrainSize=self._convert_quantity(pd_data.get('maximumGrainSize', 0.0)), + minimumNumberOfGrains=pd_data.get('minimumNumberOfGrains', 0), + interpolation=interpolation, + maximumAtoms=pd_data.get('maximumAtoms') + ) + + def _process_species_constraints(self, constraints): + """ + process generated species constraints + """ + # Create a copy to avoid modifying the original + constraints_copy = constraints.copy() + + # Handle the special 'allowed' field + if 'allowed' in constraints_copy: + allowed_list = constraints_copy['allowed'] + # Convert special string values to their expected format + processed_allowed = [] + for item in allowed_list: + if item == 'input species': + processed_allowed.append('input species') + elif item == 'seed mechanisms': + processed_allowed.append('seed mechanisms') + elif item == 'reaction libraries': + processed_allowed.append('reaction libraries') + else: + processed_allowed.append(item) + constraints_copy['allowed'] = processed_allowed + + generated_species_constraints(**constraints_copy) + + def _process_thermo_central_database(self, tcd_data): + """ + process thermo central database settings + """ + thermo_central_database( + host=tcd_data['host'], + port=tcd_data['port'], + username=tcd_data['username'], + password=tcd_data['password'], + application=tcd_data['application'] + ) + + def _process_uncertainty(self, unc_data): + """ + process uncertainty settings + """ + uncertainty( + localAnalysis=unc_data.get('localAnalysis', False), + globalAnalysis=unc_data.get('globalAnalysis', False), + uncorrelated=unc_data.get('uncorrelated', True), + correlated=unc_data.get('correlated', True), + localNumber=unc_data.get('localNumber', 10), + globalNumber=unc_data.get('globalNumber', 5), + terminationTime=self._convert_quantity(unc_data.get('terminationTime')), + pceRunTime=unc_data.get('pceRunTime', 1800), + pceErrorTol=unc_data.get('pceErrorTol'), + pceMaxEvals=unc_data.get('pceMaxEvals'), + logx=unc_data.get('logx', True) + ) + + def _process_restart_from_seed(self, restart_data): + """ + process restart from seed settings + """ + restart_from_seed( + path=restart_data.get('path'), + coreSeed=restart_data.get('coreSeed'), + edgeSeed=restart_data.get('edgeSeed'), + filters=restart_data.get('filters'), + speciesMap=restart_data.get('speciesMap') + ) + + def _process_options(self, opt_data): + """ + process general options + """ + options( + name=opt_data.get('name', 'Seed'), + generateSeedEachIteration=opt_data.get('generateSeedEachIteration', True), + saveSeedToDatabase=opt_data.get('saveSeedToDatabase', False), + units=opt_data.get('units', 'si'), + saveRestartPeriod=opt_data.get('saveRestartPeriod'), + generateOutputHTML=opt_data.get('generateOutputHTML', False), + generatePlots=opt_data.get('generatePlots', False), + saveSimulationProfiles=opt_data.get('saveSimulationProfiles', False), + verboseComments=opt_data.get('verboseComments', False), + saveEdgeSpecies=opt_data.get('saveEdgeSpecies', False), + keepIrreversible=opt_data.get('keepIrreversible', False), + trimolecularProductReversible=opt_data.get('trimolecularProductReversible', True), + wallTime=opt_data.get('wallTime', '00:00:00:00'), # This is a string, not a quantity + saveSeedModulus=opt_data.get('saveSeedModulus', -1) + ) + + def _convert_quantity(self, value): + """ + convert YAML quantity representation to tuple format expected by functions + + :param value: Either a dict with 'value' and 'units', a list/tuple, + a single number/string, or None + :return: Tuple (value, units), the original value, or None + """ + if value is None: + return None + + if isinstance(value, dict): + if 'value' in value and 'units' in value: + return (value['value'], value['units']) + # Also handle the case where the quantity is directly the dict value + elif len(value) == 1: + # e.g., {0.5: 'kcal/mol'} format + val, unit = next(iter(value.items())) + return (val, unit) + else: + # Return the dict as-is if it doesn't match expected formats + return value + elif isinstance(value, (list, tuple)): + if len(value) == 2: + # Standard (value, units) format + return tuple(value) + elif len(value) == 4: + # For temperature/pressure ranges in pressure dependence + return value + else: + # Other list formats + return value + elif isinstance(value, str): + # For cases like wallTime which is just a string + return value + else: + # For single numeric values, return as-is + # The function being called will handle unit defaults if needed + return value + + def _convert_concentration_dict(self, conc_dict): + """ + convert concentration dictionary with quantity values + + :param conc_dict: Dictionary with species as keys and quantities as values + :return: Dictionary with converted quantities + """ + if not conc_dict: + return {} + + result = {} + for species, conc in conc_dict.items(): + result[species] = self._convert_quantity(conc) + return result + +# Actual reader function itself now +def read_yaml_input_file(path, rmg0): + """ + read an RMG YAML input file and process it using the existing input.py functions. + + :param path: Path to the YAML input file + :param rmg0: RMG object to populate + """ + # Import necessary modules for processing + from rmgpy.rmg.input import set_global_rmg + from rmgpy.rmg.model import CoreEdgeReactionModel + + # Set up the global RMG object + set_global_rmg(rmg0) + rmg0.reaction_model = CoreEdgeReactionModel() + rmg0.initial_species = [] + rmg0.reaction_systems = [] + + # Clear the global species_dict + from rmgpy.rmg import input as rmg_input + rmg_input.species_dict = {} + rmg_input.mol_to_frag = {} + + # Set species constraints default + rmg0.species_constraints = {'explicitlyAllowedMolecules': []} + + # Process YAML file + reader = YAMLInputReader(path) + reader.read() + reader.process() + + # Post-processing (similar to original read_input_file) + for reaction_system in rmg0.reaction_systems: + if hasattr(reaction_system, 'convert_initial_keys_to_species_objects'): + reaction_system.convert_initial_keys_to_species_objects(rmg_input.species_dict) + + if rmg0.quantum_mechanics: + rmg0.quantum_mechanics.set_default_output_directory(rmg0.output_directory) + rmg0.quantum_mechanics.initialize() + + logging.info('') + +def read_input_file_wrapper(path, rmg0): + """ + read an RMG input file (either Python or YAML format) and process it. + + this function automatically detects the file format based on the extension + and calls the appropriate reader. + + :param path: Path to the input file (.py or .yaml/.yml) + :param rmg0: RMG object to populate + """ + import os + from pathlib import Path + + # Get the file extension + file_path = Path(path) + extension = file_path.suffix.lower() + + # Check if file exists + if not file_path.exists(): + raise IOError(f'The input file "{path}" could not be found.') + + # Route to appropriate reader based on extension + if extension == '.py': + # Use the original Python input file reader + from rmgpy.rmg.input import read_input_file as read_python_input_file + logging.info(f'Detected Python input file format (.py)') + read_python_input_file(path, rmg0) + elif extension in ['.yaml', '.yml']: + # Use the YAML input file reader + logging.info(f'Detected YAML input file format ({extension})') + read_yaml_input_file(path, rmg0) + else: + raise ValueError( + f'Unsupported input file format "{extension}". ' + f'RMG supports .py and .yaml/.yml input files.' + ) + \ No newline at end of file From 85ada0cfb6bf1ee347e8657d0a3732ca34167693 Mon Sep 17 00:00:00 2001 From: vincthnngyn Date: Wed, 20 Aug 2025 17:08:27 -0400 Subject: [PATCH 2/3] Forgot to commit changes to environment and main --- environment.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/environment.yml b/environment.yml index 591acb91a5..19565080a9 100644 --- a/environment.yml +++ b/environment.yml @@ -84,6 +84,8 @@ dependencies: # https://github.com/ReactionMechanismGenerator/RMG-Py/pull/2694#issuecomment-2489286263 - conda-forge::quantities !=0.16.0,!=0.16.1 - conda-forge::ringdecomposerlib-python + - pyyaml + # packages we maintain - rmg::pydas >=1.0.3 From c8dd63c1ebea97e0f3a8a5e0117419c94df9d8ce Mon Sep 17 00:00:00 2001 From: vincthnngyn Date: Wed, 24 Sep 2025 16:02:39 -0400 Subject: [PATCH 3/3] Redid the yaml_input_reader class as the previous version had INSANELY MESSY SPAGHETTI CODE. Changes to the class: - started using field mapping to clean up the functions, eliminates the super long and messy if/else statements seen before - implemented a common processor function for structures and data in order to keep code clean (abstracted it basically) - added PyNum doc strings for accessibility - added more line by line comments so code is more clearly explained Changes to example input: - made it compliant with RMG Changes made to main.py and input.py: - main.py: ensured that read_file_auto is called - input.py: added PyNum docstring --- examples/rmg/butane_yaml_example/input.yaml | 28 +- rmgpy/rmg/input.py | 39 +- rmgpy/rmg/main.py | 7 +- rmgpy/rmg/yaml_input_reader.py | 1389 ++++++++++++------- 4 files changed, 929 insertions(+), 534 deletions(-) diff --git a/examples/rmg/butane_yaml_example/input.yaml b/examples/rmg/butane_yaml_example/input.yaml index 7fd742e603..c2cc9c7b82 100644 --- a/examples/rmg/butane_yaml_example/input.yaml +++ b/examples/rmg/butane_yaml_example/input.yaml @@ -127,8 +127,8 @@ simpleReactor: # list initial mole fractions of compounds using the label from the 'species' label. # RMG will normalize if sum/=1 initialMoleFractions: - N2: 4 - O2: 1 + N2: 4.0 + O2: 1.0 butane: 0.1538461538 # 1/6.5 # number of simulations used to explore variable temperature and pressure reactors @@ -138,10 +138,10 @@ simpleReactor: # only one must be specified # the first condition to be satisfied will terminate the process terminationConversion: - butane: 0.99 + butane: 0.9 # Changed from 0.99 to 0.9 for faster convergence terminationTime: - value: 40 + value: 100 # Increased from 40 to 100 seconds as safety fallback units: s # the next two optional values specify how RMG computes sensitivities of @@ -176,10 +176,10 @@ simpleReactor: # normally this doesn't cause many issues and is modified after other issues are # ruled out simulator: - atol: 1e-16 - rtol: 1e-8 - # sensAtol: 1e-6 - # sensRtol: 1e-4 + atol: 1.0e-16 + rtol: 1.0e-8 + sens_atol: 1.0e-6 + sens_rtol: 1.0e-4 # used to add species to the model and to reduce memory usage by removing unimportant additional species. # all relative values are normalized by a characteristic flux at that time point @@ -197,7 +197,7 @@ model: # determines when to stop a ODE run to add a species. # Lower values will improve speed. # if it is too low, may never get to the end simulation to prune species. - toleranceInterruptSimulation: 1 + toleranceInterruptSimulation: 1.0 # number of edge species needed to accumulate before pruning occurs # larger values require more memory and will prune less often @@ -218,11 +218,11 @@ model: # for bimolecular reactions, will only allow them to react if # filterThreshold*C_A*C_B > toleranceMoveToCore*characteristic_rate # and if filterReactions=True - filterThreshold: 1e8 + filterThreshold: 1.0e8 options: # provides a name for the seed mechanism produced at the end of an rmg run default is 'Seed' - name: SeedName + name: butane_oxidation # Changed from generic 'SeedName' to be more descriptive # if True (default) every iteration it saves the current model as libraries/seeds # (and deletes the old one) @@ -288,13 +288,13 @@ pressureDependence: min: 300 max: 2200 units: K - count: 2 + count: 8 # Increased from 2 for better coverage pressures: min: 0.01 max: 100 units: bar - count: 3 + count: 5 # Increased from 3 for better coverage # The two options for interpolation are 'PDepArrhenius' (no extra arguments) and # 'Chebyshev' which is followed by the number of basis sets in @@ -331,7 +331,7 @@ generatedSpeciesConstraints: # maximumHeavyAtoms: 20 # maximum radicals on a molecule - maximumRadicalElectrons: 1 + maximumRadicalElectrons: 2 # Increased from 1 to allow peroxy radicals (ROO•) # maximum number of singlet carbenes (lone pair on a carbon atom) in a molecule maximumSingletCarbenes: 1 diff --git a/rmgpy/rmg/input.py b/rmgpy/rmg/input.py index f8ff75faf3..0c3306f448 100644 --- a/rmgpy/rmg/input.py +++ b/rmgpy/rmg/input.py @@ -1885,28 +1885,51 @@ def read_input_file_auto(path, rmg0): this function automatically detects the file format based on the extension and calls the appropriate reader - :param path: Path to the input file (.py or .yaml/.yml) - :param rmg0: RMG object to populate + Parameters + ---------- + path : Union[str, Path] + Path to the input file (.py or .yaml/.yml) + rmg0 : RMG + RMG object to populate with input data + + Raises + ------ + IOError + If the input file cannot be found + ValueError + If the file format is unsupported + ImportError + If the file is in YAML format but PyYAML is not installed in the + current Python environment. """ from pathlib import Path - # Get the file extension + # get the file extension file_path = Path(path) extension = file_path.suffix.lower() - # Check if file exists + # check if file exists if not file_path.exists(): raise IOError(f'The input file "{path}" could not be found.') - # Route to appropriate reader based on extension + # route to appropriate reader based on extension if extension == '.py': - # Use the original Python input file reader + # use the original Python input file reader logging.info(f'Detected Python input file format (.py)') read_input_file(path, rmg0) elif extension in ['.yaml', '.yml']: - # Use the YAML input file reader + # use the YAML input file reader try: - from rmgpy.rmg.yaml_input_reader import read_yaml_input_file + #iImport the YAML reader functions directly + import sys + import os + + # add the directory containing yaml_input_reader.py to the Python path + yaml_reader_dir = os.path.dirname(__file__) + if yaml_reader_dir not in sys.path: + sys.path.insert(0, yaml_reader_dir) + + from yaml_input_reader import read_yaml_input_file logging.info(f'Detected YAML input file format ({extension})') read_yaml_input_file(path, rmg0) except ImportError: diff --git a/rmgpy/rmg/main.py b/rmgpy/rmg/main.py index 88ea75ff4c..9d5fb7c190 100644 --- a/rmgpy/rmg/main.py +++ b/rmgpy/rmg/main.py @@ -260,11 +260,14 @@ def load_input(self, path=None): Load an RMG job from the input file located at `input_file`, or from the `input_file` attribute if not given as a parameter. """ - from rmgpy.rmg.input import read_input_file + from rmgpy.rmg.input import read_input_file, read_input_file_auto, read_thermo_input_file, save_input_file + + self.input_file = path + read_input_file_auto(path, self) # Changed to use auto-detection if path is None: path = self.input_file - read_input_file(path, self) + read_input_file_auto(path, self) self.reaction_model.kinetics_estimator = self.kinetics_estimator # If the output directory is not yet set, then set it to the same # directory as the input file by default diff --git a/rmgpy/rmg/yaml_input_reader.py b/rmgpy/rmg/yaml_input_reader.py index 6179c40e8f..f6f0f5cd77 100644 --- a/rmgpy/rmg/yaml_input_reader.py +++ b/rmgpy/rmg/yaml_input_reader.py @@ -1,17 +1,17 @@ """ -YAML input reader for RMG -this module reads YAML format RMG input files -and calls the existing input.py functions - -preserves ability to still use legacy python input files +Optimized YAML input reader for RMG +Reads YAML format RMG input files and calls existing input.py functions +Preserves compatibility with legacy Python input files """ import yaml import logging import os from pathlib import Path +from functools import lru_cache +from typing import Dict, Any, List, Union, Optional, Tuple -# import ALL the existing functions from original input.py +# import all existing functions from input.py from rmgpy.rmg.input import ( database, catalyst_properties, species, forbidden, simple_reactor, constant_V_ideal_gas_reactor, constant_TP_ideal_gas_reactor, @@ -25,436 +25,669 @@ ) class YAMLInputReader: - def __init__(self, path): + """ + Optimized YAML input reader with improved structure and performance + (using mapping from dicts now) + """ + + # mapping of YAML keys to internal processor methods + # avoids long if/else chains and allows for scalable additions into input file and + # corresponding processor methods here + PROCESSORS = { + 'database': '_process_database', + 'catalystProperties': '_process_catalyst_properties', + 'species': '_process_species', + 'forbidden': '_process_forbidden', + 'react': '_process_react', + 'simpleReactor': '_process_simple_reactor', + 'constantVIdealGasReactor': '_process_constant_v_reactor', + 'constantTPIdealGasReactor': '_process_constant_tp_reactor', + 'liquidCatReactor': '_process_liquid_cat_reactor', + 'constantTVLiquidReactor': '_process_constant_tv_liquid_reactor', + 'liquidReactor': '_process_liquid_reactor', + 'surfaceReactor': '_process_surface_reactor', + 'mbSampledReactor': '_process_mb_sampled_reactor', + 'solvation': '_process_solvation', + 'liquidVolumetricMassTransferCoefficientPowerLaw': '_process_volumetric_mass_transfer', + 'simulator': '_process_simulator', + 'model': '_process_model', + 'quantumMechanics': '_process_quantum_mechanics', + 'mlEstimator': '_process_ml_estimator', + 'pressureDependence': '_process_pressure_dependence', + 'generatedSpeciesConstraints': '_process_species_constraints', + 'thermoCentralDatabase': '_process_thermo_central_database', + 'uncertainty': '_process_uncertainty', + 'restartFromSeed': '_process_restart_from_seed', + 'options': '_process_options' + } + + # mapping structure types to their corresponding conversion functions + # already existing in input.py (the ones we imported up above) + # allows flexible specification of molecular structures in different formats + STRUCTURE_TYPES = { + 'SMILES': smiles, # SMILES string notation + 'InChI': inchi, # InChI string notation + 'adjacencyList': adjacency_list, # RMG adjacency list format + 'adjacencyListGroup': adjacency_list_group, # RMG adjacency list for reaction groups + 'SMARTS': smarts, # SMARTS pattern notation + 'fragmentAdjacencyList': fragment_adj, # Fragment adjacency list + 'fragmentSMILES': fragment_smiles # Fragment SMILES notation + } + + def __init__(self, path: Union[str, Path]): + """ + Initialize YAML input reader with a file path + + Parameters + ---------- + path : Union[str, Path] + Path to the YAML input file """ - initialize the YAML input reader with a file path + self.path = Path(path) # ensures path is a pathlib.Path obj for consistent path handling + self.data = None # will store the parsed YAML data from input file + self.species_dict = {} # store the species dictionary for reference lookup - :param path: path to the YAML input file + def read(self) -> None: """ - self.path = Path(path) - self.data = None - self.species_dict = {} + Read and parse YAML input file - def read(self): + Returns + ------- + None + + Raises + ------ + ValueError + If YAML file has invalid syntax or structure + IOError + If file cannot be read """ - read and parse YAML input file + logging.info(f'Reading YAML input file "{self.path}"...') # log file being processed + + try: + with open(self.path, 'r') as file: + content = file.read() + + self.data = yaml.safe_load(content) # parse YAML content safely (prevents code execution) + + if not isinstance(self.data, dict): # validate that top-level structure is a dictionary + raise ValueError("YAML file must contain a dictionary at the top level") + + logging.info(content) # log raw yaml text for debugging + + except yaml.YAMLError as e: + # check for any YAML syntax errors + raise ValueError(f"Invalid YAML syntax: {e}") + except IOError as e: + # check to see if can access file or if file exists + raise IOError(f"Failed to read file: {e}") + + def process(self) -> None: """ - logging.info(f'Reading YAML input file "{self.path}"...') - # let user know in termal that file is being read + Process the loaded YAML data and call appropriate RMG input functions based + on content of YAML input file - with open(self.path, 'r') as file: - #store content to log onto terminal - content = file.read() + Returns + ------- + None - self.data = yaml.safe_load(file) - - # check if input file is empty - if not isinstance(self.data, dict): - raise ValueError("Yo ur missing a dictionary bro YAML file needs a dictionary") + Raises + ------ + RuntimeError + If no data has been loaded + ValueError + If error occurs while processing any section + """ + if not self.data: # ensure data loaded before processing + raise RuntimeError("No data loaded. Run read() first") - # log contents of file into terminal - logging.info(content) + # loop through all possible keys in order + # replaced the previous method of using a bazillion if statements + for key, processor_name in self.PROCESSORS.items(): # iterate through all processor mappings + if key in self.data: # if the section exists in input file + processor = getattr(self, processor_name) # call on corresponding processor func (dynamic method lookup) + try: + processor(self.data[key]) # call on the processor function with the section data + except Exception as e: + raise ValueError(f"Error processing {key}: {e}") # report processing errors with context + + def _process_structure(self, structure_data: Union[str, Dict], + entity_type: str = "entity") -> Any: + """ + Generic structure processor to reduce code duplication + + Parameters + ---------- + structure_data : Union[str, Dict] + Structure data (string for direct adjacency list or dict with type/value pairs) + entity_type : str + Type of entity for error messages ("species" or "forbidden") - def process(self): + Returns + ------- + Any + Processed structure object (e.g., Molecule, Group) + + Raises + ------ + ValueError + If structure format is unknown or invalid """ - processes the info from YAML file and calls on the preexisting functions in input.py + # default to adjacency list for species, adjacency list group for forbidden + if isinstance(structure_data, str): # handle simple string format (most common case) + # if just string, default to adjacency list by calling the imported conversion funcs + if entity_type == "forbidden": + return adjacency_list_group(structure_data) # forbidden structures use group format + else: + return adjacency_list(structure_data) # reg species use molecule format + + elif isinstance(structure_data, dict): # handle dictionary format + # if is a dict, find the structure type and process it + for key, func in self.STRUCTURE_TYPES.items(): # check each structure type + if key in structure_data: + return func(structure_data[key]) # call on processor func for the found struc type + + raise ValueError(f"Unknown structure format in {entity_type}") # no recognized format found + else: + raise ValueError(f"Invalid structure format for {entity_type}") # not string or dict + + ############################################################## + # the general technique for these functions is just taking + # the data from the YAML file and plugging them into the + # parameters of the imported pre-existing functions from the + # original input handler + ############################################################### + + def _process_database(self, db_data: Dict[str, Any]) -> None: """ - if not self.data: - raise RuntimeError("Yo no data loaded run read() first") - - # now process the data in sections (same order as for .py inputs) - # lowkey a really monkey way of doing this w a ton of if statements but I cant really do - # switch cases since i need to check if EVERY PIECE OF INFO IS VALID - # ⬇ libraries and species information ⬇ - if 'database' in self.data: - self._process_database(self.data['database']) - if 'catalystProperties' in self.data: - self._process_catalyst_properties(self.data['catalystProperties']) - if 'species' in self.data: - self._process_species(self.data['species']) - if 'forbidden' in self.data: - self._process_forbidden(self.data['forbidden']) - if 'react' in self.data: - self._process_react(self.data['react']) - # process each reactor type individually - # each reactor can appear at the top level - # ⬇ reactor information ⬇ - if 'simpleReactor' in self.data: - self._process_simple_reactor(self.data['simpleReactor']) - if 'constantVIdealGasReactor' in self.data: - self._process_constant_v_reactor(self.data['constantVIdealGasReactor']) - if 'constantTPIdealGasReactor' in self.data: - self._process_constant_tp_reactor(self.data['constantTPIdealGasReactor']) - if 'liquidCatReactor' in self.data: - self._process_liquid_cat_reactor(self.data['liquidCatReactor']) - if 'constantTVLiquidReactor' in self.data: - self._process_constant_tv_liquid_reactor(self.data['constantTVLiquidReactor']) - if 'liquidReactor' in self.data: - self._process_liquid_reactor(self.data['liquidReactor']) - if 'surfaceReactor' in self.data: - self._process_surface_reactor(self.data['surfaceReactor']) - if 'mbSampledReactor' in self.data: - self._process_mb_sampled_reactor(self.data['mbSampledReactor']) - # ⬇ other info/options ⬇ - if 'solvation' in self.data: - self._process_solvation(self.data['solvation']) - if 'liquidVolumetricMassTransferCoefficientPowerLaw' in self.data: - self._process_volumetric_mass_transfer(self.data['liquidVolumetricMassTransferCoefficientPowerLaw']) - if 'simulator' in self.data: - self._process_simulator(self.data['simulator']) - if 'model' in self.data: - self._process_simulator(self.data['model']) - if 'quantumMechanics' in self.data: - self._process_quantum_mechanics(self.data['quantumMechanics']) - if 'mlEstimator' in self.data: - self._process_ml_estimator(self.data['mlEstimator']) - if 'pressureDependence' in self.data: - self._process_pressure_dependence(self.data['pressureDependence']) - if 'generatedSpeciesConstraints' in self.data: - self._process_species_constraints(self.data['generatedSpeciesConstraints']) - if 'thermoCentralDatabase' in self.data: - self._process_thermo_central_database(self.data['thermoCentralDatabase']) - if 'uncertainty' in self.data: - self._process_uncertainty(self.data['uncertainty']) - if 'restartFromSeed' in self.data: - self._process_restart_from_seed(self.data['restartFromSeed']) - if 'options' in self.data: - self._process_options(self.data['options']) - + Process database input configuration - # FOR PROCESSOR FUNCTIONS DO THIS: - # FOR DATABASE PROCESSOR, JUST PASS THE INFO INTO THE OG DB FUNC - def _process_database(self, db_data): - """ - process database input - """ - reaction_libraries = [] - if 'reactionLibraries' in db_data: - for lib in db_data['reactionLibraries']: - if isinstance(lib, str): - reaction_libraries.append(lib) - elif isinstance(lib, dict): - # convert dict format to tuple format - name = lib.get('name') - seed = lib.get('seed', False) - # if no seed bool set, default to False - reaction_libraries.append((name, seed)) + Parameters + ---------- + db_data : Dict[str, Any] + Database configuration dictionary + + Returns + ------- + None + """ + # convert reaction libraries + # handle reaction libraries which can be simple strings or dicts with seed flags + reaction_libraries = [ + (lib.get('name'), lib.get('seed', False)) if isinstance(lib, dict) else lib # convert dict to tuple + for lib in db_data.get('reactionLibraries', []) # auto default to empty list if not specified + ] + + # call original database function database( - thermoLibraries = db_data.get('thermoLibraries'), - transportLibraries = db_data.get('transportLibraries'), - reactionLibraries = db_data.get('reactionLibraries'), - frequenciesLibraries = db_data.get('frequenciesLibraries'), - seedMechanisms = db_data.get('seedMechanisms'), - kineticsFamilies = db_data.get('kineticsFamilies', 'default'), - kineticsDepositories = db_data.get('kineticsDepositories', 'default'), - kineticsEstimator = db_data.get('kineticsEstimator', 'rate rules'), - adsorptionGroups = db_data.get('adsorptionGroups', 'adsorptionPt111') + thermoLibraries=db_data.get('thermoLibraries'), + transportLibraries=db_data.get('transportLibraries'), + reactionLibraries=reaction_libraries, + frequenciesLibraries=db_data.get('frequenciesLibraries'), + seedMechanisms=db_data.get('seedMechanisms'), + kineticsFamilies=db_data.get('kineticsFamilies', 'default'), + kineticsDepositories=db_data.get('kineticsDepositories', 'default'), + kineticsEstimator=db_data.get('kineticsEstimator', 'rate rules'), + adsorptionGroups=db_data.get('adsorptionGroups', 'adsorptionPt111') ) - # FOR SPECIES PROCESSORS, CALL ON THE EXISTING FUNCS WITH DATA FROM YAML FILE - def _process_catalyst_properties(self, cat_data): + + def _process_catalyst_properties(self, cat_data: Dict[str, Any]) -> None: """ - process catalyst input + Process catalyst properties configuration + + Parameters + ---------- + cat_data : Dict[str, Any] + Catalyst properties configuration dictionary + + Returns + ------- + None """ + # call original catalyst_properties function catalyst_properties( - bindingEnergies = cat_data.get('bindingEnergies'), - surfaceSiteDensity = cat_data.get('surfaceSiteDensity'), - metal = cat_data.get('metal'), - coverageDependence = cat_data.get('coverageDependence', False) - # ^ if no coverage dependence bool is set, default to False like in input.py + bindingEnergies=cat_data.get('bindingEnergies'), + surfaceSiteDensity=cat_data.get('surfaceSiteDensity'), + metal=cat_data.get('metal'), + coverageDependence=cat_data.get('coverageDependence', False) ) - def _process_species(self, spec_list): - """ - process species definition - """ - for spec in spec_list: - # handle structure based on nested format or explicit type - if 'structure' in spec: - struc_data = spec['structure'] - - if isinstance(struc_data, str): - structure = adjacency_list(struc_data) - # assumes explicit adjacency list if is only string - elif isinstance(struc_data, dict): - # if not, check if its a dict w a key | [name]: [value] - if 'SMILES' in struc_data: - structure = smiles(struc_data['SMILES']) - elif 'InChI' in struc_data: - structure = inchi(struc_data['InChI']) - elif 'adjacencyList' in struc_data: - structure = adjacency_list(struc_data['adjacencyList']) - elif 'fragmentAdjacencyList' in struc_data: - structure = fragment_adj(struc_data['fragmentAdjacencyList']) - elif 'fragmentSMILES' in struc_data: - structure = fragment_smiles(struc_data['fragmentSMILES']) - else: - raise ValueError(f"Unknown structure format in species {spec.get('label', 'unknown')}") - else: - raise ValueError(f"Invalid structure format for forbidden {spec.get('label', 'unknown')}") - else: - raise ValueError(f"No structure provided for species {spec.get('label', 'unknown')}") + + def _process_species(self, spec_list: List[Dict[str, Any]]) -> None: + """ + Process species definitions + + Parameters + ---------- + spec_list : List[Dict[str, Any]] + List of species configuration dictionaries + + Returns + ------- + None + Raises + ------ + ValueError + If species structure is missing or invalid + """ + for spec in spec_list: # process each species in input file + if 'structure' not in spec: # check if structure provided + raise ValueError(f"No structure provided for species {spec.get('label', 'unknown')}") + + structure = self._process_structure(spec['structure'], "species") # convert structure to RMG format + + # call original species function species( - label = spec['label'], - structure = structure, - reactive = spec.get('reactive', True), - cut = spec.get('cut', False), - size_threshold = spec.get('sizeThreshold') + label=spec['label'], + structure=structure, + reactive=spec.get('reactive', True), + cut=spec.get('cut', False), + size_threshold=spec.get('sizeThreshold') ) - def _process_forbidden(self, forb_list): - """ - process forbidden structures - similar method as did species - """ - for forb in forb_list: - if 'structure' in forb: - struc_data = forb['structure'] - - if isinstance(struc_data, str): - # assume adjacency list group - structure = adjacency_list_group(struc_data) - elif isinstance(struc_data, dict): - if 'SMILES' in struc_data: - structure = smiles(struc_data['SMILES']) - elif 'SMARTS' in struc_data: - structure = smarts(struc_data['SMARTS']) - elif 'adjacencyList' in struc_data: - structure = adjacency_list(struc_data['adjacencyList']) - elif 'adjacencyListGroup' in struc_data: - structure = adjacency_list_group(struc_data['adjacencyListGroup']) - else: - raise ValueError(f"Unknown structure format in forbidden {forb.get('label', 'unknown')}") - else: - raise ValueError(f"Invalid structure format for forbidden {forb.get('label', 'unknown')}") - else: - raise ValueError(f"No structure provided for species {forb.get('label', 'unknown')}") + + def _process_forbidden(self, forb_list: List[Dict[str, Any]]) -> None: + """ + Process forbidden structures + + Parameters + ---------- + forb_list : List[Dict[str, Any]] + List of forbidden structure configuration dictionaries + + Returns + ------- + None + + Raises + ------ + ValueError + If forbidden structure is missing or invalid + """ + for forb in forb_list: # process each forb structure + if 'structure' not in forb: # check if structure provided + raise ValueError(f"No structure provided for forbidden {forb.get('label', 'unknown')}") + + structure = self._process_structure(forb['structure'], "forbidden") # convert to group format + + # call original forb function forbidden( label=forb['label'], structure=structure ) + + def _process_react(self, react_data: Any) -> None: + """ + Process react specifications + Parameters + ---------- + react_data : Any + React configuration data + Returns + ------- + None + """ + # pass to original react function + react(react_data) + + def _process_reactor_common(self, reactor_func, reactor_data: Union[Dict, List], + field_mapping: Dict[str, str]) -> None: + """ + Common reactor processor to reduce code duplication - # FOR REACTOR PROCESSOR, ALSO DO THE SAME + Parameters + ---------- + reactor_func : callable + The reactor function to call + reactor_data : Union[Dict, List] + Reactor configuration data (single dict or list of dicts) + field_mapping : Dict[str, str] + Mapping of YAML field names to function parameter names - def _process_react(self, react_data): + Returns + ------- + None """ - process react specifications - """ - react(react_data) + reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] # normalize to list format - def _process_simple_reactor(self, reactor_data): + for reactor in reactors: # process reactor(s) in input file + kwargs = {} # keyword args dictionary for function call (basically parameter names of func) + + for yaml_key, param_name in field_mapping.items(): # map YAML keys to provided function parameters + if yaml_key in reactor: # only process keys in input file + value = reactor[yaml_key] + + # special handling for sensitivity + if yaml_key == 'sensitivity' and value is not None: # sensitivity can be string or list + value = [value] if isinstance(value, str) else value # normalize to list format + + # convert quantities + elif any(keyword in yaml_key.lower() for keyword in # check if field requires unit conversion + ['temperature', 'pressure', 'time', 'volume', 'rate', + 'coefficient', 'viscosity', 'potential', 'distance']): + value = self._convert_quantity(value) # convert to (value, units) tuple format + + # convert concentration dictionaries + elif 'concentration' in yaml_key.lower() and isinstance(value, dict): + value = self._convert_concentration_dict(value) # process concentration mappings + + kwargs[param_name] = value # store converted value with function parameter name + + reactor_func(**kwargs) # unpack the kwargs into params so func can handle + # call on the func itself with the kwargs as params + + def _process_simple_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process simple reactor configuration + Process simple reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + Simple reactor configuration data + + Returns + ------- + None """ - # Handle both single reactor and list of reactors - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - # Handle sensitivity which could be None, a string, or a list - sensitivity = reactor.get('sensitivity') - if sensitivity is None: - sensitivity = None - elif isinstance(sensitivity, str): - sensitivity = [sensitivity] - else: - sensitivity = sensitivity - - simple_reactor( - temperature=self._convert_quantity(reactor['temperature']), - pressure=self._convert_quantity(reactor['pressure']), - initialMoleFractions=reactor['initialMoleFractions'], - nSims=reactor.get('nSims', 6), - terminationConversion=reactor.get('terminationConversion'), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - terminationRateRatio=reactor.get('terminationRateRatio'), - balanceSpecies=reactor.get('balanceSpecies'), - sensitivity=sensitivity, - sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3), - sensitivityTemperature=self._convert_quantity(reactor.get('sensitivityTemperature')), - sensitivityPressure=self._convert_quantity(reactor.get('sensitivityPressure')), - sensitivityMoleFractions=reactor.get('sensitivityMoleFractions'), - constantSpecies=reactor.get('constantSpecies') - ) + # define mapping from YAML keys to function param for simple reactor + # this basically what those kwargs above are + field_mapping = { + 'temperature': 'temperature', + 'pressure': 'pressure', + 'initialMoleFractions': 'initialMoleFractions', + 'nSims': 'nSims', + 'terminationConversion': 'terminationConversion', + 'terminationTime': 'terminationTime', + 'terminationRateRatio': 'terminationRateRatio', + 'balanceSpecies': 'balanceSpecies', + 'sensitivity': 'sensitivity', + 'sensitivityThreshold': 'sensitivityThreshold', + 'sensitivityTemperature': 'sensitivityTemperature', + 'sensitivityPressure': 'sensitivityPressure', + 'sensitivityMoleFractions': 'sensitivityMoleFractions', + 'constantSpecies': 'constantSpecies' + } + # use common processing + self._process_reactor_common(simple_reactor, reactor_data, field_mapping) - def _process_constant_v_reactor(self, reactor_data): + def _process_constant_v_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process constant V ideal gas reactor configuration + Process constant V ideal gas reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + Constant V reactor configuration data + + Returns + ------- + None """ - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - constant_V_ideal_gas_reactor( - temperature=self._convert_quantity(reactor['temperature']), - pressure=self._convert_quantity(reactor['pressure']), - initialMoleFractions=reactor['initialMoleFractions'], - terminationConversion=reactor.get('terminationConversion'), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - terminationRateRatio=reactor.get('terminationRateRatio'), - balanceSpecies=reactor.get('balanceSpecies') - ) + # mapping for constant V reactor parameters + field_mapping = { + 'temperature': 'temperature', + 'pressure': 'pressure', + 'initialMoleFractions': 'initialMoleFractions', + 'terminationConversion': 'terminationConversion', + 'terminationTime': 'terminationTime', + 'terminationRateRatio': 'terminationRateRatio', + 'balanceSpecies': 'balanceSpecies' + } + self._process_reactor_common(constant_V_ideal_gas_reactor, reactor_data, field_mapping) - def _process_constant_tp_reactor(self, reactor_data): + def _process_constant_tp_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process constant T,P ideal gas reactor configuration + Process constant T,P ideal gas reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + Constant T,P reactor configuration data + + Returns + ------- + None """ - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - constant_TP_ideal_gas_reactor( - temperature=self._convert_quantity(reactor['temperature']), - pressure=self._convert_quantity(reactor['pressure']), - initialMoleFractions=reactor['initialMoleFractions'], - terminationConversion=reactor.get('terminationConversion'), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - terminationRateRatio=reactor.get('terminationRateRatio'), - balanceSpecies=reactor.get('balanceSpecies') - ) + # mapping for constant T and P reactor + field_mapping = { + 'temperature': 'temperature', + 'pressure': 'pressure', + 'initialMoleFractions': 'initialMoleFractions', + 'terminationConversion': 'terminationConversion', + 'terminationTime': 'terminationTime', + 'terminationRateRatio': 'terminationRateRatio', + 'balanceSpecies': 'balanceSpecies' + } + self._process_reactor_common(constant_TP_ideal_gas_reactor, reactor_data, field_mapping) - def _process_liquid_cat_reactor(self, reactor_data): + def _process_liquid_cat_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process liquid catalyst reactor configuration + Process liquid catalyst reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + Liquid catalyst reactor configuration data + + Returns + ------- + None """ - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - liquid_cat_reactor( - temperature=self._convert_quantity(reactor['temperature']), - initialConcentrations=self._convert_concentration_dict(reactor['initialConcentrations']), - initialSurfaceCoverages=reactor['initialSurfaceCoverages'], - surfaceVolumeRatio=self._convert_quantity(reactor['surfaceVolumeRatio']), - distance=self._convert_quantity(reactor.get('distance')), - viscosity=self._convert_quantity(reactor.get('viscosity')), - surfPotential=self._convert_quantity(reactor.get('surfPotential')), - liqPotential=self._convert_quantity(reactor.get('liqPotential')), - terminationConversion=reactor.get('terminationConversion'), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - terminationRateRatio=reactor.get('terminationRateRatio'), - constantSpecies=reactor.get('constantSpecies', []) - ) + # mapping for liquid-phase catalytic reactor parameters + field_mapping = { + 'temperature': 'temperature', + 'initialConcentrations': 'initialConcentrations', + 'initialSurfaceCoverages': 'initialSurfaceCoverages', + 'surfaceVolumeRatio': 'surfaceVolumeRatio', + 'distance': 'distance', + 'viscosity': 'viscosity', + 'surfPotential': 'surfPotential', + 'liqPotential': 'liqPotential', + 'terminationConversion': 'terminationConversion', + 'terminationTime': 'terminationTime', + 'terminationRateRatio': 'terminationRateRatio', + 'constantSpecies': 'constantSpecies' + } + self._process_reactor_common(liquid_cat_reactor, reactor_data, field_mapping) - def _process_constant_tv_liquid_reactor(self, reactor_data): + def _process_constant_tv_liquid_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process constant T,V liquid reactor configuration + Process constant T,V liquid reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + Constant T,V liquid reactor configuration data + + Returns + ------- + None """ - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - constant_T_V_liquid_reactor( - temperature=self._convert_quantity(reactor['temperature']), - initialConcentrations=self._convert_concentration_dict(reactor['initialConcentrations']), - liquidVolume=self._convert_quantity(reactor.get('liquidVolume')), - residenceTime=self._convert_quantity(reactor.get('residenceTime')), - inletVolumetricFlowRate=self._convert_quantity(reactor.get('inletVolumetricFlowRate')), - outletVolumetricFlowRate=self._convert_quantity(reactor.get('outletVolumetricFlowRate')), - inletConcentrations=self._convert_concentration_dict(reactor.get('inletConcentrations', {})), - vaporPressure=self._convert_quantity(reactor.get('vaporPressure')), - vaporMoleFractions=reactor.get('vaporMoleFractions'), - terminationConversion=reactor.get('terminationConversion'), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - terminationRateRatio=reactor.get('terminationRateRatio'), - constantSpecies=reactor.get('constantSpecies', []) - ) + # mapping for constant T and V liquid reactor + field_mapping = { + 'temperature': 'temperature', + 'initialConcentrations': 'initialConcentrations', + 'liquidVolume': 'liquidVolume', + 'residenceTime': 'residenceTime', + 'inletVolumetricFlowRate': 'inletVolumetricFlowRate', + 'outletVolumetricFlowRate': 'outletVolumetricFlowRate', + 'inletConcentrations': 'inletConcentrations', + 'vaporPressure': 'vaporPressure', + 'vaporMoleFractions': 'vaporMoleFractions', + 'terminationConversion': 'terminationConversion', + 'terminationTime': 'terminationTime', + 'terminationRateRatio': 'terminationRateRatio', + 'constantSpecies': 'constantSpecies' + } + self._process_reactor_common(constant_T_V_liquid_reactor, reactor_data, field_mapping) - def _process_liquid_reactor(self, reactor_data): + def _process_liquid_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process liquid reactor configuration + Process liquid reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + Liquid reactor configuration data + + Returns + ------- + None """ - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - # Handle sensitivity which could be None, a string, or a list - sensitivity = reactor.get('sensitivity') - if sensitivity is None: - sensitivity = None - elif isinstance(sensitivity, str): - sensitivity = [sensitivity] - else: - sensitivity = sensitivity - - liquid_reactor( - temperature=self._convert_quantity(reactor['temperature']), - initialConcentrations=self._convert_concentration_dict(reactor['initialConcentrations']), - terminationConversion=reactor.get('terminationConversion'), - nSims=reactor.get('nSims', 4), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - terminationRateRatio=reactor.get('terminationRateRatio'), - sensitivity=sensitivity, - sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3), - sensitivityTemperature=self._convert_quantity(reactor.get('sensitivityTemperature')), - sensitivityConcentrations=self._convert_concentration_dict(reactor.get('sensitivityConcentrations', {})), - constantSpecies=reactor.get('constantSpecies') - ) + # mapping for general liquid reactor parameters + field_mapping = { + 'temperature': 'temperature', + 'initialConcentrations': 'initialConcentrations', + 'terminationConversion': 'terminationConversion', + 'nSims': 'nSims', + 'terminationTime': 'terminationTime', + 'terminationRateRatio': 'terminationRateRatio', + 'sensitivity': 'sensitivity', + 'sensitivityThreshold': 'sensitivityThreshold', + 'sensitivityTemperature': 'sensitivityTemperature', + 'sensitivityConcentrations': 'sensitivityConcentrations', + 'constantSpecies': 'constantSpecies' + } + self._process_reactor_common(liquid_reactor, reactor_data, field_mapping) - def _process_surface_reactor(self, reactor_data): + def _process_surface_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process surface reactor configuration + Process surface reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + Surface reactor configuration data + + Returns + ------- + None """ - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - # Handle sensitivity which could be None, a string, or a list - sensitivity = reactor.get('sensitivity') - if sensitivity is None: - sensitivity = None - elif isinstance(sensitivity, str): - sensitivity = [sensitivity] - else: - sensitivity = sensitivity - - surface_reactor( - temperature=self._convert_quantity(reactor['temperature']), - initialPressure=self._convert_quantity(reactor['initialPressure']), - initialGasMoleFractions=reactor['initialGasMoleFractions'], - initialSurfaceCoverages=reactor['initialSurfaceCoverages'], - surfaceVolumeRatio=self._convert_quantity(reactor['surfaceVolumeRatio']), - nSims=reactor.get('nSims', 4), - terminationConversion=reactor.get('terminationConversion'), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - terminationRateRatio=reactor.get('terminationRateRatio'), - sensitivity=sensitivity, - sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3) - ) + # mapping for surface catalysis reactor parameters + field_mapping = { + 'temperature': 'temperature', + 'initialPressure': 'initialPressure', + 'initialGasMoleFractions': 'initialGasMoleFractions', + 'initialSurfaceCoverages': 'initialSurfaceCoverages', + 'surfaceVolumeRatio': 'surfaceVolumeRatio', + 'nSims': 'nSims', + 'terminationConversion': 'terminationConversion', + 'terminationTime': 'terminationTime', + 'terminationRateRatio': 'terminationRateRatio', + 'sensitivity': 'sensitivity', + 'sensitivityThreshold': 'sensitivityThreshold' + } + self._process_reactor_common(surface_reactor, reactor_data, field_mapping) - def _process_mb_sampled_reactor(self, reactor_data): + def _process_mb_sampled_reactor(self, reactor_data: Union[Dict, List]) -> None: """ - process MB sampled reactor configuration + Process MB sampled reactor configuration + + Parameters + ---------- + reactor_data : Union[Dict, List] + MB sampled reactor configuration data + + Returns + ------- + None """ - reactors = reactor_data if isinstance(reactor_data, list) else [reactor_data] - - for reactor in reactors: - # Handle sensitivity which could be None, a string, or a list - sensitivity = reactor.get('sensitivity') - if sensitivity is None: - sensitivity = None - elif isinstance(sensitivity, str): - sensitivity = [sensitivity] - else: - sensitivity = sensitivity - - mb_sampled_reactor( - temperature=self._convert_quantity(reactor['temperature']), - pressure=self._convert_quantity(reactor['pressure']), - initialMoleFractions=reactor['initialMoleFractions'], - mbsamplingRate=self._convert_quantity(reactor['mbsamplingRate']), - terminationConversion=reactor.get('terminationConversion'), - terminationTime=self._convert_quantity(reactor.get('terminationTime')), - sensitivity=sensitivity, - sensitivityThreshold=reactor.get('sensitivityThreshold', 1e-3), - constantSpecies=reactor.get('constantSpecies') - ) + # mapping for Maxwell-Boltzmann sampled reactor (for T fluctuations) + field_mapping = { + 'temperature': 'temperature', + 'pressure': 'pressure', + 'initialMoleFractions': 'initialMoleFractions', + 'mbsamplingRate': 'mbsamplingRate', + 'terminationConversion': 'terminationConversion', + 'terminationTime': 'terminationTime', + 'sensitivity': 'sensitivity', + 'sensitivityThreshold': 'sensitivityThreshold', + 'constantSpecies': 'constantSpecies' + } + self._process_reactor_common(mb_sampled_reactor, reactor_data, field_mapping) - def _process_solvation(self, solv_data): + def _process_solvation(self, solv_data: Dict[str, Any]) -> None: """ - process solvation settings + Process solvation settings + + Parameters + ---------- + solv_data : Dict[str, Any] + Solvation configuration dictionary + + Returns + ------- + None + + Raises + ------ + ImportError + If SolventData cannot be imported when needed """ - # Handle SolventData if provided - - # (FINISH THIS IDK HOW TO DO IT RN) - - def _process_liquid_mass_transfer(self, lmt_data): + # handle SolventData if provided + solvent_data = None # initialize as None + if 'solventData' in solv_data: # check if custom solvent data is provided in input file + try: + from rmgpy.data.solvation import SolventData # import solvent data class + except ImportError: + raise ImportError( + "SolventData could not be imported. Make sure RMG's solvation module is installed." + ) + + sd = solv_data['solventData'] # extract solvent data dictionary from input file + # if custom solvent then specify Abraham-Mintz values + solvent_data = SolventData( # create SolventData object with all params for custom sovlents + # Abraham values + s_g=sd.get('s_g'), + b_g=sd.get('b_g'), + e_g=sd.get('e_g'), + l_g=sd.get('l_g'), + a_g=sd.get('a_g'), + c_g=sd.get('c_g'), + + # solvent descriptors for enthalpy effects + s_h=sd.get('s_h'), + b_h=sd.get('b_h'), + e_h=sd.get('e_h'), + l_h=sd.get('l_h'), + a_h=sd.get('a_h'), + c_h=sd.get('c_h'), + # viscosity correlation coefficients + A=sd.get('A'), + B=sd.get('B'), + C=sd.get('C'), + D=sd.get('D'), + E=sd.get('E'), + # additional solvent properties + alpha=sd.get('alpha'), + beta=sd.get('beta'), + # dielectric constant + eps=sd.get('eps'), + # name lol + name=sd.get('name') + ) + + # call original solvation function + solvation( + solvent=solv_data['solvent'], + solventData=solvent_data + ) + + def _process_volumetric_mass_transfer(self, lmt_data: Dict[str, Any]) -> None: """ - process liquid volumetric mass transfer coefficient + Process liquid volumetric mass transfer coefficient settings + + Parameters + ---------- + lmt_data : Dict[str, Any] + Mass transfer coefficient configuration dictionary + + Returns + ------- + None """ + # call original function liquid_volumetric_mass_transfer_coefficient_power_law( prefactor=self._convert_quantity(lmt_data.get('prefactor', (0, "1/s"))), diffusionCoefficientPower=lmt_data.get('diffusionCoefficientPower', 0), @@ -462,21 +695,41 @@ def _process_liquid_mass_transfer(self, lmt_data): solventDensityPower=lmt_data.get('solventDensityPower', 0) ) - def _process_simulator(self, sim_data): + def _process_simulator(self, sim_data: Dict[str, Any]) -> None: """ - process simulator settings + Process simulator settings + + Parameters + ---------- + sim_data : Dict[str, Any] + Simulator configuration dictionary + + Returns + ------- + None """ + # call original simulator function simulator( - atol=sim_data.get('atol', 1e-16), - rtol=sim_data.get('rtol', 1e-8), - sens_atol=sim_data.get('sens_atol', 1e-6), - sens_rtol=sim_data.get('sens_rtol', 1e-4) + atol=float(sim_data.get('atol', 1e-16)), + rtol=float(sim_data.get('rtol', 1e-8)), + sens_atol=float(sim_data.get('sens_atol', 1e-6)), + sens_rtol=float(sim_data.get('sens_rtol', 1e-4)) ) - def _process_model(self, model_data): + def _process_model(self, model_data: Dict[str, Any]) -> None: """ - process model settings + Process model settings + + Parameters + ---------- + model_data : Dict[str, Any] + Model configuration dictionary + + Returns + ------- + None """ + # call og model func model( toleranceMoveToCore=model_data.get('toleranceMoveToCore'), toleranceRadMoveToCore=model_data.get('toleranceRadMoveToCore', float('inf')), @@ -507,10 +760,20 @@ def _process_model(self, model_data): toleranceReactionToCoreDeadendRadical=model_data.get('toleranceReactionToCoreDeadendRadical', 0.0) ) - def _process_quantum_mechanics(self, qm_data): + def _process_quantum_mechanics(self, qm_data: Dict[str, Any]) -> None: """ - process quantum mechanics settings + Process quantum mechanics settings + + Parameters + ---------- + qm_data : Dict[str, Any] + Quantum mechanics configuration dictionary + + Returns + ------- + None """ + # call og qm func quantum_mechanics( software=qm_data['software'], method=qm_data['method'], @@ -520,10 +783,20 @@ def _process_quantum_mechanics(self, qm_data): maxRadicalNumber=qm_data.get('maxRadicalNumber', 0) ) - def _process_ml_estimator(self, ml_data): + def _process_ml_estimator(self, ml_data: Dict[str, Any]) -> None: """ - process ML estimator settings + Process ML estimator settings + + Parameters + ---------- + ml_data : Dict[str, Any] + ML estimator configuration dictionary + + Returns + ------- + None """ + # call og ML estimator func ml_estimator( thermo=ml_data.get('thermo', True), name=ml_data.get('name', 'main'), @@ -543,32 +816,42 @@ def _process_ml_estimator(self, ml_data): CpUncertaintyCutoff=self._convert_quantity(ml_data.get('CpUncertaintyCutoff', (2.0, 'cal/(mol*K)'))) ) - def _process_pressure_dependence(self, pd_data): + def _process_pressure_dependence(self, pd_data: Dict[str, Any]) -> None: """ - process pressure dependence settings + Process pressure dependence settings + + Parameters + ---------- + pd_data : Dict[str, Any] + Pressure dependence configuration dictionary + + Returns + ------- + None """ - # Process temperatures - can be dict with min/max/count/units or list - temps = pd_data['temperatures'] - if isinstance(temps, dict): - temperatures = [temps['min'], temps['max'], temps['units'], temps['count']] + # process T, can be dict with min/max/count/units or list + temps = pd_data['temperatures'] # get T specification + if isinstance(temps, dict): # handle range specification + temperatures = [temps['min'], temps['max'], temps['units'], temps['count']] # convert to list else: - temperatures = temps + temperatures = temps # use list as is - # Process pressures - can be dict with min/max/count/units or list - press = pd_data['pressures'] - if isinstance(press, dict): - pressures = [press['min'], press['max'], press['units'], press['count']] + # process P, can be dict with min/max/count/units or list lkie T + press = pd_data['pressures'] # get P specification + if isinstance(press, dict): # handle range specification + pressures = [press['min'], press['max'], press['units'], press['count']] # convert to list else: - pressures = press + pressures = press # use list as is - # Process interpolation - can be list or tuple - interp = pd_data.get('interpolation') - if isinstance(interp, list) and len(interp) > 1: - # Convert list format [method, param1, param2] to tuple - interpolation = tuple(interp) + # process interpolation, can be list or tuple + interp = pd_data.get('interpolation') # het interpolation method + if isinstance(interp, list) and len(interp) > 1: # handle list format + # convert list format [method, param1, param2] to tuple + interpolation = tuple(interp) # convert to tuple for function else: - interpolation = interp + interpolation = interp # use as is + # call og pressure dependence func pressure_dependence( method=pd_data['method'], temperatures=temperatures, @@ -579,35 +862,55 @@ def _process_pressure_dependence(self, pd_data): maximumAtoms=pd_data.get('maximumAtoms') ) - def _process_species_constraints(self, constraints): + def _process_species_constraints(self, constraints: Dict[str, Any]) -> None: """ - process generated species constraints + Process generated species constraints + + Parameters + ---------- + constraints : Dict[str, Any] + Species constraints configuration dictionary + + Returns + ------- + None """ - # Create a copy to avoid modifying the original - constraints_copy = constraints.copy() + # create copy to avoid modifying original + constraints_copy = constraints.copy() # more safety to avoid modifying input data - # Handle the special 'allowed' field - if 'allowed' in constraints_copy: + # handle the special 'allowed' field + if 'allowed' in constraints_copy: # process allowed specs list allowed_list = constraints_copy['allowed'] - # Convert special string values to their expected format - processed_allowed = [] - for item in allowed_list: + # convert special string values to expected format + processed_allowed = [] + for item in allowed_list: # check each allowed item if item == 'input species': - processed_allowed.append('input species') + processed_allowed.append('input species') # keep as string elif item == 'seed mechanisms': - processed_allowed.append('seed mechanisms') + processed_allowed.append('seed mechanisms') elif item == 'reaction libraries': processed_allowed.append('reaction libraries') else: - processed_allowed.append(item) - constraints_copy['allowed'] = processed_allowed + processed_allowed.append(item) # keep others as is + constraints_copy['allowed'] = processed_allowed # update w processed list - generated_species_constraints(**constraints_copy) + # call og func + generated_species_constraints(**constraints_copy) # unpack constraints like w kwargs above - def _process_thermo_central_database(self, tcd_data): + def _process_thermo_central_database(self, tcd_data: Dict[str, Any]) -> None: """ - process thermo central database settings + Process thermo central database settings + + Parameters + ---------- + tcd_data : Dict[str, Any] + Thermo central database configuration dictionary + + Returns + ------- + None """ + # call og thermo database func thermo_central_database( host=tcd_data['host'], port=tcd_data['port'], @@ -616,10 +919,20 @@ def _process_thermo_central_database(self, tcd_data): application=tcd_data['application'] ) - def _process_uncertainty(self, unc_data): + def _process_uncertainty(self, unc_data: Dict[str, Any]) -> None: """ - process uncertainty settings + Process uncertainty analysis settings + + Parameters + ---------- + unc_data : Dict[str, Any] + Uncertainty analysis configuration dictionary + + Returns + ------- + None """ + # call og uncertainty func uncertainty( localAnalysis=unc_data.get('localAnalysis', False), globalAnalysis=unc_data.get('globalAnalysis', False), @@ -634,10 +947,20 @@ def _process_uncertainty(self, unc_data): logx=unc_data.get('logx', True) ) - def _process_restart_from_seed(self, restart_data): + def _process_restart_from_seed(self, restart_data: Dict[str, Any]) -> None: """ - process restart from seed settings + Process restart from seed settings + + Parameters + ---------- + restart_data : Dict[str, Any] + Restart from seed configuration dictionary + + Returns + ------- + None """ + # call og restart func restart_from_seed( path=restart_data.get('path'), coreSeed=restart_data.get('coreSeed'), @@ -646,10 +969,20 @@ def _process_restart_from_seed(self, restart_data): speciesMap=restart_data.get('speciesMap') ) - def _process_options(self, opt_data): + def _process_options(self, opt_data: Dict[str, Any]) -> None: """ - process general options + Process general RMG options + + Parameters + ---------- + opt_data : Dict[str, Any] + General options configuration dictionary + + Returns + ------- + None """ + # call og options func options( name=opt_data.get('name', 'Seed'), generateSeedEachIteration=opt_data.get('generateSeedEachIteration', True), @@ -663,141 +996,177 @@ def _process_options(self, opt_data): saveEdgeSpecies=opt_data.get('saveEdgeSpecies', False), keepIrreversible=opt_data.get('keepIrreversible', False), trimolecularProductReversible=opt_data.get('trimolecularProductReversible', True), - wallTime=opt_data.get('wallTime', '00:00:00:00'), # This is a string, not a quantity + wallTime=opt_data.get('wallTime', '00:00:00:00'), saveSeedModulus=opt_data.get('saveSeedModulus', -1) ) - def _convert_quantity(self, value): + def _convert_quantity(self, value: Union[Dict, List, Tuple, str, float, None]) -> Union[Tuple, Any]: """ - convert YAML quantity representation to tuple format expected by functions - - :param value: Either a dict with 'value' and 'units', a list/tuple, - a single number/string, or None - :return: Tuple (value, units), the original value, or None + Convert YAML quantity representation to tuple format expected by RMG functions + + Parameters + ---------- + value : Union[Dict, List, Tuple, str, float, None] + Quantity value in various formats + + Returns + ------- + Union[Tuple, Any] + Converted quantity as (value, units) tuple or original value + """ if value is None: return None - if isinstance(value, dict): - if 'value' in value and 'units' in value: - return (value['value'], value['units']) - # Also handle the case where the quantity is directly the dict value - elif len(value) == 1: + if isinstance(value, dict): # handle dict + if 'value' in value and 'units' in value: # standard {value: X, units: Y} format + return (value['value'], value['units']) # convert to tuple + # also handle case where quantity IS the dict value + elif len(value) == 1: # single key value pair format # e.g., {0.5: 'kcal/mol'} format - val, unit = next(iter(value.items())) - return (val, unit) + val, unit = next(iter(value.items())) # extract key-value pair + return (val, unit) # return as tuple else: - # Return the dict as-is if it doesn't match expected formats - return value - elif isinstance(value, (list, tuple)): - if len(value) == 2: - # Standard (value, units) format - return tuple(value) - elif len(value) == 4: - # For temperature/pressure ranges in pressure dependence - return value + # return the dict as is if it dont match expected formats + return value # no conversion possible + elif isinstance(value, (list, tuple)): # handle list/tuple + if len(value) == 2: # standard (value, units) format + return tuple(value) # make tuple format + elif len(value) == 4: # for T/P ranges in P dependence + return value # keep as is (for range specs) else: - # Other list formats - return value - elif isinstance(value, str): - # For cases like wallTime which is just a string - return value + # other list formats + return value # no conversion + elif isinstance(value, str): # handle string + # for cases like wallTime which is just a string + return value # return string as is else: - # For single numeric values, return as-is - # The function being called will handle unit defaults if needed - return value + # for single numeric values, return as-is + # func being called will handle unit defaults if needed + return value # no conversion - def _convert_concentration_dict(self, conc_dict): + def _convert_concentration_dict(self, conc_dict: Dict[str, Any]) -> Dict[str, Tuple]: """ - convert concentration dictionary with quantity values - - :param conc_dict: Dictionary with species as keys and quantities as values - :return: Dictionary with converted quantities + Convert concentration dictionary with quantity values + + Parameters + ---------- + conc_dict : Dict[str, Any] + Dictionary with species names as keys and quantities as values + + Returns + ------- + Dict[str, Tuple] + Dictionary with converted quantity tuples """ if not conc_dict: return {} result = {} - for species, conc in conc_dict.items(): - result[species] = self._convert_quantity(conc) - return result + for species, conc in conc_dict.items(): # process each spec concentration + result[species] = self._convert_quantity(conc) # conv concentration to tuple + return result # return processed dict -# Actual reader function itself now -def read_yaml_input_file(path, rmg0): + +################################### +# the actual reader function itself +################################### + +def read_yaml_input_file(path: Union[str, Path], rmg0) -> None: """ - read an RMG YAML input file and process it using the existing input.py functions. - - :param path: Path to the YAML input file - :param rmg0: RMG object to populate + Read an RMG YAML input file and process it using existing input.py functions + + Parameters + ---------- + path : Union[str, Path] + Path to the YAML input file + rmg0 : RMG + RMG object to populate with input data + + Returns + ------- + None """ - # Import necessary modules for processing + # import necessary modules for processing from rmgpy.rmg.input import set_global_rmg from rmgpy.rmg.model import CoreEdgeReactionModel - # Set up the global RMG object + # set up global RMG object set_global_rmg(rmg0) rmg0.reaction_model = CoreEdgeReactionModel() rmg0.initial_species = [] rmg0.reaction_systems = [] - # Clear the global species_dict + # clear the global species_dict from rmgpy.rmg import input as rmg_input - rmg_input.species_dict = {} - rmg_input.mol_to_frag = {} + rmg_input.species_dict = {} # clear global spec dict + rmg_input.mol_to_frag = {} # clear molecular fragment - # Set species constraints default - rmg0.species_constraints = {'explicitlyAllowedMolecules': []} + # set spec constraints default + rmg0.species_constraints = {'explicitlyAllowedMolecules': []} # initialize with empty allowed list - # Process YAML file + # process YAML input file reader = YAMLInputReader(path) - reader.read() - reader.process() + reader.read() # read and parse YAML input file + reader.process() # process parsed data and call RMG functions - # Post-processing (similar to original read_input_file) - for reaction_system in rmg0.reaction_systems: - if hasattr(reaction_system, 'convert_initial_keys_to_species_objects'): - reaction_system.convert_initial_keys_to_species_objects(rmg_input.species_dict) + # post-processing (similar to original read_input_file) + for reaction_system in rmg0.reaction_systems: # process each reactor system + if hasattr(reaction_system, 'convert_initial_keys_to_species_objects'): # check for conversion method + reaction_system.convert_initial_keys_to_species_objects(rmg_input.species_dict) # convert spec keys to objects - if rmg0.quantum_mechanics: - rmg0.quantum_mechanics.set_default_output_directory(rmg0.output_directory) - rmg0.quantum_mechanics.initialize() + if rmg0.quantum_mechanics: # if quantum mechanics is enabled + rmg0.quantum_mechanics.set_default_output_directory(rmg0.output_directory) # set qm output directory + rmg0.quantum_mechanics.initialize() # initialize qm calculations - logging.info('') + logging.info('') # log empty line for spacing + -def read_input_file_wrapper(path, rmg0): +def read_input_file_wrapper(path: Union[str, Path], rmg0) -> None: """ - read an RMG input file (either Python or YAML format) and process it. - - this function automatically detects the file format based on the extension - and calls the appropriate reader. - - :param path: Path to the input file (.py or .yaml/.yml) - :param rmg0: RMG object to populate + Read an RMG input file (either Python or YAML format) and process it + + Parameters + ---------- + path : Union[str, Path] + Path to the input file (.py or .yaml/.yml) + rmg0 : RMG + RMG object to populate with input data + + Returns + ------- + None + + Raises + ------ + IOError + If the input file cannot be found + ValueError + If the file format is unsupported """ import os from pathlib import Path - # Get the file extension + # get the file extension file_path = Path(path) - extension = file_path.suffix.lower() + extension = file_path.suffix.lower() # extract extension - # Check if file exists - if not file_path.exists(): + # check if file exists + if not file_path.exists(): # validate file existence raise IOError(f'The input file "{path}" could not be found.') - # Route to appropriate reader based on extension + # route to appropriate reader based on extension + # if .py (og input file) then run og input reader func if extension == '.py': - # Use the original Python input file reader from rmgpy.rmg.input import read_input_file as read_python_input_file - logging.info(f'Detected Python input file format (.py)') + logging.info(f'Detected Python input file format (.py)') # log file type read_python_input_file(path, rmg0) + # if .yaml or .yml, use the new func above to read elif extension in ['.yaml', '.yml']: - # Use the YAML input file reader - logging.info(f'Detected YAML input file format ({extension})') + logging.info(f'Detected YAML input file format ({extension})') # log file type read_yaml_input_file(path, rmg0) else: raise ValueError( f'Unsupported input file format "{extension}". ' f'RMG supports .py and .yaml/.yml input files.' - ) - \ No newline at end of file + ) \ No newline at end of file