From 5850be5d3242eaa0447509e65e85d2c446889013 Mon Sep 17 00:00:00 2001 From: Anuj-Gupta4 Date: Thu, 19 Dec 2024 11:47:01 +0545 Subject: [PATCH] feat(tasks): add endpoint to unlock tasks locked for over 3 days and reset entities after an hour --- src/backend/app/central/central_crud.py | 13 ++- src/backend/app/tasks/task_routes.py | 125 +++++++++++++++++++++++- 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/src/backend/app/central/central_crud.py b/src/backend/app/central/central_crud.py index 7cbdf64d87..c95213c728 100644 --- a/src/backend/app/central/central_crud.py +++ b/src/backend/app/central/central_crud.py @@ -20,6 +20,7 @@ import csv import json from asyncio import gather +from datetime import datetime from io import BytesIO, StringIO from typing import Optional, Union @@ -672,6 +673,7 @@ async def get_entities_data( odk_id: int, dataset_name: str = "features", fields: str = "__system/updatedAt, osm_id, status, task_id", + filter_date: Optional[datetime] = None, ) -> list: """Get all the entity mapping statuses. @@ -683,17 +685,26 @@ async def get_entities_data( dataset_name (str): The dataset / Entity list name in ODK Central. fields (str): Extra fields to include in $select filter. __id is included by default. + filter_date (datetime): Filter entities last updated after this date. Returns: list: JSON list containing Entity info. If updated_at is included, the format is string 2022-01-31T23:59:59.999Z. """ try: + url_params = f"$select=__id{',' if fields else ''} {fields}" + + filters = [] + if filter_date: + filters.append(f"__system/updatedAt gt {filter_date}") + if filters: + url_params += f"&$filter={' and '.join(filters)}" + async with central_deps.get_odk_dataset(odk_creds) as odk_central: entities = await odk_central.getEntityData( odk_id, dataset_name, - url_params=f"$select=__id{',' if fields else ''} {fields}", + url_params=url_params, ) except Exception as e: log.exception(f"Error: {e}", stack_info=True) diff --git a/src/backend/app/tasks/task_routes.py b/src/backend/app/tasks/task_routes.py index 3529b838e0..3f86d2f800 100644 --- a/src/backend/app/tasks/task_routes.py +++ b/src/backend/app/tasks/task_routes.py @@ -17,6 +17,7 @@ # """Routes for FMTM tasks.""" +from datetime import datetime, timedelta, timezone from typing import Annotated from fastapi import APIRouter, Depends, HTTPException @@ -25,9 +26,10 @@ from app.auth.auth_schemas import ProjectUserDict from app.auth.roles import mapper +from app.central.central_crud import get_entities_data, update_entity_mapping_status from app.db.database import db_conn -from app.db.enums import HTTPStatus -from app.db.models import DbTask, DbTaskEvent +from app.db.enums import EntityState, HTTPStatus +from app.db.models import DbOdkEntities, DbProject, DbTask, DbTaskEvent from app.tasks import task_crud, task_schemas router = APIRouter( @@ -115,3 +117,122 @@ async def get_task_event_history( ): """Get the detailed history for a task.""" return await DbTaskEvent.all(db, task_id=task_id, days=days, comments=comments) + + +@router.post("/unlock-tasks") +async def trigger_unlock_tasks(db: Annotated[Connection, Depends(db_conn)]): + """Endpoint to trigger unlock_old_locked_tasks manually.""" + active_projects_query = """ + SELECT DISTINCT project_id + FROM task_events + WHERE created_at >= NOW() - INTERVAL '3 days' + """ + async with db.cursor() as cur: + await cur.execute(active_projects_query) + active_projects = await cur.fetchall() + + time_now = datetime.now(timezone.utc) + threedaysago = (time_now - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + onehourago = (time_now - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + for (project_id,) in active_projects: + project = await DbProject.one(db, project_id, True) + + recent_project_entities = await get_entities_data( + project.odk_credentials, project.odkid, filter_date=threedaysago + ) + # If there are recent entity updates, skip this project + if recent_project_entities: + continue + + await reset_entities_status(db, project.id, onehourago) + + # Only unlock tasks if there are no recent entity updates + await unlock_old_locked_tasks(db, project.id) + return {"message": "Old locked tasks unlocked successfully."} + + +async def reset_entities_status(db, project_id, filter_date): + """Reset status for entities that have been 'open in ODK' for more than 1hr.""" + project = await DbProject.one(db, project_id, True) + recent_opened_entities = await get_entities_data( + project.odk_credentials, + project.odkid, + filter_date=filter_date, + ) + for entity in recent_opened_entities: + if entity["status"] != str(EntityState.OPENED_IN_ODK): + continue + await update_entity_mapping_status( + project.odk_credentials, + project.odkid, + entity["id"], + f"Task {entity['task_id']} Feature {entity['osm_id']}", + str(EntityState.READY), + ) + + # Sync ODK entities in our database + project_entities = await get_entities_data(project.odk_credentials, project.odkid) + await DbOdkEntities.upsert(db, project.id, project_entities) + + +async def unlock_old_locked_tasks(db, project_id): + """Unlock tasks locked for more than 3 days.""" + disable_trigger_query = """ + ALTER TABLE task_events DISABLE TRIGGER task_event_state_trigger; + """ + unlock_query = """ + WITH svc_user AS ( + SELECT id AS svc_user_id, username AS svc_username + FROM users + WHERE username = 'svcfmtm' + ), + recent_events AS ( + SELECT DISTINCT ON (t.id, t.project_id) + t.id AS task_id, + t.project_id, + the.created_at AS last_event_time, + the.event AS last_event + FROM tasks t + JOIN task_events the + ON t.id = the.task_id + AND t.project_id = the.project_id + AND the.comment IS NULL + WHERE t.project_id = %(project_id)s + ORDER BY t.id, t.project_id, the.created_at DESC + ), + filtered_events AS ( + SELECT * + FROM recent_events + WHERE last_event IN ('MAP', 'ASSIGN') + AND last_event_time < NOW() - INTERVAL '3 days' + ) + INSERT INTO task_events ( + event_id, + task_id, + project_id, + event, + user_id, + state, + created_at, + username + ) + SELECT + gen_random_uuid(), + fe.task_id, + fe.project_id, + 'MAP'::taskevent, + svc.svc_user_id, + 'UNLOCKED_TO_MAP'::mappingstate, + NOW(), + svc.svc_username + FROM filtered_events fe + CROSS JOIN svc_user svc; + """ + enable_trigger_query = """ + ALTER TABLE task_events ENABLE TRIGGER task_event_state_trigger; + """ + async with db.cursor() as cur: + await cur.execute(disable_trigger_query) + await cur.execute(unlock_query, {"project_id": project_id}) + await cur.execute(enable_trigger_query)