Skip to content

Commit

Permalink
[SDESK-7443] - Planning: Migrate planning:delete_spiked command to as…
Browse files Browse the repository at this point in the history
…ync (#2134)

* Implement base async `planning.events` resource and service

SDESK-7441

* Fix linter issues

SDESK-7441

* Implement base async `planning` resource and service

SDESK-7441

* Update command to use new format

* Basic async models and services for `published` & `assignments`

SDESK-7441

* Copied get_expired_items to new Events async service

* Copied get_expired_items to new Planning async service

* Updated tests

* Added utils file

* Fix pytests and ~80% of behave tests

Fix behave tests partially

SDESK-7441

Allow behave tests to run async code

SDESK-7441

Fix pytests and use python 3.10 only

Disable some actions and add verbose mode 999

Remove python 3.8

Point sd core to fix branch

Revert "Fix linter issues"

This reverts commit 152cfb5.

Revert changes to ci-install

SDESK-7441

Fix first batch of tests

Reapply "Fix linter issues"

This reverts commit e5ac69a.

Fix second batch of tests

SDESK-7441

Fix tests batch 3

Fix tests batch 4

SDESK-7441

Fix superdesk-core dependency

Fix linter issues

SDESK-7441

* Update requirements to async branch

SDESK-7441

* Adjusted fields and indexes in assignments

SDESK-7441

* Fix for types according to PR review

SDESK-7441

* Suggested fixes

* Refactored celery call to run command

* Removed double import

* Add events module to test config

* Code refactor

* Update types based on review feedback

SDESK-7441

* Proper names and type

SDESK-7441

* Make some fields optional

SDESK-7441

* Fix index serializing issue

SDESK-7441

* Remove not needed import

SDESK-7441

* Fix typo

SDESK-7441

* Fix failing tests

* Fix failing tests

* Fix failing tests

* fix: Types warning on default factory:

* improve: Use TestCase async setup from superdesk-core

* fix: couple bugs in event/planning services

* fix: couple bugs in delete_spiked_commands

* fix lint and type issues

---------

Co-authored-by: Helmy Giacoman <[email protected]>
Co-authored-by: Mark Pittaway <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2024
1 parent a4f7d42 commit 421a61b
Show file tree
Hide file tree
Showing 11 changed files with 567 additions and 320 deletions.
11 changes: 4 additions & 7 deletions server/planning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,7 @@
from superdesk import register_jinja_filter
from .common import get_formatted_address

from .commands import (
FlagExpiredItems,
DeleteSpikedItems,
DeleteMarkedAssignments,
ExportScheduledFilters,
)
from .commands import FlagExpiredItems, DeleteMarkedAssignments, ExportScheduledFilters, delete_spiked_items_handler
import planning.commands # noqa
import planning.feeding_services # noqa
import planning.feed_parsers # noqa
Expand Down Expand Up @@ -331,7 +326,9 @@ def flag_expired():

@celery.task(soft_time_limit=600)
def delete_spiked():
DeleteSpikedItems().run()
import asyncio

asyncio.run(delete_spiked_items_handler())


@celery.task(soft_time_limit=600)
Expand Down
3 changes: 2 additions & 1 deletion server/planning/assignments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
from .assignments_history import AssignmentsHistoryResource, AssignmentsHistoryService
from .delivery import DeliveryResource

from .service import AssignmentsAsyncService
from .module import assignments_resource_config

__all__ = ["assignments_resource_config"]
__all__ = ["assignments_resource_config", "AssignmentsAsyncService"]


def init_app(app):
Expand Down
12 changes: 11 additions & 1 deletion server/planning/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from .flag_expired_items import FlagExpiredItems # noqa
from .delete_spiked_items import DeleteSpikedItems # noqa
from .delete_spiked_items import delete_spiked_items_handler # noqa
from .delete_marked_assignments import DeleteMarkedAssignments # noqa
from .export_to_newsroom import ExportToNewsroom # noqa
from .export_scheduled_filters import ExportScheduledFilters # noqa
from .purge_expired_locks import PurgeExpiredLocks # noqa
from .replace_deprecated_event_item_attribute import ReplaceDeprecatedEventItemAttributeCommand # noqa
from .async_cli import planning_cli, commands_blueprint # noqa


def configure_cli(app) -> None:
"""
Sets the current app instance into the `AsyncAppGroup` to later be passed as context of the commands.
It also registers the commands blueprint
"""

app.register_blueprint(commands_blueprint)
3 changes: 3 additions & 0 deletions server/planning/commands/async_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from superdesk.core.cli import create_commands_blueprint

commands_blueprint, planning_cli = create_commands_blueprint("planning")
195 changes: 103 additions & 92 deletions server/planning/commands/delete_spiked_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,27 @@
# at https://www.sourcefabric.org/superdesk/license

from datetime import timedelta
from contextvars import ContextVar

from superdesk.core import get_app_config
from superdesk.resource_fields import ID_FIELD
from superdesk import Command, command, get_resource_service
from superdesk.logging import logger
from superdesk.utc import utcnow
from superdesk.celery_task_utils import get_lock_id
from superdesk.lock import lock, unlock, remove_locks
from planning.common import WORKFLOW_STATE
from planning.events import EventsAsyncService
from planning.events.utils import get_recurring_timeline
from planning.planning import PlanningAsyncService
from planning.assignments import AssignmentsAsyncService
from .async_cli import planning_cli


class DeleteSpikedItems(Command):
log_msg_context: ContextVar[str] = ContextVar("log_msg", default="")


@planning_cli.command("planning:delete_spiked")
async def delete_spiked_items_command():
"""
Delete expired spiked `Events` and `Planning` items.
Expand All @@ -30,123 +39,125 @@ class DeleteSpikedItems(Command):
$ python manage.py planning:delete_spiked
"""
return await delete_spiked_items_handler()


async def delete_spiked_items_handler():
now = utcnow()
log_msg = f"Delete Spiked Items Time: {now}."
log_msg_context.set(log_msg)

log_msg = ""
logger.info(f"{log_msg} Starting to delete spiked items at.")

def run(self):
now = utcnow()
self.log_msg = "Delete Spiked Items Time: {}.".format(now)
logger.info("{} Starting to delete spiked items at.".format(self.log_msg))
expire_interval = get_app_config("PLANNING_DELETE_SPIKED_MINUTES", 0)
if expire_interval == 0:
logger.info(f"{log_msg} PLANNING_DELETE_SPIKED_MINUTES=0, not spiking any items")
return

expire_interval = get_app_config("PLANNING_DELETE_SPIKED_MINUTES", 0)
if expire_interval == 0:
logger.info("{} PLANNING_DELETE_SPIKED_MINUTES=0, not spiking any items")
return
lock_name = get_lock_id("planning", "delete_spiked")
if not lock(lock_name, expire=610):
logger.info(f"{log_msg} Delete spiked items task is already running")
return

lock_name = get_lock_id("planning", "delete_spiked")
if not lock(lock_name, expire=610):
logger.info("{} Delete spiked items task is already running".format(self.log_msg))
return
expiry_datetime = now - timedelta(minutes=expire_interval)

expiry_datetime = now - timedelta(minutes=expire_interval)
try:
await delete_spiked_events(expiry_datetime)
except Exception as e:
logger.exception(e)

try:
self._delete_spiked_events(expiry_datetime)
except Exception as e:
logger.exception(e)
try:
await delete_spiked_planning(expiry_datetime)
except Exception as e:
logger.exception(e)

try:
self._delete_spiked_planning(expiry_datetime)
except Exception as e:
logger.exception(e)
unlock(lock_name)

unlock(lock_name)
logger.info(f"{log_msg} Completed deleting spiked items.")
remove_locks()

logger.info("{} Completed deleting spiked items.".format(self.log_msg))
remove_locks()

def _delete_spiked_events(self, expiry_datetime):
logger.info("{} Starting to delete spiked events".format(self.log_msg))
events_service = get_resource_service("events")
async def delete_spiked_events(expiry_datetime):
log_msg = log_msg_context.get()
logger.info(f"{log_msg} Starting to delete spiked events")
events_service = EventsAsyncService()

events_deleted = set()
series_to_delete = dict()
events_deleted = set()
series_to_delete = dict()

# Obtain the full list of Events that we're to process first
# As subsequent queries will change the list of returned items
events = dict()
for items in events_service.get_expired_items(expiry_datetime, spiked_events_only=True):
events.update({item[ID_FIELD]: item for item in items})
# Obtain the full list of Events that we're to process first
# As subsequent queries will change the list of returned items
events = dict()
async for items in events_service.get_expired_items(expiry_datetime, spiked_events_only=True):
events.update({item[ID_FIELD]: item for item in items})

for event_id, event in events.items():
if event.get("recurrence_id") and event["recurrence_id"] not in series_to_delete:
spiked, events = self.is_series_expired_and_spiked(event, expiry_datetime)
if spiked:
series_to_delete[event["recurrence_id"]] = events
else:
events_service.delete_action(lookup={"_id": event_id})
events_deleted.add(event_id)
for event_id, event in events.items():
if event.get("recurrence_id") and event["recurrence_id"] not in series_to_delete:
spiked, events = await is_series_expired_and_spiked(event, expiry_datetime)
if spiked:
series_to_delete[event["recurrence_id"]] = events
else:
await events_service.delete_many(lookup={"_id": event_id})
events_deleted.add(event_id)

# Delete recurring series
for recurrence_id, events in series_to_delete.items():
events_service.delete_action(lookup={"recurrence_id": recurrence_id})
events_deleted.add(events)
# Delete recurring series
for recurrence_id, events in series_to_delete.items():
await events_service.delete_many(lookup={"recurrence_id": recurrence_id})
events_deleted.add([event["_id"] for event in events])

logger.info("{} {} Events deleted: {}".format(self.log_msg, len(events_deleted), list(events_deleted)))
logger.info(f"{log_msg} {len(events_deleted)} Events deleted: {list(events_deleted)}")

def is_series_expired_and_spiked(self, event, expiry_datetime):
historic, past, future = get_resource_service("events").get_recurring_timeline(event, spiked=True)

# There are future events, so the entire series is not expired.
if len(future) > 0:
return False
async def is_series_expired_and_spiked(event, expiry_datetime):
historic, past, future = await get_recurring_timeline(event, spiked=True, postponed=True)

def check_series_expired_and_spiked(series):
for event in series:
if event.get("state") != WORKFLOW_STATE.SPIKED or event["dates"]["end"] > expiry_datetime:
return False
# There are future events, so the entire series is not expired.
if len(future) > 0:
return False, []

return True
def check_series_expired_and_spiked(series):
for event in series:
if event.get("state") != WORKFLOW_STATE.SPIKED or event["dates"]["end"] > expiry_datetime:
return False

if check_series_expired_and_spiked(historic) and check_series_expired_and_spiked(past):
return True, [historic + past]
return True

return False
if check_series_expired_and_spiked(historic) and check_series_expired_and_spiked(past):
return True, [historic + past]

def _delete_spiked_planning(self, expiry_datetime):
logger.info("{} Starting to delete spiked planning items".format(self.log_msg))
planning_service = get_resource_service("planning")
return False, []

# Obtain the full list of Planning items that we're to process first
# As subsequent queries will change the list of returnd items
plans = dict()
for items in planning_service.get_expired_items(expiry_datetime, spiked_planning_only=True):
plans.update({item[ID_FIELD]: item for item in items})

plans_deleted = set()
assignments_deleted = set()
assignments_to_delete = []
async def delete_spiked_planning(expiry_datetime):
log_msg = log_msg_context.get()
logger.info(f"{log_msg} Starting to delete spiked planning items")
planning_service = PlanningAsyncService()

for plan_id, plan in plans.items():
for coverage in plan.get("coverages") or []:
assignment_id = (coverage.get("assigned_to") or {}).get("assignment_id")
if assignment_id:
assignments_to_delete.append(assignment_id)
# Obtain the full list of Planning items that we're to process first
# As subsequent queries will change the list of returnd items
plans = dict()
async for items in planning_service.get_expired_items(expiry_datetime, spiked_planning_only=True):
plans.update({item[ID_FIELD]: item for item in items})

# Now, delete the planning item
planning_service.delete_action(lookup={"_id": plan_id})
plans_deleted.add(plan_id)
plans_deleted = set()
assignments_deleted = set()
assignments_to_delete = []

# Delete assignments
assignment_service = get_resource_service("assignments")
for assign_id in assignments_to_delete:
assignment_service.delete(lookup={"_id": assign_id})
assignments_deleted.add(assign_id)
for plan_id, plan in plans.items():
for coverage in plan.get("coverages") or []:
assignment_id = (coverage.get("assigned_to") or {}).get("assignment_id")
if assignment_id:
assignments_to_delete.append(assignment_id)

logger.info(
"{} {} Assignments deleted: {}".format(self.log_msg, len(assignments_deleted), list(assignments_deleted))
)
logger.info("{} {} Planning items deleted: {}".format(self.log_msg, len(plans_deleted), list(plans_deleted)))
# Now, delete the planning item
await planning_service.delete_many(lookup={"_id": plan_id})
plans_deleted.add(plan_id)

# Delete assignments
assignment_service = AssignmentsAsyncService()
for assign_id in assignments_to_delete:
await assignment_service.delete_many(lookup={"_id": assign_id})
assignments_deleted.add(assign_id)

command("planning:delete_spiked", DeleteSpikedItems())
logger.info(f"{log_msg} {len(assignments_deleted)} Assignments deleted: {list(assignments_deleted)}")
logger.info(f"{log_msg} {len(plans_deleted)} Planning items deleted: {list(plans_deleted)}")
Loading

0 comments on commit 421a61b

Please sign in to comment.