diff --git a/server/planning/events/events_history_async_service.py b/server/planning/events/events_history_async_service.py index 275e15149..43d4a6409 100644 --- a/server/planning/events/events_history_async_service.py +++ b/server/planning/events/events_history_async_service.py @@ -41,11 +41,11 @@ async def on_item_created(self, items: list[EventResourceModel | Any], operation await super().on_item_created(created_from_planning, "created_from_planning") await super().on_item_created(regular_events) - async def on_item_deleted(self, doc): + async def on_item_deleted(self, doc: dict[str, Any]): lookup = {"event_id": doc[ID_FIELD]} await self.delete_many(lookup=lookup) - async def on_item_updated(self, updates: dict[str, Any], original, operation: str | None = None): + async def on_item_updated(self, updates: dict[str, Any], original: dict[str, Any], operation: str | None = None): item = deepcopy(original) if list(item.keys()) == ["_id"]: diff = self._remove_unwanted_fields(updates) @@ -76,8 +76,8 @@ async def _save_history(self, item, update: dict[str, Any], operation: str | Non history["operation"] = "ingested" await self.create([history]) - async def on_update_repetitions(self, updates: dict[str, Any], event_id, operation: str | None = None): + async def on_update_repetitions(self, updates: dict[str, Any], event_id: str, operation: str | None = None): await self.on_item_updated(updates, {"_id": event_id}, operation or "update_repetitions") - async def on_update_time(self, updates: dict[str, Any], original): + async def on_update_time(self, updates: dict[str, Any], original: dict[str, Any]): await self.on_item_updated(updates, original, "update_time") diff --git a/server/planning/history_async_service.py b/server/planning/history_async_service.py index ef97ef38b..fa740452d 100644 --- a/server/planning/history_async_service.py +++ b/server/planning/history_async_service.py @@ -47,7 +47,7 @@ class HistoryAsyncService(AsyncResourceService[Generic[HistoryResourceModelType]]): """Provide common async methods for tracking history of Creation, Updates and Spiking to collections""" - async def on_item_created(self, items, operation: str | None = None): + async def on_item_created(self, items: list[Any], operation: str | None = None): for item in items: if not item.get("duplicate_from"): await self._save_history( @@ -92,7 +92,7 @@ async def get_user_id(self): if user: return user.get("_id") - async def _changes(self, original, updates): + async def _changes(self, original: dict[str, Any], updates: dict[str, Any]): """ Given the original record and the updates calculate what has changed and what is new diff --git a/server/planning/planning/planning_history_async_service.py b/server/planning/planning/planning_history_async_service.py index 9742ccacc..1ec660caa 100644 --- a/server/planning/planning/planning_history_async_service.py +++ b/server/planning/planning/planning_history_async_service.py @@ -44,7 +44,7 @@ async def on_item_created(self, items: list[PlanningResourceModel], operation=No async def _save_history(self, item, update: dict[str, Any], operation: str | None = None): user = await self.get_user_id() # confirmation could be from external fulfillment, so set the user to the assignor - if operation == ASSIGNMENT_HISTORY_ACTIONS.CONFIRM and user is None: + if operation == ASSIGNMENT_HISTORY_ACTIONS.CONFIRM and user is None: # type: ignore assigned_to = update.get("assigned_to") if assigned_to is not None: user = update.get( @@ -70,7 +70,7 @@ async def on_item_updated( if list(item.keys()) == ["_id"]: diff = self._remove_unwanted_fields(updates) else: - diff = await self._changes(original, updates) + diff = await self._changes(original.to_dict(), updates) diff.pop("coverages", None) if updates: item.update(updates) @@ -87,7 +87,7 @@ async def on_item_updated( await self._save_history(item, diff, operation) - await self._save_coverage_history(updates, original) + await self._save_coverage_history(updates, original.to_dict()) async def on_cancel(self, updates: dict[str, Any], original): await self.on_item_updated( @@ -96,9 +96,9 @@ async def on_cancel(self, updates: dict[str, Any], original): "planning_cancel" if original.get("lock_action") in ["planning_cancel", "edit"] else "events_cancel", ) - async def _get_coverage_diff(self, updates: dict[str, Any], original): + async def _get_coverage_diff(self, updates: dict[str, Any], original: dict[str, Any]): diff = {"coverage_id": original.get("coverage_id")} - cov_plan_diff = await self._changes(original.get("planning"), updates.get("planning")) + cov_plan_diff = await self._changes(original.get("planning", {}), updates.get("planning", {})) if cov_plan_diff: diff["planning"] = cov_plan_diff @@ -108,7 +108,7 @@ async def _get_coverage_diff(self, updates: dict[str, Any], original): return diff - async def _save_coverage_history(self, updates: dict[str, Any], original): + async def _save_coverage_history(self, updates: dict[str, Any], original: dict[str, Any]): """Save the coverage history for the planning item""" item = deepcopy(original) original_coverages = {c.get("coverage_id"): c for c in (original or {}).get("coverages") or []} @@ -128,7 +128,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original): deleted = [coverage for cid, coverage in original_coverages.items() if cid not in updates_coverages] for cov in added: - if cov.get("assigned_to", {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED: + if cov.get("assigned_to", {}).get("state") == ASSIGNMENT_WORKFLOW_STATE.ASSIGNED: # type: ignore diff = {"coverage_id": cov.get("coverage_id")} diff.update(cov) await self._save_history( @@ -142,7 +142,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original): await self._save_history(item, cov, "coverage_created") for cov in updated: - original_coverage = original_coverages.get(cov.get("coverage_id")) + original_coverage = original_coverages.get(cov.get("coverage_id"), {}) diff = await self._get_coverage_diff(cov, original_coverage) if len(diff.keys()) > 1: await self._save_history(item, diff, "coverage_edited") @@ -160,7 +160,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original): if not original.get(LOCK_ACTION): operation = "events_cancel" elif ( - original.get(LOCK_ACTION) == ITEM_ACTIONS.PLANNING_CANCEL + original.get(LOCK_ACTION) == ITEM_ACTIONS.PLANNING_CANCEL # type: ignore or updates.get("state") == WORKFLOW_STATE.CANCELLED ): # If cancelled through item action or through editor @@ -181,7 +181,7 @@ async def _save_coverage_history(self, updates: dict[str, Any], original): for cov in deleted: await self._save_history(item, {"coverage_id": cov.get("coverage_id")}, "coverage_deleted") - async def on_spike(self, updates: dict[str, Any], original): + async def on_spike(self, updates: dict[str, Any], original: PlanningResourceModel): """Spike event On spike of a planning item the history of any agendas that the item belongs to will have an entry added to @@ -192,17 +192,17 @@ async def on_spike(self, updates: dict[str, Any], original): """ await super().on_spike(updates, original) - async def on_unspike(self, updates: dict[str, Any], original): + async def on_unspike(self, updates: dict[str, Any], original: PlanningResourceModel): await super().on_unspike(updates, original) - async def on_duplicate(self, parent, duplicate): + async def on_duplicate(self, parent: dict[str, Any], duplicate: dict[str, Any]): await self._save_history( {ID_FIELD: str(parent[ID_FIELD])}, {"duplicate_id": str(duplicate[ID_FIELD])}, "duplicate", ) - async def on_duplicate_from(self, item: dict[str, Any], duplicate_id): + async def on_duplicate_from(self, item: dict[str, Any], duplicate_id: str): new_plan = deepcopy(item) new_plan["duplicate_id"] = duplicate_id await self._save_history({ID_FIELD: str(item[ID_FIELD])}, new_plan, "duplicate_from")