Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bad root ids #125

Merged
merged 20 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions materializationengine/blueprints/materialize/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from dynamicannotationdb.models import AnalysisVersion
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
from materializationengine.blueprints.materialize.schemas import BadRootsSchema
from middle_auth_client import auth_requires_admin, auth_requires_permission
from sqlalchemy import MetaData, Table
from sqlalchemy.engine.url import make_url
Expand All @@ -43,6 +44,7 @@
bulk_upload_parser.add_argument("schema", required=True, type=str)
bulk_upload_parser.add_argument("materialized_ts", type=float)


missing_chunk_parser = reqparse.RequestParser()
missing_chunk_parser.add_argument("chunks", required=True, type=list, location="json")
missing_chunk_parser.add_argument(
Expand Down Expand Up @@ -232,23 +234,72 @@ def post(self, datastack_name: str, table_name: str):
return 200


@mat_bp.route("/materialize/run/lookup_root_ids/datastack/<string:datastack_name>")
class LookupMissingRootIdsResource(Resource):
@mat_bp.route("/materialize/run/dense_lookup_root_ids/datastack/<string:datastack_name>")
class LookupDenseMissingRootIdsResource(Resource):
@reset_auth
@auth_requires_admin
@mat_bp.doc("process new annotations workflow", security="apikey")
@mat_bp.doc("Find all null root ids and lookup new roots", security="apikey")
def post(self, datastack_name: str):
"""Run workflow to lookup missing root ids and insert into database across
all tables in the database.

Args:
datastack_name (str): name of datastack from infoservice
"""
from materializationengine.workflows.ingest_new_annotations import (
process_dense_missing_roots_workflow,
)

datastack_info = get_datastack_info(datastack_name)
process_dense_missing_roots_workflow.s(datastack_info).apply_async()
return 200


@mat_bp.route("/materialize/run/sparse_lookup_root_ids/datastack/<string:datastack_name>/table/<string:table_name>")
class LookupSparseMissingRootIdsResource(Resource):
@reset_auth
@auth_requires_admin
@mat_bp.doc("Find null root ids in table and lookup new root ids", security="apikey")
def post(self, datastack_name: str, table_name: str):
"""Finds null root ids in a given table and lookups new root ids
using last updated time stamp.

Args:
datastack_name (str): name of datastack from infoservice
table_name (str): name of table
"""
from materializationengine.workflows.ingest_new_annotations import (
process_sparse_missing_roots_workflow,
)

datastack_info = get_datastack_info(datastack_name)
process_sparse_missing_roots_workflow.s(datastack_info, table_name).apply_async()
return 200


@mat_bp.route(
"/materialize/run/remove_bad_root_ids/datastack/<string:datastack_name>/table/<string:table_name>"
)
class SetBadRootsToNullResource(Resource):
@reset_auth
@auth_requires_admin
@accepts("BadRootsSchema", schema=BadRootsSchema, api=mat_bp)
@mat_bp.doc("set bad roots to None", security="apikey")
def post(self, datastack_name: str, table_name: str):
"""Run workflow to lookup missing root ids and insert into database

Args:
datastack_name (str): name of datastack from infoservice
"""
from materializationengine.workflows.ingest_new_annotations import (
process_missing_roots_workflow,
fix_root_id_workflow,
)

data = request.parsed_obj
bad_roots_ids = data["bad_roots"]

datastack_info = get_datastack_info(datastack_name)
process_missing_roots_workflow.s(datastack_info).apply_async()
fix_root_id_workflow.s(datastack_info, table_name, bad_roots_ids).apply_async()
return 200


Expand Down Expand Up @@ -317,7 +368,7 @@ class UpdateExpiredRootIdsResource(Resource):
@reset_auth
@auth_requires_admin
@mat_bp.expect(get_roots_parser)
@mat_bp.doc("Lookup root ids", security="apikey")
@mat_bp.doc("Update expired root ids", security="apikey")
def post(self, datastack_name: str):
"""Lookup root ids

Expand Down
4 changes: 4 additions & 0 deletions materializationengine/blueprints/materialize/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ class VirtualVersionSchema(Schema):
target_version = fields.Integer()
tables_to_include = fields.List(fields.Str(), example=None)
virtual_version_name = fields.Str()


class BadRootsSchema(Schema):
bad_roots = fields.List(fields.Int(), example=None)
2 changes: 2 additions & 0 deletions materializationengine/workflows/complete_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from materializationengine.workflows.ingest_new_annotations import (
ingest_new_annotations_workflow,
find_missing_root_ids_workflow
)
from materializationengine.task import LockedTask
from materializationengine.workflows.update_root_ids import (
Expand Down Expand Up @@ -78,6 +79,7 @@ def run_complete_workflow(
if mat_metadata.get("segmentation_table_name"):
workflow = chain(
ingest_new_annotations_workflow(mat_metadata),
# find_missing_root_ids_workflow(mat_metadata), # skip for now
update_root_ids_workflow(mat_metadata),
)
else:
Expand Down
Loading
Loading