From 25ce050856506c8e5dc939ae2c1aeabf1c8d3373 Mon Sep 17 00:00:00 2001 From: Thomas Bosch Date: Wed, 30 Apr 2025 13:51:15 +0200 Subject: [PATCH 1/5] First implementatoin of stress testing. Testing on both old and new mcsd update algorithms. --- .gitignore | 3 + app/application.py | 12 +- app/config.py | 1 - app/db/db.py | 14 + app/routers/update_router.py | 3 +- app/services/fhir/model_factory.py | 2 +- .../fhir/references/reference_extractor.py | 2 - .../mcsd_services/update_consumer_service.py | 241 ++++++++++ .../update/update_all_consumers_service.py | 52 ++ app/stats.py | 89 ++-- poetry.lock | 65 +++ pyproject.toml | 1 + seeds/generate_data.py | 453 ++++++------------ tests/conftest.py | 23 +- tests/services/test_consumer_update.py | 170 +++++++ tests/services/test_new_update.py | 232 +++++++++ tests/test_config.py | 4 +- tests/unit_tests/__init__.py | 0 tests/unit_tests/test_stats.py | 65 +++ tests/utils/__init__.py | 0 tests/utils/mcsd_resource_gen.py | 325 +++++++++++++ tests/utils/utils.py | 92 ++++ 22 files changed, 1490 insertions(+), 359 deletions(-) create mode 100644 app/services/mcsd_services/update_consumer_service.py create mode 100644 app/services/update/update_all_consumers_service.py create mode 100644 tests/services/test_consumer_update.py create mode 100644 tests/services/test_new_update.py create mode 100644 tests/unit_tests/__init__.py create mode 100644 tests/unit_tests/test_stats.py create mode 100644 tests/utils/__init__.py create mode 100644 tests/utils/mcsd_resource_gen.py create mode 100644 tests/utils/utils.py diff --git a/.gitignore b/.gitignore index 9e944dc9..aab7540f 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ secrets/* *.pub docker-compose.override.yml coverage.xml +tests/mock_data +tests/testing_results +tests/new_testing_results diff --git a/app/application.py b/app/application.py index 0fb22524..9daebf97 100644 --- a/app/application.py +++ b/app/application.py @@ -16,7 +16,7 @@ from app.routers.directory_health import router as directory_health_router from app.routers.ignore_list_router import router as ignore_list_router from app.config import get_config -from app.stats import setup_stats +from app.stats import StatsdMiddleware, setup_stats from app.telemetry import setup_telemetry @@ -50,12 +50,12 @@ def run() -> None: def create_fastapi_app() -> FastAPI: - application_init() - fastapi = setup_fastapi() - if get_config().stats.enabled: setup_stats() + application_init() + fastapi = setup_fastapi() + if get_config().telemetry.enabled: setup_telemetry(fastapi) @@ -108,4 +108,8 @@ def setup_fastapi() -> FastAPI: for router in routers: fastapi.include_router(router) + stats_conf = get_config().stats + if stats_conf.enabled and stats_conf.host is not None and stats_conf.port is not None: + fastapi.add_middleware(StatsdMiddleware, module_name=stats_conf.module_name) + return fastapi diff --git a/app/config.py b/app/config.py index 460b839c..13866b0d 100644 --- a/app/config.py +++ b/app/config.py @@ -104,7 +104,6 @@ class ConfigStats(BaseModel): port: int | None module_name: str | None - class ConfigMcsd(BaseModel): consumer_url: str authentication: Union[str] = Field( diff --git a/app/db/db.py b/app/db/db.py index 6d3166c4..90401dda 100644 --- a/app/db/db.py +++ b/app/db/db.py @@ -30,6 +30,20 @@ def generate_tables(self) -> None: logger.info("Generating tables...") Base.metadata.create_all(self.engine) + def truncate_tables(self) -> None: + logger.info("Truncating all tables...") + try: + metadata = MetaData() + metadata.reflect(bind=self.engine) + with Session(self.engine) as session: + for table in reversed(metadata.sorted_tables): + session.execute(text(f"DELETE FROM {table.name}")) + session.commit() + logger.info("All tables truncated successfully.") + except Exception as e: + logger.error("Error while truncating tables: %s", e) + raise e + def is_healthy(self) -> bool: """ Check if the database is healthy diff --git a/app/routers/update_router.py b/app/routers/update_router.py index 5b1ea196..61b3d0d7 100644 --- a/app/routers/update_router.py +++ b/app/routers/update_router.py @@ -11,6 +11,7 @@ from app.services.entity.supplier_ignored_directory_service import SupplierIgnoredDirectoryService from app.services.update.update_consumer_service import UpdateConsumerService from app.services.supplier_provider.supplier_provider import SupplierProvider +from app.stats import get_stats router = APIRouter(prefix="/update_resources", tags=["Update consumer resources"]) @@ -18,7 +19,7 @@ class UpdateQueryParams(BaseModel): since: datetime | None = Field(default=None) - +@get_stats().timer("update_resources") @router.post("/{supplier_id}", response_model=None, summary="Update by supplier ID") @router.post("", response_model=None, summary="Update all suppliers") def update_supplier( diff --git a/app/services/fhir/model_factory.py b/app/services/fhir/model_factory.py index 628322f4..7bb4b4f4 100644 --- a/app/services/fhir/model_factory.py +++ b/app/services/fhir/model_factory.py @@ -45,7 +45,7 @@ def create_endpoint(data: Dict[str, Any], strict: bool) -> Endpoint: if "payloadType" not in data: data["payloadType"] = [] - logger.warning(warning_message("Endpoint", "payload_type", "empty array")) + logger.warning(warning_message("Endpoint", "payloadType", "empty array")) if "address" not in data: data["address"] = "" logger.warning(warning_message("Endpoint", "address", "empty string")) diff --git a/app/services/fhir/references/reference_extractor.py b/app/services/fhir/references/reference_extractor.py index 70d8ec3f..c8f76938 100644 --- a/app/services/fhir/references/reference_extractor.py +++ b/app/services/fhir/references/reference_extractor.py @@ -32,12 +32,10 @@ def _get_org_references(model: Organization) -> List[Reference]: ref = extract_references(endpoint) if ref is not None: refs.append(ref) - if part_of is not None: new_ref = extract_references(part_of) if new_ref is not None: refs.append(new_ref) - return refs diff --git a/app/services/mcsd_services/update_consumer_service.py b/app/services/mcsd_services/update_consumer_service.py new file mode 100644 index 00000000..45ac4044 --- /dev/null +++ b/app/services/mcsd_services/update_consumer_service.py @@ -0,0 +1,241 @@ +import logging +from typing import Dict, Any, Tuple +from datetime import datetime + +from app.models.resource_map.dto import ResourceMapDto, ResourceMapUpdateDto +from app.models.supplier_update.dto import UpdateLookup, UpdateLookupEntry +from app.services.bundle_tools import ( + get_resource_type_and_id_from_entry, + get_request_method_from_entry, +) +from app.services.fhir.fhir_service import FhirService +from app.services.request_services.supplier_request_service import ( + SupplierRequestsService, +) +from app.services.entity_services.resource_map_service import ResourceMapService +from app.services.request_services.consumer_request_service import ( + ConsumerRequestService, +) +from app.models.fhir.r4.types import Bundle, Request, Entry, Resource +from app.stats import Stats + +logger = logging.getLogger(__name__) + + +class UpdateConsumerService: + def __init__( + self, + supplier_request_service: SupplierRequestsService, + consumer_request_service: ConsumerRequestService, + resource_map_service: ResourceMapService, + strict_validation: bool, + stats: Stats, + ): + self.__supplier_request_service = supplier_request_service + self.__consumer_request_service = consumer_request_service + self.__resource_map_service = resource_map_service + self.__fhir_service = FhirService(strict_validation=strict_validation) + self.stats = stats + + def update_supplier( + self, + supplier_id: str, + _since: datetime | None = None, + ) -> Dict[str, Any]: + # Fetch the history of all resources from this supplier (can take a while) + supplier_history = self.__supplier_request_service.get_resource_history( + supplier_id=supplier_id, _since=_since + ) + entries = supplier_history.entry if supplier_history.entry else [] + + # Map all the resources into a set of tuples (resource_type, resource_id) + resource_ids: set[Tuple[str, str]] = set() # Resource type & ID + for entry in entries: + (resource_type, resource_id) = get_resource_type_and_id_from_entry( + entry + ) + if resource_type is not None and resource_id is not None: + resource_ids.add((resource_type, resource_id)) + + for resource_type, resource_id in resource_ids: + resource_history = self.__supplier_request_service.get_resource_history( + supplier_id, resource_type, resource_id, None + ) + if resource_history.entry is None or len(resource_history.entry) == 0: + continue + + # Update this resource to the latest entry (if needed) + latest_entry = resource_history.entry[0] + self.update(supplier_id, len(resource_history.entry), latest_entry) + + return { + "message": "organizations and endpoints are updated", + "data": self.__resource_map_service.find(supplier_id=supplier_id), + } + + def update(self, supplier_id: str, history_size: int, latest_entry: Entry) -> None: + _, main_resource_id = get_resource_type_and_id_from_entry(latest_entry) + request_method = get_request_method_from_entry(latest_entry) + + main_resource = ( + self.__fhir_service.create_resource(latest_entry.resource.model_dump()) + if latest_entry.resource + else None + ) + unique_refs = ( + self.__fhir_service.get_references(main_resource) + if main_resource is not None + else [] + ) + + update_lookup: UpdateLookup = {} + if main_resource_id: + update_lookup.update( + { + main_resource_id: UpdateLookupEntry( + history_size=history_size, entry=latest_entry + ) + } + ) + + for ref in unique_refs: + res_type, id = self.__fhir_service.split_reference(ref) + data = self.__supplier_request_service.get_resource_history( + resource_type=res_type, resource_id=id, supplier_id=supplier_id + ) + entry = data.entry + if entry is not None and len(entry) > 0 and id is not None: + update_lookup.update( + {id: UpdateLookupEntry(history_size=len(entry), entry=entry[0])} + ) + + # prepare bundle + new_bundle = Bundle(type="transaction", entry=[]) + for id, lookup_data in update_lookup.items(): + resource_type, resource_id = get_resource_type_and_id_from_entry( + lookup_data.entry + ) + resource_map = self.__resource_map_service.get( + supplier_id, resource_type, resource_id + ) + request_method = get_request_method_from_entry(lookup_data.entry) + original_resource = ( + self.__fhir_service.create_resource( + lookup_data.entry.resource.model_dump() + ) + if lookup_data.entry.resource is not None + else None + ) + + # resource new and method is delete + if resource_map is None and request_method == "DELETE": + logger.info( + f"resource {id} is new and already DELETED from supplier {supplier_id} ...skipping" + ) + continue + + # resource up to date + if ( + resource_map is not None + and resource_map.history_size == lookup_data.history_size + ): + logger.info( + f"resource {resource_id} from {supplier_id} is up to date with consumer id: {resource_map.consumer_resource_id} ...skipping" + ) + continue + + # resource is new + if ( + resource_map is None + and request_method != "DELETE" + and original_resource is not None + ): + # replace references + + logger.info( + f"resource {resource_id} from {supplier_id} is new ...processing" + ) + new_id = f"{supplier_id}-{resource_id}" + new_resource = self.__fhir_service.namespace_resource_references( + original_resource, supplier_id + ) + new_resource.id = new_id + new_entry = Entry( + resource=Resource(**new_resource.model_dump()), + request=Request(method="PUT", url=f"{resource_type}/{new_id}"), + ) + new_bundle.entry.append(new_entry) + lookup_data.resource_map = ResourceMapDto( + supplier_id=supplier_id, + supplier_resource_id=resource_id, # type: ignore + resource_type=resource_type, # type: ignore + consumer_resource_id=new_id, + history_size=lookup_data.history_size, + ) + + # resource needs to be delete + if resource_map is not None and request_method == "DELETE": + logger.info( + f"resource {resource_id} from {supplier_id} needs to be deleted with consumer id: {resource_map.consumer_resource_id} ...processing" + ) + new_entry = Entry( + request=Request( + method="DELETE", + url=f"{resource_type}/{resource_map.consumer_resource_id}", + ) + ) + new_bundle.entry.append(new_entry) + lookup_data.resource_map = ResourceMapUpdateDto( + supplier_id=supplier_id, + resource_type=resource_map.resource_type, + supplier_resource_id=resource_map.supplier_resource_id, + history_size=lookup_data.history_size, + ) + + if ( + resource_map is not None + and request_method != "DELETE" + and original_resource is not None + ): + logger.info( + f"resource {resource_id} from {supplier_id} needs to be updated with consumer id: {resource_map.consumer_resource_id} ...processing" + ) + # replace id with one from resource_map + original_resource.id = resource_map.consumer_resource_id + new_resource = self.__fhir_service.namespace_resource_references( + original_resource, supplier_id + ) + new_entry = Entry( + resource=Resource(**new_resource.model_dump()), + request=Request( + method="PUT", + url=f"{resource_type}/{resource_map.consumer_resource_id}", + ), + ) + new_bundle.entry.append(new_entry) + lookup_data.resource_map = ResourceMapUpdateDto( + supplier_id=supplier_id, + resource_type=resource_map.resource_type, + supplier_resource_id=resource_map.supplier_resource_id, + history_size=lookup_data.history_size, + ) + + # only post when something has changed + if len(new_bundle.entry) > 0: + logger.info(f"detected changes from {supplier_id} ...updating data") + self.__consumer_request_service.post_bundle(new_bundle) + logger.info(f"data from {supplier_id} has been updated successfully!!") + + for v in update_lookup.values(): + if isinstance(v.resource_map, ResourceMapDto): + logger.info( + f"new resource map entry with {v.resource_map.__repr__()} ...creating" + ) + self.__resource_map_service.add_one(v.resource_map) + if isinstance(v.resource_map, ResourceMapUpdateDto): + self.__resource_map_service.update_one(v.resource_map) + logger.info( + f"new resource map entry with {v.resource_map.__repr__()} ...updating" + ) + + logger.info("resource map has been updated successfully!!!") diff --git a/app/services/update/update_all_consumers_service.py b/app/services/update/update_all_consumers_service.py new file mode 100644 index 00000000..60618ad3 --- /dev/null +++ b/app/services/update/update_all_consumers_service.py @@ -0,0 +1,52 @@ +import logging +from datetime import datetime, timedelta +from typing import Any + +from app.services.entity.supplier_info_service import SupplierInfoService +from app.services.api.suppliers_api import SuppliersApi +from app.services.update.update_consumer_service import UpdateConsumerService +from app.stats import Stats + +logger = logging.getLogger(__name__) + + +class UpdateAllConsumersService: + def __init__( + self, + update_consumer_service: UpdateConsumerService, + supplier_service: SuppliersApi, + supplier_info_service: SupplierInfoService, + stats: Stats, + ) -> None: + self.__supplier_service = supplier_service + self.__update_consumer_service = update_consumer_service + self.__supplier_info_service = supplier_info_service + self.__stats = stats + + def update_all(self) -> list[dict[str, Any]]: + with self.__stats.timer("update_from_all_suppliers"): + all_suppliers = self.__supplier_service.get_all() + data: list[dict[str, Any]] = [] + for supplier in all_suppliers: + with self.__stats.timer(f"get_supplier_info_{supplier.id}"): + info = self.__supplier_info_service.get_supplier_info(supplier.id) + new_updated = datetime.now() - timedelta(seconds=60) + try: + with self.__stats.timer(f"update_from_supplier_{supplier.id}"): + data.append( + self.__update_consumer_service.update( + supplier.id, info.last_success_sync + ) + ) + + info.last_success_sync = new_updated + info.failed_attempts = 0 + info.last_success_sync = new_updated + self.__supplier_info_service.update_supplier_info(info) + except Exception as e: + logging.error(f"Failed to update supplier {supplier.id}: {e}") + info.failed_attempts += 1 + info.failed_sync_count += 1 + self.__supplier_info_service.update_supplier_info(info) + + return data diff --git a/app/stats.py b/app/stats.py index d62c40bb..f7ff59ee 100644 --- a/app/stats.py +++ b/app/stats.py @@ -1,7 +1,9 @@ +from datetime import timedelta import time -from typing import Callable, Awaitable +from typing import Any, Callable, Awaitable import statsd +from statsd.client.timer import Timer from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response @@ -23,6 +25,9 @@ def dec(self, key: str, count: int = 1, rate: int = 1) -> None: def gauge(self, key: str, value: int, delta: bool = False) -> None: raise NotImplementedError + def timer(self, key: str) -> Timer: + raise NotImplementedError + class NoopStats(Stats): def timing(self, key: str, value: int) -> None: @@ -37,10 +42,51 @@ def dec(self, key: str, count: int = 1, rate: int = 1) -> None: def gauge(self, key: str, value: int, delta: bool = False) -> None: pass + def timer(self, key: str) -> Timer: + raise NotImplementedError + + +class MemoryClient: + def __init__(self) -> None: + self.memory: dict[str, Any] = {} + + def timer(self, stat: str, rate: int = 1) -> Timer: + return Timer(self, stat, rate) + + def timing(self, stat: str, delta: timedelta | float, rate: int = 1) -> None: + """Record a timing stat. | Warning: Own implementation, not from statsd.""" + if isinstance(delta, timedelta): + # Convert timedelta to number of milliseconds. + delta = delta.total_seconds() * 1000.0 + if stat not in self.memory: + self.memory[stat] = [] + self.memory[stat].append(delta) + + def incr(self, stat: str, count: int = 1, rate: int = 1) -> None: + """Increment a stat by `count`. | Warning: Own implementation, not from statsd.""" + if stat not in self.memory: + self.memory[stat] = 0 + self.memory[stat] += count + + + def decr(self, stat: str, count: int = 1, rate: int = 1) -> None: + """Decrement a stat by `count`. | Warning: Own implementation, not from statsd.""" + self.incr(stat, -count, rate) + + def gauge(self, stat: str, value: int, rate: int = 1, delta: bool = False) -> None: + """Set a gauge value. | Warning: Own implementation, not from statsd.""" + if stat not in self.memory: + self.memory[stat] = [] + snapshot = {"value": value, "timestamp": time.time()} + self.memory[stat].append(snapshot) + + def get_memory(self) -> dict[str, Any]: + return self.memory + class Statsd(Stats): - def __init__(self, host: str, port: int): - self.client = statsd.StatsClient(host, port) + def __init__(self, client: statsd.StatsClient | MemoryClient): + self.client = client def timing(self, key: str, value: int) -> None: self.client.timing(key, value) @@ -54,6 +100,9 @@ def dec(self, key: str, count: int = 1, rate: int = 1) -> None: def gauge(self, key: str, value: int, delta: bool = False) -> None: self.client.gauge(key, value, delta) + def timer(self, key: str) -> Timer: + return self.client.timer(key) + _STATS: Stats = NoopStats() @@ -63,12 +112,12 @@ def setup_stats() -> None: if config.stats.enabled is False: return - - config.stats.host = config.stats.host or "localhost" - config.stats.port = config.stats.port or 8125 - + if config.stats.host is not None and config.stats.port is not None: + client = statsd.StatsClient(config.stats.host, config.stats.port) + else: + client = MemoryClient() global _STATS - _STATS = Statsd(config.stats.host, config.stats.port) + _STATS = Statsd(client) def get_stats() -> Stats: @@ -85,9 +134,7 @@ def __init__(self, app: ASGIApp, module_name: str): self.module_name = module_name async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response: - normalized_path = self.normalize_path(request) - - key = f"{self.module_name}.http.request.{request.method.lower()}.{normalized_path}" + key = f"{self.module_name}.http.request.{request.method.lower()}.{request.url.path}" get_stats().inc(key) start_time = time.monotonic() @@ -98,23 +145,3 @@ async def dispatch(self, request: Request, call_next: Callable[[Request], Awaita get_stats().timing(f"{self.module_name}.http.response_time", response_time) return response - - @staticmethod - def normalize_path(request: Request) -> str: - """ - Normalize the path to remove resource IDs. This makes it easier to group similar requests together in the stats - """ - if not request.url.path.startswith("/resource/"): - return request.url.path - - parts = request.url.path.split("/") - - if len(parts) >= 4 and parts[3] != "_search": - parts[3] = "%resource_id%" # Remove Resource ID - if len(parts) >= 5 and parts[4] == "_history": - parts[5] = "%version_id%" # Remove Version ID - return '/'.join(parts) - - - - diff --git a/poetry.lock b/poetry.lock index 15c0adf6..a2c247c0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1329,6 +1329,71 @@ files = [ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] +[[package]] +name = "numpy" +version = "2.2.5" +description = "Fundamental package for array computing in Python" +optional = false +python-versions = ">=3.10" +groups = ["dev"] +files = [ + {file = "numpy-2.2.5-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:1f4a922da1729f4c40932b2af4fe84909c7a6e167e6e99f71838ce3a29f3fe26"}, + {file = "numpy-2.2.5-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:b6f91524d31b34f4a5fee24f5bc16dcd1491b668798b6d85585d836c1e633a6a"}, + {file = "numpy-2.2.5-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:19f4718c9012e3baea91a7dba661dcab2451cda2550678dc30d53acb91a7290f"}, + {file = "numpy-2.2.5-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:eb7fd5b184e5d277afa9ec0ad5e4eb562ecff541e7f60e69ee69c8d59e9aeaba"}, + {file = "numpy-2.2.5-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6413d48a9be53e183eb06495d8e3b006ef8f87c324af68241bbe7a39e8ff54c3"}, + {file = "numpy-2.2.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7451f92eddf8503c9b8aa4fe6aa7e87fd51a29c2cfc5f7dbd72efde6c65acf57"}, + {file = "numpy-2.2.5-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:0bcb1d057b7571334139129b7f941588f69ce7c4ed15a9d6162b2ea54ded700c"}, + {file = "numpy-2.2.5-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:36ab5b23915887543441efd0417e6a3baa08634308894316f446027611b53bf1"}, + {file = "numpy-2.2.5-cp310-cp310-win32.whl", hash = "sha256:422cc684f17bc963da5f59a31530b3936f57c95a29743056ef7a7903a5dbdf88"}, + {file = "numpy-2.2.5-cp310-cp310-win_amd64.whl", hash = "sha256:e4f0b035d9d0ed519c813ee23e0a733db81ec37d2e9503afbb6e54ccfdee0fa7"}, + {file = "numpy-2.2.5-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c42365005c7a6c42436a54d28c43fe0e01ca11eb2ac3cefe796c25a5f98e5e9b"}, + {file = "numpy-2.2.5-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:498815b96f67dc347e03b719ef49c772589fb74b8ee9ea2c37feae915ad6ebda"}, + {file = "numpy-2.2.5-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:6411f744f7f20081b1b4e7112e0f4c9c5b08f94b9f086e6f0adf3645f85d3a4d"}, + {file = "numpy-2.2.5-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:9de6832228f617c9ef45d948ec1cd8949c482238d68b2477e6f642c33a7b0a54"}, + {file = "numpy-2.2.5-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:369e0d4647c17c9363244f3468f2227d557a74b6781cb62ce57cf3ef5cc7c610"}, + {file = "numpy-2.2.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:262d23f383170f99cd9191a7c85b9a50970fe9069b2f8ab5d786eca8a675d60b"}, + {file = "numpy-2.2.5-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:aa70fdbdc3b169d69e8c59e65c07a1c9351ceb438e627f0fdcd471015cd956be"}, + {file = "numpy-2.2.5-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:37e32e985f03c06206582a7323ef926b4e78bdaa6915095ef08070471865b906"}, + {file = "numpy-2.2.5-cp311-cp311-win32.whl", hash = "sha256:f5045039100ed58fa817a6227a356240ea1b9a1bc141018864c306c1a16d4175"}, + {file = "numpy-2.2.5-cp311-cp311-win_amd64.whl", hash = "sha256:b13f04968b46ad705f7c8a80122a42ae8f620536ea38cf4bdd374302926424dd"}, + {file = "numpy-2.2.5-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:ee461a4eaab4f165b68780a6a1af95fb23a29932be7569b9fab666c407969051"}, + {file = "numpy-2.2.5-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ec31367fd6a255dc8de4772bd1658c3e926d8e860a0b6e922b615e532d320ddc"}, + {file = "numpy-2.2.5-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:47834cde750d3c9f4e52c6ca28a7361859fcaf52695c7dc3cc1a720b8922683e"}, + {file = "numpy-2.2.5-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:2c1a1c6ccce4022383583a6ded7bbcda22fc635eb4eb1e0a053336425ed36dfa"}, + {file = "numpy-2.2.5-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9d75f338f5f79ee23548b03d801d28a505198297534f62416391857ea0479571"}, + {file = "numpy-2.2.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a801fef99668f309b88640e28d261991bfad9617c27beda4a3aec4f217ea073"}, + {file = "numpy-2.2.5-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:abe38cd8381245a7f49967a6010e77dbf3680bd3627c0fe4362dd693b404c7f8"}, + {file = "numpy-2.2.5-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:5a0ac90e46fdb5649ab6369d1ab6104bfe5854ab19b645bf5cda0127a13034ae"}, + {file = "numpy-2.2.5-cp312-cp312-win32.whl", hash = "sha256:0cd48122a6b7eab8f06404805b1bd5856200e3ed6f8a1b9a194f9d9054631beb"}, + {file = "numpy-2.2.5-cp312-cp312-win_amd64.whl", hash = "sha256:ced69262a8278547e63409b2653b372bf4baff0870c57efa76c5703fd6543282"}, + {file = "numpy-2.2.5-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:059b51b658f4414fff78c6d7b1b4e18283ab5fa56d270ff212d5ba0c561846f4"}, + {file = "numpy-2.2.5-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:47f9ed103af0bc63182609044b0490747e03bd20a67e391192dde119bf43d52f"}, + {file = "numpy-2.2.5-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:261a1ef047751bb02f29dfe337230b5882b54521ca121fc7f62668133cb119c9"}, + {file = "numpy-2.2.5-cp313-cp313-macosx_14_0_x86_64.whl", hash = "sha256:4520caa3807c1ceb005d125a75e715567806fed67e315cea619d5ec6e75a4191"}, + {file = "numpy-2.2.5-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3d14b17b9be5f9c9301f43d2e2a4886a33b53f4e6fdf9ca2f4cc60aeeee76372"}, + {file = "numpy-2.2.5-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ba321813a00e508d5421104464510cc962a6f791aa2fca1c97b1e65027da80d"}, + {file = "numpy-2.2.5-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a4cbdef3ddf777423060c6f81b5694bad2dc9675f110c4b2a60dc0181543fac7"}, + {file = "numpy-2.2.5-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:54088a5a147ab71a8e7fdfd8c3601972751ded0739c6b696ad9cb0343e21ab73"}, + {file = "numpy-2.2.5-cp313-cp313-win32.whl", hash = "sha256:c8b82a55ef86a2d8e81b63da85e55f5537d2157165be1cb2ce7cfa57b6aef38b"}, + {file = "numpy-2.2.5-cp313-cp313-win_amd64.whl", hash = "sha256:d8882a829fd779f0f43998e931c466802a77ca1ee0fe25a3abe50278616b1471"}, + {file = "numpy-2.2.5-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:e8b025c351b9f0e8b5436cf28a07fa4ac0204d67b38f01433ac7f9b870fa38c6"}, + {file = "numpy-2.2.5-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:8dfa94b6a4374e7851bbb6f35e6ded2120b752b063e6acdd3157e4d2bb922eba"}, + {file = "numpy-2.2.5-cp313-cp313t-macosx_14_0_arm64.whl", hash = "sha256:97c8425d4e26437e65e1d189d22dff4a079b747ff9c2788057bfb8114ce1e133"}, + {file = "numpy-2.2.5-cp313-cp313t-macosx_14_0_x86_64.whl", hash = "sha256:352d330048c055ea6db701130abc48a21bec690a8d38f8284e00fab256dc1376"}, + {file = "numpy-2.2.5-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8b4c0773b6ada798f51f0f8e30c054d32304ccc6e9c5d93d46cb26f3d385ab19"}, + {file = "numpy-2.2.5-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:55f09e00d4dccd76b179c0f18a44f041e5332fd0e022886ba1c0bbf3ea4a18d0"}, + {file = "numpy-2.2.5-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:02f226baeefa68f7d579e213d0f3493496397d8f1cff5e2b222af274c86a552a"}, + {file = "numpy-2.2.5-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:c26843fd58f65da9491165072da2cccc372530681de481ef670dcc8e27cfb066"}, + {file = "numpy-2.2.5-cp313-cp313t-win32.whl", hash = "sha256:1a161c2c79ab30fe4501d5a2bbfe8b162490757cf90b7f05be8b80bc02f7bb8e"}, + {file = "numpy-2.2.5-cp313-cp313t-win_amd64.whl", hash = "sha256:d403c84991b5ad291d3809bace5e85f4bbf44a04bdc9a88ed2bb1807b3360bb8"}, + {file = "numpy-2.2.5-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:b4ea7e1cff6784e58fe281ce7e7f05036b3e1c89c6f922a6bfbc0a7e8768adbe"}, + {file = "numpy-2.2.5-pp310-pypy310_pp73-macosx_14_0_x86_64.whl", hash = "sha256:d7543263084a85fbc09c704b515395398d31d6395518446237eac219eab9e55e"}, + {file = "numpy-2.2.5-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0255732338c4fdd00996c0421884ea8a3651eea555c3a56b84892b66f696eb70"}, + {file = "numpy-2.2.5-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:d2e3bdadaba0e040d1e7ab39db73e0afe2c74ae277f5614dad53eadbecbbb169"}, + {file = "numpy-2.2.5.tar.gz", hash = "sha256:a9c0d994680cd991b1cb772e8b297340085466a6fe964bc9d4e80f5e2f43c291"}, +] + [[package]] name = "opentelemetry-api" version = "1.29.0" diff --git a/pyproject.toml b/pyproject.toml index 8383b9a6..cee5ffb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ safety = "^3.3.0" codespell = "^2.2.6" types-requests = "^2.32" faker = "^37.1.0" +numpy = "^2.2.5" [build-system] requires = ["poetry-core"] diff --git a/seeds/generate_data.py b/seeds/generate_data.py index 1a2d849c..7cbc109c 100644 --- a/seeds/generate_data.py +++ b/seeds/generate_data.py @@ -1,16 +1,10 @@ -from uuid import UUID, uuid4 +from typing import List -from faker import Faker -from fhir.resources.R4B.address import Address from fhir.resources.R4B.codeableconcept import CodeableConcept from fhir.resources.R4B.coding import Coding from fhir.resources.R4B.endpoint import Endpoint -from fhir.resources.R4B.humanname import HumanName -from fhir.resources.R4B.identifier import Identifier from fhir.resources.R4B.organization import Organization -from fhir.resources.R4B.organization import OrganizationContact -from fhir.resources.R4B.reference import Reference from fhir.resources.R4B.location import Location from fhir.resources.R4B.healthcareservice import HealthcareService from fhir.resources.R4B.organizationaffiliation import OrganizationAffiliation @@ -19,50 +13,21 @@ class DataGenerator: - fake: Faker - - def __init__(self) -> None: - self.fake = Faker() - def generate_organization( self, - endpoint_id: UUID | None = None, - part_of: UUID | None = None, + endpoint_ids: List[str | None] = None, + res_id: str | None = None, + part_of: str | None = None, ) -> Organization: return Organization( - identifier=[ - (Identifier(system="http://example.org/org", value=str(uuid4()))), - ( - Identifier( - system="http://fhir.nl/fhir/NamingSystem/ura", - value=str(self.fake.random_number(8, True)), - ) - ), - ], - active=self.fake.boolean(), - type=[ - CodeableConcept( - coding=[ - Coding( - system="http://example.org/org-type", - code=self.fake.random_element( - elements=("hospital", "clinic", "pharmacy", "lab") - ), - display=self.fake.random_element( - elements=("Hospital", "Clinic", "Pharmacy", "Lab") - ), - ) - ] - ) - ], - name=self.fake.company(), - contact=[self.generate_fake_contact()], - address=[self.generate_address()], - endpoint=( - [{"reference": f"Endpoint/{endpoint_id}"}] + id=res_id, + endpoint=[ + {"reference": f"Endpoint/{endpoint_id}"} + for endpoint_id in endpoint_ids if endpoint_id is not None - else None - ), + ] + if endpoint_ids is not None + else None, partOf=( {"reference": f"Organization/{part_of}"} if part_of is not None @@ -70,357 +35,213 @@ def generate_organization( ), ) - def generate_fake_contact(self) -> OrganizationContact: - return OrganizationContact( - name=HumanName( - use=self.fake.random_element( - elements=( - "usual", - "official", - "temp", - "nickname", - "anonymous", - "old", - "maiden", - ) - ), - text=self.fake.name(), - family=self.fake.last_name(), - given=[self.fake.first_name()], - ), - address=self.generate_address(), - ) - - def generate_address(self): - return Address( - use=self.fake.random_element( - elements=("home", "work", "temp", "old", "billing") - ), - type=self.fake.random_element(elements=("postal", "physical", "both")), - text=self.fake.address(), - city=self.fake.city(), - state=self.fake.state_abbr(), - postalCode=self.fake.postcode(), - country=self.fake.country(), - ) - def generate_endpoint( self, + res_id: str | None = None, org_fhir_id: str | None = None, ) -> Endpoint: return Endpoint( - identifier=[ - (Identifier(system="http://example.org/endpoint", value=str(uuid4()))), - ], + id=res_id, connectionType=Coding( system="http://example.org/connection-type", - code=self.fake.random_element( - elements=("hl7-fhir-rest", "hl7-fhir-messaging") - ), - display=self.fake.random_element( - elements=("HL7 FHIR REST", "HL7 FHIR Messaging") - ), + code="hl7-fhir-rest", + display="HL7 FHIR REST", ), - name=self.fake.company(), payloadType=[ CodeableConcept( coding=[ Coding( system="http://example.org/payload-type", - code=self.fake.random_element( - elements=( - "application/fhir+json", - "application/fhir+xml", - ) - ), - display=self.fake.random_element( - elements=("FHIR JSON", "FHIR XML") - ), + code="application/fhir+json", + display="FHIR JSON", ) ] ) ], managingOrganization=( - Reference.construct(reference="Organization/" + str(org_fhir_id)) - if org_fhir_id - else None - ), - address=self.fake.uri(), - header=[ - self.fake.random_element( - elements=( - "Authorization: Bearer token", - "Content-Type: application/fhir+json", - ) - ) - ], - payloadMimeType=[ - self.fake.random_element( - elements=("application/fhir+json", "application/fhir+xml") - ) - ], - status=( - self.fake.random_element( - elements=("active", "suspended", "error", "off", "entered-in-error") - ) + {"reference": f"Organization/{org_fhir_id}"} if org_fhir_id else None ), + address="http://example.org/", + status="active", ) def generate_location( self, - part_of: UUID | None = None, - endpoint_id: UUID | None = None, - organization_id: UUID | None = None, + endpoint_ids: List[str | None] = None, + res_id: str | None = None, + part_of_location: str | None = None, + managing_org: str | None = None, ) -> Location: return Location( - identifier=[ - Identifier(system="http://example.org/location", value=str(uuid4())) - ], - status=self.fake.random_element( - elements=("active", "suspended", "inactive") - ), - name=self.fake.company(), - description=self.fake.sentence(), + id=res_id, managingOrganization=( - Reference.construct(reference="Organization/" + str(organization_id)) - if organization_id - else None + {"reference": f"Organization/{managing_org}"} if managing_org else None ), partOf=( - Reference.construct(reference="Location/" + str(part_of)) - if part_of is not None + {"reference": f"Location/{part_of_location}"} + if part_of_location is not None else None ), - address=self.generate_address(), - endpoint=( - [Reference.construct(reference="Endpoint/" + str(endpoint_id))] + endpoint=[ + {"reference": f"Endpoint/{endpoint_id}"} + for endpoint_id in endpoint_ids if endpoint_id is not None - else None - ), + ] + if endpoint_ids is not None + else None, ) def generate_healthcare_service( self, - provided_by: UUID | None = None, - location_id: UUID | None = None, - coverage_area: UUID | None = None, - endpoint_id: UUID | None = None, + location_ids: List[str | None] = None, + coverage_area_locations: List[str | None] = None, + endpoint_ids: List[str | None] = None, + res_id: str | None = None, + provided_by_org: str | None = None, ) -> HealthcareService: return HealthcareService( - identifier=[ - Identifier( - system="http://example.org/healthcare-service", value=str(uuid4()) - ) - ], + id=res_id, providedBy=( - Reference.construct(reference="Organization/" + str(provided_by)) - if provided_by is not None + {"reference": f"Organization/{provided_by_org}"} + if provided_by_org is not None else None ), - location=( - [Reference.construct(reference="Location/" + str(location_id))] + location=[ + {"reference": f"Location/{location_id}"} + for location_id in location_ids if location_id is not None - else None - ), - name=self.fake.company(), - comment=self.fake.sentence(), - extraDetails=self.fake.sentence(), - coverageArea=( - [Reference.construct(reference="Location/" + str(coverage_area))] - if coverage_area is not None - else None - ), - appointmentRequired=self.fake.boolean(), - endpoint=( - [Reference.construct(reference="Endpoint/" + str(endpoint_id))] + ] + if location_ids is not None + else None, + coverageArea=[ + {"reference": f"Location/{location_id}"} + for location_id in coverage_area_locations + if location_id is not None + ] + if coverage_area_locations is not None + else None, + endpoint=[ + {"reference": f"Endpoint/{endpoint_id}"} + for endpoint_id in endpoint_ids if endpoint_id is not None - else None - ), + ] + if endpoint_ids is not None + else None, ) def generate_organization_affiliation( self, - organization_id: UUID | None = None, - participating_organization_id: UUID | None = None, - network_id: list[UUID] = None, - location_id: UUID | None = None, - healthcare_service_id: UUID | None = None, - endpoint_id: UUID | None = None, + res_id: str | None = None, + organization_id: str | None = None, + participating_organization_id: str | None = None, + network_org_ids: List[str | None] = None, + location_ids: List[str | None] = None, + healthcare_service_ids: List[str | None] = None, + endpoint_ids: List[str | None] = None, ) -> OrganizationAffiliation: - if network_id is None: - network_id = [] return OrganizationAffiliation( - identifier=[ - Identifier( - system="http://example.org/organization-affiliation", - value=str(uuid4()), - ) - ], - active=self.fake.boolean(), + id=res_id, organization=( - (Reference.construct(reference="Organization/" + str(organization_id))) + {"reference": f"Organization/{organization_id}"} if organization_id is not None else None ), participatingOrganization=( - ( - Reference.construct( - reference="Organization/" + str(participating_organization_id) - ) - ) + {"reference": f"Organization/{participating_organization_id}"} if participating_organization_id is not None else None ), network=[ - Reference.construct(reference="Organization/" + str(network)) - for network in network_id - ], - location=( - [Reference.construct(reference="Location/" + str(location_id))] + {"reference": f"Organization/{org_id}"} + for org_id in network_org_ids + if org_id is not None + ] + if network_org_ids is not None + else None, + location=[ + {"reference": f"Location/{location_id}"} + for location_id in location_ids if location_id is not None - else None - ), - healthcareService=( - [ - Reference.construct( - reference="HealthcareService/" + str(healthcare_service_id) - ) - ] + ] + if location_ids is not None + else None, + healthcareService=[ + {"reference": f"HealthcareService/{healthcare_service_id}"} + for healthcare_service_id in healthcare_service_ids if healthcare_service_id is not None - else None - ), - endpoint=( - [Reference.construct(reference="Endpoint/" + str(endpoint_id))] + ] + if healthcare_service_ids is not None + else None, + endpoint=[ + {"reference": f"Endpoint/{endpoint_id}"} + for endpoint_id in endpoint_ids if endpoint_id is not None - else None - ), + ] + if endpoint_ids is not None + else None, ) - def generate_practioner( + def generate_practitioner( self, - organization_id: UUID | None = None, + res_id: str | None = None, + qualification_issuer_org_ids: List[str | None] = None, ) -> Practitioner: return Practitioner( - identifier=[ - Identifier(system="http://example.org/practitioner", value=str(uuid4())) - ], - active=self.fake.boolean(), - name=[ - HumanName( - use=self.fake.random_element( - elements=( - "usual", - "official", - "temp", - "nickname", - "anonymous", - "old", - "maiden", - ) - ), - text=self.fake.name(), - family=self.fake.last_name(), - given=[self.fake.first_name()], - ) - ], - address=[self.generate_address()], - qualification=[self.generate_practitioner_qualification(organization_id)], + id=res_id, + qualification=[ + self.generate_practitioner_qualification(qualification_issuer_org_id) + for qualification_issuer_org_id in qualification_issuer_org_ids + if qualification_issuer_org_id is not None + ] + if qualification_issuer_org_ids is not None + else None, ) def generate_practitioner_qualification( - self, organization_id: UUID | None = None + self, qualification_issuer_org_id: str | None = None ) -> PractitionerQualification: return PractitionerQualification( code=CodeableConcept( coding=[ Coding( system="http://example.org/practitioner-qualification", - code=self.fake.random_element( - elements=("MD", "DO", "NP", "PA", "RN", "LPN") - ), - display=self.fake.random_element( - elements=( - "Doctor of Medicine", - "Doctor of Osteopathy", - "Nurse Practitioner", - "Physician Assistant", - "Registered Nurse", - "Licensed Practical Nurse", - ) - ), + code="MD", + display="Doctor of Medicine", ) ] ), - issuer=( - Reference.construct(reference="Organization/" + str(organization_id)) - if organization_id is not None - else None - ), - identifier=[ - Identifier( - system="http://example.org/practitioner-qualification", - value=str(uuid4()), - ) - ], + issuer={"reference": f"Organization/{qualification_issuer_org_id}"} + if qualification_issuer_org_id is not None + else None, ) def generate_practitioner_role( self, - practitioner_id: UUID | None = None, - organization_id: UUID | None = None, - location_id: UUID | None = None, - healthcare_service_id: UUID | None = None, - endpoint_id: UUID | None = None, + res_id: str | None = None, + practitioner_id: str | None = None, + organization_id: str | None = None, + location_ids: List[str | None] = None, + healthcare_service_ids: List[str | None] = None, + endpoint_ids: List[str | None] = None, ) -> PractitionerRole: return PractitionerRole( - identifier=[ - Identifier( - system="http://example.org/practitioner-role", value=str(uuid4()) - ) - ], - active=self.fake.boolean(), - practitioner=( - Reference.construct(reference="Practitioner/" + str(practitioner_id)) - if practitioner_id is not None - else None - ), - organization=( - Reference.construct(reference="Organization/" + str(organization_id)) - if organization_id is not None - else None - ), - code=[ - CodeableConcept( - coding=[ - Coding( - system="http://example.org/practitioner-role", - code=self.fake.random_element( - elements=("doctor", "nurse", "pharmacist", "researcher") - ), - display=self.fake.random_element( - elements=("Doctor", "Nurse", "Pharmacist", "Researcher") - ), - ) - ] - ) - ], - location=( - [Reference.construct(reference="Location/" + str(location_id))] - if location_id is not None - else None - ), - healthcareService=( - [ - Reference.construct( - reference="HealthcareService/" + str(healthcare_service_id) - ) - ] - if healthcare_service_id is not None - else None - ), - endpoint=( - [Reference.construct(reference="Endpoint/" + str(endpoint_id))] - if endpoint_id is not None - else None - ), + id=res_id, + practitioner={"reference": f"Practitioner/{practitioner_id}"} + if practitioner_id is not None + else None, + organization={"reference": f"Organization/{organization_id}"} + if organization_id is not None + else None, + location=[ + {"reference": f"Location/{location_id}"} + for location_id in location_ids if location_id is not None + ] if location_ids is not None else None, + healthcareService=[ + {"reference": f"HealthcareService/{healthcare_service_id}"} + for healthcare_service_id in healthcare_service_ids if healthcare_service_id is not None + ] if healthcare_service_ids is not None else None, + endpoint=[ + {"reference": f"Endpoint/{endpoint_id}"} + for endpoint_id in endpoint_ids if endpoint_id is not None + ] if endpoint_ids is not None else None, ) diff --git a/tests/conftest.py b/tests/conftest.py index 560ac513..7d6673f0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,8 @@ from typing import Any, Dict import copy +import pathlib +from typing import Any + from collections.abc import Generator from fastapi import FastAPI from fastapi.testclient import TestClient @@ -13,9 +16,20 @@ from app.services.fhir.fhir_service import FhirService from tests.test_config import get_test_config from tests.mock_data import organization, endpoint, location +import os +import shutil +# Don't search for tests in the mock_data directory +def pytest_ignore_collect(collection_path: pathlib.Path, config: Any) -> bool: + return "tests/mock_data" in str(collection_path) -@pytest.fixture() +# Disable the test if running in GitHub Actions +def pytest_collection_modifyitems(config: Any, items: Any) -> None: + if os.getenv('GITHUB_ACTIONS') == 'true': + if "test_consumer_update_with_timing" in str(items): + items[:] = [item for item in items if "test_consumer_update_with_timing" not in str(item)] + +@pytest.fixture def database() -> Generator[Database, Any, None]: try: db = Database("sqlite:///:memory:") @@ -63,3 +77,10 @@ def mock_ep() -> Dict[str, Any]: @pytest.fixture() def mock_location() -> Dict[str, Any]: return copy.deepcopy(location) + +# Disable this if you want the mock data to persist after tests +@pytest.fixture(scope="session", autouse=True) +def cleanup_after_tests() -> Generator[None, None, None]: + yield + if os.path.exists("tests/mock_data"): + shutil.rmtree("tests/mock_data") diff --git a/tests/services/test_consumer_update.py b/tests/services/test_consumer_update.py new file mode 100644 index 00000000..9ac5650a --- /dev/null +++ b/tests/services/test_consumer_update.py @@ -0,0 +1,170 @@ +from datetime import datetime +import json +import time +import threading +import psutil +import pytest +import os +from fastapi.testclient import TestClient +from unittest.mock import patch +from fastapi.responses import JSONResponse +from app.container import get_database +from app.models.fhir.r4.types import Bundle +from utils.utils import ( + check_if_in_db, + create_file_structure, + generate_report, +) + +from app.stats import Stats, get_stats + +MOCK_DATA_PATH = "tests/mock_data" +TEST_RESULTS_PATH = "tests/testing_results" + +monitor_data = [] +monitoring = False +iteration = 0 + + +def monitor_resources(interval: float = 0.01) -> None: + process = psutil.Process() + while monitoring: + mem = process.memory_info().rss / 1024**2 + cpu = process.cpu_percent(interval=None) + io = process.io_counters() + monitor_data.append( + { + "memory_mb": mem, + "cpu_percent": cpu, + "read_bytes": io.read_bytes, + "write_bytes": io.write_bytes, + } + ) + time.sleep(interval) + +def mock_get_resource_history( + supplier_id: str, + resource_type: str | None = None, + resource_id: str | None = None, + _since: datetime | None = None, + ) -> Bundle: + def _read_mock_data(file_path: str) -> Bundle: + with open(file_path, "r") as file: + return Bundle(**json.load(file)) + global iteration + with get_stats().timer(f"{iteration}.mock_get_resource_history"): + if resource_type is None and resource_id is None: + print("Total history bundle is requested") + return _read_mock_data( + f"{MOCK_DATA_PATH}/all_resources_history.json" + ) + if resource_id is None: + print( + f"All from resourcetype history bundle is requested: {resource_type}" + ) + return _read_mock_data( + f"{MOCK_DATA_PATH}/{resource_type}/{resource_type}_history.json" + ) + # print(f"Specific resource history bundle is requested: {resource_type} - {resource_id}") + return _read_mock_data( + f"{MOCK_DATA_PATH}/{resource_type}/{resource_id}/{resource_id}_history.json" + ) + +@pytest.mark.filterwarnings("ignore:Pydantic serializer warnings") +@pytest.mark.parametrize( + "resource_count, version_count, max_depth, test_name", + [ + (1, 1, 1, "very_simple"), + (2, 2, 1, "no_depth"), + (5, 3, 1, "more_resources"), + # (5, 3, 2, "just_a_little_depth"), + # (5, 3, 4, "medium"), + # (5, 3, 5, "difficult"), + # (5, 3, 6, "hard"), + # (5, 3, 7, "very_hard"), + # (5, 3, 8, "extreme"), + # (5, 3, 9, "crazy"), + # (5, 3, 10, "insane"), + # (10, 5, 10, "impossible"), + ], +) +@patch( + "app.services.request_services.supplier_request_service.SupplierRequestsService.get_resource_history", + side_effect=mock_get_resource_history, +) +@patch( + "app.services.request_services.consumer_request_service.ConsumerRequestService.post_bundle", + return_value=JSONResponse({"message": "ok"}), +) +def test_consumer_update_with_timing( + mock_post_bundle: JSONResponse, + mock_get_resource_history: Bundle, + api_client: TestClient, resource_count: int, version_count: int, max_depth: int, test_name: str +) -> None: + global monitoring + monitoring = True + + global monitor_data + monitor_data = [] + + errors = 0 + iterations = 30 + global iteration + + + print(f"Running test: {test_name}") + db = get_database() + db.truncate_tables() + + create_file_structure( + base_path=f"{MOCK_DATA_PATH}", + resource_count=resource_count, + version_count=version_count, + max_depth=max_depth, + ) + + monitor_thread = threading.Thread(target=monitor_resources) + try: + monitor_thread.start() + for iteration in range(iterations): + print(f"Iteration {iteration + 1}/{iterations}") + response = api_client.post("/update_resources/test-supplier") + print("Update done: mCSD resources are updated") + assert response.status_code == 200 + assert response.json()["message"] == "organizations and endpoints are updated" + + finally: + monitoring = False + monitor_thread.join() + + ids_set = set() + with open( + f"{MOCK_DATA_PATH}/all_resources_history.json", "r" + ) as file: + bundle_data = json.load(file) + for entry in bundle_data["entry"]: + res_id = entry["resource"]["id"] + ids_set.add(res_id) + + errors += check_if_in_db(ids_set=ids_set) + + print("Generating test report") + + stats = get_stats() + assert isinstance(stats, Stats) + assert hasattr(stats, "client") + report = stats.client.get_memory() + + durations = [] + for idx, item in enumerate(report["mcsd.update_supplier"]): + durations.append(item - sum(report[f"{idx}.mock_get_resource_history"])) + + report = generate_report( + test_name, durations, iterations, monitor_data, total_resources=len(ids_set) + ) + + + if not os.path.exists(TEST_RESULTS_PATH): + os.makedirs(TEST_RESULTS_PATH) + with open(f"{TEST_RESULTS_PATH}/test_{test_name}.json", "w") as f: + json.dump(report, f, indent=4) diff --git a/tests/services/test_new_update.py b/tests/services/test_new_update.py new file mode 100644 index 00000000..c06c5d7c --- /dev/null +++ b/tests/services/test_new_update.py @@ -0,0 +1,232 @@ +import json +import os +import time +import threading +import psutil +import pytest + +from typing import Any, Dict, List +from fastapi.testclient import TestClient +from unittest.mock import patch +import requests +from yarl import URL +from app.container import get_database +from fhir.resources.R4B.bundle import Bundle, BundleEntry +from utils.utils import ( + check_if_in_db, + create_file_structure, + generate_report, +) + +from app.stats import Stats, get_stats + +MOCK_DATA_PATH = "tests/mock_data" +TEST_RESULTS_PATH = "tests/new_testing_results" + +monitor_data = [] +monitoring = False +iteration = 0 + + +def monitor_resources(interval: float = 0.01) -> None: + process = psutil.Process() + while monitoring: + mem = process.memory_info().rss / 1024**2 + cpu = process.cpu_percent(interval=None) + io = process.io_counters() + monitor_data.append( + { + "memory_mb": mem, + "cpu_percent": cpu, + "read_bytes": io.read_bytes, + "write_bytes": io.write_bytes, + } + ) + time.sleep(interval) + +def _read_mock_data(file_path: str) -> Bundle: + with open(file_path, "r") as file: + return Bundle(**json.load(file)) + +def mock_do_request(method: str, url: URL, json_data: Dict[str, Any] | None = None) -> requests.Response: + global iteration + with get_stats().timer(f"{iteration}.mock_get_resource_history"): + if str(url) == "http://testserver/test": + return_bundle = Bundle(type="batch-response", entry=[]) + assert json_data is not None + for entry in json_data.get('entry', []): + if not entry.get('request'): # + continue + if entry['request'].get('method') == 'GET': + resource_type = entry['request'].get('url').split('/')[1] + file_path = f"{MOCK_DATA_PATH}/{resource_type}/{resource_type}_history.json" + bundle = _read_mock_data(file_path) + return_bundle.entry.append(BundleEntry(resource=bundle)) + response = requests.Response() + response.status_code = 200 + response._content = return_bundle.model_dump_json(indent=4).encode('utf-8') + return response + if str(url) == "http://testserver/consumer/test": + response = requests.Response() + assert json_data is not None + for entry in json_data.get('entry', []): + if not entry.get('request'): + continue + if entry['request'].get('method') == 'POST' or entry['request'].get('method') == 'PUT': + resource = entry['resource'] + store_consumer(resource) + response._content = b'{"resourceType": "Bundle", "type": "batch-response", "entry": []}' + response.status_code = 200 + if entry['request'].get('method') == 'GET': + resource_type = entry['request'].get('url').split('/')[1] + resource_id = entry['request'].get('url').split('/')[2] + resource = get_from_consumer(resource_type, resource_id) + + if resource is not None: + return_bundle = Bundle(type="batch-response", entry=[]) + return_bundle.entry.append(BundleEntry(resource=resource)) + response._content = return_bundle.model_dump_json(indent=4).encode('utf-8') + response.status_code = 200 + + if response.status_code != 200: + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({ + "resourceType": "OperationOutcome", + "issue": [ + { + "severity": "information", + "code": "informational", + "details": { + "text": f"Resource not found: method {method}, url {url}, json_data {json_data}" + } + } + ] + }).encode('utf-8') + return response + assert False, "Should not reach here" + +def store_consumer(resource: Dict[str, Any]) -> None: + if not os.path.exists(f"{MOCK_DATA_PATH}/consumer_data/{resource['resourceType']}"): + os.makedirs(f"{MOCK_DATA_PATH}/consumer_data/{resource['resourceType']}") + file_path = f"{MOCK_DATA_PATH}/consumer_data/{resource['resourceType']}/{resource['id']}.json" + with open(file_path, "w") as file: + json.dump(resource, file) + +def get_from_consumer(resource_type: str, resource_id: str) -> Any | None: + file_path = f"{MOCK_DATA_PATH}/consumer_data/{resource_type}/{resource_id}.json" + if not os.path.exists(file_path): + return None + with open(file_path, "r") as file: + return json.load(file) + +def mock_get_history_batch(url: URL) -> tuple[URL | None, List[BundleEntry]]: + global iteration + with get_stats().timer(f"{iteration}.mock_get_resource_history"): + resource_type = url.path.split("/")[2] # type:ignore + file_path = f"{MOCK_DATA_PATH}/{resource_type}/{resource_type}_history.json" + with open(file_path, "r") as file: + bundle = Bundle(**json.load(file)) + return None, bundle.entry + assert False, "Should not reach here" + return None, [] + +@pytest.mark.filterwarnings("ignore:Pydantic serializer warnings") +@pytest.mark.parametrize( + "resource_count, version_count, max_depth, test_name", + [ + (1, 1, 1, "very_simple"), + (2, 2, 1, "no_depth"), + (5, 3, 1, "more_resources"), + # (5, 3, 2, "just_a_little_depth"), + # (5, 3, 4, "medium"), + # (5, 3, 5, "difficult"), + # (5, 3, 6, "hard"), + # (5, 3, 7, "very_hard"), + # (5, 3, 8, "extreme"), + # (5, 3, 9, "crazy"), + # (5, 3, 10, "insane"), + # (10, 5, 10, "impossible"), + ], +) +@patch( + "app.services_new.api.fhir_api.FhirApi.get_history_batch", + side_effect=mock_get_history_batch, +) +@patch( + "app.services_new.api.api_service.AuthenticationBasedApi.do_request", + side_effect=mock_do_request, +) +def test_consumer_update_with_timing( + mock_do_request: requests.Response, + mock_get_history_batch: Bundle, + api_client: TestClient, resource_count: int, version_count: int, max_depth: int, test_name: str +) -> None: + global monitoring + monitoring = True + + global monitor_data + monitor_data = [] + + errors = 0 + iterations = 30 + global iteration + + + print(f"Running test: {test_name}") + db = get_database() + db.truncate_tables() + + create_file_structure( + base_path=f"{MOCK_DATA_PATH}", + resource_count=resource_count, + version_count=version_count, + max_depth=max_depth, + ) + + monitor_thread = threading.Thread(target=monitor_resources) + try: + monitor_thread.start() + for iteration in range(iterations): + print(f"Iteration {iteration + 1}/{iterations}") + _response = api_client.post("/update_resources/new/test-supplier") + print("Update done: mCSD resources are updated") + + # ok_status_code = response.status_code == 200 or False + # good_updated_message = response.json()[0]["message"] == "organizations and endpoints are updated" or False + # if ok_status_code is False or good_updated_message is False: + # errors += 1 + finally: + monitoring = False + monitor_thread.join() + + ids_set = set() + with open( + f"{MOCK_DATA_PATH}/all_resources_history.json", "r" + ) as file: + bundle_data = json.load(file) + for entry in bundle_data["entry"]: + res_id = entry["resource"]["id"] + ids_set.add(res_id) + + errors += check_if_in_db(ids_set=ids_set) + + print("Generating test report") + + stats = get_stats() + assert isinstance(stats, Stats) + assert hasattr(stats, "client") + report = stats.client.get_memory() + + durations = [] + for idx, item in enumerate(report["mcsd.update_supplier"]): + durations.append(item - sum(report[f"{idx}.mock_get_resource_history"])) + + report = generate_report( + test_name, durations, iterations, monitor_data, total_resources=len(ids_set) + ) + + if not os.path.exists(TEST_RESULTS_PATH): + os.makedirs(TEST_RESULTS_PATH) + with open(f"{TEST_RESULTS_PATH}/test_{test_name}.json", "w") as f: + json.dump(report, f, indent=4) \ No newline at end of file diff --git a/tests/test_config.py b/tests/test_config.py index ae563f13..f8e5fbab 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -40,7 +40,7 @@ def get_test_config() -> Config: ssl_key_file=None, ), mcsd=ConfigMcsd( - consumer_url="http://testserver/test", + consumer_url="http://testserver/consumer/test", authentication="off", request_count=20, strict_validation=False, @@ -52,7 +52,7 @@ def get_test_config() -> Config: tracer_name=None, ), stats=ConfigStats( - enabled=False, + enabled=True, host=None, port=None, module_name=None, diff --git a/tests/unit_tests/__init__.py b/tests/unit_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/test_stats.py b/tests/unit_tests/test_stats.py new file mode 100644 index 00000000..ed0f09ca --- /dev/null +++ b/tests/unit_tests/test_stats.py @@ -0,0 +1,65 @@ +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from app.config import set_config +from app.stats import MemoryClient, Statsd, StatsdMiddleware, setup_stats, get_stats +from get_test_config import get_test_config + +@pytest.fixture +def memory_client() -> MemoryClient: + return MemoryClient() + +def test_memory_client_gauge(memory_client: MemoryClient) -> None: + memory_client.gauge("test.metric", 100) + memory = memory_client.get_memory() + assert "test.metric" in memory + assert len(memory["test.metric"]) == 1 + assert memory["test.metric"][0]["value"] == 100 + +def test_memory_client_timing(memory_client: MemoryClient) -> None: + memory_client.timing("test.timing", 500) + memory = memory_client.get_memory() + assert "test.timing" in memory + assert len(memory["test.timing"]) == 1 + assert memory["test.timing"][0] == 500 + +def test_memory_client_incr(memory_client: MemoryClient) -> None: + memory_client.incr("test.counter") + memory = memory_client.get_memory() + # Since incr is not implemented to store data, this is a placeholder test + assert memory == {'test.counter': 1} + +def test_memory_client_decr(memory_client: MemoryClient) -> None: + memory_client.decr("test.counter") + memory = memory_client.get_memory() + # Since decr is not implemented to store data, this is a placeholder test + assert memory == {'test.counter': -1} + +def test_statsd_middleware() -> None: + test_conf = get_test_config() + test_conf.stats.enabled = True + test_conf.stats.host = None + test_conf.stats.port = None + test_conf.stats.module_name = "test_module" + set_config(test_conf) + + app = FastAPI() + + @app.get("/test") + async def test_endpoint() -> dict[str, str]: + return {"message": "ok"} + + app.add_middleware(StatsdMiddleware, module_name="test_module") + setup_stats() + client = TestClient(app) + + response = client.get("/test") + assert response.status_code == 200 + + stats = get_stats() + assert isinstance(stats, Statsd) + assert isinstance(stats.client, MemoryClient) + memory = stats.client.get_memory() + print(memory) + assert "test_module.http.request.get./test" in memory + assert "test_module.http.response_time" in memory \ No newline at end of file diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/mcsd_resource_gen.py b/tests/utils/mcsd_resource_gen.py new file mode 100644 index 00000000..06640b64 --- /dev/null +++ b/tests/utils/mcsd_resource_gen.py @@ -0,0 +1,325 @@ +from random import choice +from typing import Any, Dict +from uuid import uuid4 + +from app.services.request_services.supplier_request_service import McsdResources +from seeds.generate_data import DataGenerator +import os +import json +import copy + +dg = DataGenerator() + + +def setup_fhir_resource( + base_path: str, + mcsd_res_type: McsdResources, + max_depth: int, + version: int = 0, + resource_id: str | None = None, +) -> str | None: + if resource_id is None: + resource_id = str(uuid4()) + if max_depth <= 0: + return None + match mcsd_res_type: + case McsdResources.ORGANIZATION_AFFILIATION: + resource = dg.generate_organization_affiliation( + res_id=resource_id, + organization_id=setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + participating_organization_id=setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + network_org_ids=[ + setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + ], + location_ids=[ + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + ], + healthcare_service_ids=[ + setup_fhir_resource( + base_path, McsdResources.HEALTH_CARE_SERVICE, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.HEALTH_CARE_SERVICE, max_depth - 1, 0 + ), + ], + endpoint_ids=[ + setup_fhir_resource( + base_path, McsdResources.ENDPOINT, max_depth - 1, 0 + ) + ], + ).model_dump_json() + case McsdResources.PRACTITIONER_ROLE: + resource = dg.generate_practitioner_role( + res_id=resource_id, + practitioner_id=setup_fhir_resource( + base_path, McsdResources.PRACTITIONER, max_depth - 1, 0 + ), + organization_id=setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + location_ids=[ + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + ], + healthcare_service_ids=[ + setup_fhir_resource( + base_path, McsdResources.HEALTH_CARE_SERVICE, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.HEALTH_CARE_SERVICE, max_depth - 1, 0 + ), + ], + endpoint_ids=[ + setup_fhir_resource( + base_path, McsdResources.ENDPOINT, max_depth - 1, 0 + ) + ], + ).model_dump_json() + case McsdResources.HEALTH_CARE_SERVICE: + resource = dg.generate_healthcare_service( + res_id=resource_id, + location_ids=[ + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + ], + coverage_area_locations=[ + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + ], + provided_by_org=setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + endpoint_ids=[ + setup_fhir_resource( + base_path, McsdResources.ENDPOINT, max_depth - 1, 0 + ) + ], + ).model_dump_json() + case McsdResources.LOCATION: + resource = dg.generate_location( + res_id=resource_id, + part_of_location=setup_fhir_resource( + base_path, McsdResources.LOCATION, max_depth - 1, 0 + ), + endpoint_ids=[ + setup_fhir_resource( + base_path, McsdResources.ENDPOINT, max_depth - 1, 0 + ) + ], + managing_org=setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + ).model_dump_json() + case McsdResources.PRACTITIONER: + resource = dg.generate_practitioner( + res_id=resource_id, + qualification_issuer_org_ids=[ + setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + ], + ).model_dump_json() + case McsdResources.ENDPOINT: + resource = dg.generate_endpoint( + res_id=resource_id, + org_fhir_id=setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + ).model_dump_json() + case McsdResources.ORGANIZATION: + resource = dg.generate_organization( + res_id=resource_id, + endpoint_ids=[ + setup_fhir_resource( + base_path, McsdResources.ENDPOINT, max_depth - 1, 0 + ) + ], + part_of=setup_fhir_resource( + base_path, McsdResources.ORGANIZATION, max_depth - 1, 0 + ), + ).model_dump_json() + save_resource_version( + resource_id_folder=f"{base_path}/{mcsd_res_type.value}/{resource_id}", + resource=resource, + version=version, + ) + return resource_id + + +def save_resource_version( + resource_id_folder: str, resource: Dict[str, Any], version: int = 0 +) -> None: + os.makedirs(resource_id_folder, exist_ok=True) + version_file = os.path.join(resource_id_folder, f"{version + 1}.json") + with open(version_file, "w") as f: + if isinstance(resource, str): + resource = json.loads(resource) + json.dump(resource, f, indent=4) + + +def generate_post_bundle(base_path: str) -> None: + global_post_bundle = { + "resourceType": "Bundle", + "id": str(uuid4()), + "type": "transaction", + "total": 0, + "entry": [], + } + global_post_bundle_2 = { + "resourceType": "Bundle", + "id": str(uuid4()), + "type": "transaction", + "total": 0, + "entry": [], + } + for resource_type in McsdResources: + resource_folder = os.path.join(base_path, resource_type.value) + for resource_id in os.listdir(resource_folder): + resource_id_folder = os.path.join(resource_folder, resource_id) + if "_history.json" in resource_id_folder: + continue + versions = sorted(os.listdir(resource_id_folder)) + for version in versions: + if "_history.json" in version: + continue + version_file = os.path.join(resource_id_folder, version) + with open(version_file, "r") as f: + resource = json.load(f) + entry = { + "fullUrl": f"{resource_type.value}/{resource_id}", + "resource": resource, + "request": { + "method": "PUT", + "url": f"{resource_type.value}/{resource_id}", + }, + } + resource_2 = copy.deepcopy(resource) + resource_2["identifier"] = [{ + "system": f"http://example.com/{str(uuid4())}", + "value": str(uuid4()) + }] + entry_2 = { + "fullUrl": f"{resource_type.value}/{resource_id}", + "resource": resource_2, + "request": { + "method": "PUT", + "url": f"{resource_type.value}/{resource_id}", + }, + } + assert isinstance(global_post_bundle["entry"], list) + assert isinstance(global_post_bundle["total"], int) + + global_post_bundle["entry"].append(entry) + global_post_bundle["total"] += 1 + + assert isinstance(global_post_bundle_2["entry"], list) + assert isinstance(global_post_bundle_2["total"], int) + + global_post_bundle_2["entry"].append(entry_2) + global_post_bundle_2["total"] += 1 + break + with open(os.path.join(base_path, "post_resources.json"), "w") as f: + json.dump(global_post_bundle, f, indent=4) + with open(os.path.join(base_path, "post_resources_2.json"), "w") as f: + json.dump(global_post_bundle_2, f, indent=4) + + +def generate_history_bundles(base_path: str) -> None: + global_history_bundle = { + "resourceType": "Bundle", + "id": str(uuid4()), + "type": "history", + "total": 0, + "entry": [], + } + + for resource_type in McsdResources: + resource_folder = os.path.join(base_path, resource_type.value) + resource_history_bundle = { + "resourceType": "Bundle", + "id": str(uuid4()), + "type": "history", + "total": 0, + "entry": [], + } + for resource_id in os.listdir(resource_folder): + resource_id_folder = os.path.join(resource_folder, resource_id) + versions = sorted(os.listdir(resource_id_folder)) + history_bundle = { + "resourceType": "Bundle", + "id": str(uuid4()), + "type": "history", + "total": 0, + "entry": [], + } + + for version in versions: + version_file = os.path.join(resource_id_folder, version) + with open(version_file, "r") as f: + resource = json.load(f) + methods = ["POST"] + if version.replace(".json", "") != "1": + methods.extend(["PUT", "DELETE"]) + entry = { + "fullUrl": f"{resource_type.value}/{resource_id}/{version.replace('.json', '')}", + "resource": resource, + "request": { + "method": choice(methods), + "url": f"{resource_type.value}/{resource_id}/{version.replace('.json', '')}", + }, + } + assert isinstance(history_bundle["entry"], list) + assert isinstance(resource_history_bundle["entry"], list) + assert isinstance(global_history_bundle["entry"], list) + assert isinstance(history_bundle["total"], int) + assert isinstance(resource_history_bundle["total"], int) + assert isinstance(global_history_bundle["total"], int) + history_bundle["entry"].append(entry) + history_bundle["total"] += 1 + resource_history_bundle["entry"].append(entry) + resource_history_bundle["total"] += 1 + global_history_bundle["entry"].append(entry) + global_history_bundle["total"] += 1 + + with open( + os.path.join( + resource_folder, resource_id, f"{resource_id}_history.json" + ), + "w", + ) as f: + json.dump(history_bundle, f, indent=4) + with open( + os.path.join(resource_folder, f"{resource_type.value}_history.json"), "w" + ) as f: + json.dump(resource_history_bundle, f, indent=4) + with open(os.path.join(base_path, "all_resources_history.json"), "w") as f: + json.dump(global_history_bundle, f, indent=4) diff --git a/tests/utils/utils.py b/tests/utils/utils.py new file mode 100644 index 00000000..98f694d9 --- /dev/null +++ b/tests/utils/utils.py @@ -0,0 +1,92 @@ +import os +import platform +import shutil +from typing import Any, Dict, List, Set +from uuid import uuid4 + +import numpy as np +import psutil +from app.container import get_database +from app.db.db import Database +from app.db.repositories.resource_map_repository import ResourceMapRepository +from app.services.request_services.supplier_request_service import McsdResources +from .mcsd_resource_gen import generate_history_bundles, generate_post_bundle, setup_fhir_resource + +def create_file_structure( + base_path: str, resource_count: int, version_count: int, max_depth: int = 1 +) -> None: + shutil.rmtree(base_path, ignore_errors=True) + os.makedirs(base_path, exist_ok=True) + + for resource_type in McsdResources: + resource_folder = os.path.join(base_path, resource_type.value) + os.makedirs(resource_folder, exist_ok=True) + + for _ in range(resource_count): + resource_id = str(uuid4()) + for version in range(version_count): + setup_fhir_resource( + base_path, + resource_type, + max_depth=max_depth, + version=version, + resource_id=resource_id, + ) + + generate_history_bundles(base_path) + generate_post_bundle(base_path) + +def generate_report(test_name: str, durations: List[float], iterations: int, monitor_data: List[Dict[str, Any]], total_resources: int) -> Dict[str, Any]: + memory = [x["memory_mb"] for x in monitor_data] + cpu = [x["cpu_percent"] for x in monitor_data] + reads = [x["read_bytes"] for x in monitor_data] + writes = [x["write_bytes"] for x in monitor_data] + + def summarize(values: List[float]) -> Dict[str, Any]: + return { + "p1": np.percentile(values, 1), + "p50": np.percentile(values, 50), + "p75": np.percentile(values, 75), + "p99": np.percentile(values, 99), + "max": max(values), + } + + return { + "test_name": test_name, + "hardware": { + "cpu": platform.processor(), + "cores": psutil.cpu_count(logical=False), + "logical_cores": psutil.cpu_count(logical=True), + "ram_gb": round(psutil.virtual_memory().total / (1024**3), 2), + "platform": platform.platform(), + }, + "iterations": iterations, + "duration_milliseconds": summarize(durations), + "total_resources": total_resources, + "resource_usage_summary": { + "cpu_percent": summarize(cpu), + "memory_mb": summarize(memory), + "disk_read_mb": summarize([b / 1024**2 for b in reads]), + "disk_write_mb": summarize([b / 1024**2 for b in writes]), + }, + } + + +def check_if_in_db(ids_set: Set[str]) -> int: + errors = 0 + print("Checking if all resources are in the database") + db = get_database() + for _id in ids_set: + if not check_if_id_in_database(_id, db): + errors += 1 + return errors + +def check_if_id_in_database(id: str, db: Database) -> bool: + with db.get_db_session() as session: + repo = session.get_repository(ResourceMapRepository) + result = repo.find(supplier_resource_id=id) + # print(f"finding {id} in database") + for resource_map in result: + if id == resource_map.supplier_resource_id: + return True + return False \ No newline at end of file From db5ba77b2815fe90e73a7ad525d9ec4531f878bd Mon Sep 17 00:00:00 2001 From: Thomas Bosch Date: Thu, 1 May 2025 13:09:14 +0200 Subject: [PATCH 2/5] Resolved some comments --- app/application.py | 3 ++- app/stats.py | 6 ++---- seeds/seed_hapi.py | 1 - tests/services/test_consumer_update.py | 3 +-- tests/services/test_new_update.py | 7 +------ tests/utils/utils.py | 3 +-- 6 files changed, 7 insertions(+), 16 deletions(-) diff --git a/app/application.py b/app/application.py index 9daebf97..338e7e10 100644 --- a/app/application.py +++ b/app/application.py @@ -109,7 +109,8 @@ def setup_fastapi() -> FastAPI: fastapi.include_router(router) stats_conf = get_config().stats - if stats_conf.enabled and stats_conf.host is not None and stats_conf.port is not None: + keep_in_memory = (stats_conf.enabled and stats_conf.host is not None and stats_conf.port is not None) or False + if keep_in_memory: fastapi.add_middleware(StatsdMiddleware, module_name=stats_conf.module_name) return fastapi diff --git a/app/stats.py b/app/stats.py index f7ff59ee..45f48344 100644 --- a/app/stats.py +++ b/app/stats.py @@ -112,10 +112,8 @@ def setup_stats() -> None: if config.stats.enabled is False: return - if config.stats.host is not None and config.stats.port is not None: - client = statsd.StatsClient(config.stats.host, config.stats.port) - else: - client = MemoryClient() + in_memory = (config.stats.host is None and config.stats.port is None) or False + client = MemoryClient() if in_memory else statsd.StatsClient(config.stats.host, config.stats.port) global _STATS _STATS = Statsd(client) diff --git a/seeds/seed_hapi.py b/seeds/seed_hapi.py index 0ded140d..3f4d32de 100644 --- a/seeds/seed_hapi.py +++ b/seeds/seed_hapi.py @@ -6,7 +6,6 @@ generator = DataGenerator() def generate_data(): - print(sys.argv) if len(sys.argv) != 2: raise ValueError("Usage: python seed_hapi.py \n For example: python seed_hapi.py http://example.com/fhir/") diff --git a/tests/services/test_consumer_update.py b/tests/services/test_consumer_update.py index 9ac5650a..6dcdf15d 100644 --- a/tests/services/test_consumer_update.py +++ b/tests/services/test_consumer_update.py @@ -60,12 +60,11 @@ def _read_mock_data(file_path: str) -> Bundle: ) if resource_id is None: print( - f"All from resourcetype history bundle is requested: {resource_type}" + f"All history bundle is requested for: {resource_type}" ) return _read_mock_data( f"{MOCK_DATA_PATH}/{resource_type}/{resource_type}_history.json" ) - # print(f"Specific resource history bundle is requested: {resource_type} - {resource_id}") return _read_mock_data( f"{MOCK_DATA_PATH}/{resource_type}/{resource_id}/{resource_id}_history.json" ) diff --git a/tests/services/test_new_update.py b/tests/services/test_new_update.py index c06c5d7c..296fb4d4 100644 --- a/tests/services/test_new_update.py +++ b/tests/services/test_new_update.py @@ -55,7 +55,7 @@ def mock_do_request(method: str, url: URL, json_data: Dict[str, Any] | None = No return_bundle = Bundle(type="batch-response", entry=[]) assert json_data is not None for entry in json_data.get('entry', []): - if not entry.get('request'): # + if not entry.get('request'): continue if entry['request'].get('method') == 'GET': resource_type = entry['request'].get('url').split('/')[1] @@ -191,11 +191,6 @@ def test_consumer_update_with_timing( print(f"Iteration {iteration + 1}/{iterations}") _response = api_client.post("/update_resources/new/test-supplier") print("Update done: mCSD resources are updated") - - # ok_status_code = response.status_code == 200 or False - # good_updated_message = response.json()[0]["message"] == "organizations and endpoints are updated" or False - # if ok_status_code is False or good_updated_message is False: - # errors += 1 finally: monitoring = False monitor_thread.join() diff --git a/tests/utils/utils.py b/tests/utils/utils.py index 98f694d9..fa9fa692 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -1,11 +1,11 @@ import os import platform import shutil +import psutil from typing import Any, Dict, List, Set from uuid import uuid4 import numpy as np -import psutil from app.container import get_database from app.db.db import Database from app.db.repositories.resource_map_repository import ResourceMapRepository @@ -85,7 +85,6 @@ def check_if_id_in_database(id: str, db: Database) -> bool: with db.get_db_session() as session: repo = session.get_repository(ResourceMapRepository) result = repo.find(supplier_resource_id=id) - # print(f"finding {id} in database") for resource_map in result: if id == resource_map.supplier_resource_id: return True From 39a6b2cea9048dbf7b74ec07297816ffea3afde7 Mon Sep 17 00:00:00 2001 From: Thomas Bosch Date: Wed, 7 May 2025 14:21:01 +0200 Subject: [PATCH 3/5] Removed old tests --- app/application.py | 2 +- app/routers/update_router.py | 3 +- .../mcsd_services/update_consumer_service.py | 241 ------------------ tests/services/test_consumer_update.py | 169 ------------ .../{test_new_update.py => test_update.py} | 11 +- tests/test_config.py | 2 +- tests/unit_tests/test_stats.py | 2 +- tests/utils/mcsd_resource_gen.py | 2 +- tests/utils/utils.py | 2 +- 9 files changed, 13 insertions(+), 421 deletions(-) delete mode 100644 app/services/mcsd_services/update_consumer_service.py delete mode 100644 tests/services/test_consumer_update.py rename tests/services/{test_new_update.py => test_update.py} (96%) diff --git a/app/application.py b/app/application.py index 338e7e10..6bb4752a 100644 --- a/app/application.py +++ b/app/application.py @@ -109,7 +109,7 @@ def setup_fastapi() -> FastAPI: fastapi.include_router(router) stats_conf = get_config().stats - keep_in_memory = (stats_conf.enabled and stats_conf.host is not None and stats_conf.port is not None) or False + keep_in_memory = not (stats_conf.enabled and stats_conf.host is not None and stats_conf.port is not None) or False if keep_in_memory: fastapi.add_middleware(StatsdMiddleware, module_name=stats_conf.module_name) diff --git a/app/routers/update_router.py b/app/routers/update_router.py index 61b3d0d7..49852205 100644 --- a/app/routers/update_router.py +++ b/app/routers/update_router.py @@ -19,9 +19,10 @@ class UpdateQueryParams(BaseModel): since: datetime | None = Field(default=None) -@get_stats().timer("update_resources") + @router.post("/{supplier_id}", response_model=None, summary="Update by supplier ID") @router.post("", response_model=None, summary="Update all suppliers") +@get_stats().timer("update_resources") def update_supplier( supplier_id: str | None = None, query_params: UpdateQueryParams = Depends(), diff --git a/app/services/mcsd_services/update_consumer_service.py b/app/services/mcsd_services/update_consumer_service.py deleted file mode 100644 index 45ac4044..00000000 --- a/app/services/mcsd_services/update_consumer_service.py +++ /dev/null @@ -1,241 +0,0 @@ -import logging -from typing import Dict, Any, Tuple -from datetime import datetime - -from app.models.resource_map.dto import ResourceMapDto, ResourceMapUpdateDto -from app.models.supplier_update.dto import UpdateLookup, UpdateLookupEntry -from app.services.bundle_tools import ( - get_resource_type_and_id_from_entry, - get_request_method_from_entry, -) -from app.services.fhir.fhir_service import FhirService -from app.services.request_services.supplier_request_service import ( - SupplierRequestsService, -) -from app.services.entity_services.resource_map_service import ResourceMapService -from app.services.request_services.consumer_request_service import ( - ConsumerRequestService, -) -from app.models.fhir.r4.types import Bundle, Request, Entry, Resource -from app.stats import Stats - -logger = logging.getLogger(__name__) - - -class UpdateConsumerService: - def __init__( - self, - supplier_request_service: SupplierRequestsService, - consumer_request_service: ConsumerRequestService, - resource_map_service: ResourceMapService, - strict_validation: bool, - stats: Stats, - ): - self.__supplier_request_service = supplier_request_service - self.__consumer_request_service = consumer_request_service - self.__resource_map_service = resource_map_service - self.__fhir_service = FhirService(strict_validation=strict_validation) - self.stats = stats - - def update_supplier( - self, - supplier_id: str, - _since: datetime | None = None, - ) -> Dict[str, Any]: - # Fetch the history of all resources from this supplier (can take a while) - supplier_history = self.__supplier_request_service.get_resource_history( - supplier_id=supplier_id, _since=_since - ) - entries = supplier_history.entry if supplier_history.entry else [] - - # Map all the resources into a set of tuples (resource_type, resource_id) - resource_ids: set[Tuple[str, str]] = set() # Resource type & ID - for entry in entries: - (resource_type, resource_id) = get_resource_type_and_id_from_entry( - entry - ) - if resource_type is not None and resource_id is not None: - resource_ids.add((resource_type, resource_id)) - - for resource_type, resource_id in resource_ids: - resource_history = self.__supplier_request_service.get_resource_history( - supplier_id, resource_type, resource_id, None - ) - if resource_history.entry is None or len(resource_history.entry) == 0: - continue - - # Update this resource to the latest entry (if needed) - latest_entry = resource_history.entry[0] - self.update(supplier_id, len(resource_history.entry), latest_entry) - - return { - "message": "organizations and endpoints are updated", - "data": self.__resource_map_service.find(supplier_id=supplier_id), - } - - def update(self, supplier_id: str, history_size: int, latest_entry: Entry) -> None: - _, main_resource_id = get_resource_type_and_id_from_entry(latest_entry) - request_method = get_request_method_from_entry(latest_entry) - - main_resource = ( - self.__fhir_service.create_resource(latest_entry.resource.model_dump()) - if latest_entry.resource - else None - ) - unique_refs = ( - self.__fhir_service.get_references(main_resource) - if main_resource is not None - else [] - ) - - update_lookup: UpdateLookup = {} - if main_resource_id: - update_lookup.update( - { - main_resource_id: UpdateLookupEntry( - history_size=history_size, entry=latest_entry - ) - } - ) - - for ref in unique_refs: - res_type, id = self.__fhir_service.split_reference(ref) - data = self.__supplier_request_service.get_resource_history( - resource_type=res_type, resource_id=id, supplier_id=supplier_id - ) - entry = data.entry - if entry is not None and len(entry) > 0 and id is not None: - update_lookup.update( - {id: UpdateLookupEntry(history_size=len(entry), entry=entry[0])} - ) - - # prepare bundle - new_bundle = Bundle(type="transaction", entry=[]) - for id, lookup_data in update_lookup.items(): - resource_type, resource_id = get_resource_type_and_id_from_entry( - lookup_data.entry - ) - resource_map = self.__resource_map_service.get( - supplier_id, resource_type, resource_id - ) - request_method = get_request_method_from_entry(lookup_data.entry) - original_resource = ( - self.__fhir_service.create_resource( - lookup_data.entry.resource.model_dump() - ) - if lookup_data.entry.resource is not None - else None - ) - - # resource new and method is delete - if resource_map is None and request_method == "DELETE": - logger.info( - f"resource {id} is new and already DELETED from supplier {supplier_id} ...skipping" - ) - continue - - # resource up to date - if ( - resource_map is not None - and resource_map.history_size == lookup_data.history_size - ): - logger.info( - f"resource {resource_id} from {supplier_id} is up to date with consumer id: {resource_map.consumer_resource_id} ...skipping" - ) - continue - - # resource is new - if ( - resource_map is None - and request_method != "DELETE" - and original_resource is not None - ): - # replace references - - logger.info( - f"resource {resource_id} from {supplier_id} is new ...processing" - ) - new_id = f"{supplier_id}-{resource_id}" - new_resource = self.__fhir_service.namespace_resource_references( - original_resource, supplier_id - ) - new_resource.id = new_id - new_entry = Entry( - resource=Resource(**new_resource.model_dump()), - request=Request(method="PUT", url=f"{resource_type}/{new_id}"), - ) - new_bundle.entry.append(new_entry) - lookup_data.resource_map = ResourceMapDto( - supplier_id=supplier_id, - supplier_resource_id=resource_id, # type: ignore - resource_type=resource_type, # type: ignore - consumer_resource_id=new_id, - history_size=lookup_data.history_size, - ) - - # resource needs to be delete - if resource_map is not None and request_method == "DELETE": - logger.info( - f"resource {resource_id} from {supplier_id} needs to be deleted with consumer id: {resource_map.consumer_resource_id} ...processing" - ) - new_entry = Entry( - request=Request( - method="DELETE", - url=f"{resource_type}/{resource_map.consumer_resource_id}", - ) - ) - new_bundle.entry.append(new_entry) - lookup_data.resource_map = ResourceMapUpdateDto( - supplier_id=supplier_id, - resource_type=resource_map.resource_type, - supplier_resource_id=resource_map.supplier_resource_id, - history_size=lookup_data.history_size, - ) - - if ( - resource_map is not None - and request_method != "DELETE" - and original_resource is not None - ): - logger.info( - f"resource {resource_id} from {supplier_id} needs to be updated with consumer id: {resource_map.consumer_resource_id} ...processing" - ) - # replace id with one from resource_map - original_resource.id = resource_map.consumer_resource_id - new_resource = self.__fhir_service.namespace_resource_references( - original_resource, supplier_id - ) - new_entry = Entry( - resource=Resource(**new_resource.model_dump()), - request=Request( - method="PUT", - url=f"{resource_type}/{resource_map.consumer_resource_id}", - ), - ) - new_bundle.entry.append(new_entry) - lookup_data.resource_map = ResourceMapUpdateDto( - supplier_id=supplier_id, - resource_type=resource_map.resource_type, - supplier_resource_id=resource_map.supplier_resource_id, - history_size=lookup_data.history_size, - ) - - # only post when something has changed - if len(new_bundle.entry) > 0: - logger.info(f"detected changes from {supplier_id} ...updating data") - self.__consumer_request_service.post_bundle(new_bundle) - logger.info(f"data from {supplier_id} has been updated successfully!!") - - for v in update_lookup.values(): - if isinstance(v.resource_map, ResourceMapDto): - logger.info( - f"new resource map entry with {v.resource_map.__repr__()} ...creating" - ) - self.__resource_map_service.add_one(v.resource_map) - if isinstance(v.resource_map, ResourceMapUpdateDto): - self.__resource_map_service.update_one(v.resource_map) - logger.info( - f"new resource map entry with {v.resource_map.__repr__()} ...updating" - ) - - logger.info("resource map has been updated successfully!!!") diff --git a/tests/services/test_consumer_update.py b/tests/services/test_consumer_update.py deleted file mode 100644 index 6dcdf15d..00000000 --- a/tests/services/test_consumer_update.py +++ /dev/null @@ -1,169 +0,0 @@ -from datetime import datetime -import json -import time -import threading -import psutil -import pytest -import os -from fastapi.testclient import TestClient -from unittest.mock import patch -from fastapi.responses import JSONResponse -from app.container import get_database -from app.models.fhir.r4.types import Bundle -from utils.utils import ( - check_if_in_db, - create_file_structure, - generate_report, -) - -from app.stats import Stats, get_stats - -MOCK_DATA_PATH = "tests/mock_data" -TEST_RESULTS_PATH = "tests/testing_results" - -monitor_data = [] -monitoring = False -iteration = 0 - - -def monitor_resources(interval: float = 0.01) -> None: - process = psutil.Process() - while monitoring: - mem = process.memory_info().rss / 1024**2 - cpu = process.cpu_percent(interval=None) - io = process.io_counters() - monitor_data.append( - { - "memory_mb": mem, - "cpu_percent": cpu, - "read_bytes": io.read_bytes, - "write_bytes": io.write_bytes, - } - ) - time.sleep(interval) - -def mock_get_resource_history( - supplier_id: str, - resource_type: str | None = None, - resource_id: str | None = None, - _since: datetime | None = None, - ) -> Bundle: - def _read_mock_data(file_path: str) -> Bundle: - with open(file_path, "r") as file: - return Bundle(**json.load(file)) - global iteration - with get_stats().timer(f"{iteration}.mock_get_resource_history"): - if resource_type is None and resource_id is None: - print("Total history bundle is requested") - return _read_mock_data( - f"{MOCK_DATA_PATH}/all_resources_history.json" - ) - if resource_id is None: - print( - f"All history bundle is requested for: {resource_type}" - ) - return _read_mock_data( - f"{MOCK_DATA_PATH}/{resource_type}/{resource_type}_history.json" - ) - return _read_mock_data( - f"{MOCK_DATA_PATH}/{resource_type}/{resource_id}/{resource_id}_history.json" - ) - -@pytest.mark.filterwarnings("ignore:Pydantic serializer warnings") -@pytest.mark.parametrize( - "resource_count, version_count, max_depth, test_name", - [ - (1, 1, 1, "very_simple"), - (2, 2, 1, "no_depth"), - (5, 3, 1, "more_resources"), - # (5, 3, 2, "just_a_little_depth"), - # (5, 3, 4, "medium"), - # (5, 3, 5, "difficult"), - # (5, 3, 6, "hard"), - # (5, 3, 7, "very_hard"), - # (5, 3, 8, "extreme"), - # (5, 3, 9, "crazy"), - # (5, 3, 10, "insane"), - # (10, 5, 10, "impossible"), - ], -) -@patch( - "app.services.request_services.supplier_request_service.SupplierRequestsService.get_resource_history", - side_effect=mock_get_resource_history, -) -@patch( - "app.services.request_services.consumer_request_service.ConsumerRequestService.post_bundle", - return_value=JSONResponse({"message": "ok"}), -) -def test_consumer_update_with_timing( - mock_post_bundle: JSONResponse, - mock_get_resource_history: Bundle, - api_client: TestClient, resource_count: int, version_count: int, max_depth: int, test_name: str -) -> None: - global monitoring - monitoring = True - - global monitor_data - monitor_data = [] - - errors = 0 - iterations = 30 - global iteration - - - print(f"Running test: {test_name}") - db = get_database() - db.truncate_tables() - - create_file_structure( - base_path=f"{MOCK_DATA_PATH}", - resource_count=resource_count, - version_count=version_count, - max_depth=max_depth, - ) - - monitor_thread = threading.Thread(target=monitor_resources) - try: - monitor_thread.start() - for iteration in range(iterations): - print(f"Iteration {iteration + 1}/{iterations}") - response = api_client.post("/update_resources/test-supplier") - print("Update done: mCSD resources are updated") - assert response.status_code == 200 - assert response.json()["message"] == "organizations and endpoints are updated" - - finally: - monitoring = False - monitor_thread.join() - - ids_set = set() - with open( - f"{MOCK_DATA_PATH}/all_resources_history.json", "r" - ) as file: - bundle_data = json.load(file) - for entry in bundle_data["entry"]: - res_id = entry["resource"]["id"] - ids_set.add(res_id) - - errors += check_if_in_db(ids_set=ids_set) - - print("Generating test report") - - stats = get_stats() - assert isinstance(stats, Stats) - assert hasattr(stats, "client") - report = stats.client.get_memory() - - durations = [] - for idx, item in enumerate(report["mcsd.update_supplier"]): - durations.append(item - sum(report[f"{idx}.mock_get_resource_history"])) - - report = generate_report( - test_name, durations, iterations, monitor_data, total_resources=len(ids_set) - ) - - - if not os.path.exists(TEST_RESULTS_PATH): - os.makedirs(TEST_RESULTS_PATH) - with open(f"{TEST_RESULTS_PATH}/test_{test_name}.json", "w") as f: - json.dump(report, f, indent=4) diff --git a/tests/services/test_new_update.py b/tests/services/test_update.py similarity index 96% rename from tests/services/test_new_update.py rename to tests/services/test_update.py index 296fb4d4..8b3fa53c 100644 --- a/tests/services/test_new_update.py +++ b/tests/services/test_update.py @@ -12,7 +12,7 @@ from yarl import URL from app.container import get_database from fhir.resources.R4B.bundle import Bundle, BundleEntry -from utils.utils import ( +from tests.utils.utils import ( check_if_in_db, create_file_structure, generate_report, @@ -21,7 +21,7 @@ from app.stats import Stats, get_stats MOCK_DATA_PATH = "tests/mock_data" -TEST_RESULTS_PATH = "tests/new_testing_results" +TEST_RESULTS_PATH = "tests/testing_results" monitor_data = [] monitoring = False @@ -150,11 +150,11 @@ def mock_get_history_batch(url: URL) -> tuple[URL | None, List[BundleEntry]]: ], ) @patch( - "app.services_new.api.fhir_api.FhirApi.get_history_batch", + "app.services.api.fhir_api.FhirApi.get_history_batch", side_effect=mock_get_history_batch, ) @patch( - "app.services_new.api.api_service.AuthenticationBasedApi.do_request", + "app.services.api.api_service.AuthenticationBasedApiService.do_request", side_effect=mock_do_request, ) def test_consumer_update_with_timing( @@ -189,7 +189,8 @@ def test_consumer_update_with_timing( monitor_thread.start() for iteration in range(iterations): print(f"Iteration {iteration + 1}/{iterations}") - _response = api_client.post("/update_resources/new/test-supplier") + with get_stats().timer("mcsd.update_supplier"): + _response = api_client.post("/update_resources/test-supplier") print("Update done: mCSD resources are updated") finally: monitoring = False diff --git a/tests/test_config.py b/tests/test_config.py index f8e5fbab..f6ee95c2 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -55,7 +55,7 @@ def get_test_config() -> Config: enabled=True, host=None, port=None, - module_name=None, + module_name="test_stats", ), azure_oauth2=None, aws=None, diff --git a/tests/unit_tests/test_stats.py b/tests/unit_tests/test_stats.py index ed0f09ca..8c475b9c 100644 --- a/tests/unit_tests/test_stats.py +++ b/tests/unit_tests/test_stats.py @@ -3,7 +3,7 @@ from fastapi.testclient import TestClient from app.config import set_config from app.stats import MemoryClient, Statsd, StatsdMiddleware, setup_stats, get_stats -from get_test_config import get_test_config +from tests.get_test_config import get_test_config @pytest.fixture def memory_client() -> MemoryClient: diff --git a/tests/utils/mcsd_resource_gen.py b/tests/utils/mcsd_resource_gen.py index 06640b64..f17af83a 100644 --- a/tests/utils/mcsd_resource_gen.py +++ b/tests/utils/mcsd_resource_gen.py @@ -2,7 +2,7 @@ from typing import Any, Dict from uuid import uuid4 -from app.services.request_services.supplier_request_service import McsdResources +from app.services.update.update_consumer_service import McsdResources from seeds.generate_data import DataGenerator import os import json diff --git a/tests/utils/utils.py b/tests/utils/utils.py index fa9fa692..9084f9cd 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -9,7 +9,7 @@ from app.container import get_database from app.db.db import Database from app.db.repositories.resource_map_repository import ResourceMapRepository -from app.services.request_services.supplier_request_service import McsdResources +from app.services.update.update_consumer_service import McsdResources from .mcsd_resource_gen import generate_history_bundles, generate_post_bundle, setup_fhir_resource def create_file_structure( From 6fb53496a4ff7f189331dcb0182c04da6c73f86e Mon Sep 17 00:00:00 2001 From: Thomas Bosch Date: Wed, 14 May 2025 11:05:06 +0200 Subject: [PATCH 4/5] rebased to main with fixes --- app/container.py | 5 +- app/db/db.py | 2 +- app/routers/update_router.py | 2 - .../update/mass_update_consumer_service.py | 75 ++++++++++--------- app/stats.py | 8 +- poetry.lock | 2 +- tests/conftest.py | 5 +- .../{test_update.py => test_stress_test.py} | 13 +++- .../test_mass_update_consumer_service.py | 2 + tests/unit_tests/test_stats.py | 3 +- 10 files changed, 70 insertions(+), 47 deletions(-) rename tests/services/{test_update.py => test_stress_test.py} (93%) diff --git a/app/container.py b/app/container.py index f21068ec..cb370921 100644 --- a/app/container.py +++ b/app/container.py @@ -12,6 +12,8 @@ from app.services.update.update_consumer_service import UpdateConsumerService from typing import cast +from app.stats import get_stats + def container_config(binder: inject.Binder) -> None: @@ -57,7 +59,8 @@ def container_config(binder: inject.Binder) -> None: supplier_provider=supplier_provider, supplier_info_service=supplier_info_service, supplier_ignored_directory_service=supplier_ignored_directory_service, - cleanup_client_directory_after_success_timeout_seconds=config.scheduler.cleanup_client_directory_after_success_timeout_in_sec # type: ignore + cleanup_client_directory_after_success_timeout_seconds=config.scheduler.cleanup_client_directory_after_success_timeout_in_sec, # type: ignore + stats=get_stats(), ) diff --git a/app/db/db.py b/app/db/db.py index 90401dda..8febbe6f 100644 --- a/app/db/db.py +++ b/app/db/db.py @@ -1,6 +1,6 @@ import logging -from sqlalchemy import StaticPool, create_engine, text +from sqlalchemy import MetaData, StaticPool, create_engine, text from sqlalchemy.orm import Session from app.db.entities.base import Base diff --git a/app/routers/update_router.py b/app/routers/update_router.py index 49852205..5b1ea196 100644 --- a/app/routers/update_router.py +++ b/app/routers/update_router.py @@ -11,7 +11,6 @@ from app.services.entity.supplier_ignored_directory_service import SupplierIgnoredDirectoryService from app.services.update.update_consumer_service import UpdateConsumerService from app.services.supplier_provider.supplier_provider import SupplierProvider -from app.stats import get_stats router = APIRouter(prefix="/update_resources", tags=["Update consumer resources"]) @@ -22,7 +21,6 @@ class UpdateQueryParams(BaseModel): @router.post("/{supplier_id}", response_model=None, summary="Update by supplier ID") @router.post("", response_model=None, summary="Update all suppliers") -@get_stats().timer("update_resources") def update_supplier( supplier_id: str | None = None, query_params: UpdateQueryParams = Depends(), diff --git a/app/services/update/mass_update_consumer_service.py b/app/services/update/mass_update_consumer_service.py index 0cb778f5..894edc56 100644 --- a/app/services/update/mass_update_consumer_service.py +++ b/app/services/update/mass_update_consumer_service.py @@ -6,6 +6,7 @@ from app.services.entity.supplier_info_service import SupplierInfoService from app.services.supplier_provider.supplier_provider import SupplierProvider from app.services.update.update_consumer_service import UpdateConsumerService +from app.stats import Stats logger = logging.getLogger(__name__) @@ -18,52 +19,56 @@ def __init__( supplier_info_service: SupplierInfoService, supplier_ignored_directory_service: SupplierIgnoredDirectoryService, cleanup_client_directory_after_success_timeout_seconds: int, + stats: Stats, ) -> None: self.__supplier_provider = supplier_provider self.__update_consumer_service = update_consumer_service self.__supplier_info_service = supplier_info_service self.__supplier_ignored_directory_service = supplier_ignored_directory_service self.__cleanup_client_directory_after_success_timeout_seconds = cleanup_client_directory_after_success_timeout_seconds + self.__stats = stats def update_all(self) -> list[dict[str, Any]]: - try: - all_suppliers = self.__supplier_provider.get_all_suppliers() - except Exception as e: - logging.error(f"Failed to retrieve suppliers: {e}") - return [] - data: list[dict[str, Any]] = [] - filtered_suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_dto(all_suppliers) - for supplier in filtered_suppliers: - - info = self.__supplier_info_service.get_supplier_info(supplier.id) - new_updated = datetime.now() - timedelta(seconds=60) + with self.__stats.timer("update_all_suppliers"): try: - data.append( - self.__update_consumer_service.update( - supplier, info.last_success_sync + all_suppliers = self.__supplier_provider.get_all_suppliers() + except Exception as e: + logging.error(f"Failed to retrieve suppliers: {e}") + return [] + data: list[dict[str, Any]] = [] + filtered_suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_dto(all_suppliers) + for supplier in filtered_suppliers: + + info = self.__supplier_info_service.get_supplier_info(supplier.id) + new_updated = datetime.now() - timedelta(seconds=60) + try: + data.append( + self.__update_consumer_service.update( + supplier, info.last_success_sync + ) ) - ) - info.last_success_sync = new_updated - info.failed_attempts = 0 - info.last_success_sync = new_updated - self.__supplier_info_service.update_supplier_info(info) - except Exception as e: - logging.error(f"Failed to update supplier {supplier.id}: {e}") - info.failed_attempts += 1 - info.failed_sync_count += 1 - self.__supplier_info_service.update_supplier_info(info) + info.last_success_sync = new_updated + info.failed_attempts = 0 + info.last_success_sync = new_updated + self.__supplier_info_service.update_supplier_info(info) + except Exception as e: + logging.error(f"Failed to update supplier {supplier.id}: {e}") + info.failed_attempts += 1 + info.failed_sync_count += 1 + self.__supplier_info_service.update_supplier_info(info) - return data + return data def cleanup_old_directories(self) -> None: - all_suppliers = self.__supplier_info_service.get_all_suppliers_info() - suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_info(all_suppliers) - for supplier_info in suppliers: - if supplier_info.last_success_sync is not None: - elapsed_time = (datetime.now(timezone.utc) - supplier_info.last_success_sync.astimezone(timezone.utc)).total_seconds() - if elapsed_time > self.__cleanup_client_directory_after_success_timeout_seconds: - logging.info(f"Cleaning up directory for outdated supplier {supplier_info.supplier_id}") - self.__update_consumer_service.cleanup(supplier_info.supplier_id) - self.__supplier_ignored_directory_service.add_directory_to_ignore_list(supplier_info.supplier_id) - \ No newline at end of file + with self.__stats.timer("cleanup_old_directories"): + all_suppliers = self.__supplier_info_service.get_all_suppliers_info() + suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_info(all_suppliers) + for supplier_info in suppliers: + if supplier_info.last_success_sync is not None: + elapsed_time = (datetime.now(timezone.utc) - supplier_info.last_success_sync.astimezone(timezone.utc)).total_seconds() + if elapsed_time > self.__cleanup_client_directory_after_success_timeout_seconds: + logging.info(f"Cleaning up directory for outdated supplier {supplier_info.supplier_id}") + self.__update_consumer_service.cleanup(supplier_info.supplier_id) + self.__supplier_ignored_directory_service.add_directory_to_ignore_list(supplier_info.supplier_id) + \ No newline at end of file diff --git a/app/stats.py b/app/stats.py index 45f48344..e6f1cce7 100644 --- a/app/stats.py +++ b/app/stats.py @@ -1,6 +1,7 @@ +from contextlib import contextmanager from datetime import timedelta import time -from typing import Any, Callable, Awaitable +from typing import Any, Callable, Awaitable, Generator import statsd from statsd.client.timer import Timer @@ -43,7 +44,10 @@ def gauge(self, key: str, value: int, delta: bool = False) -> None: pass def timer(self, key: str) -> Timer: - raise NotImplementedError + @contextmanager + def noop_context_manager() -> Generator[Any, Any, Any]: + yield + return noop_context_manager() class MemoryClient: diff --git a/poetry.lock b/poetry.lock index a2c247c0..0f2efcfa 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3001,4 +3001,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "7acb0d8cf639c73c26cfbfc2a168cc9f51487d34f963f088301fdf4282f3f63c" +content-hash = "e5986d72313706937d366e0b9acce6efc6ac75ab1b625a616b9ba6f584a8fff4" diff --git a/tests/conftest.py b/tests/conftest.py index 7d6673f0..c290e40a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,6 @@ from typing import Any, Dict import copy import pathlib -from typing import Any from collections.abc import Generator from fastapi import FastAPI @@ -26,8 +25,8 @@ def pytest_ignore_collect(collection_path: pathlib.Path, config: Any) -> bool: # Disable the test if running in GitHub Actions def pytest_collection_modifyitems(config: Any, items: Any) -> None: if os.getenv('GITHUB_ACTIONS') == 'true': - if "test_consumer_update_with_timing" in str(items): - items[:] = [item for item in items if "test_consumer_update_with_timing" not in str(item)] + if "test_stress_test_update" in str(items): + items[:] = [item for item in items if "test_stress_test_update" not in str(item)] @pytest.fixture def database() -> Generator[Database, Any, None]: diff --git a/tests/services/test_update.py b/tests/services/test_stress_test.py similarity index 93% rename from tests/services/test_update.py rename to tests/services/test_stress_test.py index 8b3fa53c..2c8e61ca 100644 --- a/tests/services/test_update.py +++ b/tests/services/test_stress_test.py @@ -12,6 +12,7 @@ from yarl import URL from app.container import get_database from fhir.resources.R4B.bundle import Bundle, BundleEntry +from app.models.supplier.dto import SupplierDto from tests.utils.utils import ( check_if_in_db, create_file_structure, @@ -157,9 +158,19 @@ def mock_get_history_batch(url: URL) -> tuple[URL | None, List[BundleEntry]]: "app.services.api.api_service.AuthenticationBasedApiService.do_request", side_effect=mock_do_request, ) -def test_consumer_update_with_timing( +@patch( + "app.services.supplier_provider.api_provider.SupplierApiProvider.get_all_suppliers", + return_value=[SupplierDto(id="test-supplier", name="Test Supplier", endpoint="http://testserver/test", is_deleted=False)], +) +@patch( + "app.services.supplier_provider.api_provider.SupplierApiProvider.get_one_supplier", + return_value=SupplierDto(id="test-supplier", name="Test Supplier", endpoint="http://testserver/test", is_deleted=False), +) +def test_stress_test_update( mock_do_request: requests.Response, mock_get_history_batch: Bundle, + mock_get_all_suppliers: List[SupplierDto], + mock_get_one_supplier: SupplierDto, api_client: TestClient, resource_count: int, version_count: int, max_depth: int, test_name: str ) -> None: global monitoring diff --git a/tests/services/update/test_mass_update_consumer_service.py b/tests/services/update/test_mass_update_consumer_service.py index c063b0c3..81be90d0 100644 --- a/tests/services/update/test_mass_update_consumer_service.py +++ b/tests/services/update/test_mass_update_consumer_service.py @@ -2,6 +2,7 @@ from datetime import datetime, timedelta, timezone from app.services.update.mass_update_consumer_service import MassUpdateConsumerService from app.db.entities.supplier_info import SupplierInfo +from app.stats import NoopStats @@ -32,6 +33,7 @@ def test_cleanup_old_directories() -> None: supplier_info_service=mock_supplier_info_service, supplier_ignored_directory_service=mock_supplier_ignored_directory_service, cleanup_client_directory_after_success_timeout_seconds=3600, # less than 2 hours + stats=NoopStats(), # Assuming NoopStats is a mock or a real class that does nothing ) service.cleanup_old_directories() diff --git a/tests/unit_tests/test_stats.py b/tests/unit_tests/test_stats.py index 8c475b9c..f25a5484 100644 --- a/tests/unit_tests/test_stats.py +++ b/tests/unit_tests/test_stats.py @@ -3,7 +3,8 @@ from fastapi.testclient import TestClient from app.config import set_config from app.stats import MemoryClient, Statsd, StatsdMiddleware, setup_stats, get_stats -from tests.get_test_config import get_test_config +from tests.test_config import get_test_config + @pytest.fixture def memory_client() -> MemoryClient: From 9a5bd5159ece9377fc853a9186413c9bb8b877fb Mon Sep 17 00:00:00 2001 From: Thomas Bosch Date: Wed, 14 May 2025 14:50:48 +0200 Subject: [PATCH 5/5] Extra fixes, use cache instead of opening files --- tests/conftest.py | 2 +- tests/services/test_stress_test.py | 111 +++++++++++------- .../test_mass_update_consumer_service.py | 2 +- tests/utils/utils.py | 32 ++--- 4 files changed, 88 insertions(+), 59 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index c290e40a..6f7c201d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import shutil from typing import Any, Dict import copy import pathlib @@ -16,7 +17,6 @@ from tests.test_config import get_test_config from tests.mock_data import organization, endpoint, location import os -import shutil # Don't search for tests in the mock_data directory def pytest_ignore_collect(collection_path: pathlib.Path, config: Any) -> bool: diff --git a/tests/services/test_stress_test.py b/tests/services/test_stress_test.py index 2c8e61ca..8d675b47 100644 --- a/tests/services/test_stress_test.py +++ b/tests/services/test_stress_test.py @@ -5,7 +5,7 @@ import psutil import pytest -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple from fastapi.testclient import TestClient from unittest.mock import patch import requests @@ -23,13 +23,28 @@ MOCK_DATA_PATH = "tests/mock_data" TEST_RESULTS_PATH = "tests/testing_results" +CACHE: Dict[str, Any] = {} monitor_data = [] monitoring = False iteration = 0 +def mock_bundles_cache() -> Dict[str, Any]: + global CACHE + if CACHE: + return CACHE + cache = {} + for root, _, files in os.walk(MOCK_DATA_PATH): + for f in files: + if f.endswith(".json"): + path = os.path.join(root, f) + with open(path, "r") as file: + cache[path] = json.load(file) + CACHE = cache + return CACHE -def monitor_resources(interval: float = 0.01) -> None: + +def monitor_resources(interval: float = 0.1) -> None: process = psutil.Process() while monitoring: mem = process.memory_info().rss / 1024**2 @@ -45,13 +60,9 @@ def monitor_resources(interval: float = 0.01) -> None: ) time.sleep(interval) -def _read_mock_data(file_path: str) -> Bundle: - with open(file_path, "r") as file: - return Bundle(**json.load(file)) - def mock_do_request(method: str, url: URL, json_data: Dict[str, Any] | None = None) -> requests.Response: global iteration - with get_stats().timer(f"{iteration}.mock_get_resource_history"): + with get_stats().timer(f"{iteration}.patch_timing"): if str(url) == "http://testserver/test": return_bundle = Bundle(type="batch-response", entry=[]) assert json_data is not None @@ -61,7 +72,7 @@ def mock_do_request(method: str, url: URL, json_data: Dict[str, Any] | None = No if entry['request'].get('method') == 'GET': resource_type = entry['request'].get('url').split('/')[1] file_path = f"{MOCK_DATA_PATH}/{resource_type}/{resource_type}_history.json" - bundle = _read_mock_data(file_path) + bundle = Bundle(**CACHE[file_path]) return_bundle.entry.append(BundleEntry(resource=bundle)) response = requests.Response() response.status_code = 200 @@ -108,29 +119,22 @@ def mock_do_request(method: str, url: URL, json_data: Dict[str, Any] | None = No assert False, "Should not reach here" def store_consumer(resource: Dict[str, Any]) -> None: - if not os.path.exists(f"{MOCK_DATA_PATH}/consumer_data/{resource['resourceType']}"): - os.makedirs(f"{MOCK_DATA_PATH}/consumer_data/{resource['resourceType']}") - file_path = f"{MOCK_DATA_PATH}/consumer_data/{resource['resourceType']}/{resource['id']}.json" - with open(file_path, "w") as file: - json.dump(resource, file) + key = f"consumer_data/{resource['resourceType']}/{resource['id']}.json" + CACHE[key] = resource def get_from_consumer(resource_type: str, resource_id: str) -> Any | None: - file_path = f"{MOCK_DATA_PATH}/consumer_data/{resource_type}/{resource_id}.json" - if not os.path.exists(file_path): - return None - with open(file_path, "r") as file: - return json.load(file) + key = f"consumer_data/{resource_type}/{resource_id}.json" + return CACHE.get(key) def mock_get_history_batch(url: URL) -> tuple[URL | None, List[BundleEntry]]: global iteration - with get_stats().timer(f"{iteration}.mock_get_resource_history"): + with get_stats().timer(f"{iteration}.patch_timing"): resource_type = url.path.split("/")[2] # type:ignore file_path = f"{MOCK_DATA_PATH}/{resource_type}/{resource_type}_history.json" - with open(file_path, "r") as file: - bundle = Bundle(**json.load(file)) - return None, bundle.entry - assert False, "Should not reach here" - return None, [] + if file_path not in CACHE: + assert False, f"File {file_path} not found in cache" + bundle = Bundle(**CACHE[file_path]) + return None, bundle.entry @pytest.mark.filterwarnings("ignore:Pydantic serializer warnings") @pytest.mark.parametrize( @@ -152,7 +156,7 @@ def mock_get_history_batch(url: URL) -> tuple[URL | None, List[BundleEntry]]: ) @patch( "app.services.api.fhir_api.FhirApi.get_history_batch", - side_effect=mock_get_history_batch, + side_effect=mock_get_history_batch ) @patch( "app.services.api.api_service.AuthenticationBasedApiService.do_request", @@ -168,7 +172,7 @@ def mock_get_history_batch(url: URL) -> tuple[URL | None, List[BundleEntry]]: ) def test_stress_test_update( mock_do_request: requests.Response, - mock_get_history_batch: Bundle, + mock_get_history_batch: Tuple[URL | None, List[BundleEntry]], mock_get_all_suppliers: List[SupplierDto], mock_get_one_supplier: SupplierDto, api_client: TestClient, resource_count: int, version_count: int, max_depth: int, test_name: str @@ -179,8 +183,7 @@ def test_stress_test_update( global monitor_data monitor_data = [] - errors = 0 - iterations = 30 + iterations = 10 global iteration @@ -195,6 +198,10 @@ def test_stress_test_update( max_depth=max_depth, ) + global CACHE + CACHE = {} + CACHE = mock_bundles_cache() + monitor_thread = threading.Thread(target=monitor_resources) try: monitor_thread.start() @@ -202,21 +209,35 @@ def test_stress_test_update( print(f"Iteration {iteration + 1}/{iterations}") with get_stats().timer("mcsd.update_supplier"): _response = api_client.post("/update_resources/test-supplier") - print("Update done: mCSD resources are updated") + print(f"Update done: mCSD resources are updated: {_response.json()}") finally: monitoring = False monitor_thread.join() - ids_set = set() - with open( - f"{MOCK_DATA_PATH}/all_resources_history.json", "r" - ) as file: - bundle_data = json.load(file) - for entry in bundle_data["entry"]: - res_id = entry["resource"]["id"] - ids_set.add(res_id) + latest_entries: Dict[str, Tuple[int, str]] = {} + + all_histories = CACHE.get("tests/mock_data/all_resources_history.json") + assert all_histories is not None, "All resources history file not found in cache" + + for entry in all_histories.get("entry", []): + request_url = entry["request"]["url"] + method = entry["request"]["method"] - errors += check_if_in_db(ids_set=ids_set) + parts = request_url.split("/") + if len(parts) < 3: + assert False, f"Invalid request URL: {request_url}" + + res_id = parts[1] + version = int(parts[2]) + + if res_id not in latest_entries or version > latest_entries[res_id][0]: + latest_entries[res_id] = (version, method) + + final_resources = { + res_id for res_id, (version, method) in latest_entries.items() if method != "DELETE" + } + + assert check_if_in_db(supplier_id="test-supplier", ids_set=final_resources) == 0 print("Generating test report") @@ -225,12 +246,20 @@ def test_stress_test_update( assert hasattr(stats, "client") report = stats.client.get_memory() - durations = [] + true_update_durations = [] + patch_durations = [] for idx, item in enumerate(report["mcsd.update_supplier"]): - durations.append(item - sum(report[f"{idx}.mock_get_resource_history"])) + patch_durations.append(sum(report[f"{idx}.patch_timing"])) + true_update_durations.append(item - sum(report[f"{idx}.patch_timing"])) + + durations = { + "total": report["mcsd.update_supplier"], + "true_update": true_update_durations, + "patch": patch_durations, + } report = generate_report( - test_name, durations, iterations, monitor_data, total_resources=len(ids_set) + test_name, durations, iterations, monitor_data, total_resources=len(final_resources) ) if not os.path.exists(TEST_RESULTS_PATH): diff --git a/tests/services/update/test_mass_update_consumer_service.py b/tests/services/update/test_mass_update_consumer_service.py index 81be90d0..c4d70403 100644 --- a/tests/services/update/test_mass_update_consumer_service.py +++ b/tests/services/update/test_mass_update_consumer_service.py @@ -33,7 +33,7 @@ def test_cleanup_old_directories() -> None: supplier_info_service=mock_supplier_info_service, supplier_ignored_directory_service=mock_supplier_ignored_directory_service, cleanup_client_directory_after_success_timeout_seconds=3600, # less than 2 hours - stats=NoopStats(), # Assuming NoopStats is a mock or a real class that does nothing + stats=NoopStats(), ) service.cleanup_old_directories() diff --git a/tests/utils/utils.py b/tests/utils/utils.py index 9084f9cd..b46573a3 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -7,7 +7,6 @@ import numpy as np from app.container import get_database -from app.db.db import Database from app.db.repositories.resource_map_repository import ResourceMapRepository from app.services.update.update_consumer_service import McsdResources from .mcsd_resource_gen import generate_history_bundles, generate_post_bundle, setup_fhir_resource @@ -36,7 +35,7 @@ def create_file_structure( generate_history_bundles(base_path) generate_post_bundle(base_path) -def generate_report(test_name: str, durations: List[float], iterations: int, monitor_data: List[Dict[str, Any]], total_resources: int) -> Dict[str, Any]: +def generate_report(test_name: str, durations: Dict[str, List[float]], iterations: int, monitor_data: List[Dict[str, Any]], total_resources: int) -> Dict[str, Any]: memory = [x["memory_mb"] for x in monitor_data] cpu = [x["cpu_percent"] for x in monitor_data] reads = [x["read_bytes"] for x in monitor_data] @@ -50,7 +49,7 @@ def summarize(values: List[float]) -> Dict[str, Any]: "p99": np.percentile(values, 99), "max": max(values), } - + return { "test_name": test_name, "hardware": { @@ -61,7 +60,11 @@ def summarize(values: List[float]) -> Dict[str, Any]: "platform": platform.platform(), }, "iterations": iterations, - "duration_milliseconds": summarize(durations), + "duration_milliseconds": { + "total_with_patch": summarize(durations["total"]), + "true_update": summarize(durations["true_update"]), + "patch": summarize(durations["patch"]), + }, "total_resources": total_resources, "resource_usage_summary": { "cpu_percent": summarize(cpu), @@ -72,20 +75,17 @@ def summarize(values: List[float]) -> Dict[str, Any]: } -def check_if_in_db(ids_set: Set[str]) -> int: +def check_if_in_db(supplier_id: str, ids_set: Set[str]) -> int: errors = 0 print("Checking if all resources are in the database") db = get_database() - for _id in ids_set: - if not check_if_id_in_database(_id, db): - errors += 1 - return errors - -def check_if_id_in_database(id: str, db: Database) -> bool: with db.get_db_session() as session: repo = session.get_repository(ResourceMapRepository) - result = repo.find(supplier_resource_id=id) - for resource_map in result: - if id == resource_map.supplier_resource_id: - return True - return False \ No newline at end of file + result = repo.find(supplier_id=supplier_id) + for resource_item in result: + if resource_item.supplier_resource_id in ids_set: # Ids set does not contain deleted resources as latest version + ids_set.discard(resource_item.supplier_resource_id) + if ids_set: + print(f"Resource {ids_set} not found in the database") + errors += len(ids_set) + return errors