Skip to content

Commit

Permalink
feat(tasks): add endpoint to unlock tasks locked for over 3 days and …
Browse files Browse the repository at this point in the history
…reset entities after an hour
  • Loading branch information
Anuj-Gupta4 committed Dec 19, 2024
1 parent 1d0a0a2 commit 5850be5
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/backend/app/central/central_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import csv
import json
from asyncio import gather
from datetime import datetime
from io import BytesIO, StringIO
from typing import Optional, Union

Expand Down Expand Up @@ -672,6 +673,7 @@ async def get_entities_data(
odk_id: int,
dataset_name: str = "features",
fields: str = "__system/updatedAt, osm_id, status, task_id",
filter_date: Optional[datetime] = None,
) -> list:
"""Get all the entity mapping statuses.
Expand All @@ -683,17 +685,26 @@ async def get_entities_data(
dataset_name (str): The dataset / Entity list name in ODK Central.
fields (str): Extra fields to include in $select filter.
__id is included by default.
filter_date (datetime): Filter entities last updated after this date.
Returns:
list: JSON list containing Entity info. If updated_at is included,
the format is string 2022-01-31T23:59:59.999Z.
"""
try:
url_params = f"$select=__id{',' if fields else ''} {fields}"

filters = []
if filter_date:
filters.append(f"__system/updatedAt gt {filter_date}")
if filters:
url_params += f"&$filter={' and '.join(filters)}"

async with central_deps.get_odk_dataset(odk_creds) as odk_central:
entities = await odk_central.getEntityData(
odk_id,
dataset_name,
url_params=f"$select=__id{',' if fields else ''} {fields}",
url_params=url_params,
)
except Exception as e:
log.exception(f"Error: {e}", stack_info=True)
Expand Down
125 changes: 123 additions & 2 deletions src/backend/app/tasks/task_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#
"""Routes for FMTM tasks."""

from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException
Expand All @@ -25,9 +26,10 @@

from app.auth.auth_schemas import ProjectUserDict
from app.auth.roles import mapper
from app.central.central_crud import get_entities_data, update_entity_mapping_status
from app.db.database import db_conn
from app.db.enums import HTTPStatus
from app.db.models import DbTask, DbTaskEvent
from app.db.enums import EntityState, HTTPStatus
from app.db.models import DbOdkEntities, DbProject, DbTask, DbTaskEvent
from app.tasks import task_crud, task_schemas

router = APIRouter(
Expand Down Expand Up @@ -115,3 +117,122 @@ async def get_task_event_history(
):
"""Get the detailed history for a task."""
return await DbTaskEvent.all(db, task_id=task_id, days=days, comments=comments)


@router.post("/unlock-tasks")
async def trigger_unlock_tasks(db: Annotated[Connection, Depends(db_conn)]):
"""Endpoint to trigger unlock_old_locked_tasks manually."""
active_projects_query = """
SELECT DISTINCT project_id
FROM task_events
WHERE created_at >= NOW() - INTERVAL '3 days'
"""
async with db.cursor() as cur:
await cur.execute(active_projects_query)
active_projects = await cur.fetchall()

time_now = datetime.now(timezone.utc)
threedaysago = (time_now - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
onehourago = (time_now - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

for (project_id,) in active_projects:
project = await DbProject.one(db, project_id, True)

recent_project_entities = await get_entities_data(
project.odk_credentials, project.odkid, filter_date=threedaysago
)
# If there are recent entity updates, skip this project
if recent_project_entities:
continue

await reset_entities_status(db, project.id, onehourago)

# Only unlock tasks if there are no recent entity updates
await unlock_old_locked_tasks(db, project.id)
return {"message": "Old locked tasks unlocked successfully."}


async def reset_entities_status(db, project_id, filter_date):
"""Reset status for entities that have been 'open in ODK' for more than 1hr."""
project = await DbProject.one(db, project_id, True)
recent_opened_entities = await get_entities_data(
project.odk_credentials,
project.odkid,
filter_date=filter_date,
)
for entity in recent_opened_entities:
if entity["status"] != str(EntityState.OPENED_IN_ODK):
continue
await update_entity_mapping_status(
project.odk_credentials,
project.odkid,
entity["id"],
f"Task {entity['task_id']} Feature {entity['osm_id']}",
str(EntityState.READY),
)

# Sync ODK entities in our database
project_entities = await get_entities_data(project.odk_credentials, project.odkid)
await DbOdkEntities.upsert(db, project.id, project_entities)


async def unlock_old_locked_tasks(db, project_id):
"""Unlock tasks locked for more than 3 days."""
disable_trigger_query = """
ALTER TABLE task_events DISABLE TRIGGER task_event_state_trigger;
"""
unlock_query = """
WITH svc_user AS (
SELECT id AS svc_user_id, username AS svc_username
FROM users
WHERE username = 'svcfmtm'
),
recent_events AS (
SELECT DISTINCT ON (t.id, t.project_id)
t.id AS task_id,
t.project_id,
the.created_at AS last_event_time,
the.event AS last_event
FROM tasks t
JOIN task_events the
ON t.id = the.task_id
AND t.project_id = the.project_id
AND the.comment IS NULL
WHERE t.project_id = %(project_id)s
ORDER BY t.id, t.project_id, the.created_at DESC
),
filtered_events AS (
SELECT *
FROM recent_events
WHERE last_event IN ('MAP', 'ASSIGN')
AND last_event_time < NOW() - INTERVAL '3 days'
)
INSERT INTO task_events (
event_id,
task_id,
project_id,
event,
user_id,
state,
created_at,
username
)
SELECT
gen_random_uuid(),
fe.task_id,
fe.project_id,
'MAP'::taskevent,
svc.svc_user_id,
'UNLOCKED_TO_MAP'::mappingstate,
NOW(),
svc.svc_username
FROM filtered_events fe
CROSS JOIN svc_user svc;
"""
enable_trigger_query = """
ALTER TABLE task_events ENABLE TRIGGER task_event_state_trigger;
"""
async with db.cursor() as cur:
await cur.execute(disable_trigger_query)
await cur.execute(unlock_query, {"project_id": project_id})
await cur.execute(enable_trigger_query)

0 comments on commit 5850be5

Please sign in to comment.