From 44cf909e8edd146d55e6578a132549c76b426961 Mon Sep 17 00:00:00 2001 From: Helmy Giacoman Date: Tue, 10 Dec 2024 16:33:07 +0100 Subject: [PATCH] [SDESK-7442] Migrate Events resource service to async (#2150) * Move first method and utility functions SDESK-7442 * Migrated `create` & `on_create` methods Along with another set of utils functions SDESK-7442 * Move `on_create`, `on_created`, `on_update` & `on_updated` SDESK-7442 * Move `on_delete` method SDESK-7442 * Update `ExportToNewsroomTest` to use async service SDESK-7442 * Fix tests Pending to fix test broken because of prod_api app context issue SDESK-7442 * Minor fixes SDESK-7442 * Improvements after feedback SDESK-7442 * Update events.py * Move `on_updated` method SDESK-7442 --- server/planning/assignments/__init__.py | 6 +- server/planning/assignments/assignments.py | 5 +- server/planning/autosave.py | 1 + .../planning/commands/delete_spiked_items.py | 2 +- .../commands/delete_spiked_items_test.py | 2 + .../commands/export_to_newsroom_test.py | 41 +- .../commands/purge_expired_locks_test.py | 2 + server/planning/common.py | 3 + server/planning/events/__init__.py | 10 +- server/planning/events/events.py | 46 +- server/planning/events/events_service.py | 819 ++++++++++++++++++ .../planning/events/events_sync/__init__.py | 19 +- .../events/events_sync/embedded_planning.py | 6 +- server/planning/events/events_tests.py | 3 +- server/planning/events/events_utils.py | 221 +++++ server/planning/events/module.py | 2 +- server/planning/events/service.py | 61 -- server/planning/events/utils.py | 92 -- server/planning/planning/__init__.py | 5 +- server/planning/planning/planning_history.py | 7 +- server/planning/planning/service.py | 26 +- server/planning/signals.py | 22 + server/planning/tests/__init__.py | 9 + server/planning/types/__init__.py | 9 +- server/planning/types/base.py | 4 +- server/planning/types/common.py | 7 +- server/planning/types/enums.py | 6 +- server/planning/types/event.py | 10 +- server/planning/types/event_dates.py | 15 +- 29 files changed, 1219 insertions(+), 242 deletions(-) create mode 100644 server/planning/events/events_service.py create mode 100644 server/planning/events/events_utils.py delete mode 100644 server/planning/events/service.py delete mode 100644 server/planning/events/utils.py diff --git a/server/planning/assignments/__init__.py b/server/planning/assignments/__init__.py index 5e8bae079..cc904eeae 100644 --- a/server/planning/assignments/__init__.py +++ b/server/planning/assignments/__init__.py @@ -11,6 +11,8 @@ import superdesk from quart_babel import lazy_gettext from superdesk.services import BaseService + +from planning import signals from .assignments import AssignmentsResource, AssignmentsService from .assignments_content import AssignmentsContentResource, AssignmentsContentService from .assignments_link import AssignmentsLinkResource, AssignmentsLinkService @@ -93,13 +95,15 @@ def init_app(app): delivery_service = BaseService("delivery", backend=superdesk.get_backend()) DeliveryResource("delivery", app=app, service=delivery_service) + # listen to async signals + signals.events_update.connect(assignments_publish_service.on_events_updated) + # Updating data/lock on assignments based on content item updates from authoring app.on_updated_archive += assignments_publish_service.update_assignment_on_archive_update app.on_archive_item_updated += assignments_publish_service.update_assignment_on_archive_operation app.on_item_lock += assignments_publish_service.validate_assignment_lock app.on_item_locked += assignments_publish_service.sync_assignment_lock app.on_item_unlocked += assignments_publish_service.sync_assignment_unlock - app.on_updated_events += assignments_publish_service.on_events_updated # Track updates for an assignment if it's news story was updated if app.config.get("PLANNING_LINK_UPDATES_TO_COVERAGES", True): diff --git a/server/planning/assignments/assignments.py b/server/planning/assignments/assignments.py index 8f4029055..f8d5d6853 100644 --- a/server/planning/assignments/assignments.py +++ b/server/planning/assignments/assignments.py @@ -71,6 +71,7 @@ planning_auto_assign_to_workflow, ) +from planning.types import EventResourceModel from planning.planning_notifications import PlanningNotifications from planning.common import format_address, get_assginment_name from .assignments_history import ASSIGNMENT_HISTORY_ACTIONS @@ -1020,10 +1021,10 @@ def update_assignment_on_archive_operation(self, updates, original, operation=No lock_service = get_component(LockService) lock_service.unlock(assignment, user_id, get_auth()["_id"], "assignments") - def on_events_updated(self, updates, original): + def on_events_updated(self, updates: dict[str, Any], original: EventResourceModel): """Send assignment notifications if any relevant Event metadata has changed""" - event = deepcopy(original) + event = deepcopy(original.to_dict()) event.update(updates) plannings = get_related_planning_for_events([event[ID_FIELD]], "primary") diff --git a/server/planning/autosave.py b/server/planning/autosave.py index 89759ca3a..df51ea9f1 100644 --- a/server/planning/autosave.py +++ b/server/planning/autosave.py @@ -27,6 +27,7 @@ def on_create(self, docs): def on_delete(self, doc): if doc.get(ITEM_TYPE) == "event": + # TODO-ASYNC: replace with equivalent in `EventsAsyncService` get_resource_service("events").delete_event_files(None, doc) @staticmethod diff --git a/server/planning/commands/delete_spiked_items.py b/server/planning/commands/delete_spiked_items.py index 2a0753c5d..6a81e4556 100644 --- a/server/planning/commands/delete_spiked_items.py +++ b/server/planning/commands/delete_spiked_items.py @@ -19,7 +19,7 @@ from superdesk.lock import lock, unlock, remove_locks from planning.common import WORKFLOW_STATE from planning.events import EventsAsyncService -from planning.events.utils import get_recurring_timeline +from planning.events.events_utils import get_recurring_timeline from planning.planning import PlanningAsyncService from planning.assignments import AssignmentsAsyncService from .async_cli import planning_cli diff --git a/server/planning/commands/delete_spiked_items_test.py b/server/planning/commands/delete_spiked_items_test.py index 95765bee3..b0a462699 100644 --- a/server/planning/commands/delete_spiked_items_test.py +++ b/server/planning/commands/delete_spiked_items_test.py @@ -83,6 +83,8 @@ async def asyncSetUp(self): self.planning_service = PlanningAsyncService() self.assignment_service = AssignmentsAsyncService() + self.setup_test_user() + async def assertDeleteOperation(self, item_type, ids, not_deleted=False): service = self.event_service if item_type == "events" else self.planning_service diff --git a/server/planning/commands/export_to_newsroom_test.py b/server/planning/commands/export_to_newsroom_test.py index 36ca93889..09fbd39a2 100644 --- a/server/planning/commands/export_to_newsroom_test.py +++ b/server/planning/commands/export_to_newsroom_test.py @@ -7,12 +7,17 @@ # For the full copyright and license information, please see the # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license + import mock from datetime import timedelta -from .export_to_newsroom import ExportToNewsroom -from superdesk import get_resource_service + from superdesk.utc import utcnow +from superdesk import get_resource_service + from planning.tests import TestCase +from planning.events.events_service import EventsAsyncService + +from .export_to_newsroom import ExportToNewsroom class MockTransmitter: @@ -27,17 +32,19 @@ def transmit(self, queue_item): class ExportToNewsroomTest(TestCase): - def setUp(self): - super().setUp() + async def asyncSetUp(self): + await super().asyncSetUp() - self.event_service = get_resource_service("events") + self.event_service = EventsAsyncService() self.planning_service = get_resource_service("planning") - def setUp_data(self): + async def setup_data(self): utc_now = utcnow() + self.setup_test_user() + events = [ { - "_id": "draft", + "guid": "draft", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -48,7 +55,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "scheduled", + "guid": "scheduled", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -60,7 +67,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "postponed", + "guid": "postponed", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -72,7 +79,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "rescheduled", + "guid": "rescheduled", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -84,7 +91,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "cancelled", + "guid": "cancelled", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -96,7 +103,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "killed", + "guid": "killed", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -108,7 +115,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "postponed-not-published", + "guid": "postponed-not-published", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -119,7 +126,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "rescheduled-not-published", + "guid": "rescheduled-not-published", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -130,7 +137,7 @@ def setUp_data(self): "type": "event", }, { - "_id": "cancelled-not-published", + "guid": "cancelled-not-published", "dates": { "start": utc_now, "end": utc_now + timedelta(days=1), @@ -213,13 +220,13 @@ def setUp_data(self): }, ] - self.event_service.create(events) + await self.event_service.create(events) self.planning_service.create(planning) @mock.patch("planning.commands.export_to_newsroom.NewsroomHTTPTransmitter") async def test_events_events_planning(self, mock_transmitter): async with self.app.app_context(): - self.setUp_data() + await self.setup_data() mock_transmitter.return_value = MockTransmitter() ExportToNewsroom().run(assets_url="foo", resource_url="bar") diff --git a/server/planning/commands/purge_expired_locks_test.py b/server/planning/commands/purge_expired_locks_test.py index 4bc954841..1d30ac971 100644 --- a/server/planning/commands/purge_expired_locks_test.py +++ b/server/planning/commands/purge_expired_locks_test.py @@ -28,6 +28,8 @@ async def asyncSetUp(self) -> None: await super().asyncSetUp() async with self.app.app_context(): + self.setup_test_user() + await self.insert( "events", [ diff --git a/server/planning/common.py b/server/planning/common.py index c7134bdf0..e9542d3e3 100644 --- a/server/planning/common.py +++ b/server/planning/common.py @@ -349,6 +349,9 @@ def post_required(updates, original): def update_post_item(updates, original): """Method to update(re-post) a posted item after the item is updated""" + # TODO-ASYNC: update once `events_post` & `planning_post` are async + # also ot use pydantic models intead of dicts + pub_status = None # Save&Post or Save&Unpost if updates.get("pubstatus"): diff --git a/server/planning/events/__init__.py b/server/planning/events/__init__.py index fd3d23be1..2ec6ad8a8 100644 --- a/server/planning/events/__init__.py +++ b/server/planning/events/__init__.py @@ -10,6 +10,8 @@ import superdesk from quart_babel import lazy_gettext + +from planning import signals from .events import EventsResource, EventsService from .events_spike import ( EventsSpikeResource, @@ -44,7 +46,7 @@ ) from planning.autosave import AutosaveService -from .service import EventsAsyncService +from .events_service import EventsAsyncService from .module import events_resource_config __all__ = [ @@ -137,8 +139,14 @@ def init_app(app): service=recent_events_template_service, ) + # listen to async signals + signals.events_created.connect(events_history_service.on_item_created) + app.on_updated_events += events_history_service.on_item_updated + + # TODO-ASYNC: remove `on_inserted_events` when `events_reschedule` is async app.on_inserted_events += events_history_service.on_item_created + app.on_deleted_item_events -= events_history_service.on_item_deleted app.on_deleted_item_events += events_history_service.on_item_deleted app.on_updated_events_spike += events_history_service.on_spike diff --git a/server/planning/events/events.py b/server/planning/events/events.py index bb041d407..ccb9ea908 100644 --- a/server/planning/events/events.py +++ b/server/planning/events/events.py @@ -10,14 +10,15 @@ """Superdesk Events""" -from typing import Dict, Any, Optional, List, Tuple + +import re +import pytz import logging import itertools + from copy import deepcopy from datetime import timedelta - -import pytz -import re +from typing import Dict, Any, Optional, List, Tuple from eve.methods.common import resolve_document_etag from eve.utils import date_to_str from dateutil.rrule import ( @@ -35,9 +36,9 @@ SU, ) +import superdesk from superdesk.core import get_app_config, get_current_app from superdesk.resource_fields import ID_FIELD -import superdesk from superdesk import get_resource_service from superdesk.errors import SuperdeskApiError from superdesk.metadata.utils import generate_guid @@ -48,13 +49,8 @@ from apps.auth import get_user, get_user_id from apps.archive.common import get_auth, update_dates_for -from planning.types import ( - Event, - EmbeddedPlanning, - EmbeddedCoverageItem, - PlanningRelatedEventLink, - PLANNING_RELATED_EVENT_LINK_TYPE, -) +from planning.types import Event, PlanningRelatedEventLink, PLANNING_RELATED_EVENT_LINK_TYPE +from planning.types.event import EmbeddedPlanning from planning.common import ( UPDATE_SINGLE, UPDATE_FUTURE, @@ -74,7 +70,6 @@ set_ingest_version_datetime, is_new_version, update_ingest_on_patch, - TEMP_ID_PREFIX, ) from planning.utils import ( get_planning_event_link_method, @@ -84,6 +79,7 @@ from .events_base_service import EventsBaseService from .events_schema import events_schema from .events_sync import sync_event_metadata_with_planning_items +from .events_utils import get_events_embedded_planning logger = logging.getLogger(__name__) @@ -99,22 +95,6 @@ } -def get_events_embedded_planning(event: Event) -> List[EmbeddedPlanning]: - def get_coverage_id(coverage: EmbeddedCoverageItem) -> str: - if not coverage.get("coverage_id"): - coverage["coverage_id"] = TEMP_ID_PREFIX + "-" + generate_guid(type=GUID_NEWSML) - return coverage["coverage_id"] - - return [ - EmbeddedPlanning( - planning_id=planning.get("planning_id"), - update_method=planning.get("update_method") or "single", - coverages={get_coverage_id(coverage): coverage for coverage in planning.get("coverages") or []}, - ) - for planning in event.pop("embedded_planning", []) - ] - - def get_subject_str(subject: Dict[str, str]) -> str: return ":".join( [ @@ -298,9 +278,9 @@ def create(self, docs: List[Event], **kwargs): embedded_planning_lists: List[Tuple[Event, List[EmbeddedPlanning]]] = [] for event in docs: - embedded_planning = get_events_embedded_planning(event) - if len(embedded_planning): - embedded_planning_lists.append((event, embedded_planning)) + emb_planning = get_events_embedded_planning(event) + if len(emb_planning): + embedded_planning_lists.append((event, emb_planning)) # type: ignore ids = self.backend.create(self.datasource, docs, **kwargs) @@ -877,6 +857,7 @@ class EventsResource(superdesk.Resource): merge_nested_documents = True +# TODO-ASYNC: moved to `events_utils.py`. Remove when it is no longer referenced def generate_recurring_dates( start, frequency, @@ -1000,6 +981,7 @@ def generate_recurring_events(event, recurrence_id=None): for key in list(new_event.keys()): if key.startswith("_") or key.startswith("lock_"): new_event.pop(key) + elif key == "embedded_planning": if not embedded_planning_added: # If this is the first Event in the series, then keep diff --git a/server/planning/events/events_service.py b/server/planning/events/events_service.py new file mode 100644 index 000000000..745168200 --- /dev/null +++ b/server/planning/events/events_service.py @@ -0,0 +1,819 @@ +import pytz +import itertools + +from copy import deepcopy +from bson import ObjectId +from typing import Any, AsyncGenerator, cast +from datetime import datetime, timedelta +from apps.archive.common import get_auth +from apps.auth import get_user, get_user_id + +from superdesk.utc import utcnow +from superdesk.core import get_app_config +from superdesk import get_resource_service +from superdesk.resource_fields import ID_FIELD +from superdesk.errors import SuperdeskApiError +from superdesk.metadata.item import GUID_NEWSML +from superdesk.notification import push_notification +from superdesk.core.utils import date_to_str, generate_guid + + +from planning import signals +from planning.types import ( + PLANNING_RELATED_EVENT_LINK_TYPE, + EventResourceModel, + PlanningRelatedEventLink, + PlanningSchedule, + PostStates, + UpdateMethods, + WorkflowState, +) +from planning.types.event import EmbeddedPlanning +from planning.common import ( + WorkflowStates, + format_address, + get_event_max_multi_day_duration, + get_max_recurrent_events, + remove_lock_information, + set_ingested_event_state, + post_required, + update_post_item, +) +from planning.planning import PlanningAsyncService +from planning.core.service import BasePlanningAsyncService +from planning.utils import ( + get_planning_event_link_method, + get_related_event_ids_for_planning, + get_related_planning_for_events, +) + +from .events_sync import sync_event_metadata_with_planning_items +from .events_utils import ( + generate_recurring_dates, + get_events_embedded_planning, + get_recurring_timeline, +) + + +class EventsAsyncService(BasePlanningAsyncService[EventResourceModel]): + resource_name = "events" + + async def get_expired_items( + self, expiry_datetime: datetime, spiked_events_only: bool = False + ) -> AsyncGenerator[list[dict[str, Any]], None]: + """Get the expired items + + Where end date is in the past + """ + query: dict[str, Any] = { + "query": { + "bool": { + "must_not": [{"term": {"expired": True}}], + "filter": {"range": {"dates.end": {"lte": date_to_str(expiry_datetime)}}}, + }, + }, + "sort": [{"dates.start": "asc"}], + "size": get_max_recurrent_events(), + } + + if spiked_events_only: + del query["query"]["bool"]["must_not"] + query["query"]["bool"]["must"] = [{"term": {"state": WorkflowState.SPIKED}}] + + total_received = 0 + total_events = -1 + + while True: + query["from"] = total_received + + results = await self.search(query) + items = await results.to_list_raw() + results_count = len(items) + + # If the total_events has not been set, then this is the first query + # In which case we need to store the total hits from the search + if total_events < 0: + total_events = results_count + + # If the search doesn't contain any results, return here + if total_events < 1: + break + + # If the last query doesn't contain any results, return here + if results_count == 0: + break + + total_received += results_count + + # Yield the results for iteration by the callee + yield items + + async def create(self, docs: list[EventResourceModel]): + """ + Extracts out the ``embedded_planning`` before saving the Event(s) + And then uses them to synchronise/process the associated Planning item(s) + """ + + docs = await self._convert_dicts_to_model(docs) + ids = await super().create(docs) + + embedded_planning_lists: list[tuple[EventResourceModel, list[EmbeddedPlanning]]] = [] + + for event in docs: + embedded_planning = get_events_embedded_planning(event) + if len(embedded_planning): + embedded_planning_lists.append((event.to_dict(), embedded_planning)) + + if len(embedded_planning_lists): + for event, embedded_planning in embedded_planning_lists: + sync_event_metadata_with_planning_items(None, event, embedded_planning) + + return ids + + async def on_create(self, docs: list[EventResourceModel]) -> None: + # events generated by recurring rules + generated_events = [] + for event in docs: + # generates an unique id + if not event.guid: + event.guid = generate_guid(type=GUID_NEWSML) + event.id = event.guid + + if not event.language: + try: + event.language = event.languages[0] + except IndexError: + event.language = get_app_config("DEFAULT_LANGUAGE") + + # TODO-ASYNC: consider moving this into base service later + event.original_creator = ObjectId(get_user_id()) or None + + # overwrite expiry date if needed + self._overwrite_event_expiry_date(event) + + # we ignore the 'update_method' on create + if event.update_method: + event.update_method = None + + # remove the 'expired' flag if it is set, as no new Event can be created as expired + if event.expired: + event.expired = False + + event.planning_schedule = self._create_planning_schedule(event) + original_planning_item = event.planning_item + + # validate event + self.validate_event(event) + + # If _created_externally is true, generate_recurring_events is restricted. + if event.dates and event.dates.recurring_rule and not event.dates.recurring_rule._created_externally: + recurring_events = self._generate_recurring_events(event) + generated_events.extend(recurring_events) + + # Set the current Event to the first Event in the new series + # This will make sure the ID of the Event can be used when + # using 'event' from here on, such as when linking to a Planning item + event = recurring_events[0] + + # And set the Planning Item from the original + # (generate_recurring_events removes this field) + event.planning_item = original_planning_item + + if event.state == WorkflowStates.INGESTED: + events_history = get_resource_service("events_history") + events_history.on_item_created([event.to_dict()]) + + if original_planning_item: + await self._link_to_planning(event) + del event["_planning_item"] + + if generated_events: + docs.extend(generated_events) + + await super().on_create(docs) + + async def on_created(self, docs: list[EventResourceModel]): + """Send WebSocket Notifications for created Events + + Generate the list of IDs for recurring and non-recurring events, + then send this list off to the clients so they can fetch these events + """ + notifications_sent = [] + history_service = get_resource_service("events_history") + + for doc in docs: + event_id = doc.id + + # If we duplicated this event, update the history + if doc.duplicate_from: + parent_id = doc.duplicate_from + parent_event = await self.find_by_id(parent_id) + if not parent_event: + raise SuperdeskApiError.badRequestError("Parent event not found") + + history_service.on_item_updated({"duplicate_id": event_id}, parent_event.to_dict(), "duplicate") + history_service.on_item_updated({"duplicate_id": parent_id}, doc.to_dict(), "duplicate_from") + + duplicate_ids = parent_event.duplicate_to or [] + duplicate_ids.append(event_id) + + await super().update(parent_id, {"duplicate_to": duplicate_ids}) + + event_type = "events:created" + user_id = doc.original_creator or "" + + if doc.recurrence_id: + event_type = "events:created:recurring" + event_id = str(doc.recurrence_id) + + # Don't send notification if one has already been sent + # This is to ensure recurring events don't send multiple notifications + if event_id in notifications_sent or doc.previous_recurrence_id: + continue + + notifications_sent.append(event_id) + push_notification(event_type, item=event_id, user=user_id) + + async def on_update(self, updates: dict[str, Any], original: EventResourceModel): + """Update single or series of recurring events. + + Determine if the supplied event is a single event or a + series of recurring events, and call the appropriate method + for the event type. + """ + if "skip_on_update" in updates: + # this is a recursive update (see below) + del updates["skip_on_update"] + return + + update_method = updates.pop("update_method", UpdateMethods.SINGLE) + + user = get_user() + user_id = user.get(ID_FIELD) if user else None + + if user_id: + updates["version_creator"] = user_id + set_ingested_event_state(updates, original.to_dict()) + + lock_user = original.lock_user or None + str_user_id = str(user.get(ID_FIELD)) if user_id else None + + if lock_user and str(lock_user) != str_user_id: + raise SuperdeskApiError.forbiddenError("The item was locked by another user") + + # If only the `recurring_rule` was provided, then fill in the rest from the original + # This can happen, for example, when converting a single Event to a series of Recurring Events + if list(updates.get("dates") or {}) == ["recurring_rule"]: + new_dates = original.to_dict()["dates"] + new_dates.update(updates["dates"]) + updates["dates"] = new_dates + + # validate event + self.validate_event(updates, original) + + # Run the specific methods based on if the original is a single or a series of recurring events + if not getattr((original.dates or {}), "recurring_rule") or update_method == UpdateMethods.SINGLE: + await self._update_single_event(updates, original) + else: + await self._update_recurring_events(updates, original, update_method) + + return await super().on_update(updates, original) + + async def update(self, event_id: str | ObjectId, updates: dict[str, Any], etag: str | None = None): + """Updates the event and also extracts out the ``embedded_planning`` before saving the Event + And then uses them to synchronise/process the associated Planning item(s) + """ + + updates.setdefault("versioncreated", utcnow()) + original_event = await self.find_by_id(event_id) + + if original_event is None: + raise SuperdeskApiError.badRequestError("Event not found") + + # Extract the ``embedded_planning`` from the updates + embedded_planning = get_events_embedded_planning(updates) + + await super().update(event_id, updates, etag) + + # Process ``embedded_planning`` field, and sync Event metadata with associated Planning/Coverages + sync_event_metadata_with_planning_items(original_event.to_dict(), updates, embedded_planning) + + async def on_updated(self, updates: dict[str, Any], original: EventResourceModel, from_ingest: bool = False): + # if this Event was converted to a recurring series + # then update all associated Planning items with the recurrence_id + if updates.get("recurrence_id") and not original.recurrence_id: + await PlanningAsyncService().on_event_converted_to_recurring(updates, original) + + if not updates.get("duplicate_to"): + posted = update_post_item(updates, original.to_dict()) + if posted: + new_event = await self.find_by_id(original.id) + assert new_event is not None + updates["_etag"] = new_event.etag + updates["state_reason"] = new_event.state_reason + + if original.lock_user and "lock_user" in updates and updates.get("lock_user") is None: + # when the event is unlocked by the patch. + push_notification( + "events:unlock", + item=str(original.id), + user=str(get_user_id()), + lock_session=str(get_auth().get("_id")), + etag=updates["_etag"], + recurrence_id=original.recurrence_id or None, + from_ingest=from_ingest, + ) + + await self.delete_event_files(updates, original.files) + + if "location" not in updates and original.location: + updates["location"] = original.location + + updates[ID_FIELD] = original.id + self._enhance_event_item(updates) + + async def delete_event_files(self, updates: dict[str, Any], event_files: list[ObjectId]): + files = [f for f in event_files if f not in (updates or {}).get("files", [])] + files_service = get_resource_service("events_files") + + for file in files: + events_using_file = await self.find({"files": file}) + if (await events_using_file.count()) == 0: + files_service.delete_action(lookup={"_id": file}) + + async def on_deleted(self, doc: EventResourceModel): + push_notification( + "events:delete", + item=str(doc.id), + user=str(get_user_id()), + lock_session=str(get_auth().get("_id")), + ) + + def validate_event( + self, updated_event: dict[str, Any] | EventResourceModel, original_event: EventResourceModel | None = None + ): + """Validate the event""" + + if isinstance(updated_event, dict): + updated_event = EventResourceModel.from_dict(updated_event) + # mypy complains even when `from_dict` returns a model instance + updated_event = cast(EventResourceModel, updated_event) + + self._validate_multiday_event_duration(updated_event) + self._validate_dates(updated_event, original_event) + self._validate_convert_to_recurring(updated_event, original_event) + self._validate_template(updated_event, original_event) + + # TODO-ASYNC: migrate `sanitize_input_data` to support new models based on pydantic + # this function below allows both Event and Planning items + # sanitize_input_data(updates) + + def _validate_multiday_event_duration(self, event: EventResourceModel): + """Validate that the multiday event duration is not greater than PLANNING_MAX_MULTI_DAY_DURATION + + @:param dict event: event created or updated + """ + max_duration = get_event_max_multi_day_duration() + if not max_duration > 0: + return + + if not event.dates: + return + + assert event.dates.start is not None + assert event.dates.end is not None + + event_duration = event.dates.end - event.dates.start + if event_duration.days > max_duration: + raise SuperdeskApiError(message="Event duration is greater than {} days.".format(max_duration)) + + def _validate_dates(self, updated_event: EventResourceModel, original_event: EventResourceModel | None = None): + """Validate the dates + + @:param dict event: + """ + # TODO-ASYNC: consider/check if these validations should be in the pydantic model + event = updated_event if updated_event.dates or not original_event else original_event + + assert event.dates is not None + + start_date = event.dates.start + end_date = event.dates.end + + if not start_date or not end_date: + raise SuperdeskApiError(message="Event START DATE and END DATE are mandatory.") + + if end_date < start_date: + raise SuperdeskApiError(message="END TIME should be after START TIME") + + if event.dates.recurring_rule and not event.dates.recurring_rule.until and not event.dates.recurring_rule.count: + raise SuperdeskApiError(message="Recurring event should have an end (until or count)") + + def _validate_convert_to_recurring( + self, updated_event: dict[str, Any] | EventResourceModel, original: EventResourceModel | None = None + ): + """Validates if the convert to recurring action is valid. + + :param updates: + :param original: + :return: + """ + if original is None: + return + + if isinstance(updated_event, dict): + updated_event = EventResourceModel.from_dict(updated_event) + updated_event = cast(EventResourceModel, updated_event) + + if ( + original.lock_action == "convert_recurring" + and updated_event.dates + and updated_event.dates.recurring_rule is None + ): + raise SuperdeskApiError(message="Event recurring rules are mandatory for convert to recurring action.") + + if original.lock_action == "convert_recurring" and original.recurrence_id: + raise SuperdeskApiError(message="Event is already converted to recurring event.") + + @staticmethod + def _validate_template(updated_event: EventResourceModel, original_event: EventResourceModel | None = None): + """Ensures that event template can't be changed + + :param updates: updates to event that should be saved + :type updates: dict + :param original: original event before update + :type original: dict + :return: + """ + if original_event is None: + return + + # we can't change `template` id + if updated_event.template and updated_event.template != original_event.template: + raise SuperdeskApiError.badRequestError( + message="Request is not valid", + payload={"template": "This value can't be changed."}, + ) + + async def _update_single_event(self, updates: dict[str, Any], original: EventResourceModel): + """Updates the metadata of a single event. + + If recurring_rule is provided, we convert this single event into + a series of recurring events, otherwise we simply update this event. + """ + + if post_required(updates, original.to_dict()): + merged: EventResourceModel = original.model_copy(updates, deep=True) + + # TODO-ASYNC: replace when `event_post` is async + get_resource_service("events_post").validate_item(merged.to_dict()) + + # Determine if we're to convert this single event to a recurring of events + if ( + original.lock_action == "convert_recurring" + and updates.get("dates", {}).get("recurring_rule", None) is not None + ): + generated_events = await self._convert_to_recurring_events(updates, original) + + # if the original event was "posted" then post all the generated events + if original.pubstatus in [PostStates.CANCELLED, PostStates.USABLE]: + post = { + "event": generated_events[0].id, + "etag": generated_events[0].etag, + "update_method": "all", + "pubstatus": original.pubstatus, + } + + # TODO-ASYNC: replace when `event_post` is async + get_resource_service("events_post").post([post]) + + push_notification( + "events:updated:recurring", + item=str(original.id), + user=str(updates.get("version_creator", "")), + recurrence_id=str(generated_events[0].recurrence_id), + ) + else: + if original.lock_action == "mark_completed" and updates.get("actioned_date"): + self.mark_event_complete(updates, original, False) + + # This updates Event metadata only + push_notification( + "events:updated", + item=str(original.id), + user=str(updates.get("version_creator", "")), + ) + + async def _update_recurring_events( + self, updates: dict[str, Any], original: EventResourceModel, update_method: UpdateMethods + ): + """Method to update recurring events. + + If the recurring_rule has been removed for this event, process + it separately, otherwise update the event and/or its recurring rules + """ + # This method now only handles updating of Event metadata + # So make sure to remove any date information that might be in + # the updates + updates.pop("dates", None) + original_as_dict = original.to_dict() + + if update_method == UpdateMethods.FUTURE: + historic, past, future = await get_recurring_timeline(original_as_dict) + events = future + else: + historic, past, future = await get_recurring_timeline(original_as_dict) + events = historic + past + future + + events_post_service = get_resource_service("events_post") + + # First we want to validate that all events can be posted + for e in events: + if post_required(updates, e): + merged = deepcopy(e) + merged.update(updates) + events_post_service.validate_item(merged) + + # If this update is from assignToCalendar action + # Then we only want to update the calendars of each Event + only_calendars = original.lock_action == "assign_calendar" + original_calendar_qcodes = [calendar.qcode for calendar in original.calendars] + + # Get the list of calendars added + updated_calendars = [ + calendar for calendar in updates.get("calendars") or [] if calendar["qcode"] not in original_calendar_qcodes + ] + + mark_completed = original.lock_action == "mark_completed" and updates.get("actioned_date") + mark_complete_validated = False + for e in events: + event_id = e[ID_FIELD] + + new_updates = deepcopy(updates) + new_updates["skip_on_update"] = True + new_updates[ID_FIELD] = event_id + + if only_calendars: + # Get the original for this item, and add new calendars to it + # Skipping calendars already assigned to this item + original_event: EventResourceModel = await self.find_by_id(event_id) + assert original_event is not None + original_qcodes = [calendar.qcode for calendar in original_event.calendars] + + new_updates["calendars"] = deepcopy(original_event.calendars) + new_updates["calendars"].extend( + [calendar for calendar in updated_calendars if calendar["qcode"] not in original_qcodes] + ) + elif mark_completed: + ev = EventResourceModel.from_dict(e) + self.mark_event_complete(updates, ev, mark_complete_validated) + # It is validated if the previous funciton did not raise an error + mark_complete_validated = True + + # Remove ``embedded_planning`` before updating this event, as this should only be handled + # by the event provided to this update request + new_updates.pop("embedded_planning", None) + + await signals.events_update.send(new_updates, original) + + # And finally push a notification to connected clients + push_notification( + "events:updated:recurring", + item=str(original[ID_FIELD]), + recurrence_id=str(original.recurrence_id), + user=str(updates.get("version_creator", "")), + ) + + def mark_event_complete(self, updates: dict[str, Any], event: EventResourceModel, mark_complete_validated: bool): + assert event.dates is not None + assert event.dates.start is not None + + # If the entire series is in future, raise an error + if event.recurrence_id: + if not mark_complete_validated: + if event.dates.start.date() > updates["actioned_date"].date(): + raise SuperdeskApiError.badRequestError("Recurring series has not started.") + + # If we are marking an event as completed + # Update only those which are behind the 'actioned_date' + if event.dates.start < updates["actioned_date"]: + return + + for plan in get_related_planning_for_events([event.id], "primary"): + if plan.get("state") != WorkflowState.CANCELLED and len(plan.get("coverages", [])) > 0: + # TODO-ASYNC: replace when `planning_cancel` is async + get_resource_service("planning_cancel").patch( + plan[ID_FIELD], + { + "reason": "Event Completed", + "cancel_all_coverage": True, + }, + ) + + async def _convert_to_recurring_events(self, updates: dict[str, Any], original: EventResourceModel): + """Convert a single event to a series of recurring events""" + + self._validate_convert_to_recurring(updates, original) + updates["recurrence_id"] = original.id + + merged: EventResourceModel = original.model_copy(updates, deep=True) + + # Generated new events will be "draft" + merged.state = WorkflowState.DRAFT + generated_events = self._generate_recurring_events(merged, updates["recurrence_id"]) + updated_event = generated_events.pop(0) + + assert updated_event.dates is not None + assert updated_event.dates.start is not None + assert original.dates is not None + assert original.dates.start is not None + + # Check to see if the first generated event is different from original + # If yes, mark original as rescheduled with generated recurrence_id + if updated_event.dates.start.date() != original.dates.start.date(): + # Reschedule original event + updates["update_method"] = UpdateMethods.SINGLE + updates["dates"] = updated_event.dates + updates["_planning_schedule"] = [x.to_dict() for x in self._create_planning_schedule(updated_event)] + + event_reschedule_service = get_resource_service("events_reschedule") + event_reschedule_service.update_single_event(updates, original) + + if updates.get("state") == WorkflowState.RESCHEDULED: + history_service = get_resource_service("events_history") + history_service.on_reschedule(updates, original.to_dict()) + else: + # Original event falls as a part of the series + # Remove the first element in the list (the current event being updated) + # And update the start/end dates to be in line with the new recurring rules + updates["dates"]["start"] = updated_event.dates.start + updates["dates"]["end"] = updated_event.dates.end + updates["_planning_schedule"] = [x.to_dict() for x in self._create_planning_schedule(updated_event)] + remove_lock_information(item=updates) + + # Create the new events and generate their history + await self.create(generated_events) + await signals.events_created.send(generated_events) + + return generated_events + + def _set_planning_schedule(self, event: EventResourceModel): + if event.dates and event.dates.start: + event.planning_schedule = [PlanningSchedule(scheduled=event.dates.start)] + + def _create_planning_schedule(self, event: EventResourceModel) -> list[PlanningSchedule]: + if event.dates and event.dates.start: + return [PlanningSchedule(scheduled=event.dates.start)] + return [] + + def _overwrite_event_expiry_date(self, event: EventResourceModel): + if event.expiry: + assert event.dates is not None + assert event.dates.end is not None + + expiry_minutes = get_app_config("PLANNING_EXPIRY_MINUTES", None) + event.expiry = event.dates.end + timedelta(minutes=expiry_minutes or 0) + + def set_recurring_mode(self, event: EventResourceModel): + assert event.dates is not None + assert event.dates.recurring_rule is not None + + end_repeat_mode = event.dates.recurring_rule.end_repeat_mode + + if end_repeat_mode == "count": + event.dates.recurring_rule.until = None + elif end_repeat_mode == "until": + event.dates.recurring_rule.count = None + + def _reset_recurring_event_fields(self, event: EventResourceModel): + """ + Reset fields that are not required by the new (recurring) events + """ + fields_to_reset = ["lock_user", "lock_time", "lock_session", "lock_action"] + + for field in fields_to_reset: + setattr(event, field, None) + + def _generate_recurring_events( + self, event: EventResourceModel, recurrence_id: str | None = None + ) -> list[EventResourceModel]: + """ + Generate recurring events based on the recurrence rules of the given event. + + Args: + event (EventResourceModel): The original event used as a template for recurrence. + recurrence_id (str, optional): The ID of the recurrence group. Defaults to None. + + Returns: + list[EventResourceModel]: A list of newly generated recurring events. + """ + assert event.dates is not None + + self.set_recurring_mode(event) + generated_events = [] + + assert event.dates.start is not None + assert event.dates.end is not None + assert event.dates.recurring_rule is not None + + # compute the difference between start and end in the original event + time_delta = event.dates.end - event.dates.start + + max_recurring_events = get_max_recurrent_events() + recurring_dates = generate_recurring_dates( + start=event.dates.start, + tz=pytz.timezone(event.dates.tz or ""), + **event.dates.recurring_rule.to_dict(), + ) + + # for all the dates based on the recurring rules + # set a limit to prevent too many events to be created + recurring_dates_iter = itertools.islice(recurring_dates, 0, max_recurring_events) + for i, date in enumerate(recurring_dates_iter): + # prepare data for new recurring event + new_id = generate_guid(type=GUID_NEWSML) + recurring_event_updates = {"dates": dict(start=date, end=(date + time_delta)), "guid": new_id, "id": new_id} + + if not recurrence_id: + recurring_event_updates["recurrence_id"] = new_id + + # reset fields not required by new events + fields_to_reset = [ + "lock_user", + "lock_time", + "lock_session", + "lock_action", + "planning_schedule", + "reschedule_from_schedule", + "planning_item", + "pubstatus", + "reschedule_from", + ] + for field in fields_to_reset: + recurring_event_updates[field] = None + + # let's finally clone the original event & update it with recurring event data + new_event = event.model_copy(update=recurring_event_updates, deep=True) + + # reset embedded_planning to all Events but the first one, as this auto-generates + # associated Planning item with Coverages to the event + if i > 0: + new_event.embedded_planning = [] + + # set expiry date + self._overwrite_event_expiry_date(new_event) + self._set_planning_schedule(new_event) + + generated_events.append(new_event) + + return generated_events + + @staticmethod + async def _link_to_planning(event: EventResourceModel): + """ + Links an Event to an existing Planning Item + + The Planning item remains locked, it is up to the client to release this lock + after this operation is complete + """ + planning_service = PlanningAsyncService() + planning_item = await planning_service.find_by_id(event.planning_item) + + if not planning_item: + raise SuperdeskApiError.badRequestError("Planning item not found") + + updates = {"related_events": planning_item.get("related_events") or []} + event_link_method = get_planning_event_link_method() + link_type: PLANNING_RELATED_EVENT_LINK_TYPE = ( + "primary" + if not len(get_related_event_ids_for_planning(planning_item, "primary")) + and event_link_method in ("one_primary", "one_primary_many_secondary") + else "secondary" + ) + related_planning = PlanningRelatedEventLink(_id=event.id, link_type=link_type) + updates["related_events"].append(related_planning) + + # Add ``recurrence_id`` if the supplied Event is part of a series + if event.recurrence_id: + related_planning["recurrence_id"] = event.recurrence_id + if not planning_item.get("recurrence_id") and link_type == "primary": + updates["recurrence_id"] = event.recurrence_id + + # TODO-ASYNC: migrate `validate_on_update` method to async + # planning_service.validate_on_update(updates, planning_item, get_user()) + await planning_service.system_update(event.planning_item, updates) + + await signals.planning_update.send(updates, planning_item) + + def _enhance_event_item(self, doc: dict[str, Any]): + plannings = get_related_planning_for_events([doc[ID_FIELD]]) + + if len(plannings): + doc["planning_ids"] = [planning.get("_id") for planning in plannings] + + for location in doc.get("location") or []: + format_address(location) + + # this is to fix the existing events have original creator as empty string + if not doc.get("original_creator"): + doc.pop("original_creator", None) diff --git a/server/planning/events/events_sync/__init__.py b/server/planning/events/events_sync/__init__.py index 50606aae1..d2b405d95 100644 --- a/server/planning/events/events_sync/__init__.py +++ b/server/planning/events/events_sync/__init__.py @@ -8,7 +8,7 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from typing import Dict, Optional, List +from typing import Dict, Optional, List, cast from copy import deepcopy import pytz @@ -16,10 +16,11 @@ from superdesk import get_resource_service -from planning.types import Event, EmbeddedPlanning, StringFieldTranslation +from planning.types import Event, EmbeddedPlanningDict, StringFieldTranslation from planning.common import get_config_event_fields_to_sync_with_planning from planning.content_profiles.utils import AllContentProfileData from planning.utils import get_related_planning_for_events +from planning.types.event import EmbeddedPlanning as EmbeddedPlanningModel, EventResourceModel from .common import VocabsSyncData, SyncItemData, SyncData from .embedded_planning import ( @@ -39,9 +40,21 @@ def get_translated_fields(translations: List[StringFieldTranslation]) -> Dict[st return fields +# TODO-ASYNC: use resource models instead of typed dicts def sync_event_metadata_with_planning_items( - original: Optional[Event], updates: Event, embedded_planning: List[EmbeddedPlanning] + original: Optional[Event], + updates: Event | EventResourceModel, + embedded_planning: list[EmbeddedPlanningDict] | list[EmbeddedPlanningModel], ): + # TODO-ASYNC: remove these checks after this is migrated + if isinstance(updates, EventResourceModel): + updates = cast(Event, updates.to_dict()) + + embedded_planning = [ + cast(EmbeddedPlanningDict, obj.to_dict()) if isinstance(obj, EmbeddedPlanningModel) else obj + for obj in embedded_planning + ] + profiles = AllContentProfileData() if original is None: diff --git a/server/planning/events/events_sync/embedded_planning.py b/server/planning/events/events_sync/embedded_planning.py index 4a4447e80..7583e9d6b 100644 --- a/server/planning/events/events_sync/embedded_planning.py +++ b/server/planning/events/events_sync/embedded_planning.py @@ -16,7 +16,7 @@ from planning.types import ( Event, - EmbeddedPlanning, + EmbeddedPlanningDict, EmbeddedCoverageItem, Planning, Coverage, @@ -35,7 +35,7 @@ def create_new_plannings_from_embedded_planning( event: Event, event_translations: Dict[str, Dict[str, str]], - embedded_planning: List[EmbeddedPlanning], + embedded_planning: List[EmbeddedPlanningDict], profiles: AllContentProfileData, vocabs: VocabsSyncData, ): @@ -240,7 +240,7 @@ def create_new_coverage_from_event_and_planning( def get_existing_plannings_from_embedded_planning( event: Event, event_translations: Dict[str, Dict[str, str]], - embedded_planning: List[EmbeddedPlanning], + embedded_planning: List[EmbeddedPlanningDict], profiles: AllContentProfileData, vocabs: VocabsSyncData, ) -> Iterator[Tuple[Planning, Planning, bool]]: diff --git a/server/planning/events/events_tests.py b/server/planning/events/events_tests.py index 2a5356334..3c172bdf4 100644 --- a/server/planning/events/events_tests.py +++ b/server/planning/events/events_tests.py @@ -11,6 +11,7 @@ from datetime import datetime, timedelta from copy import deepcopy +from bson import ObjectId from pytest import mark import pytz from mock import Mock, patch @@ -431,7 +432,7 @@ async def test_planning_schedule_convert_to_recurring(self, get_user_mock): events = list(service.get_from_mongo(req=None, lookup=None)) self.assertPlanningSchedule(events, 1) lock_service = LockService(self.app) - locked_event = lock_service.lock(events[0], None, "session", "convert_recurring", "events") + locked_event = lock_service.lock(events[0], None, ObjectId(), "convert_recurring", "events") self.assertEqual(locked_event.get("lock_action"), "convert_recurring") schedule = deepcopy(events[0].get("dates")) schedule["start"] = datetime(2099, 11, 21, 12, 00, 00, tzinfo=pytz.UTC) diff --git a/server/planning/events/events_utils.py b/server/planning/events/events_utils.py new file mode 100644 index 000000000..ad7d9a036 --- /dev/null +++ b/server/planning/events/events_utils.py @@ -0,0 +1,221 @@ +import re +import pytz + +from datetime import date, datetime +from typing import AsyncGenerator, Any, Generator, Tuple, Literal, cast + +from dateutil.rrule import rrule, DAILY, WEEKLY, MONTHLY, YEARLY, MO, TU, WE, TH, FR, SA, SU + +from superdesk.utc import utcnow +from superdesk.resource_fields import ID_FIELD +from superdesk.metadata.item import GUID_NEWSML +from superdesk.metadata.utils import generate_guid +from superdesk.core.types import SortParam, SortListParam + +from planning.types import EventResourceModel, UpdateMethods +from planning.types.event import EmbeddedPlanning, EmbeddedPlanningCoverage +from planning.common import TEMP_ID_PREFIX, WORKFLOW_STATE, get_max_recurrent_events + + +FrequencyType = Literal["DAILY", "WEEKLY", "MONTHLY", "YEARLY"] + +FREQUENCIES = { + "DAILY": DAILY, + "WEEKLY": WEEKLY, + "MONTHLY": MONTHLY, + "YEARLY": YEARLY, +} + +DAYS = { + "MO": MO, + "TU": TU, + "WE": WE, + "TH": TH, + "FR": FR, + "SA": SA, + "SU": SU, +} + + +def generate_recurring_dates( + start: datetime, + frequency: FrequencyType, + interval: int = 1, + until: datetime | None = None, + byday: str | None = None, + count: int = 5, + tz: pytz.BaseTzInfo | None = None, + date_only: bool = False, +) -> Generator[datetime | date, None, None]: + """ + + Returns list of dates related to recurring rules + + :param start datetime: date when to start + :param frequency FrequencyType: DAILY, WEEKLY, MONTHLY, YEARLY + :param interval int: indicates how often the rule repeats as a positive integer + :param until datetime: date after which the recurrence rule expires + :param byday str or list: "MO TU" + :param count int: number of occurrences of the rule + :return Generator: list of datetime + + """ + # if tz is given, respect the timezone by starting from the local time + # NOTE: rrule uses only naive datetime + if tz: + try: + # start can already be localized + start = pytz.UTC.localize(start) + except ValueError: + pass + start = start.astimezone(tz).replace(tzinfo=None) + if until: + until = until.astimezone(tz).replace(tzinfo=None) + + if frequency == "DAILY": + byday = None + + # check format of the recurring_rule byday value + if byday and re.match(r"^-?[1-5]+.*", byday): + # byday uses monthly or yearly frequency rule with day of week and + # preceding day of month integer by day value + # examples: + # 1FR - first friday of the month + # -2MON - second to last monday of the month + if byday[:1] == "-": + day_of_month = int(byday[:2]) + day_of_week = byday[2:] + else: + day_of_month = int(byday[:1]) + day_of_week = byday[1:] + + byweekday = DAYS.get(day_of_week)(day_of_month) # type: ignore[misc] + else: + # byday uses DAYS constants + byweekday = byday and [DAYS.get(d) for d in byday.split()] or None + + # convert count of repeats to count of events + if count: + count = count * (len(byday.split()) if byday else 1) + + # TODO: use dateutil.rrule.rruleset to incude ex_date and ex_rule + dates = rrule( + FREQUENCIES.get(frequency), + dtstart=start, + until=until, + byweekday=byweekday, + count=count, + interval=interval, + ) + # if a timezone has been applied, returns UTC + if tz: + if date_only: + return (tz.localize(dt).astimezone(pytz.UTC).replace(tzinfo=None).date() for dt in dates) + else: + return (tz.localize(dt).astimezone(pytz.UTC).replace(tzinfo=None) for dt in dates) + else: + if date_only: + return (date.date() for date in dates) + else: + return (date for date in dates) + + +def get_events_embedded_planning(event: dict[str, Any] | EventResourceModel) -> list[EmbeddedPlanning]: + if isinstance(event, dict): + event = EventResourceModel.from_dict(event) + event = cast(EventResourceModel, event) + + def _get_coverage_id(coverage: EmbeddedPlanningCoverage) -> str: + if not coverage.coverage_id: + coverage.coverage_id = TEMP_ID_PREFIX + "-" + generate_guid(type=GUID_NEWSML) + return coverage.coverage_id + + return [ + EmbeddedPlanning( + planning_id=planning.planning_id, + update_method=planning.update_method or UpdateMethods.SINGLE, + coverages={_get_coverage_id(coverage): coverage for coverage in planning.coverages}, + ) + for planning in event.embedded_planning + ] + + +async def get_series( + query: dict, sort: SortParam | None = None, max_results: int = 25 +) -> AsyncGenerator[EventResourceModel, None]: + events_service = EventResourceModel.get_service() + page = 1 + + while True: + # Get the results from mongo + results = await events_service.find(req=query, page=page, max_results=max_results, sort=sort, use_mongo=True) + + docs = await results.to_list() + if not docs: + break + + page += 1 + + # Yield the results for iteration by the callee + for doc in docs: + yield doc + + +async def get_recurring_timeline( + selected: dict[str, Any], + spiked: bool = False, + rescheduled: bool = False, + cancelled: bool = False, + postponed: bool = False, +) -> Tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: + """Utility method to get all events in the series + + This splits up the series of events into 3 separate arrays. + Historic: event.dates.start < utcnow() + Past: utcnow() < event.dates.start < selected.dates.start + Future: event.dates.start > selected.dates.start + """ + excluded_states = [] + + if not spiked: + excluded_states.append(WORKFLOW_STATE.SPIKED) + if not rescheduled: + excluded_states.append(WORKFLOW_STATE.RESCHEDULED) + if not cancelled: + excluded_states.append(WORKFLOW_STATE.CANCELLED) + if not postponed: + excluded_states.append(WORKFLOW_STATE.POSTPONED) + + query = { + "$and": [ + {"recurrence_id": selected["recurrence_id"]}, + {"_id": {"$ne": selected[ID_FIELD]}}, + ] + } + + if excluded_states: + query["$and"].append({"state": {"$nin": excluded_states}}) + + sort: SortListParam = [("dates.start", 1)] + max_results = get_max_recurrent_events() + selected_start = selected.get("dates", {}).get("start", utcnow()) + + # Make sure we are working with a datetime instance + if not isinstance(selected_start, datetime): + selected_start = datetime.strptime(selected_start, "%Y-%m-%dT%H:%M:%S%z") + + historic = [] + past = [] + future = [] + + async for event in get_series(query, sort, max_results): + end = event.dates.end if event.dates else None + start = event.dates.start if event.dates else None + if end and end < utcnow(): + historic.append(event.to_dict()) + elif start and start < selected_start: + past.append(event.to_dict()) + elif start and start > selected_start: + future.append(event.to_dict()) + + return historic, past, future diff --git a/server/planning/events/module.py b/server/planning/events/module.py index bc1ca661c..935345445 100644 --- a/server/planning/events/module.py +++ b/server/planning/events/module.py @@ -6,7 +6,7 @@ ) from planning.types import EventResourceModel -from .service import EventsAsyncService +from .events_service import EventsAsyncService events_resource_config = ResourceConfig( name="events", diff --git a/server/planning/events/service.py b/server/planning/events/service.py deleted file mode 100644 index 56fbd020e..000000000 --- a/server/planning/events/service.py +++ /dev/null @@ -1,61 +0,0 @@ -from typing import AsyncGenerator, Any -from datetime import datetime -from superdesk.core.utils import date_to_str - -from planning.types import EventResourceModel -from planning.common import get_max_recurrent_events, WORKFLOW_STATE -from planning.core.service import BasePlanningAsyncService - - -class EventsAsyncService(BasePlanningAsyncService[EventResourceModel]): - resource_name = "events" - - async def get_expired_items( - self, expiry_datetime: datetime, spiked_events_only: bool = False - ) -> AsyncGenerator[list[dict[str, Any]], None]: - """Get the expired items - - Where end date is in the past - """ - query: dict[str, Any] = { - "query": { - "bool": { - "must_not": [{"term": {"expired": True}}], - "filter": {"range": {"dates.end": {"lte": date_to_str(expiry_datetime)}}}, - }, - }, - "sort": [{"dates.start": "asc"}], - "size": get_max_recurrent_events(), - } - - if spiked_events_only: - del query["query"]["bool"]["must_not"] - query["query"]["bool"]["must"] = [{"term": {"state": WORKFLOW_STATE.SPIKED}}] - - total_received = 0 - total_events = -1 - - while True: - query["from"] = total_received - - results = await self.search(query) - items = await results.to_list_raw() - results_count = len(items) - - # If the total_events has not been set, then this is the first query - # In which case we need to store the total hits from the search - if total_events < 0: - total_events = results_count - - # If the search doesn't contain any results, return here - if total_events < 1: - break - - # If the last query doesn't contain any results, return here - if results_count == 0: - break - - total_received += results_count - - # Yield the results for iteration by the callee - yield items diff --git a/server/planning/events/utils.py b/server/planning/events/utils.py deleted file mode 100644 index 75e118253..000000000 --- a/server/planning/events/utils.py +++ /dev/null @@ -1,92 +0,0 @@ -from typing import AsyncGenerator, Any, Tuple -from datetime import datetime - -from planning.common import ( - WORKFLOW_STATE, - get_max_recurrent_events, -) -from planning.types import EventResourceModel -from superdesk.core.types import SortParam, SortListParam -from superdesk.resource_fields import ID_FIELD -from superdesk.utc import utcnow - - -async def get_series( - query: dict, sort: SortParam | None = None, max_results: int = 25 -) -> AsyncGenerator[EventResourceModel, None]: - events_service = EventResourceModel.get_service() - page = 1 - - while True: - # Get the results from mongo - results = await events_service.find(req=query, page=page, max_results=max_results, sort=sort, use_mongo=True) - - docs = await results.to_list() - if not docs: - break - - page += 1 - - # Yield the results for iteration by the callee - for doc in docs: - yield doc - - -async def get_recurring_timeline( - selected: dict[str, Any], - spiked: bool = False, - rescheduled: bool = False, - cancelled: bool = False, - postponed: bool = False, -) -> Tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: - """Utility method to get all events in the series - - This splits up the series of events into 3 separate arrays. - Historic: event.dates.start < utcnow() - Past: utcnow() < event.dates.start < selected.dates.start - Future: event.dates.start > selected.dates.start - """ - excluded_states = [] - - if not spiked: - excluded_states.append(WORKFLOW_STATE.SPIKED) - if not rescheduled: - excluded_states.append(WORKFLOW_STATE.RESCHEDULED) - if not cancelled: - excluded_states.append(WORKFLOW_STATE.CANCELLED) - if not postponed: - excluded_states.append(WORKFLOW_STATE.POSTPONED) - - query = { - "$and": [ - {"recurrence_id": selected["recurrence_id"]}, - {"_id": {"$ne": selected[ID_FIELD]}}, - ] - } - - if excluded_states: - query["$and"].append({"state": {"$nin": excluded_states}}) - - sort: SortListParam = [("dates.start", 1)] - max_results = get_max_recurrent_events() - selected_start = selected.get("dates", {}).get("start", utcnow()) - - # Make sure we are working with a datetime instance - if not isinstance(selected_start, datetime): - selected_start = datetime.strptime(selected_start, "%Y-%m-%dT%H:%M:%S%z") - - historic = [] - past = [] - future = [] - - async for event in get_series(query, sort, max_results): - end = event.dates.end if event.dates else None - start = event.dates.start if event.dates else None - if end and end < utcnow(): - historic.append(event.to_dict()) - elif start and start < selected_start: - past.append(event.to_dict()) - elif start and start > selected_start: - future.append(event.to_dict()) - - return historic, past, future diff --git a/server/planning/planning/__init__.py b/server/planning/planning/__init__.py index 83ee4d7f7..14df7a88a 100644 --- a/server/planning/planning/__init__.py +++ b/server/planning/planning/__init__.py @@ -8,6 +8,7 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license +from planning import signals from quart_babel import lazy_gettext import superdesk @@ -131,8 +132,10 @@ def init_app(app): planning_autosave_service = PlanningAutosaveService("planning_autosave", superdesk.get_backend()) PlanningAutosaveResource("planning_autosave", app=app, service=planning_autosave_service) + # listen to async signals + signals.planning_update.connect(planning_history_service.on_item_updated) + app.on_inserted_planning += planning_history_service.on_item_created - app.on_updated_planning += planning_history_service.on_item_updated app.on_updated_planning_spike += planning_history_service.on_spike app.on_updated_planning_unspike += planning_history_service.on_unspike app.on_updated_planning_cancel += planning_history_service.on_cancel diff --git a/server/planning/planning/planning_history.py b/server/planning/planning/planning_history.py index 697c0b86e..aad3bc9b7 100644 --- a/server/planning/planning/planning_history.py +++ b/server/planning/planning/planning_history.py @@ -10,12 +10,15 @@ import logging from copy import deepcopy +from typing import Any + from superdesk.flask import request from superdesk.resource_fields import ID_FIELD from superdesk import Resource, get_resource_service from superdesk.default_settings import strtobool +from planning.types import PlanningResourceModel from planning.history import HistoryService from planning.common import WORKFLOW_STATE, ITEM_ACTIONS, ASSIGNMENT_WORKFLOW_STATE from planning.item_lock import LOCK_ACTION @@ -71,8 +74,8 @@ def _save_history(self, planning, update, operation): self.post([history]) - def on_item_updated(self, updates, original, operation=None): - item = deepcopy(original) + def on_item_updated(self, updates: dict[str, Any], original: PlanningResourceModel, operation: str | None = None): + item = deepcopy(original.to_dict()) if list(item.keys()) == ["_id"]: diff = self._remove_unwanted_fields(updates) else: diff --git a/server/planning/planning/service.py b/server/planning/planning/service.py index 7163e6993..dfab6349d 100644 --- a/server/planning/planning/service.py +++ b/server/planning/planning/service.py @@ -1,10 +1,14 @@ -from typing import AsyncGenerator, Any from datetime import datetime +from typing import AsyncGenerator, Any + from superdesk.core.utils import date_to_str +from superdesk.resource_fields import ID_FIELD -from planning.types import PlanningResourceModel from planning.common import WORKFLOW_STATE +from planning.types import PlanningResourceModel +from planning.types.event import EventResourceModel from planning.core.service import BasePlanningAsyncService +from planning.utils import get_related_event_links_for_planning, get_related_planning_for_events class PlanningAsyncService(BasePlanningAsyncService[PlanningResourceModel]): @@ -82,3 +86,21 @@ async def get_expired_items( # Yield the results for iteration by the callee yield items + + async def on_event_converted_to_recurring(self, updates: dict[str, Any], original: EventResourceModel): + for item in get_related_planning_for_events([original.id]): + related_events = get_related_event_links_for_planning(item) + + # Set the ``recurrence_id`` in the ``planning.related_events`` field + for event in related_events: + if event["_id"] == original.id: + event["recurrence_id"] = updates["recurrence_id"] + break + + await self.update( + item[ID_FIELD], + { + "recurrence_id": updates["recurrence_id"], + "related_events": related_events, + }, + ) diff --git a/server/planning/signals.py b/server/planning/signals.py index 56a0c555b..5b61133e2 100644 --- a/server/planning/signals.py +++ b/server/planning/signals.py @@ -9,10 +9,15 @@ # at https://www.sourcefabric.org/superdesk/license import blinker +from typing import Any + +from superdesk.core import AsyncSignal +from planning.types import EventResourceModel, PlanningResourceModel __all__ = [ "planning_created", "planning_ingested", + "events_update", ] signals = blinker.Namespace() @@ -20,3 +25,20 @@ planning_created = signals.signal("planning:created") planning_ingested = signals.signal("planning:ingested") assignment_content_create = signals.signal("planning:assignment_content_create") + + +#: Signal for when an Event is about to be updated in the DB +#: param updates: Event updates +#: param original_event: `EventResourceModel` instance of the event to be updated +events_update = AsyncSignal[dict[str, Any], EventResourceModel]("events:update") + + +#: Signal for when a list of Events have been recorded into DB +#: param events: List of events registered in DB +events_created = AsyncSignal[list[EventResourceModel]]("events:created") + + +#: Signal for when a Planning item is about to be updated in the DB +#: param updates: Planning item updates +#: param planning_item: `PlanningResourceModel` instance of the event to be updated +planning_update = AsyncSignal[dict[str, Any], PlanningResourceModel]("planning:update") diff --git a/server/planning/tests/__init__.py b/server/planning/tests/__init__.py index f1b97a352..f9d056f4d 100644 --- a/server/planning/tests/__init__.py +++ b/server/planning/tests/__init__.py @@ -1,10 +1,19 @@ from typing import Any + +from bson import ObjectId +from superdesk.flask import g from superdesk.tests import TestCase as BaseTestCase class TestCase(BaseTestCase): test_context = None # avoid using test_request_context + app_config: dict[str, Any] = { "INSTALLED_APPS": ["planning"], "MODULES": ["planning.module"], } + + def setup_test_user(self): + user = {"_id": ObjectId()} + self.app.data.insert("users", [user]) + g.user = user diff --git a/server/planning/types/__init__.py b/server/planning/types/__init__.py index 9debfe388..11e73fb32 100644 --- a/server/planning/types/__init__.py +++ b/server/planning/types/__init__.py @@ -14,10 +14,13 @@ from .content_profiles import ContentFieldSchema, ContentFieldEditor, ContentProfile # noqa from .base import BasePlanningModel +from .common import PlanningSchedule from .event import EventResourceModel from .planning import PlanningResourceModel from .assignment import AssignmentResourceModel from .published import PublishedPlanningModel +from .enums import PostStates, UpdateMethods, WorkflowState + __all__ = [ "BasePlanningModel", @@ -25,6 +28,10 @@ "PlanningResourceModel", "AssignmentResourceModel", "PublishedPlanningModel", + "PlanningSchedule", + "PostStates", + "UpdateMethods", + "WorkflowState", ] @@ -55,7 +62,7 @@ class EmbeddedCoverageItem(TypedDict, total=False): priority: int -class EmbeddedPlanning(TypedDict, total=False): +class EmbeddedPlanningDict(TypedDict, total=False): planning_id: str update_method: UPDATE_METHOD coverages: Dict[str, EmbeddedCoverageItem] diff --git a/server/planning/types/base.py b/server/planning/types/base.py index d62de85b7..1bc343eec 100644 --- a/server/planning/types/base.py +++ b/server/planning/types/base.py @@ -5,5 +5,5 @@ class BasePlanningModel(ResourceModel): - original_creator: Annotated[ObjectId, validate_data_relation_async("users")] = None - version_creator: Annotated[ObjectId, validate_data_relation_async("users")] = None + original_creator: Annotated[ObjectId, validate_data_relation_async("users")] | None = None + version_creator: Annotated[ObjectId, validate_data_relation_async("users")] | None = None diff --git a/server/planning/types/common.py b/server/planning/types/common.py index 5a9244c30..7bae389e2 100644 --- a/server/planning/types/common.py +++ b/server/planning/types/common.py @@ -3,7 +3,7 @@ from typing import Any, Annotated, Literal, TypeAlias from superdesk.utc import utcnow -from superdesk.core.resources import dataclass, fields +from superdesk.core.resources import dataclass, fields, Dataclass from superdesk.core.elastic.mapping import json_schema_to_elastic_mapping from superdesk.core.resources.validators import validate_data_relation_async @@ -63,9 +63,8 @@ class RelationshipItem: related: str | None = None -@dataclass -class PlanningSchedule: - scheduled: date | None = None +class PlanningSchedule(Dataclass): + scheduled: datetime | None = None coverage_id: fields.Keyword | None = None diff --git a/server/planning/types/enums.py b/server/planning/types/enums.py index 043b9fe95..8d82934d3 100644 --- a/server/planning/types/enums.py +++ b/server/planning/types/enums.py @@ -32,9 +32,9 @@ class PostStates(str, Enum): @unique class UpdateMethods(str, Enum): - UPDATE_SINGLE = "single" - UPDATE_FUTURE = "future" - UPDATE_ALL = "all" + SINGLE = "single" + FUTURE = "future" + ALL = "all" @unique diff --git a/server/planning/types/event.py b/server/planning/types/event.py index d057b608b..ded74ed7c 100644 --- a/server/planning/types/event.py +++ b/server/planning/types/event.py @@ -5,7 +5,7 @@ from content_api.items.model import CVItem, Place from superdesk.utc import utcnow -from superdesk.core.resources import fields, dataclass +from superdesk.core.resources import fields, dataclass, Dataclass from superdesk.core.resources.validators import validate_data_relation_async from .base import BasePlanningModel @@ -101,10 +101,10 @@ class EmbeddedPlanningCoverage: @dataclass -class EmbeddedPlanning: +class EmbeddedPlanning(Dataclass): planning_id: Annotated[str, validate_data_relation_async("planning")] update_method: Annotated[UpdateMethods, fields.keyword_mapping()] | None = None - coverages: list[EmbeddedPlanningCoverage] | None = Field(default_factory=list) + coverages: list[EmbeddedPlanningCoverage] = Field(default_factory=list) @dataclass @@ -123,7 +123,7 @@ class RelatedItem: class EventResourceModel(BasePlanningModel, LockFieldsMixin): - guid: fields.Keyword + guid: fields.Keyword | None = None unique_id: int | None = None unique_name: fields.Keyword | None = None version: int | None = None @@ -214,7 +214,7 @@ class EventResourceModel(BasePlanningModel, LockFieldsMixin): item_type: Annotated[fields.Keyword, Field(alias="type")] = "event" # Named Calendars - calendars: list[KeywordQCodeName] | None = None + calendars: list[KeywordQCodeName] = Field(default_factory=list) # The previous state the item was in before for example being spiked, # when un-spiked it will revert to this state diff --git a/server/planning/types/event_dates.py b/server/planning/types/event_dates.py index 22334eff3..a68267363 100644 --- a/server/planning/types/event_dates.py +++ b/server/planning/types/event_dates.py @@ -1,19 +1,20 @@ -from typing import List, Literal +from typing import List, Literal, TypeAlias from datetime import datetime, date from pydantic.fields import Field -from superdesk.core.resources import dataclass, fields +from superdesk.core.resources import dataclass, fields, Dataclass # NewsML-G2 Event properties See IPTC-G2-Implementation_Guide 15.4.3 +RepeatModeType: TypeAlias = Literal["count", "until"] -@dataclass -class RecurringRule: + +class RecurringRule(Dataclass): frequency: str | None = None interval: int | None = None - endRepeatMode: Literal["count", "until"] | None = None + end_repeat_mode: RepeatModeType | None = Field(default=None, alias="endRepeatMode") until: datetime | None = None count: int | None = None bymonth: str | None = None @@ -42,8 +43,8 @@ class OccurStatus: label: fields.Keyword | None = None -@dataclass -class EventDates: +class EventDates(Dataclass): + # TODO-ASYNC: double check which ones are mandatory start: datetime | None = None end: datetime | None = None tz: str | None = None