Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDESK-7445] - Planning: Migrate planning:purge_expired_locks command to async #2142

Merged
merged 50 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
663523e
Implement base async `planning.events` resource and service
eos87 Nov 20, 2024
a452a92
Fix linter issues
eos87 Nov 20, 2024
98d4e37
Merge branch 'async' into hg/SDESK-7441-base-async-services-resources
eos87 Nov 20, 2024
2cdb25a
Implement base async `planning` resource and service
eos87 Nov 21, 2024
d7da0c9
Update command to use new format
BrianMwangi21 Nov 22, 2024
2112bad
Basic async models and services for `published` & `assignments`
eos87 Nov 22, 2024
6e60d55
Merge branch 'hg/SDESK-7441-base-async-services-resources' into SDESK…
BrianMwangi21 Nov 25, 2024
c131916
Copied get_expired_items to new Events async service
BrianMwangi21 Nov 25, 2024
db03451
Merge branch 'hg/SDESK-7441-base-async-services-resources' into SDESK…
BrianMwangi21 Nov 26, 2024
f79172c
Copied get_expired_items to new Planning async service
BrianMwangi21 Nov 26, 2024
f9763d8
Updated tests
BrianMwangi21 Nov 26, 2024
deb9f01
Added utils file
BrianMwangi21 Nov 27, 2024
9ef8423
Fix pytests and ~80% of behave tests
eos87 Nov 20, 2024
1dc92ec
Merge branch 'async' into hg/SDESK-7441-base-async-services-resources
eos87 Nov 27, 2024
9d87ded
Update requirements to async branch
eos87 Nov 27, 2024
a062037
Adjusted fields and indexes in assignments
eos87 Nov 28, 2024
5a12856
Fix for types according to PR review
eos87 Nov 28, 2024
e2713f0
Suggested fixes
BrianMwangi21 Nov 28, 2024
5b19216
Merge branch 'hg/SDESK-7441-base-async-services-resources' into SDESK…
BrianMwangi21 Nov 28, 2024
62dd304
Refactored celery call to run command
BrianMwangi21 Nov 28, 2024
114a507
Removed double import
BrianMwangi21 Nov 28, 2024
5b82274
Add events module to test config
BrianMwangi21 Nov 28, 2024
882f818
Code refactor
BrianMwangi21 Nov 28, 2024
2fd8799
Update types based on review feedback
eos87 Nov 28, 2024
b85a4e2
Proper names and type
eos87 Nov 28, 2024
d6cb71c
Make some fields optional
eos87 Nov 29, 2024
7c91bc4
Merge branch 'hg/SDESK-7441-base-async-services-resources' into SDESK…
BrianMwangi21 Nov 29, 2024
5739d2c
Changed purge_expired_locks to new command style and async
BrianMwangi21 Dec 2, 2024
ed3b37a
Updated tests
BrianMwangi21 Dec 2, 2024
8d32942
Await system update
BrianMwangi21 Dec 2, 2024
a39d49f
Instantiate the needed service in functions
BrianMwangi21 Dec 2, 2024
c8731f3
Fix index serializing issue
eos87 Dec 2, 2024
84c6f2c
Remove not needed import
eos87 Dec 2, 2024
36bd997
Merge branch 'hg/SDESK-7441-base-async-services-resources' into SDESK…
BrianMwangi21 Dec 2, 2024
0f26b18
Fix typo
eos87 Dec 2, 2024
9102f9c
Merge branch 'hg/SDESK-7441-base-async-services-resources' into SDESK…
BrianMwangi21 Dec 2, 2024
dd3feef
Fix failing tests
BrianMwangi21 Dec 2, 2024
2cb97c2
Merge branch 'SDESK-7443' into SDESK-7445
BrianMwangi21 Dec 2, 2024
183e76f
Merge branch 'async' into SDESK-7443
BrianMwangi21 Dec 3, 2024
650a801
Merge branch 'SDESK-7443' into SDESK-7445
BrianMwangi21 Dec 3, 2024
2bc4d05
Merge branch 'async' into SDESK-7445
BrianMwangi21 Dec 4, 2024
cf0573a
Fix failing tests
BrianMwangi21 Dec 4, 2024
95550ce
Removed push_notification
BrianMwangi21 Dec 4, 2024
804575a
Add date_to_str in the filter
BrianMwangi21 Dec 4, 2024
2b2408e
Removed extra item on system_update
BrianMwangi21 Dec 4, 2024
dd529b4
Remove print statement and uncomment other tests
BrianMwangi21 Dec 4, 2024
37156b9
Uncommented assignments
BrianMwangi21 Dec 4, 2024
1ab3097
Made planning_item optional in model
BrianMwangi21 Dec 4, 2024
f0fcf2a
Added planning items to assignment test items
BrianMwangi21 Dec 4, 2024
e3b559f
Fixes from PR suggestions
BrianMwangi21 Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/planning/assignments/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from planning.core.service import BasePlanningAsyncService
from planning.types import AssignmentResourceModel


class AssignmentsAsyncService(BasePlanningAsyncService):
class AssignmentsAsyncService(BasePlanningAsyncService[AssignmentResourceModel]):
pass
2 changes: 1 addition & 1 deletion server/planning/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .delete_marked_assignments import DeleteMarkedAssignments # noqa
from .export_to_newsroom import ExportToNewsroom # noqa
from .export_scheduled_filters import ExportScheduledFilters # noqa
from .purge_expired_locks import PurgeExpiredLocks # noqa
from .purge_expired_locks import purge_expired_locks_handler # noqa
from .replace_deprecated_event_item_attribute import ReplaceDeprecatedEventItemAttributeCommand # noqa
from .async_cli import planning_cli, commands_blueprint # noqa

Expand Down
221 changes: 119 additions & 102 deletions server/planning/commands/purge_expired_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,40 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

import click
import logging
from datetime import timedelta
from eve.utils import date_to_str
from datetime import timedelta, datetime
from typing import AsyncGenerator, Any

from superdesk import Command, command, get_resource_service, Option
from superdesk import get_resource_service
from superdesk.core import get_app_config
from superdesk.core.utils import date_to_str
from superdesk.utc import utcnow
from superdesk.lock import lock, unlock
from superdesk.celery_task_utils import get_lock_id
from planning.item_lock import LOCK_ACTION, LOCK_SESSION, LOCK_TIME, LOCK_USER
from planning.utils import try_cast_object_id
from planning.utils import get_service, try_cast_object_id
from .async_cli import planning_cli

logger = logging.getLogger(__name__)


class PurgeExpiredLocks(Command):
@planning_cli.command("planning:purge_expired_locks")
@click.option(
"--resource",
"-r",
required=True,
help="The name of the resource to purge item locks for (e.g., events, planning, assignments, all)",
)
@click.option(
"--expire-hours",
"-e",
required=False,
type=int,
default=24,
help="Purges locks that are older than this many hours (default: 24 hours)",
)
async def purge_expired_locks_command(resource: str, expire_hours: int = 24):
"""
Purge item locks that are linked to a non-existing session

Expand All @@ -39,109 +57,108 @@ class PurgeExpiredLocks(Command):
$ python manage.py planning:purge_expired_locks -r all
$ python manage.py planning:purge_expired_locks -r all -e 48
"""
return await purge_expired_locks_handler(resource, expire_hours)

option_list = [
Option("--resource", "-r", required=True),
Option("--expire-hours", "-e", dest="expire_hours", required=False, type=int, default=24),
]

def run(self, resource: str, expire_hours: int = 24) -> None:
logger.info("Starting to purge expired item locks")
async def purge_expired_locks_handler(resource: str, expire_hours: int = 24):
logger.info("Starting to purge expired item locks")

if resource == "all":
resources = ["events", "planning", "assignments"]
elif resource not in ["events", "planning", "assignments"]:
raise ValueError(f"Invalid resource: {resource}")
else:
resources = [resource]
if resource == "all":
resources = ["events", "planning", "assignments"]
elif resource not in ["events", "planning", "assignments"]:
raise ValueError(f"Invalid resource: {resource}")
else:
resources = [resource]

lock_name = get_lock_id("purge_expired_locks", resource)
if not lock(lock_name, expire=600):
logger.info("purge expired locks task is already running")
return

expiry_datetime = utcnow() - timedelta(hours=expire_hours)
for resource_name in resources:
try:
await purge_item_locks(resource_name, expiry_datetime)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")

unlock(lock_name)
logger.info("Completed purging expired item locks")

lock_name = get_lock_id("purge_expired_locks", resource)
if not lock(lock_name, expire=600):
logger.info("purge expired locks task is already running")
return

expiry_datetime = date_to_str(utcnow() - timedelta(hours=expire_hours))
for resource_name in resources:
async def purge_item_locks(resource: str, expiry_datetime: datetime):
logger.info(f"Purging expired locks for {resource}")
resource_service = get_service(resource)
try:
autosave_service = get_resource_service("event_autosave" if resource == "events" else f"{resource}_autosave")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MarkLark86 we are making use of autosave services here. They are yet to be converted to async. Do they remain as is and just add a TODO to get back to them when finished ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fine

except KeyError:
autosave_service = None

async for items in get_locked_items(resource, expiry_datetime):
failed_ids = []
for item in items:
try:
item_id = try_cast_object_id(item["_id"])
except KeyError:
logger.exception("Item ID not found, unable to purge its lock")
continue

try:
self._purge_item_locks(resource_name, expiry_datetime)
# Remove all lock information from this item
await resource_service.system_update(
item_id,
{
LOCK_USER: None,
LOCK_ACTION: None,
LOCK_SESSION: None,
LOCK_TIME: None,
},
)
except Exception as err:
logger.exception(f"Failed to purge item locks ({err})")
logger.exception(f"Failed to purge item lock ({err})")
failed_ids.append(item_id)
continue

unlock(lock_name)
logger.info("Completed purging expired item locks")
if autosave_service is None:
continue

def _purge_item_locks(self, resource: str, expiry_datetime: str):
logger.info(f"Purging expired locks for {resource}")
resource_service = get_resource_service(resource)
try:
autosave_service = get_resource_service(
"event_autosave" if resource == "events" else f"{resource}_autosave"
)
except KeyError:
autosave_service = None

for items in self.get_locked_items(resource, expiry_datetime):
failed_ids = []
for item in items:
try:
item_id = try_cast_object_id(item["_id"])
except KeyError:
logger.exception("Item ID not found, unable to purge its lock")
continue

try:
# Remove all lock information from this item
resource_service.system_update(
item_id,
{
LOCK_USER: None,
LOCK_ACTION: None,
LOCK_SESSION: None,
LOCK_TIME: None,
},
item,
push_notification=False,
)
except Exception as err:
logger.exception(f"Failed to purge item lock ({err})")
failed_ids.append(item_id)
continue

if autosave_service is None:
continue

try:
# Delete any autosave items associated with this item
autosave_service.delete_action(lookup={"_id": item_id})
except Exception as err:
logger.exception(f"Failed to delete autosave item(s) ({err})")

num_items = len(items)
num_success = num_items - len(failed_ids)
if num_success != num_items:
logger.warning(f"{num_success}/{num_items} {resource} locks purged. Failed IDs: {failed_ids}")
else:
logger.info(f"{num_items} {resource} locks purged")

def get_locked_items(self, resource: str, expiry_datetime: str):
service = get_resource_service(resource)
total_received = 0
query = {
"query": {"bool": {"filter": [{"range": {LOCK_TIME: {"lt": expiry_datetime}}}]}},
"size": get_app_config("MAX_EXPIRY_QUERY_LIMIT"),
"sort": [{LOCK_TIME: "asc"}],
}

for i in range(get_app_config("MAX_EXPIRY_LOOPS")):
query["from"] = total_received
results = list(service.search(query))
num_results = len(results)

if not num_results:
break

total_received += num_results
yield results


command("planning:purge_expired_locks", PurgeExpiredLocks())
try:
# Delete any autosave items associated with this item
autosave_service.delete_action(lookup={"_id": item_id})
except Exception as err:
logger.exception(f"Failed to delete autosave item(s) ({err})")

num_items = len(items)
num_success = num_items - len(failed_ids)
if num_success != num_items:
logger.warning(f"{num_success}/{num_items} {resource} locks purged. Failed IDs: {failed_ids}")
else:
logger.info(f"{num_items} {resource} locks purged")


async def get_locked_items(resource: str, expiry_datetime: datetime) -> AsyncGenerator[list[dict[str, Any]], None]:
resource_service = get_service(resource)
total_received = 0
query: dict[str, Any] = {
"query": {
"bool": {
"filter": {"range": {LOCK_TIME: {"lt": date_to_str(expiry_datetime)}}},
},
},
"size": get_app_config("MAX_EXPIRY_QUERY_LIMIT"),
"sort": [{LOCK_TIME: "asc"}],
}

for i in range(get_app_config("MAX_EXPIRY_LOOPS")):
query["from"] = total_received
results = await resource_service.search(query)
items = await results.to_list_raw()

num_results = len(items)

if not num_results:
break

total_received += num_results

yield items
Loading
Loading