diff --git a/service/src/structure_comparer/data/profile.py b/service/src/structure_comparer/data/profile.py index b954c02..208a40a 100644 --- a/service/src/structure_comparer/data/profile.py +++ b/service/src/structure_comparer/data/profile.py @@ -1,7 +1,7 @@ import json import logging from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional from uuid import uuid4 from fhir.resources.R4B.elementdefinition import ElementDefinition @@ -18,38 +18,36 @@ class Profile: def __init__(self, data: dict, package=None) -> None: self.__data = StructureDefinition.model_validate(data) - self.__fields: List[str, ProfileField] = None + self.__fields: Dict[str, ProfileField] = {} self.__init_fields() self.__package = package - def __str__(self) -> str: - return f"(name={self.name}, version={self.version}, fields={self.fields})" - - def __repr__(self) -> str: - return str(self) - def __init_fields(self) -> None: - self.__fields: Dict[str, ProfileField] = {} for elem in self.__data.snapshot.element: field = ProfileField(elem) - if field.path is not None: + if field.path: self.__fields[field.id] = field + def __str__(self) -> str: + return f"(name={self.name}, version={self.version}, fields={list(self.fields)})" + + def __repr__(self) -> str: + return str(self) + @staticmethod def from_json(path: Path, package=None) -> "Profile": if not path.exists(): raise FileNotFoundError( - f"The file {path} does not exist. Please check the file path and try again." + f"The file {path} does not exist. Please check the path and try again." ) try: - return Profile( - data=json.loads(path.read_text(encoding="utf-8")), package=package - ) - + data = json.loads(path.read_text(encoding="utf-8")) + return Profile(data=data, package=package) except Exception as e: - logger.error("failed to read file '%s'", str(path)) + logger.error("Failed to read profile from '%s'", path) logger.exception(e) + raise @property def name(self) -> str: @@ -78,7 +76,7 @@ def url(self) -> str: def __lt__(self, other: "Profile") -> bool: return self.key < other.key - def __to_dict(self) -> dict: + def _to_dict(self) -> dict: return { "id": self.id, "url": self.url, @@ -87,36 +85,28 @@ def __to_dict(self) -> dict: "version": self.version, } - def __to_pkg_dict(self) -> dict: - dict_ = self.__to_dict() - dict_["package"] = self.__package.id - - return dict_ + def _to_pkg_dict(self) -> dict: + d = self._to_dict() + d["package"] = self.__package.id if self.__package else None + return d - def to_model(self) -> ProfileModel: + def to_model(self) -> Optional[ProfileModel]: try: - model = ProfileModel(**self.__to_dict()) + return ProfileModel(**self._to_dict()) except ValidationError as e: - logger.exception(e) + logger.exception("Failed to convert Profile to ProfileModel: %s", e) + return None - else: - return model - - def to_pkg_model(self) -> ProfileModel: + def to_pkg_model(self) -> Optional[PackageProfileModel]: try: - model = PackageProfileModel(**self.__to_pkg_dict()) + return PackageProfileModel(**self._to_pkg_dict()) except ValidationError as e: - logger.exception(e) - - else: - return model + logger.exception("Failed to convert Profile to PackageProfileModel: %s", e) + return None class ProfileField: - def __init__( - self, - data: ElementDefinition, - ) -> None: + def __init__(self, data: ElementDefinition) -> None: self.__data = data self.__id = str(uuid4()) @@ -127,7 +117,11 @@ def __repr__(self) -> str: return str(self) def __eq__(self, value: object) -> bool: - return self.min == value.min and self.max == value.max + return ( + isinstance(value, ProfileField) + and self.min == value.min + and self.max == value.max + ) @property def id(self) -> str: @@ -138,10 +132,8 @@ def path_full(self) -> str: return self.__data.id @property - def path(self) -> str: - return ( - ("." + self.path_full.split(".", 1)[1]) if "." in self.path_full else None - ) + def path(self) -> Optional[str]: + return "." + self.path_full.split(".", 1)[1] if "." in self.path_full else None @property def min(self) -> int: @@ -157,29 +149,28 @@ def max_num(self) -> float: @property def must_support(self) -> bool: - return self.__data.mustSupport if self.__data.mustSupport else False + return bool(self.__data.mustSupport) @property - def ref_types(self) -> list[str]: - return ( - [ - p - for t in self.__data.type - if t.code == "Reference" - for p in t.targetProfile - ] - if self.__data.type is not None - else [] - ) + def ref_types(self) -> List[str]: + if not self.__data.type: + return [] + return [ + p + for t in self.__data.type + if t.code == "Reference" and t.targetProfile + for p in t.targetProfile + ] @property def is_default(self) -> bool: - return self == self.__data.base + return self.__data.base == self def to_model(self) -> ProfileFieldModel: + ref_types = self.ref_types return ProfileFieldModel( min=self.min, max=self.max, must_support=self.must_support, - ref_types=self.ref_types if len(self.ref_types) else None, + ref_types=ref_types if ref_types else None, ) diff --git a/service/src/structure_comparer/data/project.py b/service/src/structure_comparer/data/project.py index f840e42..f3daeca 100644 --- a/service/src/structure_comparer/data/project.py +++ b/service/src/structure_comparer/data/project.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Dict +from typing import Dict, Optional from ..manual_entries import ManualEntries from ..model.project import Project as ProjectModel @@ -15,71 +15,66 @@ def __init__(self, path: Path): self.dir = path self.config = ProjectConfig.from_json(path / "config.json") - self.mappings: Dict[str, Mapping] - self.comparisons: Dict[str, Comparison] - self.manual_entries: ManualEntries + self.mappings: Dict[str, Mapping] = {} + self.comparisons: Dict[str, Comparison] = {} + self.manual_entries: ManualEntries = ManualEntries() + self.pkgs: list[Package] = [] - self.pkgs: list[Package] + self._load_packages() + self._load_comparisons() + self._load_mappings() + self._read_manual_entries() - self.__load_packages() - self.load_comparisons() - self.__load_mappings() - self.__read_manual_entries() + def _load_packages(self) -> None: + # Trigger creation of data_dir via property + data_dir = self.data_dir # <- erstellt Verzeichnis dank Property - def __load_packages(self) -> None: # Load packages from config - self.pkgs = [Package(self.data_dir, self, p) for p in self.config.packages] + self.pkgs = [Package(data_dir, self, cfg) for cfg in self.config.packages] - # Check for local packages not in config + # Add any local packages not yet in config for dir in self.data_dir.iterdir(): if dir.is_dir(): - name, version = dir.name.split("#") - if not self.__has_pkg(name, version): - # FHIR package brings own information with it + try: + name, version = dir.name.split("#", maxsplit=1) + except ValueError: + continue # skip invalid folder names + + if not self._has_package(name, version): if (dir / "package/package.json").exists(): self.pkgs.append(Package(dir, self)) - - # Create new config entry for package else: - cfg = PackageConfig(name=name, version=version) - self.config.packages.append(cfg) + new_cfg = PackageConfig(name=name, version=version) + self.config.packages.append(new_cfg) self.config.write() + self.pkgs.append(Package(dir, self, new_cfg)) - # Create and append package - self.pkgs.append(Package(dir, self, cfg)) - - def load_comparisons(self): + def _load_comparisons(self) -> None: self.comparisons = { - c.id: Comparison(c, self).init_ext() for c in self.config.comparisons + cmp.id: Comparison(cmp, self).init_ext() for cmp in self.config.comparisons } - def __load_mappings(self): + def _load_mappings(self) -> None: self.mappings = { - m.id: Mapping(m, self).init_ext() for m in self.config.mappings + mp.id: Mapping(mp, self).init_ext() for mp in self.config.mappings } - def __read_manual_entries(self): - manual_entries_file = self.dir / self.config.manual_entries_file + def _read_manual_entries(self) -> None: + manual_file = self.dir / self.config.manual_entries_file + manual_file.touch(exist_ok=True) - if not manual_entries_file.exists(): - manual_entries_file.touch() - - self.manual_entries = ManualEntries() - self.manual_entries.read(manual_entries_file) + self.manual_entries.read(manual_file) self.manual_entries.write() @staticmethod def create(path: Path, project_name: str) -> "Project": path.mkdir(parents=True, exist_ok=True) - # Create empty manual_entries.yaml file - manual_entries_file = path / "manual_entries.yaml" - manual_entries_file.touch() + (path / "manual_entries.yaml").touch() - # Create default config.json file - config_data = ProjectConfig(name=project_name) - config_data._file_path = path / "config.json" - config_data.write() + config = ProjectConfig(name=project_name) + config._file_path = path / "config.json" + config.write() return Project(path) @@ -87,32 +82,30 @@ def create(path: Path, project_name: str) -> "Project": def name(self) -> str: return self.config.name + @name.setter + def name(self, value: str) -> None: + self.config.name = value + self.config.write() + @property def key(self) -> str: return self.dir.name @property def url(self) -> str: - return "/project/" + self.key - - @name.setter - def name(self, value: str): - self.config.name = value - self.config.write() + return f"/project/{self.key}" @property def data_dir(self) -> Path: - return self.dir / self.config.data_dir + data_path = self.dir / self.config.data_dir + data_path.mkdir(parents=True, exist_ok=True) + return data_path - def write_config(self): + def write_config(self) -> None: self.config.write() - def get_package(self, id: str) -> Package | None: - for pkg in self.pkgs: - if pkg.id == id: - return pkg - - return None + def get_package(self, id: str) -> Optional[Package]: + return next((pkg for pkg in self.pkgs if pkg.id == id), None) def get_profile(self, id: str, url: str, version: str): for pkg in self.pkgs: @@ -121,19 +114,17 @@ def get_profile(self, id: str, url: str, version: str): profile.id == id or profile.url == url ) and profile.version == version: return profile - return None - def __has_pkg(self, name: str, version: str) -> bool: - return any([p.name == name and p.version == version for p in self.pkgs]) + def _has_package(self, name: str, version: str) -> bool: + return any(p.name == name and p.version == version for p in self.pkgs) def to_model(self) -> ProjectModel: - mappings = [m.to_base_model() for m in self.mappings.values()] - pkgs = [p.to_model() for p in self.pkgs] - comparisons = [c.to_overview_model() for c in self.comparisons.values()] - return ProjectModel( - name=self.name, mappings=mappings, comparisons=comparisons, packages=pkgs + name=self.name, + mappings=[m.to_base_model() for m in self.mappings.values()], + comparisons=[c.to_overview_model() for c in self.comparisons.values()], + packages=[p.to_model() for p in self.pkgs], ) def to_overview_model(self) -> ProjectOverviewModel: diff --git a/service/src/structure_comparer/handler/comparison.py b/service/src/structure_comparer/handler/comparison.py index 856372c..a2c9279 100644 --- a/service/src/structure_comparer/handler/comparison.py +++ b/service/src/structure_comparer/handler/comparison.py @@ -43,7 +43,7 @@ def create(self, project_key, input: ComparisonCreateModel): p.config.comparisons.append(cc) p.config.write() - p.load_comparisons() + p._load_comparisons() return p.comparisons[cc.id].to_overview_model() @@ -61,7 +61,7 @@ def delete(self, project_key, comparison_id): ] p.config.write() - p.load_comparisons() + p._load_comparisons() def _to_profiles_config(url: str) -> ComparisonProfileConfigModel: diff --git a/service/src/structure_comparer/manual_entries.py b/service/src/structure_comparer/manual_entries.py index ac8d4fb..978d821 100644 --- a/service/src/structure_comparer/manual_entries.py +++ b/service/src/structure_comparer/manual_entries.py @@ -1,6 +1,7 @@ import logging from enum import StrEnum from pathlib import Path +from typing import Iterator import yaml @@ -25,48 +26,62 @@ def __init__(self) -> None: def entries(self) -> list[ManualEntriesMappingModel]: if self._data is None: raise NotInitialized("ManualEntries data was not initialized") - return self._data.entries - def read(self, file: str | Path): + def read(self, file: str | Path) -> None: self._file = Path(file) - content = self._file.read_text(encoding="utf-8") - - if self._file.suffix == ".json": - self._data = ManualEntriesModel.model_validate_json(content) - elif self._file.suffix == ".yaml": - self._data = ManualEntriesModel.model_validate(yaml.safe_load(content)) - - def write(self): + content = self._file.read_text(encoding="utf-8").strip() + + if not content: + logger.warning( + f"ManualEntries file {self._file} is empty. Using default model." + ) + self._data = ManualEntriesModel(entries=[]) + return + + try: + if self._file.suffix == ".json": + self._data = ManualEntriesModel.model_validate_json(content) + elif self._file.suffix == ".yaml": + parsed = yaml.safe_load(content) or {} + self._data = ManualEntriesModel.model_validate(parsed) + else: + raise ValueError(f"Unsupported file extension: {self._file.suffix}") + except Exception as e: + logger.exception(f"Failed to read manual entries from {self._file}: {e}") + raise + + def write(self) -> None: if self._file is None: raise NotInitialized("ManualEntries file was not set") - if self._data is None: raise NotInitialized("ManualEntries data was not initialized") - content = None + content: str | None = None if self._file.suffix == ".json": content = self._data.model_dump_json(indent=4) elif self._file.suffix == ".yaml": content = yaml.safe_dump(self._data.model_dump()) + else: + raise ValueError(f"Unsupported file extension: {self._file.suffix}") - if content is not None: - self._file.write_text(content, encoding="utf-8") + self._file.write_text(content, encoding="utf-8") - def __iter__(self): + def __iter__(self) -> Iterator[ManualEntriesMappingModel]: return iter(self.entries) - def get(self, key, default=None) -> ManualEntriesMappingModel | None: + def get(self, key: str, default=None) -> ManualEntriesMappingModel | None: return next((e for e in self.entries if e.id == key), default) - def __getitem__(self, key) -> ManualEntriesMappingModel: - return next((e for e in self.entries if e.id == key)) - - def __setitem__(self, key, value) -> None: - i = next((i for i, e in enumerate(self.entries) if e.id == key), None) - - if i is None: - self.entries.append(value) - - else: - self.entries[i] = value + def __getitem__(self, key: str) -> ManualEntriesMappingModel: + for entry in self.entries: + if entry.id == key: + return entry + raise KeyError(f"No entry found with ID: {key}") + + def __setitem__(self, key: str, value: ManualEntriesMappingModel) -> None: + for i, entry in enumerate(self.entries): + if entry.id == key: + self.entries[i] = value + return + self.entries.append(value)