Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions execution_engine/omop/db/celida/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,16 @@
from execution_engine.omop.db.omop.schema import SCHEMA_NAME as OMOP_SCHEMA_NAME
from execution_engine.util.interval import IntervalType

IntervalTypeEnum = Enum(IntervalType, name="interval_type", schema=SCHEMA_NAME)
CohortCategoryEnum = Enum(CohortCategory, name="cohort_category", schema=SCHEMA_NAME)
# Use the "public" schema so that tables in different schemas can
# these enums easily without introducing depedencies between the
# respective schemas. Note that replicate the enum definitions in each
# schema would not work when data must be exchanged between the
# schemas because enum definitions in separate schemas, even if
# identical in terms of enum values, are considered distinct and
# incompatible.
IntervalTypeEnum = Enum(IntervalType, name="interval_type", schema="public")
CohortCategoryEnum = Enum(CohortCategory, name="cohort_category", schema="public")



class Recommendation(Base): # noqa: D101
Expand Down
10 changes: 6 additions & 4 deletions execution_engine/task/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
and all(dep.name() in self.completed_tasks for dep in task.dependencies)
and task.name() not in self.enqueued_tasks
):
logging.info(f"Enqueuing task {task.name()}")
logging.debug(f"Enqueuing task {task.name()}")

try:
self.queue.put(task)
Expand Down Expand Up @@ -191,6 +191,7 @@
try:
while len(self.completed_tasks) < len(self.tasks):
self.enqueue_ready_tasks()
logging.info(f"{len(self.completed_tasks)}/{len(self.tasks)} tasks")

if self.queue.empty() and not any(
task.status == TaskStatus.RUNNING for task in self.tasks
Expand All @@ -209,7 +210,7 @@
with self.lock:
# Update the set of completed tasks
self.completed_tasks = set(self.shared_results.keys())
logging.info(
logging.debug(
f"Completed {len(self.completed_tasks)} of {len(self.tasks)} tasks"
)

Expand Down Expand Up @@ -286,7 +287,7 @@
self.stop_event.set()
continue

logging.info(f"Got task {task.name()}")
logging.debug(f"Got task {task.name()}")

Check warning on line 290 in execution_engine/task/runner.py

View check run for this annotation

Codecov / codecov/patch

execution_engine/task/runner.py#L290

Added line #L290 was not covered by tests

try:
self.run_task(task, bind_params)
Expand All @@ -296,7 +297,7 @@
f"Task {task.name()} failed with status {task.status}"
)

logging.info(f"Finished task {task.name()}")
logging.debug(f"Finished task {task.name()}")

Check warning on line 300 in execution_engine/task/runner.py

View check run for this annotation

Codecov / codecov/patch

execution_engine/task/runner.py#L300

Added line #L300 was not covered by tests
except TaskError as ex:
logging.error(ex)
self._error_queue.put(traceback.format_exc())
Expand All @@ -316,6 +317,7 @@
break

self.enqueue_ready_tasks()
logging.info(f"{len(self.completed_tasks)}/{len(self.tasks)} tasks")

if self.completed_tasks == self.enqueued_tasks and len(
self.completed_tasks
Expand Down
2 changes: 1 addition & 1 deletion execution_engine/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def run(
)

self.status = TaskStatus.RUNNING
logging.info(f"Running task '{self.name()}'")
logging.debug(f"Running task '{self.name()}'")

try:
if len(self.dependencies) == 0 or self.expr.is_Atom:
Expand Down