Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto unlock locked tasks after 3 days #1984

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/backend/app/central/central_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import csv
import json
from asyncio import gather
from datetime import datetime
from io import BytesIO, StringIO
from typing import Optional, Union

Expand Down Expand Up @@ -672,6 +673,7 @@ async def get_entities_data(
odk_id: int,
dataset_name: str = "features",
fields: str = "__system/updatedAt, osm_id, status, task_id",
filter_date: Optional[datetime] = None,
) -> list:
"""Get all the entity mapping statuses.

Expand All @@ -683,17 +685,26 @@ async def get_entities_data(
dataset_name (str): The dataset / Entity list name in ODK Central.
fields (str): Extra fields to include in $select filter.
__id is included by default.
filter_date (datetime): Filter entities last updated after this date.

Returns:
list: JSON list containing Entity info. If updated_at is included,
the format is string 2022-01-31T23:59:59.999Z.
"""
try:
url_params = f"$select=__id{',' if fields else ''} {fields}"

filters = []
if filter_date:
filters.append(f"__system/updatedAt gt {filter_date}")
if filters:
url_params += f"&$filter={' and '.join(filters)}"

async with central_deps.get_odk_dataset(odk_creds) as odk_central:
entities = await odk_central.getEntityData(
odk_id,
dataset_name,
url_params=f"$select=__id{',' if fields else ''} {fields}",
url_params=url_params,
)
except Exception as e:
log.exception(f"Error: {e}", stack_info=True)
Expand Down
125 changes: 123 additions & 2 deletions src/backend/app/tasks/task_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#
"""Routes for FMTM tasks."""

from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException
Expand All @@ -25,9 +26,10 @@

from app.auth.auth_schemas import ProjectUserDict
from app.auth.roles import mapper
from app.central.central_crud import get_entities_data, update_entity_mapping_status
from app.db.database import db_conn
from app.db.enums import HTTPStatus
from app.db.models import DbTask, DbTaskEvent
from app.db.enums import EntityState, HTTPStatus
from app.db.models import DbOdkEntities, DbProject, DbTask, DbTaskEvent
from app.tasks import task_crud, task_schemas

router = APIRouter(
Expand Down Expand Up @@ -115,3 +117,122 @@ async def get_task_event_history(
):
"""Get the detailed history for a task."""
return await DbTaskEvent.all(db, task_id=task_id, days=days, comments=comments)


@router.post("/unlock-tasks")
async def trigger_unlock_tasks(db: Annotated[Connection, Depends(db_conn)]):
"""Endpoint to trigger unlock_old_locked_tasks manually."""
active_projects_query = """
SELECT DISTINCT project_id
FROM task_events
WHERE created_at >= NOW() - INTERVAL '3 days'
"""
async with db.cursor() as cur:
await cur.execute(active_projects_query)
active_projects = await cur.fetchall()

time_now = datetime.now(timezone.utc)
threedaysago = (time_now - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
onehourago = (time_now - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

for (project_id,) in active_projects:
project = await DbProject.one(db, project_id, True)

recent_project_entities = await get_entities_data(
project.odk_credentials, project.odkid, filter_date=threedaysago
)
# If there are recent entity updates, skip this project
if recent_project_entities:
continue

await reset_entities_status(db, project.id, onehourago)

# Only unlock tasks if there are no recent entity updates
await unlock_old_locked_tasks(db, project.id)
return {"message": "Old locked tasks unlocked successfully."}


async def reset_entities_status(db, project_id, filter_date):
"""Reset status for entities that have been 'open in ODK' for more than 1hr."""
project = await DbProject.one(db, project_id, True)
recent_opened_entities = await get_entities_data(
project.odk_credentials,
project.odkid,
filter_date=filter_date,
)
for entity in recent_opened_entities:
if entity["status"] != str(EntityState.OPENED_IN_ODK):
continue
await update_entity_mapping_status(
project.odk_credentials,
project.odkid,
entity["id"],
f"Task {entity['task_id']} Feature {entity['osm_id']}",
str(EntityState.READY),
)

# Sync ODK entities in our database
project_entities = await get_entities_data(project.odk_credentials, project.odkid)
await DbOdkEntities.upsert(db, project.id, project_entities)


async def unlock_old_locked_tasks(db, project_id):
"""Unlock tasks locked for more than 3 days."""
disable_trigger_query = """
ALTER TABLE task_events DISABLE TRIGGER task_event_state_trigger;
"""
unlock_query = """
WITH svc_user AS (
SELECT id AS svc_user_id, username AS svc_username
FROM users
WHERE username = 'svcfmtm'
),
recent_events AS (
SELECT DISTINCT ON (t.id, t.project_id)
t.id AS task_id,
t.project_id,
the.created_at AS last_event_time,
the.event AS last_event
FROM tasks t
JOIN task_events the
ON t.id = the.task_id
AND t.project_id = the.project_id
AND the.comment IS NULL
WHERE t.project_id = %(project_id)s
ORDER BY t.id, t.project_id, the.created_at DESC
),
filtered_events AS (
SELECT *
FROM recent_events
WHERE last_event IN ('MAP', 'ASSIGN')
AND last_event_time < NOW() - INTERVAL '3 days'
)
INSERT INTO task_events (
event_id,
task_id,
project_id,
event,
user_id,
state,
created_at,
username
)
SELECT
gen_random_uuid(),
fe.task_id,
fe.project_id,
'MAP'::taskevent,
svc.svc_user_id,
'UNLOCKED_TO_MAP'::mappingstate,
NOW(),
svc.svc_username
FROM filtered_events fe
CROSS JOIN svc_user svc;
"""
enable_trigger_query = """
ALTER TABLE task_events ENABLE TRIGGER task_event_state_trigger;
"""
async with db.cursor() as cur:
await cur.execute(disable_trigger_query)
await cur.execute(unlock_query, {"project_id": project_id})
await cur.execute(enable_trigger_query)
Loading