-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathupdate_database_workflow.py
107 lines (93 loc) · 3.62 KB
/
update_database_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import datetime
import json
import os
from celery import chain
from celery.utils.log import get_task_logger
from materializationengine.blueprints.materialize.api import get_datastack_info
from materializationengine.celery_init import celery
from materializationengine.shared_tasks import (
get_materialization_info,
monitor_workflow_state,
workflow_complete,
fin,
)
from materializationengine.task import LockedTask
from materializationengine.utils import get_config_param
from materializationengine.workflows.ingest_new_annotations import (
ingest_new_annotations_workflow,
find_missing_root_ids_workflow,
)
from materializationengine.workflows.update_root_ids import (
update_root_ids_workflow,
)
celery_logger = get_task_logger(__name__)
@celery.task(name="workflow:run_periodic_database_update")
def run_periodic_database_update(datastack: str = None) -> None:
"""
Run update database workflow. Steps are as follows:
1. Find missing segmentation data in a given datastack and lookup.
2. Update expired root ids
"""
if datastack:
datastacks = [datastack]
else:
try:
datastacks = json.loads(os.environ["DATASTACKS"])
except Exception:
datastacks = get_config_param("DATASTACKS")
for datastack in datastacks:
try:
celery_logger.info(f"Start periodic database update job for {datastack}")
datastack_info = get_datastack_info(datastack)
task = update_database_workflow.s(datastack_info)
task.apply_async(kwargs={"Datastack": datastack})
except Exception as e:
celery_logger.error(e)
raise e
return True
@celery.task(
bind=True,
base=LockedTask,
timeout=int(60 * 60 * 24), # Task locked for 1 day
name="workflow:update_database_workflow",
)
def update_database_workflow(self, datastack_info: dict, **kwargs):
"""Updates 'live' database:
- Find all annotations with missing segmentation rows
and lookup supervoxel_id and root_id
- Lookup all expired root_ids and update them
Args:
datastack_info (dict): [description]
days_to_expire (int, optional): [description]. Defaults to 5.
"""
materialization_time_stamp = datetime.datetime.utcnow()
mat_info = get_materialization_info(
datastack_info=datastack_info,
analysis_version=None,
materialization_time_stamp=materialization_time_stamp,
)
celery_logger.info(mat_info)
update_live_database_workflow = []
celery_logger.debug(mat_info)
# lookup missing segmentation data for new annotations and update expired root_ids
# skip tables that are larger than 1,000,000 rows due to performance.
try:
for mat_metadata in mat_info:
if mat_metadata.get("segmentation_table_name"):
workflow = chain(
ingest_new_annotations_workflow(mat_metadata),
# find_missing_root_ids_workflow(mat_metadata), # skip for now
update_root_ids_workflow(mat_metadata),
)
update_live_database_workflow.append(workflow)
else:
update_live_database_workflow.append(fin.si())
run_update_database_workflow = chain(
*update_live_database_workflow, workflow_complete.si("update_root_ids")
).apply_async(kwargs={"Datastack": datastack_info["datastack"]})
except Exception as e:
celery_logger.error(f"An error has occurred: {e}")
raise e
tasks_completed = monitor_workflow_state(run_update_database_workflow)
if tasks_completed:
return True