Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ secrets/*
*.pub
docker-compose.override.yml
coverage.xml
tests/mock_data
tests/testing_results
tests/new_testing_results
13 changes: 9 additions & 4 deletions app/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from app.routers.directory_health import router as directory_health_router
from app.routers.ignore_list_router import router as ignore_list_router
from app.config import get_config
from app.stats import setup_stats
from app.stats import StatsdMiddleware, setup_stats
from app.telemetry import setup_telemetry


Expand Down Expand Up @@ -50,12 +50,12 @@ def run() -> None:


def create_fastapi_app() -> FastAPI:
application_init()
fastapi = setup_fastapi()

if get_config().stats.enabled:
setup_stats()

application_init()
fastapi = setup_fastapi()

if get_config().telemetry.enabled:
setup_telemetry(fastapi)

Expand Down Expand Up @@ -108,4 +108,9 @@ def setup_fastapi() -> FastAPI:
for router in routers:
fastapi.include_router(router)

stats_conf = get_config().stats
keep_in_memory = not (stats_conf.enabled and stats_conf.host is not None and stats_conf.port is not None) or False
if keep_in_memory:
fastapi.add_middleware(StatsdMiddleware, module_name=stats_conf.module_name)

return fastapi
1 change: 0 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ class ConfigStats(BaseModel):
port: int | None
module_name: str | None


class ConfigMcsd(BaseModel):
consumer_url: str
authentication: Union[str] = Field(
Expand Down
5 changes: 4 additions & 1 deletion app/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from app.services.update.update_consumer_service import UpdateConsumerService
from typing import cast

from app.stats import get_stats



def container_config(binder: inject.Binder) -> None:
Expand Down Expand Up @@ -57,7 +59,8 @@ def container_config(binder: inject.Binder) -> None:
supplier_provider=supplier_provider,
supplier_info_service=supplier_info_service,
supplier_ignored_directory_service=supplier_ignored_directory_service,
cleanup_client_directory_after_success_timeout_seconds=config.scheduler.cleanup_client_directory_after_success_timeout_in_sec # type: ignore
cleanup_client_directory_after_success_timeout_seconds=config.scheduler.cleanup_client_directory_after_success_timeout_in_sec, # type: ignore
stats=get_stats(),
)


Expand Down
16 changes: 15 additions & 1 deletion app/db/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from sqlalchemy import StaticPool, create_engine, text
from sqlalchemy import MetaData, StaticPool, create_engine, text
from sqlalchemy.orm import Session

from app.db.entities.base import Base
Expand Down Expand Up @@ -30,6 +30,20 @@ def generate_tables(self) -> None:
logger.info("Generating tables...")
Base.metadata.create_all(self.engine)

def truncate_tables(self) -> None:
logger.info("Truncating all tables...")
try:
metadata = MetaData()
metadata.reflect(bind=self.engine)
with Session(self.engine) as session:
for table in reversed(metadata.sorted_tables):
session.execute(text(f"DELETE FROM {table.name}"))
session.commit()
logger.info("All tables truncated successfully.")
except Exception as e:
logger.error("Error while truncating tables: %s", e)
raise e

def is_healthy(self) -> bool:
"""
Check if the database is healthy
Expand Down
2 changes: 1 addition & 1 deletion app/services/fhir/model_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def create_endpoint(data: Dict[str, Any], strict: bool) -> Endpoint:
if "payloadType" not in data:
data["payloadType"] = []

logger.warning(warning_message("Endpoint", "payload_type", "empty array"))
logger.warning(warning_message("Endpoint", "payloadType", "empty array"))
if "address" not in data:
data["address"] = ""
logger.warning(warning_message("Endpoint", "address", "empty string"))
Expand Down
2 changes: 0 additions & 2 deletions app/services/fhir/references/reference_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ def _get_org_references(model: Organization) -> List[Reference]:
ref = extract_references(endpoint)
if ref is not None:
refs.append(ref)

if part_of is not None:
new_ref = extract_references(part_of)
if new_ref is not None:
refs.append(new_ref)

return refs


Expand Down
75 changes: 40 additions & 35 deletions app/services/update/mass_update_consumer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.services.entity.supplier_info_service import SupplierInfoService
from app.services.supplier_provider.supplier_provider import SupplierProvider
from app.services.update.update_consumer_service import UpdateConsumerService
from app.stats import Stats

logger = logging.getLogger(__name__)

Expand All @@ -18,52 +19,56 @@ def __init__(
supplier_info_service: SupplierInfoService,
supplier_ignored_directory_service: SupplierIgnoredDirectoryService,
cleanup_client_directory_after_success_timeout_seconds: int,
stats: Stats,
) -> None:
self.__supplier_provider = supplier_provider
self.__update_consumer_service = update_consumer_service
self.__supplier_info_service = supplier_info_service
self.__supplier_ignored_directory_service = supplier_ignored_directory_service
self.__cleanup_client_directory_after_success_timeout_seconds = cleanup_client_directory_after_success_timeout_seconds
self.__stats = stats

def update_all(self) -> list[dict[str, Any]]:
try:
all_suppliers = self.__supplier_provider.get_all_suppliers()
except Exception as e:
logging.error(f"Failed to retrieve suppliers: {e}")
return []
data: list[dict[str, Any]] = []
filtered_suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_dto(all_suppliers)
for supplier in filtered_suppliers:

info = self.__supplier_info_service.get_supplier_info(supplier.id)
new_updated = datetime.now() - timedelta(seconds=60)
with self.__stats.timer("update_all_suppliers"):
try:
data.append(
self.__update_consumer_service.update(
supplier, info.last_success_sync
all_suppliers = self.__supplier_provider.get_all_suppliers()
except Exception as e:
logging.error(f"Failed to retrieve suppliers: {e}")
return []
data: list[dict[str, Any]] = []
filtered_suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_dto(all_suppliers)
for supplier in filtered_suppliers:

info = self.__supplier_info_service.get_supplier_info(supplier.id)
new_updated = datetime.now() - timedelta(seconds=60)
try:
data.append(
self.__update_consumer_service.update(
supplier, info.last_success_sync
)
)
)

info.last_success_sync = new_updated
info.failed_attempts = 0
info.last_success_sync = new_updated
self.__supplier_info_service.update_supplier_info(info)
except Exception as e:
logging.error(f"Failed to update supplier {supplier.id}: {e}")
info.failed_attempts += 1
info.failed_sync_count += 1
self.__supplier_info_service.update_supplier_info(info)
info.last_success_sync = new_updated
info.failed_attempts = 0
info.last_success_sync = new_updated
self.__supplier_info_service.update_supplier_info(info)
except Exception as e:
logging.error(f"Failed to update supplier {supplier.id}: {e}")
info.failed_attempts += 1
info.failed_sync_count += 1
self.__supplier_info_service.update_supplier_info(info)

return data
return data

def cleanup_old_directories(self) -> None:
all_suppliers = self.__supplier_info_service.get_all_suppliers_info()
suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_info(all_suppliers)
for supplier_info in suppliers:
if supplier_info.last_success_sync is not None:
elapsed_time = (datetime.now(timezone.utc) - supplier_info.last_success_sync.astimezone(timezone.utc)).total_seconds()
if elapsed_time > self.__cleanup_client_directory_after_success_timeout_seconds:
logging.info(f"Cleaning up directory for outdated supplier {supplier_info.supplier_id}")
self.__update_consumer_service.cleanup(supplier_info.supplier_id)
self.__supplier_ignored_directory_service.add_directory_to_ignore_list(supplier_info.supplier_id)

with self.__stats.timer("cleanup_old_directories"):
all_suppliers = self.__supplier_info_service.get_all_suppliers_info()
suppliers = self.__supplier_ignored_directory_service.filter_ignored_supplier_info(all_suppliers)
for supplier_info in suppliers:
if supplier_info.last_success_sync is not None:
elapsed_time = (datetime.now(timezone.utc) - supplier_info.last_success_sync.astimezone(timezone.utc)).total_seconds()
if elapsed_time > self.__cleanup_client_directory_after_success_timeout_seconds:
logging.info(f"Cleaning up directory for outdated supplier {supplier_info.supplier_id}")
self.__update_consumer_service.cleanup(supplier_info.supplier_id)
self.__supplier_ignored_directory_service.add_directory_to_ignore_list(supplier_info.supplier_id)

52 changes: 52 additions & 0 deletions app/services/update/update_all_consumers_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
from datetime import datetime, timedelta
from typing import Any

from app.services.entity.supplier_info_service import SupplierInfoService
from app.services.api.suppliers_api import SuppliersApi
from app.services.update.update_consumer_service import UpdateConsumerService
from app.stats import Stats

logger = logging.getLogger(__name__)


class UpdateAllConsumersService:
def __init__(
self,
update_consumer_service: UpdateConsumerService,
supplier_service: SuppliersApi,
supplier_info_service: SupplierInfoService,
stats: Stats,
) -> None:
self.__supplier_service = supplier_service
self.__update_consumer_service = update_consumer_service
self.__supplier_info_service = supplier_info_service
self.__stats = stats

def update_all(self) -> list[dict[str, Any]]:
with self.__stats.timer("update_from_all_suppliers"):
all_suppliers = self.__supplier_service.get_all()
data: list[dict[str, Any]] = []
for supplier in all_suppliers:
with self.__stats.timer(f"get_supplier_info_{supplier.id}"):
info = self.__supplier_info_service.get_supplier_info(supplier.id)
new_updated = datetime.now() - timedelta(seconds=60)
try:
with self.__stats.timer(f"update_from_supplier_{supplier.id}"):
data.append(
self.__update_consumer_service.update(
supplier.id, info.last_success_sync
)
)

info.last_success_sync = new_updated
info.failed_attempts = 0
info.last_success_sync = new_updated
self.__supplier_info_service.update_supplier_info(info)
except Exception as e:
logging.error(f"Failed to update supplier {supplier.id}: {e}")
info.failed_attempts += 1
info.failed_sync_count += 1
self.__supplier_info_service.update_supplier_info(info)

return data
Loading
Loading