diff --git a/execution_engine/omop/db/celida/tables.py b/execution_engine/omop/db/celida/tables.py index 092bfc85..0db17e26 100644 --- a/execution_engine/omop/db/celida/tables.py +++ b/execution_engine/omop/db/celida/tables.py @@ -26,8 +26,16 @@ from execution_engine.omop.db.omop.schema import SCHEMA_NAME as OMOP_SCHEMA_NAME from execution_engine.util.interval import IntervalType -IntervalTypeEnum = Enum(IntervalType, name="interval_type", schema=SCHEMA_NAME) -CohortCategoryEnum = Enum(CohortCategory, name="cohort_category", schema=SCHEMA_NAME) +# Use the "public" schema so that tables in different schemas can +# these enums easily without introducing depedencies between the +# respective schemas. Note that replicate the enum definitions in each +# schema would not work when data must be exchanged between the +# schemas because enum definitions in separate schemas, even if +# identical in terms of enum values, are considered distinct and +# incompatible. +IntervalTypeEnum = Enum(IntervalType, name="interval_type", schema="public") +CohortCategoryEnum = Enum(CohortCategory, name="cohort_category", schema="public") + class Recommendation(Base): # noqa: D101 diff --git a/execution_engine/task/runner.py b/execution_engine/task/runner.py index 15ec7609..9379fb9a 100644 --- a/execution_engine/task/runner.py +++ b/execution_engine/task/runner.py @@ -80,7 +80,7 @@ def enqueue_ready_tasks(self) -> int: and all(dep.name() in self.completed_tasks for dep in task.dependencies) and task.name() not in self.enqueued_tasks ): - logging.info(f"Enqueuing task {task.name()}") + logging.debug(f"Enqueuing task {task.name()}") try: self.queue.put(task) @@ -191,6 +191,7 @@ def run(self, bind_params: dict) -> None: try: while len(self.completed_tasks) < len(self.tasks): self.enqueue_ready_tasks() + logging.info(f"{len(self.completed_tasks)}/{len(self.tasks)} tasks") if self.queue.empty() and not any( task.status == TaskStatus.RUNNING for task in self.tasks @@ -209,7 +210,7 @@ def run(self, bind_params: dict) -> None: with self.lock: # Update the set of completed tasks self.completed_tasks = set(self.shared_results.keys()) - logging.info( + logging.debug( f"Completed {len(self.completed_tasks)} of {len(self.tasks)} tasks" ) @@ -286,7 +287,7 @@ def task_executor_worker() -> None: self.stop_event.set() continue - logging.info(f"Got task {task.name()}") + logging.debug(f"Got task {task.name()}") try: self.run_task(task, bind_params) @@ -296,7 +297,7 @@ def task_executor_worker() -> None: f"Task {task.name()} failed with status {task.status}" ) - logging.info(f"Finished task {task.name()}") + logging.debug(f"Finished task {task.name()}") except TaskError as ex: logging.error(ex) self._error_queue.put(traceback.format_exc()) @@ -316,6 +317,7 @@ def task_executor_worker() -> None: break self.enqueue_ready_tasks() + logging.info(f"{len(self.completed_tasks)}/{len(self.tasks)} tasks") if self.completed_tasks == self.enqueued_tasks and len( self.completed_tasks diff --git a/execution_engine/task/task.py b/execution_engine/task/task.py index f40bba9a..3e1f2c20 100644 --- a/execution_engine/task/task.py +++ b/execution_engine/task/task.py @@ -119,7 +119,7 @@ def run( ) self.status = TaskStatus.RUNNING - logging.info(f"Running task '{self.name()}'") + logging.debug(f"Running task '{self.name()}'") try: if len(self.dependencies) == 0 or self.expr.is_Atom: