Skip to content

Commit

Permalink
feat: use find missing root ids in periodic workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
dlbrittain committed Sep 1, 2023
1 parent bba7d8e commit 2d857e2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 2 additions & 0 deletions materializationengine/workflows/complete_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from materializationengine.workflows.ingest_new_annotations import (
ingest_new_annotations_workflow,
find_missing_root_ids_workflow
)
from materializationengine.task import LockedTask
from materializationengine.workflows.update_root_ids import (
Expand Down Expand Up @@ -78,6 +79,7 @@ def run_complete_workflow(
if mat_metadata.get("segmentation_table_name"):
workflow = chain(
ingest_new_annotations_workflow(mat_metadata),
find_missing_root_ids_workflow(mat_metadata),
update_root_ids_workflow(mat_metadata),
)
else:
Expand Down
6 changes: 4 additions & 2 deletions materializationengine/workflows/update_database_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
get_materialization_info,
monitor_workflow_state,
workflow_complete,
fin
fin,
)
from materializationengine.task import LockedTask
from materializationengine.utils import get_config_param
from materializationengine.workflows.ingest_new_annotations import (
ingest_new_annotations_workflow,
find_missing_root_ids_workflow,
)
from materializationengine.workflows.update_root_ids import (
update_root_ids_workflow,
Expand Down Expand Up @@ -84,13 +85,14 @@ def update_database_workflow(self, datastack_info: dict, **kwargs):
if mat_metadata.get("segmentation_table_name"):
workflow = chain(
ingest_new_annotations_workflow(mat_metadata),
find_missing_root_ids_workflow(mat_metadata),
update_root_ids_workflow(mat_metadata),
)

update_live_database_workflow.append(workflow)
else:
update_live_database_workflow.append(fin.si())

run_update_database_workflow = chain(
*update_live_database_workflow, workflow_complete.si("update_root_ids")
).apply_async(kwargs={"Datastack": datastack_info["datastack"]})
Expand Down

0 comments on commit 2d857e2

Please sign in to comment.