Skip to content

Commit

Permalink
[SDESK-7442] Migrate Events resource service to async (#2150)
Browse files Browse the repository at this point in the history
* Move first method and utility functions

SDESK-7442

* Migrated `create` & `on_create` methods

Along with another set of utils functions

SDESK-7442

* Move `on_create`, `on_created`, `on_update` & `on_updated`

SDESK-7442

* Move `on_delete` method

SDESK-7442

* Update `ExportToNewsroomTest` to use async service

SDESK-7442

* Fix tests

Pending to fix test broken because of prod_api app context issue

SDESK-7442

* Minor fixes

SDESK-7442

* Improvements after feedback

SDESK-7442

* Update events.py

* Move `on_updated` method

SDESK-7442
  • Loading branch information
eos87 authored Dec 10, 2024
1 parent 9a1fe66 commit 44cf909
Show file tree
Hide file tree
Showing 29 changed files with 1,219 additions and 242 deletions.
6 changes: 5 additions & 1 deletion server/planning/assignments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import superdesk
from quart_babel import lazy_gettext
from superdesk.services import BaseService

from planning import signals
from .assignments import AssignmentsResource, AssignmentsService
from .assignments_content import AssignmentsContentResource, AssignmentsContentService
from .assignments_link import AssignmentsLinkResource, AssignmentsLinkService
Expand Down Expand Up @@ -93,13 +95,15 @@ def init_app(app):
delivery_service = BaseService("delivery", backend=superdesk.get_backend())
DeliveryResource("delivery", app=app, service=delivery_service)

# listen to async signals
signals.events_update.connect(assignments_publish_service.on_events_updated)

# Updating data/lock on assignments based on content item updates from authoring
app.on_updated_archive += assignments_publish_service.update_assignment_on_archive_update
app.on_archive_item_updated += assignments_publish_service.update_assignment_on_archive_operation
app.on_item_lock += assignments_publish_service.validate_assignment_lock
app.on_item_locked += assignments_publish_service.sync_assignment_lock
app.on_item_unlocked += assignments_publish_service.sync_assignment_unlock
app.on_updated_events += assignments_publish_service.on_events_updated

# Track updates for an assignment if it's news story was updated
if app.config.get("PLANNING_LINK_UPDATES_TO_COVERAGES", True):
Expand Down
5 changes: 3 additions & 2 deletions server/planning/assignments/assignments.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
planning_auto_assign_to_workflow,
)

from planning.types import EventResourceModel
from planning.planning_notifications import PlanningNotifications
from planning.common import format_address, get_assginment_name
from .assignments_history import ASSIGNMENT_HISTORY_ACTIONS
Expand Down Expand Up @@ -1020,10 +1021,10 @@ def update_assignment_on_archive_operation(self, updates, original, operation=No
lock_service = get_component(LockService)
lock_service.unlock(assignment, user_id, get_auth()["_id"], "assignments")

def on_events_updated(self, updates, original):
def on_events_updated(self, updates: dict[str, Any], original: EventResourceModel):
"""Send assignment notifications if any relevant Event metadata has changed"""

event = deepcopy(original)
event = deepcopy(original.to_dict())
event.update(updates)
plannings = get_related_planning_for_events([event[ID_FIELD]], "primary")

Expand Down
1 change: 1 addition & 0 deletions server/planning/autosave.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def on_create(self, docs):

def on_delete(self, doc):
if doc.get(ITEM_TYPE) == "event":
# TODO-ASYNC: replace with equivalent in `EventsAsyncService`
get_resource_service("events").delete_event_files(None, doc)

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion server/planning/commands/delete_spiked_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from superdesk.lock import lock, unlock, remove_locks
from planning.common import WORKFLOW_STATE
from planning.events import EventsAsyncService
from planning.events.utils import get_recurring_timeline
from planning.events.events_utils import get_recurring_timeline
from planning.planning import PlanningAsyncService
from planning.assignments import AssignmentsAsyncService
from .async_cli import planning_cli
Expand Down
2 changes: 2 additions & 0 deletions server/planning/commands/delete_spiked_items_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ async def asyncSetUp(self):
self.planning_service = PlanningAsyncService()
self.assignment_service = AssignmentsAsyncService()

self.setup_test_user()

async def assertDeleteOperation(self, item_type, ids, not_deleted=False):
service = self.event_service if item_type == "events" else self.planning_service

Expand Down
41 changes: 24 additions & 17 deletions server/planning/commands/export_to_newsroom_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
# For the full copyright and license information, please see the
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

import mock
from datetime import timedelta
from .export_to_newsroom import ExportToNewsroom
from superdesk import get_resource_service

from superdesk.utc import utcnow
from superdesk import get_resource_service

from planning.tests import TestCase
from planning.events.events_service import EventsAsyncService

from .export_to_newsroom import ExportToNewsroom


class MockTransmitter:
Expand All @@ -27,17 +32,19 @@ def transmit(self, queue_item):


class ExportToNewsroomTest(TestCase):
def setUp(self):
super().setUp()
async def asyncSetUp(self):
await super().asyncSetUp()

self.event_service = get_resource_service("events")
self.event_service = EventsAsyncService()
self.planning_service = get_resource_service("planning")

def setUp_data(self):
async def setup_data(self):
utc_now = utcnow()
self.setup_test_user()

events = [
{
"_id": "draft",
"guid": "draft",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -48,7 +55,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "scheduled",
"guid": "scheduled",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -60,7 +67,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "postponed",
"guid": "postponed",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -72,7 +79,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "rescheduled",
"guid": "rescheduled",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -84,7 +91,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "cancelled",
"guid": "cancelled",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -96,7 +103,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "killed",
"guid": "killed",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -108,7 +115,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "postponed-not-published",
"guid": "postponed-not-published",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -119,7 +126,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "rescheduled-not-published",
"guid": "rescheduled-not-published",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand All @@ -130,7 +137,7 @@ def setUp_data(self):
"type": "event",
},
{
"_id": "cancelled-not-published",
"guid": "cancelled-not-published",
"dates": {
"start": utc_now,
"end": utc_now + timedelta(days=1),
Expand Down Expand Up @@ -213,13 +220,13 @@ def setUp_data(self):
},
]

self.event_service.create(events)
await self.event_service.create(events)
self.planning_service.create(planning)

@mock.patch("planning.commands.export_to_newsroom.NewsroomHTTPTransmitter")
async def test_events_events_planning(self, mock_transmitter):
async with self.app.app_context():
self.setUp_data()
await self.setup_data()

mock_transmitter.return_value = MockTransmitter()
ExportToNewsroom().run(assets_url="foo", resource_url="bar")
Expand Down
2 changes: 2 additions & 0 deletions server/planning/commands/purge_expired_locks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ async def asyncSetUp(self) -> None:
await super().asyncSetUp()

async with self.app.app_context():
self.setup_test_user()

await self.insert(
"events",
[
Expand Down
3 changes: 3 additions & 0 deletions server/planning/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ def post_required(updates, original):

def update_post_item(updates, original):
"""Method to update(re-post) a posted item after the item is updated"""
# TODO-ASYNC: update once `events_post` & `planning_post` are async
# also ot use pydantic models intead of dicts

pub_status = None
# Save&Post or Save&Unpost
if updates.get("pubstatus"):
Expand Down
10 changes: 9 additions & 1 deletion server/planning/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import superdesk
from quart_babel import lazy_gettext

from planning import signals
from .events import EventsResource, EventsService
from .events_spike import (
EventsSpikeResource,
Expand Down Expand Up @@ -44,7 +46,7 @@
)
from planning.autosave import AutosaveService

from .service import EventsAsyncService
from .events_service import EventsAsyncService
from .module import events_resource_config

__all__ = [
Expand Down Expand Up @@ -137,8 +139,14 @@ def init_app(app):
service=recent_events_template_service,
)

# listen to async signals
signals.events_created.connect(events_history_service.on_item_created)

app.on_updated_events += events_history_service.on_item_updated

# TODO-ASYNC: remove `on_inserted_events` when `events_reschedule` is async
app.on_inserted_events += events_history_service.on_item_created

app.on_deleted_item_events -= events_history_service.on_item_deleted
app.on_deleted_item_events += events_history_service.on_item_deleted
app.on_updated_events_spike += events_history_service.on_spike
Expand Down
46 changes: 14 additions & 32 deletions server/planning/events/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@

"""Superdesk Events"""

from typing import Dict, Any, Optional, List, Tuple

import re
import pytz
import logging
import itertools

from copy import deepcopy
from datetime import timedelta

import pytz
import re
from typing import Dict, Any, Optional, List, Tuple
from eve.methods.common import resolve_document_etag
from eve.utils import date_to_str
from dateutil.rrule import (
Expand All @@ -35,9 +36,9 @@
SU,
)

import superdesk
from superdesk.core import get_app_config, get_current_app
from superdesk.resource_fields import ID_FIELD
import superdesk
from superdesk import get_resource_service
from superdesk.errors import SuperdeskApiError
from superdesk.metadata.utils import generate_guid
Expand All @@ -48,13 +49,8 @@
from apps.auth import get_user, get_user_id
from apps.archive.common import get_auth, update_dates_for

from planning.types import (
Event,
EmbeddedPlanning,
EmbeddedCoverageItem,
PlanningRelatedEventLink,
PLANNING_RELATED_EVENT_LINK_TYPE,
)
from planning.types import Event, PlanningRelatedEventLink, PLANNING_RELATED_EVENT_LINK_TYPE
from planning.types.event import EmbeddedPlanning
from planning.common import (
UPDATE_SINGLE,
UPDATE_FUTURE,
Expand All @@ -74,7 +70,6 @@
set_ingest_version_datetime,
is_new_version,
update_ingest_on_patch,
TEMP_ID_PREFIX,
)
from planning.utils import (
get_planning_event_link_method,
Expand All @@ -84,6 +79,7 @@
from .events_base_service import EventsBaseService
from .events_schema import events_schema
from .events_sync import sync_event_metadata_with_planning_items
from .events_utils import get_events_embedded_planning

logger = logging.getLogger(__name__)

Expand All @@ -99,22 +95,6 @@
}


def get_events_embedded_planning(event: Event) -> List[EmbeddedPlanning]:
def get_coverage_id(coverage: EmbeddedCoverageItem) -> str:
if not coverage.get("coverage_id"):
coverage["coverage_id"] = TEMP_ID_PREFIX + "-" + generate_guid(type=GUID_NEWSML)
return coverage["coverage_id"]

return [
EmbeddedPlanning(
planning_id=planning.get("planning_id"),
update_method=planning.get("update_method") or "single",
coverages={get_coverage_id(coverage): coverage for coverage in planning.get("coverages") or []},
)
for planning in event.pop("embedded_planning", [])
]


def get_subject_str(subject: Dict[str, str]) -> str:
return ":".join(
[
Expand Down Expand Up @@ -298,9 +278,9 @@ def create(self, docs: List[Event], **kwargs):

embedded_planning_lists: List[Tuple[Event, List[EmbeddedPlanning]]] = []
for event in docs:
embedded_planning = get_events_embedded_planning(event)
if len(embedded_planning):
embedded_planning_lists.append((event, embedded_planning))
emb_planning = get_events_embedded_planning(event)
if len(emb_planning):
embedded_planning_lists.append((event, emb_planning)) # type: ignore

ids = self.backend.create(self.datasource, docs, **kwargs)

Expand Down Expand Up @@ -877,6 +857,7 @@ class EventsResource(superdesk.Resource):
merge_nested_documents = True


# TODO-ASYNC: moved to `events_utils.py`. Remove when it is no longer referenced
def generate_recurring_dates(
start,
frequency,
Expand Down Expand Up @@ -1000,6 +981,7 @@ def generate_recurring_events(event, recurrence_id=None):
for key in list(new_event.keys()):
if key.startswith("_") or key.startswith("lock_"):
new_event.pop(key)

elif key == "embedded_planning":
if not embedded_planning_added:
# If this is the first Event in the series, then keep
Expand Down
Loading

0 comments on commit 44cf909

Please sign in to comment.