diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/converter/geant4/Geant4MacroGenerator.py b/converter/geant4/Geant4MacroGenerator.py deleted file mode 100644 index f5ea483b..00000000 --- a/converter/geant4/Geant4MacroGenerator.py +++ /dev/null @@ -1,320 +0,0 @@ -import converter.geant4.utils as utils -from typing import Dict, Any, List -from converter.common import convert_beam_energy - -# skipcq: PYL-W0511 -# TODO geantino names needs better mapping or handling -GEANT4_PARTICLE_MAP = { - 1: { - "name": "neutron", - "allowed_units": ["MeV", "MeV/nucl"], - "target_unit": "MeV" - }, - 2: { - "name": "proton", - "allowed_units": ["MeV", "MeV/nucl"], - "target_unit": "MeV" - }, - 3: { - "name": "geantino", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 4: { - "name": "e-", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 5: { - "name": "e+", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 6: { - "name": "alpha", - "allowed_units": ["MeV", "MeV/nucl"], - "target_unit": "MeV" - }, - 7: { - "name": "mu-", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 8: { - "name": "mu+", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 9: { - "name": "pi-", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 10: { - "name": "pi+", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 11: { - "name": "geantino", - "allowed_units": ["MeV"], - "target_unit": "MeV" - }, - 25: { # equivalent of HEAVYION in other simulators - "name": "ion", - "allowed_units": ["MeV", "MeV/nucl"], - "target_unit": "MeV" - } -} - -GEANT4_QUANTITY_MAP = { - "DoseGy": "doseDeposit", - "Energy": "energyDeposit", - "Fluence": "cellFlux", - "KineticEnergySpectrum": "cellFlux", -} - - -class Geant4MacroGenerator: - """Generate Geant4 mac (beam + scoring + run).""" - - def __init__(self, data: Dict[str, Any]) -> None: - """Initialize with JSON data.""" - self.data = data - self.lines: List[str] = [] - self.probe_histograms: List[Dict[str, Any]] = [] - self.probe_counter = 0 - - def generate(self) -> str: - """Generate full Geant4 macro content.""" - self._append_initialization() - self._append_scoring() - self._append_histograms() - self._append_run() - self._append_results() - return "\n".join(self.lines) - - def _append_initialization(self) -> None: - """Append particle source and run initialization.""" - beam = self.data.get("beam", {}) - particle = beam.get("particle", {}) - particle_id = beam.get("particle", {}).get("id", 2) - pos = beam.get("position", [0, 0, 0]) - direction = beam.get("direction", [0, 0, 1]) - - a = particle.get("a", 1) - z = particle.get("z", a) - input_energy = beam["energy"] - input_energy_unit = beam.get("energyUnit", "MeV") - energy, _, energy_scale_factor = convert_beam_energy(GEANT4_PARTICLE_MAP, particle_id, a, - input_energy, input_energy_unit) - sigma = beam.get("energySpread", 0) * energy_scale_factor - energy_high = beam.get("energyHighCutoff", 1000) * energy_scale_factor - energy_min = beam.get("energyLowCutoff", 0) * energy_scale_factor - - self.lines.extend([ - "/run/initialize\n", - "##########################################", - "####### Particle Source definition #######", - "##########################################\n", - "/gps/verbose 0", - f"/gps/position {pos[0]} {pos[1]} {pos[2]} cm" - ]) - if particle_id == 25: # heavy ions - self.lines.extend([ - "/gps/particle ion", - f"/gps/ion {z} {a} 0 0" - ]) - else: - if particle_id not in GEANT4_PARTICLE_MAP or "name" not in GEANT4_PARTICLE_MAP[particle_id]: - raise ValueError(f"Invalid particle id={particle_id}") - name = GEANT4_PARTICLE_MAP[particle_id]["name"] - self.lines.append(f"/gps/particle {name}") - self._append_beam_shape(beam) - self.lines.extend([ - f"/gps/direction {direction[0]} {direction[1]} {direction[2]}", - "/gps/ene/type Gauss", - f"/gps/ene/mono {energy} MeV", - f"/gps/ene/sigma {sigma} MeV", - f"/gps/ene/max {energy_high} MeV\n" - f"/gps/ene/min {energy_min} MeV\n" - ]) - - def _append_beam_shape(self, beam: Dict[str, Any]) -> None: - """Set particle source shape and set its dimensions""" - shape_data = beam.get("sigma", {}) - shape_type = shape_data.get("type", None) - x = shape_data.get("x", 0) - y = shape_data.get("y", 0) - if shape_type == "Flat circular": - self.lines.append("/gps/pos/type Plane") - self.lines.append("/gps/pos/shape Circle") - if y > 0: # y defines radius, see SHIELD-HIT12A implementation - self.lines.append(f"/gps/pos/radius {y} cm") - elif shape_type == "Flat square": - self.lines.append("/gps/pos/type Plane") - self.lines.append("/gps/pos/shape Rectangle") - if x > 0: - self.lines.append(f"/gps/pos/halfx {x} cm") - if y > 0: - self.lines.append(f"/gps/pos/halfy {y} cm") - else: # default to type == "Gaussian" - self.lines.append("/gps/pos/type Beam") - if x > 0: - self.lines.append(f"/gps/pos/sigma_x {x} cm") - if y > 0: - self.lines.append(f"/gps/pos/sigma_y {y} cm") - - # -------------------- Scoring -------------------- - def _append_scoring(self) -> None: - """Append all detector scoring blocks.""" - self.lines.extend([ - "##########################################", - "################ Scoring #################", - "##########################################\n" - ]) - - detectors = {d["uuid"]: d for d in self.data.get("detectorManager", {}).get("detectors", [])} - outputs = self.data.get("scoringManager", {}).get("outputs", []) - filters = {f["uuid"]: f for f in self.data.get("scoringManager", {}).get("filters", [])} - - detector_quantities: Dict[str, List[Dict[str, Any]]] = {} - for output in outputs: - detector_uuid = output.get("detectorUuid") - detector_quantities.setdefault(detector_uuid, []).extend(output.get("quantities", [])) - - for detector_uuid, quantities in detector_quantities.items(): - detector = detectors.get(detector_uuid) - if not detector: - continue - self._append_detector_scoring(detector, quantities, filters) - - def _append_detector_scoring(self, detector: Dict[str, Any], quantities: List[Dict[str, Any]], - filters: Dict[str, Any]) -> None: - """Append scoring for a single detector including mesh/probe and quantities.""" - name = utils.get_detector_name(detector) - geom = detector.get("geometryData", {}) - geom_type = geom.get("geometryType", "Box") - params = geom.get("parameters", {}) - pos_det = geom.get("position", [0, 0, 0]) - - is_probe = any(q.get("keyword") == "KineticEnergySpectrum" for q in quantities) - - if is_probe: - self._append_probe(detector, geom_type, params, pos_det) - else: - self._append_mesh(detector, geom_type, params, pos_det) - - for quantity in quantities: - self._append_quantity(quantity, filters, name) - - self.lines.append("/score/close\n") - - def _append_mesh(self, detector: Dict[str, Any], geom_type: str, - params: Dict[str, Any], pos_det: List[float]) -> None: - """Append a mesh (cylinder or box) for a detector, if it's not a cylinder box will be added""" - name = utils.get_detector_name(detector) - if geom_type.lower() in ["cyl", "cylinder"]: - self.lines.append(f"/score/create/cylinderMesh {name}") - self.lines.append(f"/score/mesh/translate/xyz {pos_det[0]} {pos_det[1]} {pos_det[2]} cm") - radius = params.get("radius", 1) - depth = params.get("depth", 1) - n_radial = params.get("radialSegments", 1) - n_z = params.get("zSegments", 1) - self.lines.append(f"/score/mesh/cylinderSize {radius} {depth / 2} cm") - self.lines.append(f"/score/mesh/nBin {n_radial} {n_z} 1") - else: - self.lines.append(f"/score/create/boxMesh {name}") - self.lines.append(f"/score/mesh/translate/xyz {pos_det[0]} {pos_det[1]} {pos_det[2]} cm") - width = params.get("width", 1) - height = params.get("height", 1) - depth = params.get("depth", 1) - n_x = params.get("xSegments", 1) - n_y = params.get("ySegments", 1) - n_z = params.get("zSegments", 1) - self.lines.append(f"/score/mesh/boxSize {width / 2} {height / 2} {depth / 2} cm") - self.lines.append(f"/score/mesh/nBin {n_x} {n_y} {n_z}") - - def _append_probe(self, detector: Dict[str, Any], geom_type: str, - params: Dict[str, Any], pos_det: List[float]) -> None: - """Append a probe scoring for KineticEnergySpectrum quantities.""" - name = utils.get_detector_name(detector) - size = params.get("radius", 1) if geom_type.lower() in ["cyl", "cylinder"] \ - else max(params.get("width", 1), params.get("height", 1), params.get("depth", 1)) - self.lines.append(f"/score/create/probe {name} {size / 2} cm") - self.lines.append(f"/score/probe/locate {pos_det[0]} {pos_det[1]} {pos_det[2]} cm") - - def _append_quantity(self, quantity: Dict[str, Any], filters: Dict[str, Any], detector_name: str) -> None: - """Append a quantity and its filter for a detector.""" - keyword = quantity.get("keyword", "") - qname = quantity.get("name", keyword) - mapped_keyword = GEANT4_QUANTITY_MAP.get(keyword, keyword.lower()) - self.lines.append(f"/score/quantity/{mapped_keyword} {qname}") - - if keyword == "KineticEnergySpectrum": - self.probe_histograms.append({ - "quantity": qname, - "detector": detector_name, - "bins": quantity.get("histogramNBins", 1), - "min": quantity.get("histogramMin", 0), - "max": quantity.get("histogramMax", 1), - "unit": quantity.get("histogramUnit", "MeV"), - "XScale": quantity.get("histogramXScale", "none"), - "XBinScheme": quantity.get("histogramXBinScheme", "linear"), - }) - - filter_uuid = quantity.get("filter") - if filter_uuid and filter_uuid in filters: - filter_particles = filters[filter_uuid] - particle_types = filter_particles.get("data", {}).get("particleTypes", []) - if particle_types: - particles_metadata = [GEANT4_PARTICLE_MAP.get(pt["id"]) for pt in particle_types] - particles_metadata = filter(lambda x: x is not None, particles_metadata) - particle_names = " ".join([pm["name"] for pm in particles_metadata]) - self.lines.append(f"/score/filter/particle {filter_particles['name']} {particle_names}") - - # -------------------- The histogram for KineticEnergySpectrum -------------------- - def _append_histograms(self) -> None: - """Append a histogram for KineticEnergySpectrum probe""" - for hist in self.probe_histograms: - qname = hist["quantity"] - det_name = hist["detector"] - arbitrary_name = f"{qname}_differential_{self.probe_counter}" - self.lines.append(f"/analysis/h1/create {qname}_{self.probe_counter} {arbitrary_name} {hist['bins']} " - f"{hist['min']} {hist['max']} {hist['unit']} {hist['XScale']} {hist['XBinScheme']}") - self.lines.append(f"/score/fill1D {self.probe_counter} {det_name} {qname}") - self.probe_counter += 1 - - # -------------------- Run -------------------- - def _append_run(self) -> None: - """Append run section""" - beam = self.data.get("beam", {}) - self.lines.extend([ - "\n##########################################", - "################## Run ###################", - "##########################################\n", - f"/run/beamOn {beam.get('numberOfParticles', 10000)}\n" - ]) - - # -------------------- Results -------------------- - def _append_results(self) -> None: - """Append results section""" - detectors = {d["uuid"]: d for d in self.data.get("detectorManager", {}).get("detectors", [])} - outputs = self.data.get("scoringManager", {}).get("outputs", []) - - self.lines.extend([ - "##########################################", - "############ Collect results #############", - "##########################################\n" - ]) - - for output in outputs: - detector_uuid = output.get("detectorUuid") - detector = detectors.get(detector_uuid) - if not detector: - continue - name = utils.get_detector_name(detector) - for quantity in output.get("quantities", []): - qname = quantity.get("name", quantity.get("keyword", "UnknownQuantity")) - filename = f"{name}_{qname}.txt" - self.lines.append(f"/score/dumpQuantityToFile {name} {qname} {filename}") diff --git a/converter/geant4/constants.py b/converter/geant4/constants.py new file mode 100644 index 00000000..b42b587e --- /dev/null +++ b/converter/geant4/constants.py @@ -0,0 +1,73 @@ +HEAVY_ION_PARTICLE_ID = 25 + +GEANT4_PARTICLE_MAP = { + 1: { + "name": "neutron", + "allowed_units": ["MeV", "MeV/nucl"], + "target_unit": "MeV" + }, + 2: { + "name": "proton", + "allowed_units": ["MeV", "MeV/nucl"], + "target_unit": "MeV" + }, + 3: { + "name": "geantino", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + 4: { + "name": "e-", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + 5: { + "name": "e+", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + 6: { + "name": "alpha", + "allowed_units": ["MeV", "MeV/nucl"], + "target_unit": "MeV" + }, + 7: { + "name": "mu-", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + 8: { + "name": "mu+", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + 9: { + "name": "pi-", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + 10: { + "name": "pi+", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + 11: { + "name": "geantino", + "allowed_units": ["MeV"], + "target_unit": "MeV" + }, + HEAVY_ION_PARTICLE_ID: { # equivalent of HEAVYION in other simulators + "name": "ion", + "allowed_units": ["MeV", "MeV/nucl"], + "target_unit": "MeV" + } +} + +GEANT4_QUANTITY_MAP = { + "DoseGy": "doseDeposit", + "Energy": "energyDeposit", + "Fluence": "cellFlux", + "KineticEnergySpectrum": "cellFlux", +} + +GEANT4_KINETIC_ENERGY_SPECTRUM = "KineticEnergySpectrum" diff --git a/converter/geant4/gdml/__init__.py b/converter/geant4/gdml/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/converter/geant4/gdml/builder.py b/converter/geant4/gdml/builder.py new file mode 100644 index 00000000..d8bab7cd --- /dev/null +++ b/converter/geant4/gdml/builder.py @@ -0,0 +1,94 @@ +from typing import Optional +# skipcq: BAN-B405 +import xml.etree.ElementTree as ET +from defusedxml.minidom import parseString + +from converter.geant4 import utils +from .materials import get_materials, create_material_elements +from .structure import build_structure + + +def generate_gdml_entry_point(world_json: Optional[dict]) -> str: + """Generate GDML string for provided world or an empty world.""" + if world_json: + return _generate_gdml(world_json) + return _generate_empty() + + +def _generate_gdml(world: dict) -> str: + """Generate GDML from geometry node.""" + gdml_root = ET.Element("gdml", { + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "xsi:noNamespaceSchemaLocation": + "http://cern.ch/service-spi/app/releases/GDML/schema/gdml.xsd", + }) + + ET.SubElement(gdml_root, "define") + + used_materials = get_materials(world) + + materials_xml = ET.SubElement(gdml_root, "materials") + for mat in create_material_elements(used_materials): + materials_xml.append(mat) + + solids_xml = ET.SubElement(gdml_root, "solids") + structure_xml = ET.SubElement(gdml_root, "structure") + + counters = {"solid": {}, "logic": {}, "phys": {}} + + solids, volumes, _, _ = build_structure(world, counters) + + for s in solids: + solids_xml.append(s) + + for v in volumes: + structure_xml.append(v) + + setup = ET.SubElement(gdml_root, "setup", { + "name": "Default", + "version": "1.0", + }) + ET.SubElement(setup, "world", { + "ref": utils.choose_logic_name(world), + }) + + return _prettify_xml(gdml_root) + + +def _generate_empty() -> str: + """Generate empty GDML with a default world geometry.""" + root = ET.Element("gdml", { + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "xsi:noNamespaceSchemaLocation": + "http://cern.ch/service-spi/app/releases/GDML/schema/gdml.xsd", + }) + + ET.SubElement(root, "define") + ET.SubElement(root, "materials") + + solids = ET.SubElement(root, "solids") + ET.SubElement(solids, "box", { + "name": "solidWorld", + "x": utils.to_mm_str(100), + "y": utils.to_mm_str(100), + "z": utils.to_mm_str(100), + "lunit": "mm", + }) + + structure = ET.SubElement(root, "structure") + vol = ET.SubElement(structure, "volume", {"name": "logicWorld"}) + ET.SubElement(vol, "materialref", {"ref": "G4_Galactic"}) + ET.SubElement(vol, "solidref", {"ref": "solidWorld"}) + + setup = ET.SubElement(root, "setup", {"name": "Default", "version": "1.0"}) + ET.SubElement(setup, "world", {"ref": "logicWorld"}) + + return _prettify_xml(root) + + +def _prettify_xml(root: ET.Element) -> str: + """Return a pretty-printed XML string for a given ElementTree root.""" + xml_bytes = ET.tostring(root, "utf-8") + pretty = parseString(xml_bytes).toprettyxml(indent=" ") + no_decl = "\n".join(pretty.split("\n")[1:]) + return '\n' + no_decl diff --git a/converter/geant4/gdml/materials.py b/converter/geant4/gdml/materials.py new file mode 100644 index 00000000..45d6ef5e --- /dev/null +++ b/converter/geant4/gdml/materials.py @@ -0,0 +1,37 @@ +# skipcq: BAN-B405 +import xml.etree.ElementTree as ET + + +def get_materials(node: dict) -> set[str]: + """ + Recursively traverse a geometry node tree and return a set of all + Geant4 material names ("geant4_name") used in the subtree rooted at `node`. + """ + materials: set[str] = set() + + mat = node.get("simulationMaterial", {}).get("geant4_name") + if mat: + materials.add(mat) + + for ch in node.get("children", []): + materials.update(get_materials(ch)) + + return materials + + +def create_material_elements(used: set[str]) -> list[ET.Element]: + """Emit material definitions to GDML, including BlackHole if present in the used materials set""" + elements: list[ET.Element] = [] + + if "BlackHole" in used: + mat = ET.Element("material", { + "name": "BlackHole", + "state": "solid", + }) + ET.SubElement(mat, "T", {"unit": "K", "value": "293.15"}) + ET.SubElement(mat, "MEE", {"unit": "eV", "value": "823"}) + ET.SubElement(mat, "D", {"unit": "g/cm3", "value": "1e+8"}) + ET.SubElement(mat, "fraction", {"n": "1", "ref": "G4_Pb"}) + elements.append(mat) + + return elements diff --git a/converter/geant4/gdml/solids.py b/converter/geant4/gdml/solids.py new file mode 100644 index 00000000..68dd790d --- /dev/null +++ b/converter/geant4/gdml/solids.py @@ -0,0 +1,47 @@ +# skipcq: BAN-B405 +import xml.etree.ElementTree as ET +from converter.geant4 import utils +from typing import Optional + + +def create_solid_element(node: dict, solid_name: str) -> Optional[ET.Element]: + """Create and return a GDML element for a given geometry node.""" + geo = node.get("geometryData", {}) + geom_type = geo.get("geometryType") + params = geo.get("parameters", {}) + + if geom_type in ("HollowCylinderGeometry", "CylinderGeometry"): + return ET.Element("tube", { + "name": solid_name, + "rmin": utils.to_mm_str(float(params.get("innerRadius", 0))), + "rmax": utils.to_mm_str(float(params.get("radius", 0))), + "z": utils.to_mm_str(float(params.get("depth", 0))), + "startphi": "0", + "deltaphi": "360", + "aunit": "deg", + "lunit": "mm", + }) + + if geom_type == "SphereGeometry": + return ET.Element("sphere", { + "name": solid_name, + "rmin": "0", + "rmax": utils.to_mm_str(float(params.get("radius", 0))), + "startphi": "0", + "deltaphi": "360", + "starttheta": "0", + "deltatheta": "180", + "aunit": "deg", + "lunit": "mm", + }) + + if geom_type == "BoxGeometry": + return ET.Element("box", { + "name": solid_name, + "x": utils.to_mm_str(float(params.get("width", 0))), + "y": utils.to_mm_str(float(params.get("height", 0))), + "z": utils.to_mm_str(float(params.get("depth", 0))), + "lunit": "mm", + }) + + return None diff --git a/converter/geant4/gdml/structure.py b/converter/geant4/gdml/structure.py new file mode 100644 index 00000000..0195463e --- /dev/null +++ b/converter/geant4/gdml/structure.py @@ -0,0 +1,58 @@ +# skipcq: BAN-B405 +import xml.etree.ElementTree as ET +from converter.geant4 import utils +from .solids import create_solid_element + + +def build_structure( + node: dict, + counters: dict, +) -> tuple[list[ET.Element], list[ET.Element], str, str]: + """Build GDML solids and structure elements for a node and its children.""" + solids: list[ET.Element] = [] + volumes: list[ET.Element] = [] + + child_infos = [] + + for ch in node.get("children", []): + ch_solids, ch_volumes, ch_logic, _ = build_structure(ch, counters) + solids.extend(ch_solids) + volumes.extend(ch_volumes) + + pos = ch.get("geometryData", {}).get("position", [0, 0, 0]) + child_infos.append((ch, ch_logic, pos)) + + solid_name = utils.choose_solid_name(node, counters) + solid_element = create_solid_element(node, solid_name) + if solid_element is not None: + solids.append(solid_element) + + logic_name = utils.choose_logic_name(node, counters) + vol = ET.Element("volume", {"name": logic_name}) + + material = node.get("simulationMaterial", {}).get( + "geant4_name", "G4_Galactic" + ) + ET.SubElement(vol, "materialref", {"ref": material}) + ET.SubElement(vol, "solidref", {"ref": solid_name}) + + for child, child_logic, (x, y, z) in child_infos: + phys_name = utils.choose_phys_name(child, counters) + phys = ET.SubElement(vol, "physvol", { + "copynumber": "1", + "name": phys_name, + }) + ET.SubElement(phys, "volumeref", {"ref": child_logic}) + + if abs(x) > utils.EPS or abs(y) > utils.EPS or abs(z) > utils.EPS: + ET.SubElement(phys, "position", { + "name": f"{phys_name}_pos", + "unit": "mm", + "x": utils.to_mm_str(x), + "y": utils.to_mm_str(y), + "z": utils.to_mm_str(z), + }) + + volumes.append(vol) + + return solids, volumes, logic_name, solid_name diff --git a/converter/geant4/macro/__init__.py b/converter/geant4/macro/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/converter/geant4/macro/beam_parser.py b/converter/geant4/macro/beam_parser.py new file mode 100644 index 00000000..a1eed3fa --- /dev/null +++ b/converter/geant4/macro/beam_parser.py @@ -0,0 +1,107 @@ +from typing import Dict, Any, List +from converter.geant4.constants import GEANT4_PARTICLE_MAP, HEAVY_ION_PARTICLE_ID +from converter.common import convert_beam_energy +from converter.geant4.macro.defaults.beam_defaults import (DEFAULT_PARTICLE_ID, DEFAULT_PARTICLE_A, + DEFAULT_BEAM_POSITION, DEFAULT_BEAM_DIRECTION) + + +def generate_beam_lines(data: Dict[str, Any]) -> List[str]: + """Parse the beam configuration and append GEANT4 /gps commands.""" + lines: List[str] = [] + beam = data.get("beam", {}) + particle = beam.get("particle", {}) + + particle_id = particle.get("id", DEFAULT_PARTICLE_ID) + pos = beam.get("position", DEFAULT_BEAM_POSITION) + direction = beam.get("direction", DEFAULT_BEAM_DIRECTION) + a = particle.get("a", DEFAULT_PARTICLE_A) + z = particle.get("z", a) + + lines.extend(_particle_commands(particle_id, a, z, pos)) + _append_beam_shape(lines, beam) + lines.extend(_compute_direction(direction)) + lines.extend(_compute_energy(beam, particle_id, a)) + + return lines + + +def _particle_commands(particle_id: int, a: int, z: int, pos: List[float]) -> List[str]: + """Return GEANT4 header + particle commands.""" + header = [ + "/run/initialize\n", + "##########################################", + "####### Particle Source definition #######", + "##########################################\n", + "/gps/verbose 0", + f"/gps/position {pos[0]} {pos[1]} {pos[2]} cm" + ] + + if particle_id == HEAVY_ION_PARTICLE_ID: + particle_cmd = [ + "/gps/particle ion", + f"/gps/ion {z} {a} 0 0" + ] + else: + if particle_id not in GEANT4_PARTICLE_MAP or "name" not in GEANT4_PARTICLE_MAP[particle_id]: + raise ValueError(f"Invalid particle id={particle_id}") + + name = GEANT4_PARTICLE_MAP[particle_id]["name"] + particle_cmd = [f"/gps/particle {name}"] + + return header + particle_cmd + + +def _compute_energy(beam: Dict[str, Any], particle_id: int, a: int) -> List[str]: + """Compute energy values *and* return ready-to-append /gps output lines.""" + input_energy = beam.get("energy", 0) + unit = beam.get("energyUnit", "MeV") + + energy, _, scale = convert_beam_energy( + GEANT4_PARTICLE_MAP, particle_id, a, input_energy, unit + ) + + sigma = beam.get("energySpread", 0) * scale + high = beam.get("energyHighCutoff", 1000) * scale + low = beam.get("energyLowCutoff", 0) * scale + + return [ + "/gps/ene/type Gauss", + f"/gps/ene/mono {energy} MeV", + f"/gps/ene/sigma {sigma} MeV", + f"/gps/ene/max {high} MeV", + f"/gps/ene/min {low} MeV", + ] + + +def _compute_direction(direction: List[float]) -> List[str]: + """Return /gps/direction command.""" + return [ + f"/gps/direction {direction[0]} {direction[1]} {direction[2]}" + ] + + +def _append_beam_shape(lines: List[str], beam: Dict[str, Any]) -> None: + """Append commands describing the beam's spatial distribution.""" + shape_data = beam.get("sigma", {}) + shape_type = shape_data.get("type") + x = shape_data.get("x", 0) + y = shape_data.get("y", 0) + + if shape_type == "Flat circular": + lines.append("/gps/pos/type Plane") + lines.append("/gps/pos/shape Circle") + if y > 0: + lines.append(f"/gps/pos/radius {y} cm") + elif shape_type == "Flat square": + lines.append("/gps/pos/type Plane") + lines.append("/gps/pos/shape Rectangle") + if x > 0: + lines.append(f"/gps/pos/halfx {x} cm") + if y > 0: + lines.append(f"/gps/pos/halfy {y} cm") + else: + lines.append("/gps/pos/type Beam") + if x > 0: + lines.append(f"/gps/pos/sigma_x {x} cm") + if y > 0: + lines.append(f"/gps/pos/sigma_y {y} cm") diff --git a/converter/geant4/macro/builder.py b/converter/geant4/macro/builder.py new file mode 100644 index 00000000..2d5aee6e --- /dev/null +++ b/converter/geant4/macro/builder.py @@ -0,0 +1,29 @@ +from typing import Dict +from .beam_parser import generate_beam_lines +from .scoring_parser import generate_scoring_lines +from .histogram_parser import generate_histogram_lines +from .run_parser import generate_run_lines +from .result_parser import generate_result_lines + + +def generate_macro_entry_point(data: Dict) -> str: + """Central builder for Geant4 macro, combining all parsers.""" + lines = [] + + # Beam + lines.extend(generate_beam_lines(data)) + + # Scoring + scoring_lines, probe_histograms = generate_scoring_lines(data) + lines.extend(scoring_lines) + + # Histogram + lines.extend(generate_histogram_lines(probe_histograms)) + + # Run + lines.extend(generate_run_lines(data)) + + # Results + lines.extend(generate_result_lines(data)) + + return "\n".join(lines) diff --git a/converter/geant4/macro/defaults/__init__.py b/converter/geant4/macro/defaults/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/converter/geant4/macro/defaults/beam_defaults.py b/converter/geant4/macro/defaults/beam_defaults.py new file mode 100644 index 00000000..f0ccdeb6 --- /dev/null +++ b/converter/geant4/macro/defaults/beam_defaults.py @@ -0,0 +1,7 @@ +from typing import List + +DEFAULT_PARTICLE_ID: int = 2 +DEFAULT_PARTICLE_A: int = 1 +DEFAULT_PARTICLE_Z_FROM_A: bool = True +DEFAULT_BEAM_POSITION: List[int] = [0, 0, 0] +DEFAULT_BEAM_DIRECTION: List[int] = [0, 0, 1] diff --git a/converter/geant4/macro/histogram_parser.py b/converter/geant4/macro/histogram_parser.py new file mode 100644 index 00000000..17f46a44 --- /dev/null +++ b/converter/geant4/macro/histogram_parser.py @@ -0,0 +1,16 @@ +from typing import List, Dict, Any + + +def generate_histogram_lines(probe_histograms: List[Dict[str, Any]]) -> List[str]: + """Append histogram output commands for all probe quantities.""" + lines: List[str] = [] + for idx, hist in enumerate(probe_histograms): + qname = hist["quantity"] + det_name = hist["detector"] + arbitrary_name = f"{qname}_differential_{idx}" + lines.append( + f"/analysis/h1/create {qname}_{idx} {arbitrary_name} {hist['bins']} " + f"{hist['min']} {hist['max']} {hist['unit']} {hist['XScale']} {hist['XBinScheme']}" + ) + lines.append(f"/score/fill1D {idx} {det_name} {qname}") + return lines diff --git a/converter/geant4/macro/result_parser.py b/converter/geant4/macro/result_parser.py new file mode 100644 index 00000000..dc861562 --- /dev/null +++ b/converter/geant4/macro/result_parser.py @@ -0,0 +1,28 @@ +from typing import Dict, List +import converter.geant4.utils as utils + + +def generate_result_lines(data: Dict) -> List[str]: + """Generate commands for writing scoring results to output files.""" + lines: List[str] = [] + detectors = {d["uuid"]: d for d in data.get("detectorManager", {}).get("detectors", [])} + outputs = data.get("scoringManager", {}).get("outputs", []) + + lines.extend([ + "##########################################", + "############ Collect results #############", + "##########################################\n" + ]) + + for output in outputs: + detector_uuid = output.get("detectorUuid") + detector = detectors.get(detector_uuid) + if not detector: + continue + name = utils.get_detector_name(detector) + for quantity in output.get("quantities", []): + qname = quantity.get("name", quantity.get("keyword", "UnknownQuantity")) + filename = f"{name}_{qname}.txt" + lines.append(f"/score/dumpQuantityToFile {name} {qname} {filename}") + + return lines diff --git a/converter/geant4/macro/run_parser.py b/converter/geant4/macro/run_parser.py new file mode 100644 index 00000000..1db24f01 --- /dev/null +++ b/converter/geant4/macro/run_parser.py @@ -0,0 +1,15 @@ +from typing import Dict, List + + +def generate_run_lines(data: Dict) -> List[str]: + """Generate run section.""" + lines: List[str] = [] + beam = data.get("beam", {}) + lines.extend([ + "\n##########################################", + "################## Run ###################", + "##########################################\n", + f"/run/beamOn {beam.get('numberOfParticles', 10000)}\n" + ]) + + return lines diff --git a/converter/geant4/macro/scoring_parser.py b/converter/geant4/macro/scoring_parser.py new file mode 100644 index 00000000..01bc7dfb --- /dev/null +++ b/converter/geant4/macro/scoring_parser.py @@ -0,0 +1,132 @@ +from typing import Dict, Any, List, Tuple +import converter.geant4.utils as utils +from converter.geant4.constants import GEANT4_PARTICLE_MAP, GEANT4_QUANTITY_MAP, GEANT4_KINETIC_ENERGY_SPECTRUM + + +def generate_scoring_lines(data: Dict[str, Any]) -> Tuple[List[str], List[Dict[str, Any]]]: + """Generate Scoring commands based on configuration.""" + lines: List[str] = [ + "\n##########################################", + "################ Scoring #################", + "##########################################\n" + ] + probe_histograms: List[Dict[str, Any]] = [] + + detectors = {d["uuid"]: d for d in data.get("detectorManager", {}).get("detectors", [])} + outputs = data.get("scoringManager", {}).get("outputs", []) + filters = {f["uuid"]: f for f in data.get("scoringManager", {}).get("filters", [])} + + detector_quantities: Dict[str, List[Dict[str, Any]]] = {} + for output in outputs: + detector_uuid = output.get("detectorUuid") + detector_quantities.setdefault(detector_uuid, []).extend(output.get("quantities", [])) + + for detector_uuid, quantities in detector_quantities.items(): + detector = detectors.get(detector_uuid) + if detector: + _append_detector_scoring_lines(detector, quantities, filters, lines, probe_histograms) + + return lines, probe_histograms + + +def _append_detector_scoring_lines( + detector: Dict[str, Any], + quantities: List[Dict[str, Any]], + filters: Dict[str, Any], + lines: List[str], + probe_histograms: List[Dict[str, Any]], +) -> None: + """Append all scoring quantities and filters for a given detector.""" + name = utils.get_detector_name(detector) + geom = detector.get("geometryData", {}) + geom_type = geom.get("geometryType", "Box") + params = geom.get("parameters", {}) + pos_det = geom.get("position", [0, 0, 0]) + + is_probe = any(q.get("keyword") == GEANT4_KINETIC_ENERGY_SPECTRUM for q in quantities) + if is_probe: + _append_probe_lines(detector, geom_type, params, pos_det, lines) + else: + _append_mesh_lines(detector, geom_type, params, pos_det, lines) + + for quantity in quantities: + _append_quantity_lines(quantity, filters, name, lines, probe_histograms) + + lines.append("/score/close\n") + + +def _append_mesh_lines(detector: Dict[str, Any], geom_type: str, params: Dict[str, Any], + pos_det: List[float], lines: List[str]) -> None: + """Append a mesh-type scoring detector definition to the macro.""" + name = utils.get_detector_name(detector) + if geom_type.lower() in ["cyl", "cylinder"]: + lines.append(f"/score/create/cylinderMesh {name}") + lines.append(f"/score/mesh/translate/xyz {pos_det[0]} {pos_det[1]} {pos_det[2]} cm") + radius = params.get("radius", 1) + depth = params.get("depth", 1) + n_radial = params.get("radialSegments", 1) + n_z = params.get("zSegments", 1) + lines.append(f"/score/mesh/cylinderSize {radius} {depth / 2} cm") + lines.append(f"/score/mesh/nBin {n_radial} {n_z} 1") + else: + lines.append(f"/score/create/boxMesh {name}") + lines.append(f"/score/mesh/translate/xyz {pos_det[0]} {pos_det[1]} {pos_det[2]} cm") + width = params.get("width", 1) + height = params.get("height", 1) + depth = params.get("depth", 1) + n_x = params.get("xSegments", 1) + n_y = params.get("ySegments", 1) + n_z = params.get("zSegments", 1) + lines.append(f"/score/mesh/boxSize {width / 2} {height / 2} {depth / 2} cm") + lines.append(f"/score/mesh/nBin {n_x} {n_y} {n_z}") + + +def _append_probe_lines( + detector: Dict[str, Any], + geom_type: str, + params: Dict[str, Any], + pos_det: List[float], + lines: List[str], +) -> None: + """Append a probe-type detector scoring definition to the macro.""" + name = utils.get_detector_name(detector) + size = params.get("radius", 1) if geom_type.lower() in ["cyl", "cylinder"] \ + else max(params.get("width", 1), params.get("height", 1), params.get("depth", 1)) + lines.append(f"/score/create/probe {name} {size / 2} cm") + lines.append(f"/score/probe/locate {pos_det[0]} {pos_det[1]} {pos_det[2]} cm") + + +def _append_quantity_lines( + quantity: Dict[str, Any], + filters: Dict[str, Dict[str, Any]], + detector_name: str, + lines: List[str], + probe_histograms: List[Dict[str, Any]], +) -> None: + """Append a scoring quantity definition to the macro.""" + keyword = quantity.get("keyword", "") + qname = quantity.get("name", keyword) + mapped_keyword = GEANT4_QUANTITY_MAP.get(keyword, keyword.lower()) + lines.append(f"/score/quantity/{mapped_keyword} {qname}") + + if keyword == GEANT4_KINETIC_ENERGY_SPECTRUM: + probe_histograms.append({ + "quantity": qname, + "detector": detector_name, + "bins": quantity.get("histogramNBins", 1), + "min": quantity.get("histogramMin", 0), + "max": quantity.get("histogramMax", 1), + "unit": quantity.get("histogramUnit", "MeV"), + "XScale": quantity.get("histogramXScale", "none"), + "XBinScheme": quantity.get("histogramXBinScheme", "linear"), + }) + + filter_uuid = quantity.get("filter") + if filter_uuid and filter_uuid in filters: + filter_particles = filters[filter_uuid] + particle_types = filter_particles.get("data", {}).get("particleTypes", []) + if particle_types: + particles_metadata = [GEANT4_PARTICLE_MAP.get(pt["id"]) for pt in particle_types] + particles_metadata = filter(lambda x: x is not None, particles_metadata) + particle_names = " ".join([pm["name"] for pm in particles_metadata]) + lines.append(f"/score/filter/particle {filter_particles['name']} {particle_names}") diff --git a/converter/geant4/parser.py b/converter/geant4/parser.py index e57aa9aa..35ed9b6d 100644 --- a/converter/geant4/parser.py +++ b/converter/geant4/parser.py @@ -1,189 +1,31 @@ from converter.common import Parser -import converter.geant4.utils as utils -from converter.geant4.Geant4MacroGenerator import Geant4MacroGenerator -# skipcq: BAN-B405 -import xml.etree.ElementTree as ET -from defusedxml.minidom import parseString -from typing import Dict, Tuple, Optional, Set +from converter.geant4.macro.builder import generate_macro_entry_point +from converter.geant4.gdml.builder import generate_gdml_entry_point class Geant4Parser(Parser): - """Parser that converts JSON to GDML format for geant4 simulations""" + """Parser that converts JSON simulation configurations into GDML geometry and GEANT4 macro scripts.""" def __init__(self) -> None: - """Init the parser with empty GDML content.""" super().__init__() self.info["simulator"] = "geant4" - self._gdml_content: str = "" - self._macro_content: str = "" + self._gdml_content = "" + self._macro_content = "" def parse_configs(self, json_data: dict) -> None: - """Parse the provided JSON configuration and generate GDML content.""" + """Parse full JSON configuration into internal GDML and macro builders.""" + world = None if "figureManager" in json_data and json_data["figureManager"]["figures"]: - # we assume that first figure in json is always World (the root of our tree) - world_figure_json = json_data["figureManager"]["figures"][0] - self._gdml_content = self._generate_gdml(world_figure_json) - else: - self._gdml_content = self._generate_empty_gdml() + world = json_data["figureManager"]["figures"][0] - self._initialize_macro(json_data) - - def _initialize_macro(self, json_data: dict) -> None: - """Initialize macro content from JSON data.""" - macro_gen = Geant4MacroGenerator(json_data) - self._macro_content = macro_gen.generate() + self._gdml_content = generate_gdml_entry_point(world) + self._macro_content = generate_macro_entry_point(json_data) def get_configs_json(self) -> dict: - """Return dictionary from gdml content""" - configs_json = super().get_configs_json() - configs_json.update({ + """Return the full configuration JSON including generated GDML and macro data.""" + cfg = super().get_configs_json() + cfg.update({ "geometry.gdml": self._gdml_content, "run.mac": self._macro_content, }) - return configs_json - - @staticmethod - def _prettify_xml(root: ET.Element) -> str: - """Return a pretty-printed XML string for a given ElementTree root.""" - xml_bytes = ET.tostring(root, "utf-8") - pretty = parseString(xml_bytes).toprettyxml(indent=" ") - pretty_no_decl = "\n".join(pretty.split("\n")[1:]) - return '\n' + pretty_no_decl - - def _generate_gdml(self, world: dict) -> str: - """Generate GDML from gemetry node""" - gdml_root = ET.Element("gdml", { - "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", - "xsi:noNamespaceSchemaLocation": "http://cern.ch/service-spi/app/releases/GDML/schema/gdml.xsd", - }) - - ET.SubElement(gdml_root, "define") - used_materials: Set[str] = set() - self._collect_materials(world, used_materials) - - materials_xml = ET.SubElement(gdml_root, "materials") - if "BlackHole" in used_materials: - mat = ET.SubElement(materials_xml, "material", {"name": "BlackHole", "state": "solid"}) - ET.SubElement(mat, "T", {"unit": "K", "value": "293.15"}) - ET.SubElement(mat, "MEE", {"unit": "eV", "value": "823"}) - ET.SubElement(mat, "D", {"unit": "g/cm3", "value": "1e+8"}) - ET.SubElement(mat, "fraction", {"n": "1", "ref": "G4_Pb"}) - - solids_xml = ET.SubElement(gdml_root, "solids") - structure_xml = ET.SubElement(gdml_root, "structure") - name_counters = {"solid": {}, "logic": {}, "phys": {}} - - self._emit_node_postorder(world, solids_xml, structure_xml, name_counters) - - setup_xml = ET.SubElement(gdml_root, "setup", {"name": "Default", "version": "1.0"}) - world_logic_name = utils.choose_logic_name(world) - ET.SubElement(setup_xml, "world", {"ref": world_logic_name}) - - return self._prettify_xml(gdml_root) - - def _generate_empty_gdml(self) -> str: - """Generate empty GDML from gemetry node""" - root = ET.Element("gdml", { - "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", - "xsi:noNamespaceSchemaLocation": "http://cern.ch/service-spi/app/releases/GDML/schema/gdml.xsd", - }) - ET.SubElement(root, "define") - ET.SubElement(root, "materials") - solids = ET.SubElement(root, "solids") - ET.SubElement(solids, "box", { - "name": "solidWorld", - "x": utils.to_mm_str(100), - "y": utils.to_mm_str(100), - "z": utils.to_mm_str(100), - "lunit": "mm" - }) - structure = ET.SubElement(root, "structure") - vol = ET.SubElement(structure, "volume", {"name": "logicWorld"}) - ET.SubElement(vol, "materialref", {"ref": "G4_Galactic"}) - ET.SubElement(vol, "solidref", {"ref": "solidWorld"}) - setup = ET.SubElement(root, "setup", {"name": "Default", "version": "1.0"}) - ET.SubElement(setup, "world", {"ref": "logicWorld"}) - - return self._prettify_xml(root) - - def _collect_materials(self, node: dict, acc: Set[str]) -> None: - """Collect all material nodes recursively""" - mat = node.get("simulationMaterial", {}) - g4 = mat.get("geant4_name") - if g4: - acc.add(g4) - for ch in node.get("children", []): - self._collect_materials(ch, acc) - - def _emit_node_postorder( - self, - node: dict, - solids_xml: ET.Element, - structure_xml: ET.Element, - name_counters: Dict[str, Dict[str, int]], - ) -> Tuple[str, str]: - """Recursively emit GDML solids and structure definitions for a node and its children""" - geo = node.get("geometryData", {}) - params = geo.get("parameters", {}) - geom_type = geo.get("geometryType") - children_info = [] - for ch in node.get("children", []): - ch_logic_name, _ = self._emit_node_postorder(ch, solids_xml, structure_xml, name_counters) - children_info.append((ch, ch_logic_name, ch.get("geometryData", {}).get("position", [0, 0, 0]))) - - solid_name = utils.choose_solid_name(node, name_counters) - if geom_type in ("HollowCylinderGeometry", "CylinderGeometry") : - rmin_cm = float(params.get("innerRadius", 0)) - rmax_cm = float(params.get("radius", 0)) - z_cm = float(params.get("depth", 0)) - ET.SubElement(solids_xml, "tube", { - "name": solid_name, - "rmin": utils.to_mm_str(rmin_cm), - "rmax": utils.to_mm_str(rmax_cm), - "z": utils.to_mm_str(z_cm), - "startphi": "0", "deltaphi": "360", - "aunit": "deg", "lunit": "mm" - }) - elif geom_type == "SphereGeometry": - r_cm = float(params.get("radius", 0)) - ET.SubElement(solids_xml, "sphere", { - "name": solid_name, - "rmin": "0", - "rmax": utils.to_mm_str(r_cm), - "startphi": "0", - "deltaphi": "360", - "starttheta": "0", - "deltatheta": "180", - "aunit": "deg", - "lunit": "mm" - }) - elif geom_type == "BoxGeometry": - ET.SubElement(solids_xml, "box", { - "name": solid_name, - "x": utils.to_mm_str(float(params.get("width", 0))), - "y": utils.to_mm_str(float(params.get("height", 0))), - "z": utils.to_mm_str(float(params.get("depth", 0))), - "lunit": "mm" - }) - - logic_name = utils.choose_logic_name(node, name_counters) - vol = ET.SubElement(structure_xml, "volume", {"name": logic_name}) - material_name = node.get("simulationMaterial", {}).get("geant4_name", "G4_Galactic") - ET.SubElement(vol, "materialref", {"ref": material_name}) - ET.SubElement(vol, "solidref", {"ref": solid_name}) - - for (child_node, child_logic_name, child_pos_cm) in children_info: - phys_name = utils.choose_phys_name(child_node, name_counters) - phys = ET.SubElement(vol, "physvol", {"copynumber": "1", "name": phys_name}) - ET.SubElement(phys, "volumeref", {"ref": child_logic_name}) - x_cm, y_cm, z_cm = map(float, child_pos_cm) - if abs(x_cm) > utils.EPS or abs(y_cm) > utils.EPS or abs(z_cm) > utils.EPS: - ET.SubElement(phys, "position", { - "name": f"{phys_name}_pos", - "unit": "mm", - "x": utils.to_mm_str(x_cm), - "y": utils.to_mm_str(y_cm), - "z": utils.to_mm_str(z_cm), - }) - - return logic_name, solid_name + return cfg