Skip to content

Commit

Permalink
[SDESK-7445] - Planning: Migrate planning:purge_expired_locks command…
Browse files Browse the repository at this point in the history
… to async (#2142)

* Implement base async `planning.events` resource and service

SDESK-7441

* Fix linter issues

SDESK-7441

* Implement base async `planning` resource and service

SDESK-7441

* Update command to use new format

* Basic async models and services for `published` & `assignments`

SDESK-7441

* Copied get_expired_items to new Events async service

* Copied get_expired_items to new Planning async service

* Updated tests

* Added utils file

* Fix pytests and ~80% of behave tests

Fix behave tests partially

SDESK-7441

Allow behave tests to run async code

SDESK-7441

Fix pytests and use python 3.10 only

Disable some actions and add verbose mode 999

Remove python 3.8

Point sd core to fix branch

Revert "Fix linter issues"

This reverts commit 152cfb5.

Revert changes to ci-install

SDESK-7441

Fix first batch of tests

Reapply "Fix linter issues"

This reverts commit e5ac69a.

Fix second batch of tests

SDESK-7441

Fix tests batch 3

Fix tests batch 4

SDESK-7441

Fix superdesk-core dependency

Fix linter issues

SDESK-7441

* Update requirements to async branch

SDESK-7441

* Adjusted fields and indexes in assignments

SDESK-7441

* Fix for types according to PR review

SDESK-7441

* Suggested fixes

* Refactored celery call to run command

* Removed double import

* Add events module to test config

* Code refactor

* Update types based on review feedback

SDESK-7441

* Proper names and type

SDESK-7441

* Make some fields optional

SDESK-7441

* Changed purge_expired_locks to new command style and async

* Updated tests

* Await system update

* Instantiate the needed service in functions

* Fix index serializing issue

SDESK-7441

* Remove not needed import

SDESK-7441

* Fix typo

SDESK-7441

* Fix failing tests

* Fix failing tests

* Removed push_notification

* Add date_to_str in the filter

* Removed extra item on system_update

* Remove print statement and uncomment other tests

* Uncommented assignments

* Made planning_item optional in model

* Added planning items to assignment test items

* Fixes from PR suggestions

---------

Co-authored-by: Helmy Giacoman <[email protected]>
  • Loading branch information
BrianMwangi21 and eos87 authored Dec 5, 2024
1 parent 421a61b commit 9a1fe66
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 199 deletions.
3 changes: 2 additions & 1 deletion server/planning/assignments/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from planning.core.service import BasePlanningAsyncService
from planning.types import AssignmentResourceModel


class AssignmentsAsyncService(BasePlanningAsyncService):
class AssignmentsAsyncService(BasePlanningAsyncService[AssignmentResourceModel]):
pass
2 changes: 1 addition & 1 deletion server/planning/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .delete_marked_assignments import DeleteMarkedAssignments # noqa
from .export_to_newsroom import ExportToNewsroom # noqa
from .export_scheduled_filters import ExportScheduledFilters # noqa
from .purge_expired_locks import PurgeExpiredLocks # noqa
from .purge_expired_locks import purge_expired_locks_handler # noqa
from .replace_deprecated_event_item_attribute import ReplaceDeprecatedEventItemAttributeCommand # noqa
from .async_cli import planning_cli, commands_blueprint # noqa

Expand Down
221 changes: 119 additions & 102 deletions server/planning/commands/purge_expired_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,40 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

import click
import logging
from datetime import timedelta
from eve.utils import date_to_str
from datetime import timedelta, datetime
from typing import AsyncGenerator, Any

from superdesk import Command, command, get_resource_service, Option
from superdesk import get_resource_service
from superdesk.core import get_app_config
from superdesk.core.utils import date_to_str
from superdesk.utc import utcnow
from superdesk.lock import lock, unlock
from superdesk.celery_task_utils import get_lock_id
from planning.item_lock import LOCK_ACTION, LOCK_SESSION, LOCK_TIME, LOCK_USER
from planning.utils import try_cast_object_id
from planning.utils import get_service, try_cast_object_id
from .async_cli import planning_cli

logger = logging.getLogger(__name__)


class PurgeExpiredLocks(Command):
@planning_cli.command("planning:purge_expired_locks")
@click.option(
"--resource",
"-r",
required=True,
help="The name of the resource to purge item locks for (e.g., events, planning, assignments, all)",
)
@click.option(
"--expire-hours",
"-e",
required=False,
type=int,
default=24,
help="Purges locks that are older than this many hours (default: 24 hours)",
)
async def purge_expired_locks_command(resource: str, expire_hours: int = 24):
"""
Purge item locks that are linked to a non-existing session
Expand All @@ -39,109 +57,108 @@ class PurgeExpiredLocks(Command):
$ python manage.py planning:purge_expired_locks -r all
$ python manage.py planning:purge_expired_locks -r all -e 48
"""
return await purge_expired_locks_handler(resource, expire_hours)

option_list = [
Option("--resource", "-r", required=True),
Option("--expire-hours", "-e", dest="expire_hours", required=False, type=int, default=24),
]

def run(self, resource: str, expire_hours: int = 24) -> None:
logger.info("Starting to purge expired item locks")
async def purge_expired_locks_handler(resource: str, expire_hours: int = 24):
logger.info("Starting to purge expired item locks")

if resource == "all":
resources = ["events", "planning", "assignments"]
elif resource not in ["events", "planning", "assignments"]:
raise ValueError(f"Invalid resource: {resource}")
else:
resources = [resource]
if resource == "all":
resources = ["events", "planning", "assignments"]
elif resource not in ["events", "planning", "assignments"]:
raise ValueError(f"Invalid resource: {resource}")
else:
resources = [resource]

lock_name = get_lock_id("purge_expired_locks", resource)
if not lock(lock_name, expire=600):
logger.info("purge expired locks task is already running")
return

expiry_datetime = utcnow() - timedelta(hours=expire_hours)
for resource_name in resources:
try:
await purge_item_locks(resource_name, expiry_datetime)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")

unlock(lock_name)
logger.info("Completed purging expired item locks")

lock_name = get_lock_id("purge_expired_locks", resource)
if not lock(lock_name, expire=600):
logger.info("purge expired locks task is already running")
return

expiry_datetime = date_to_str(utcnow() - timedelta(hours=expire_hours))
for resource_name in resources:
async def purge_item_locks(resource: str, expiry_datetime: datetime):
logger.info(f"Purging expired locks for {resource}")
resource_service = get_service(resource)
try:
autosave_service = get_resource_service("event_autosave" if resource == "events" else f"{resource}_autosave")
except KeyError:
autosave_service = None

async for items in get_locked_items(resource, expiry_datetime):
failed_ids = []
for item in items:
try:
item_id = try_cast_object_id(item["_id"])
except KeyError:
logger.exception("Item ID not found, unable to purge its lock")
continue

try:
self._purge_item_locks(resource_name, expiry_datetime)
# Remove all lock information from this item
await resource_service.system_update(
item_id,
{
LOCK_USER: None,
LOCK_ACTION: None,
LOCK_SESSION: None,
LOCK_TIME: None,
},
)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")
logger.exception(f"Failed to purge item lock ({err})")
failed_ids.append(item_id)
continue

unlock(lock_name)
logger.info("Completed purging expired item locks")
if autosave_service is None:
continue

def _purge_item_locks(self, resource: str, expiry_datetime: str):
logger.info(f"Purging expired locks for {resource}")
resource_service = get_resource_service(resource)
try:
autosave_service = get_resource_service(
"event_autosave" if resource == "events" else f"{resource}_autosave"
)
except KeyError:
autosave_service = None

for items in self.get_locked_items(resource, expiry_datetime):
failed_ids = []
for item in items:
try:
item_id = try_cast_object_id(item["_id"])
except KeyError:
logger.exception("Item ID not found, unable to purge its lock")
continue

try:
# Remove all lock information from this item
resource_service.system_update(
item_id,
{
LOCK_USER: None,
LOCK_ACTION: None,
LOCK_SESSION: None,
LOCK_TIME: None,
},
item,
push_notification=False,
)
except Exception as err:
logger.exception(f"Failed to purge item lock ({err})")
failed_ids.append(item_id)
continue

if autosave_service is None:
continue

try:
# Delete any autosave items associated with this item
autosave_service.delete_action(lookup={"_id": item_id})
except Exception as err:
logger.exception(f"Failed to delete autosave item(s) ({err})")

num_items = len(items)
num_success = num_items - len(failed_ids)
if num_success != num_items:
logger.warning(f"{num_success}/{num_items} {resource} locks purged. Failed IDs: {failed_ids}")
else:
logger.info(f"{num_items} {resource} locks purged")

def get_locked_items(self, resource: str, expiry_datetime: str):
service = get_resource_service(resource)
total_received = 0
query = {
"query": {"bool": {"filter": [{"range": {LOCK_TIME: {"lt": expiry_datetime}}}]}},
"size": get_app_config("MAX_EXPIRY_QUERY_LIMIT"),
"sort": [{LOCK_TIME: "asc"}],
}

for i in range(get_app_config("MAX_EXPIRY_LOOPS")):
query["from"] = total_received
results = list(service.search(query))
num_results = len(results)

if not num_results:
break

total_received += num_results
yield results


command("planning:purge_expired_locks", PurgeExpiredLocks())
try:
# Delete any autosave items associated with this item
autosave_service.delete_action(lookup={"_id": item_id})
except Exception as err:
logger.exception(f"Failed to delete autosave item(s) ({err})")

num_items = len(items)
num_success = num_items - len(failed_ids)
if num_success != num_items:
logger.warning(f"{num_success}/{num_items} {resource} locks purged. Failed IDs: {failed_ids}")
else:
logger.info(f"{num_items} {resource} locks purged")


async def get_locked_items(resource: str, expiry_datetime: datetime) -> AsyncGenerator[list[dict[str, Any]], None]:
resource_service = get_service(resource)
total_received = 0
query: dict[str, Any] = {
"query": {
"bool": {
"filter": {"range": {LOCK_TIME: {"lt": date_to_str(expiry_datetime)}}},
},
},
"size": get_app_config("MAX_EXPIRY_QUERY_LIMIT"),
"sort": [{LOCK_TIME: "asc"}],
}

for i in range(get_app_config("MAX_EXPIRY_LOOPS")):
query["from"] = total_received
results = await resource_service.search(query)
items = await results.to_list_raw()

num_results = len(items)

if not num_results:
break

total_received += num_results

yield items
Loading

0 comments on commit 9a1fe66

Please sign in to comment.