Skip to content

Commit

Permalink
Improvements after feedback
Browse files Browse the repository at this point in the history
SDESK-7442
  • Loading branch information
eos87 committed Dec 9, 2024
1 parent aea507d commit f8cdcc6
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 42 deletions.
6 changes: 5 additions & 1 deletion server/planning/assignments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import superdesk
from quart_babel import lazy_gettext
from superdesk.services import BaseService

from planning import signals
from .assignments import AssignmentsResource, AssignmentsService
from .assignments_content import AssignmentsContentResource, AssignmentsContentService
from .assignments_link import AssignmentsLinkResource, AssignmentsLinkService
Expand Down Expand Up @@ -93,13 +95,15 @@ def init_app(app):
delivery_service = BaseService("delivery", backend=superdesk.get_backend())
DeliveryResource("delivery", app=app, service=delivery_service)

# listen to async signals
signals.events_update.connect(assignments_publish_service.on_events_updated)

# Updating data/lock on assignments based on content item updates from authoring
app.on_updated_archive += assignments_publish_service.update_assignment_on_archive_update
app.on_archive_item_updated += assignments_publish_service.update_assignment_on_archive_operation
app.on_item_lock += assignments_publish_service.validate_assignment_lock
app.on_item_locked += assignments_publish_service.sync_assignment_lock
app.on_item_unlocked += assignments_publish_service.sync_assignment_unlock
app.on_updated_events += assignments_publish_service.on_events_updated

# Track updates for an assignment if it's news story was updated
if app.config.get("PLANNING_LINK_UPDATES_TO_COVERAGES", True):
Expand Down
5 changes: 3 additions & 2 deletions server/planning/assignments/assignments.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
planning_auto_assign_to_workflow,
)

from planning.types import EventResourceModel
from planning.planning_notifications import PlanningNotifications
from planning.common import format_address, get_assginment_name
from .assignments_history import ASSIGNMENT_HISTORY_ACTIONS
Expand Down Expand Up @@ -1020,10 +1021,10 @@ def update_assignment_on_archive_operation(self, updates, original, operation=No
lock_service = get_component(LockService)
lock_service.unlock(assignment, user_id, get_auth()["_id"], "assignments")

def on_events_updated(self, updates, original):
def on_events_updated(self, updates: dict[str, Any], original: EventResourceModel):
"""Send assignment notifications if any relevant Event metadata has changed"""

event = deepcopy(original)
event = deepcopy(original.to_dict())
event.update(updates)
plannings = get_related_planning_for_events([event[ID_FIELD]], "primary")

Expand Down
8 changes: 8 additions & 0 deletions server/planning/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import superdesk
from quart_babel import lazy_gettext

from planning import signals
from .events import EventsResource, EventsService
from .events_spike import (
EventsSpikeResource,
Expand Down Expand Up @@ -137,8 +139,14 @@ def init_app(app):
service=recent_events_template_service,
)

# listen to async signals
signals.events_created.connect(events_history_service.on_item_created)

app.on_updated_events += events_history_service.on_item_updated

# TODO-ASYNC: remove `on_inserted_events` when `events_reschedule` is async
app.on_inserted_events += events_history_service.on_item_created

app.on_deleted_item_events -= events_history_service.on_item_deleted
app.on_deleted_item_events += events_history_service.on_item_deleted
app.on_updated_events_spike += events_history_service.on_spike
Expand Down
68 changes: 32 additions & 36 deletions server/planning/events/events_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
from apps.archive.common import get_auth
from apps.auth import get_user, get_user_id

import superdesk
from superdesk.utc import utcnow
from superdesk.core import get_app_config
from superdesk import get_resource_service
from superdesk.resource_fields import ID_FIELD
from superdesk.errors import SuperdeskApiError
from superdesk.metadata.item import GUID_NEWSML
from superdesk.notification import push_notification
from superdesk.core import get_app_config, get_current_app
from superdesk.core.utils import date_to_str, generate_guid


from planning import signals
from planning.types import (
PLANNING_RELATED_EVENT_LINK_TYPE,
EventResourceModel,
Expand All @@ -36,16 +37,16 @@
set_ingested_event_state,
post_required,
)
from planning.planning import PlanningAsyncService
from planning.core.service import BasePlanningAsyncService
from planning.utils import (
get_planning_event_link_method,
get_related_event_ids_for_planning,
get_related_planning_for_events,
)
from planning.events.events_base_service import EventsBaseService

from .events_sync import sync_event_metadata_with_planning_items
from .events_utils import generate_recurring_dates, get_events_embedded_planning
from .events_utils import generate_recurring_dates, get_events_embedded_planning, get_recurring_timeline


class EventsAsyncService(BasePlanningAsyncService[EventResourceModel]):
Expand Down Expand Up @@ -177,7 +178,7 @@ async def on_create(self, docs: list[EventResourceModel]) -> None:
events_history.on_item_created([event.to_dict()])

if original_planning_item:
self._link_to_planning(event)
await self._link_to_planning(event)
del event["_planning_item"]

if generated_events:
Expand All @@ -188,8 +189,8 @@ async def on_create(self, docs: list[EventResourceModel]) -> None:
async def on_created(self, docs: list[EventResourceModel]):
"""Send WebSocket Notifications for created Events
Generate the list of IDs for recurring and non-recurring events
Then send this list off to the clients so they can fetch these events
Generate the list of IDs for recurring and non-recurring events,
then send this list off to the clients so they can fetch these events
"""
notifications_sent = []
history_service = get_resource_service("events_history")
Expand All @@ -201,8 +202,8 @@ async def on_created(self, docs: list[EventResourceModel]):
if doc.duplicate_from:
parent_id = doc.duplicate_from
parent_event = await self.find_by_id(parent_id)

assert parent_event is not None
if not parent_event:
raise SuperdeskApiError.badRequestError("Parent event not found")

history_service.on_item_updated({"duplicate_id": event_id}, parent_event.to_dict(), "duplicate")
history_service.on_item_updated({"duplicate_id": parent_id}, doc.to_dict(), "duplicate_from")
Expand Down Expand Up @@ -423,10 +424,9 @@ async def _update_single_event(self, updates: dict[str, Any], original: EventRes
original.lock_action == "convert_recurring"
and updates.get("dates", {}).get("recurring_rule", None) is not None
):
generated_events = await self._convert_to_recurring_event(updates, original)
generated_events = await self._convert_to_recurring_events(updates, original)

# if the original event was "posted" then post all the generated events
# if original.get("pubstatus") in [ POST_STATE.CANCELLED, POST_STATE.USABLE]:
if original.pubstatus in [PostStates.CANCELLED, PostStates.USABLE]:
post = {
"event": generated_events[0].id,
Expand Down Expand Up @@ -470,10 +470,10 @@ async def _update_recurring_events(
original_as_dict = original.to_dict()

if update_method == UpdateMethods.FUTURE:
historic, past, future = self._get_recurring_timeline(original_as_dict)
historic, past, future = await get_recurring_timeline(original_as_dict)
events = future
else:
historic, past, future = self._get_recurring_timeline(original_as_dict)
historic, past, future = await get_recurring_timeline(original_as_dict)
events = historic + past + future

events_post_service = get_resource_service("events_post")
Expand Down Expand Up @@ -525,8 +525,7 @@ async def _update_recurring_events(
# by the event provided to this update request
new_updates.pop("embedded_planning", None)

app = get_current_app().as_any()
app.on_updated_events(new_updates, {"_id": event_id})
await signals.events_update.send(new_updates, original)

# And finally push a notification to connected clients
push_notification(
Expand All @@ -536,11 +535,6 @@ async def _update_recurring_events(
user=str(updates.get("version_creator", "")),
)

def _get_recurring_timeline(self, selected: dict[str, Any], spiked: bool = False):
# TODO-ASYNC: replace with an async service
events_base_service = EventsBaseService("events", backend=superdesk.get_backend())
return events_base_service.get_recurring_timeline(selected, postponed=True, spiked=spiked)

def mark_event_complete(self, updates: dict[str, Any], event: EventResourceModel, mark_complete_validated: bool):
assert event.dates is not None
assert event.dates.start is not None
Expand All @@ -567,7 +561,7 @@ def mark_event_complete(self, updates: dict[str, Any], event: EventResourceModel
},
)

async def _convert_to_recurring_event(self, updates: dict[str, Any], original: EventResourceModel):
async def _convert_to_recurring_events(self, updates: dict[str, Any], original: EventResourceModel):
"""Convert a single event to a series of recurring events"""

self._validate_convert_to_recurring(updates, original)
Expand Down Expand Up @@ -610,8 +604,7 @@ async def _convert_to_recurring_event(self, updates: dict[str, Any], original: E

# Create the new events and generate their history
await self.create(generated_events)
app = get_current_app().as_any()
app.on_inserted_events(generated_events)
await signals.events_created.send(generated_events)

return generated_events

Expand Down Expand Up @@ -653,14 +646,14 @@ def _reset_recurring_event_fields(self, event: EventResourceModel):
setattr(event, field, None)

def _generate_recurring_events(
self, event: EventResourceModel, recurrence_id: int | None = None
self, event: EventResourceModel, recurrence_id: str | None = None
) -> list[EventResourceModel]:
"""
Generate recurring events based on the recurrence rules of the given event.
Args:
event (EventResourceModel): The original event used as a template for recurrence.
recurrence_id (int, optional): The ID of the recurrence group. Defaults to None.
recurrence_id (str, optional): The ID of the recurrence group. Defaults to None.
Returns:
list[EventResourceModel]: A list of newly generated recurring events.
Expand All @@ -686,7 +679,8 @@ def _generate_recurring_events(

# for all the dates based on the recurring rules
# set a limit to prevent too many events to be created
for date in itertools.islice(recurring_dates, 0, max_recurring_events):
recurring_dates_iter = itertools.islice(recurring_dates, 0, max_recurring_events)
for i, date in enumerate(recurring_dates_iter):
# prepare data for new recurring event
new_id = generate_guid(type=GUID_NEWSML)
recurring_event_updates = {"dates": dict(start=date, end=(date + time_delta)), "guid": new_id, "id": new_id}
Expand All @@ -712,6 +706,11 @@ def _generate_recurring_events(
# let's finally clone the original event & update it with recurring event data
new_event = event.model_copy(update=recurring_event_updates, deep=True)

# reset embedded_planning to all Events but the first one, as this auto-generates
# associated Planning item with Coverages to the event
if i > 0:
new_event.embedded_planning = []

# set expiry date
self._overwrite_event_expiry_date(new_event)
self._set_planning_schedule(new_event)
Expand All @@ -721,18 +720,15 @@ def _generate_recurring_events(
return generated_events

@staticmethod
def _link_to_planning(event: EventResourceModel):
async def _link_to_planning(event: EventResourceModel):
"""
Links an Event to an existing Planning Item
The Planning item remains locked, it is up to the client to release this lock
after this operation is complete
"""
# TODO-ASYNC: replace when planning service is async
planning_service = get_resource_service("planning")
plan_id = event.planning_item

planning_item = planning_service.find_one(req=None, _id=plan_id)
planning_service = PlanningAsyncService()
planning_item = await planning_service.find_by_id(event.planning_item)

if not planning_item:
raise SuperdeskApiError.badRequestError("Planning item not found")
Expand All @@ -754,8 +750,8 @@ def _link_to_planning(event: EventResourceModel):
if not planning_item.get("recurrence_id") and link_type == "primary":
updates["recurrence_id"] = event.recurrence_id

planning_service.validate_on_update(updates, planning_item, get_user())
planning_service.system_update(plan_id, updates, planning_item)
# TODO-ASYNC: migrate `validate_on_update` method to async
# planning_service.validate_on_update(updates, planning_item, get_user())
await planning_service.system_update(event.planning_item, updates)

app = get_current_app().as_any()
app.on_updated_planning(updates, planning_item)
await signals.planning_update.send(updates, planning_item)
5 changes: 4 additions & 1 deletion server/planning/planning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from planning import signals
from quart_babel import lazy_gettext

import superdesk
Expand Down Expand Up @@ -131,8 +132,10 @@ def init_app(app):
planning_autosave_service = PlanningAutosaveService("planning_autosave", superdesk.get_backend())
PlanningAutosaveResource("planning_autosave", app=app, service=planning_autosave_service)

# listen to async signals
signals.planning_update.connect(planning_history_service.on_item_updated)

app.on_inserted_planning += planning_history_service.on_item_created
app.on_updated_planning += planning_history_service.on_item_updated
app.on_updated_planning_spike += planning_history_service.on_spike
app.on_updated_planning_unspike += planning_history_service.on_unspike
app.on_updated_planning_cancel += planning_history_service.on_cancel
Expand Down
7 changes: 5 additions & 2 deletions server/planning/planning/planning_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@

import logging
from copy import deepcopy
from typing import Any


from superdesk.flask import request
from superdesk.resource_fields import ID_FIELD
from superdesk import Resource, get_resource_service
from superdesk.default_settings import strtobool

from planning.types import PlanningResourceModel
from planning.history import HistoryService
from planning.common import WORKFLOW_STATE, ITEM_ACTIONS, ASSIGNMENT_WORKFLOW_STATE
from planning.item_lock import LOCK_ACTION
Expand Down Expand Up @@ -71,8 +74,8 @@ def _save_history(self, planning, update, operation):

self.post([history])

def on_item_updated(self, updates, original, operation=None):
item = deepcopy(original)
def on_item_updated(self, updates: dict[str, Any], original: PlanningResourceModel, operation: str | None = None):
item = deepcopy(original.to_dict())
if list(item.keys()) == ["_id"]:
diff = self._remove_unwanted_fields(updates)
else:
Expand Down
22 changes: 22 additions & 0 deletions server/planning/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,36 @@
# at https://www.sourcefabric.org/superdesk/license

import blinker
from typing import Any

from superdesk.core import AsyncSignal
from planning.types import EventResourceModel, PlanningResourceModel

__all__ = [
"planning_created",
"planning_ingested",
"events_update",
]

signals = blinker.Namespace()

planning_created = signals.signal("planning:created")
planning_ingested = signals.signal("planning:ingested")
assignment_content_create = signals.signal("planning:assignment_content_create")


#: Signal for when an Event is about to be updated in the DB
#: param updates: Event updates
#: param original_event: `EventResourceModel` instance of the event to be updated
events_update = AsyncSignal[dict[str, Any], EventResourceModel]("events:update")


#: Signal for when a list of Events have been recorded into DB
#: param events: List of events registered in DB
events_created = AsyncSignal[list[EventResourceModel]]("events:created")


#: Signal for when a Planning item is about to be updated in the DB
#: param updates: Planning item updates
#: param planning_item: `PlanningResourceModel` instance of the event to be updated
planning_update = AsyncSignal[dict[str, Any], PlanningResourceModel]("planning:update")

0 comments on commit f8cdcc6

Please sign in to comment.