From 2704ce95efa061c10675d9109dc6b52ebf987245 Mon Sep 17 00:00:00 2001 From: Yuta Okamoto Date: Wed, 13 Sep 2023 19:13:33 +0900 Subject: [PATCH] add type hints to Subscription --- asyncua/client/client.py | 4 +- asyncua/common/subscription.py | 194 +++++++++++++++++++++++------ asyncua/server/history.py | 6 +- asyncua/server/internal_session.py | 2 +- 4 files changed, 165 insertions(+), 41 deletions(-) diff --git a/asyncua/client/client.py b/asyncua/client/client.py index 8bf95a19b..e236e4ad8 100644 --- a/asyncua/client/client.py +++ b/asyncua/client/client.py @@ -14,7 +14,7 @@ from ..common.xmlexporter import XmlExporter from ..common.node import Node from ..common.manage_nodes import delete_nodes -from ..common.subscription import SubHandler, Subscription +from ..common.subscription import Subscription, SubscriptionHandler from ..common.shortcuts import Shortcuts from ..common.structures import load_type_definitions, load_enums from ..common.structures104 import load_data_type_definitions @@ -703,7 +703,7 @@ def get_node(self, nodeid: Union[Node, ua.NodeId, str, int]) -> Node: return Node(self.uaclient, nodeid) async def create_subscription( - self, period: Union[ua.CreateSubscriptionParameters, float], handler: SubHandler, publishing: bool = True + self, period: Union[ua.CreateSubscriptionParameters, float], handler: SubscriptionHandler, publishing: bool = True ) -> Subscription: """ Create a subscription. diff --git a/asyncua/common/subscription.py b/asyncua/common/subscription.py index 3ccbf52df..ac60875aa 100644 --- a/asyncua/common/subscription.py +++ b/asyncua/common/subscription.py @@ -2,10 +2,19 @@ high level interface to subscriptions """ import asyncio -import logging import collections.abc -from typing import Tuple, Union, List, Iterable, Optional +import logging +import sys +from typing import Any, Tuple, Union, List, Iterable, Optional, overload + +if sys.version_info >= (3, 8): + from typing import Protocol +else: + from typing_extensions import Protocol + +from asyncua.client.ua_client import UaClient from asyncua.common.ua_utils import copy_dataclass_attr +from asyncua.server.internal_session import InternalSession from asyncua import ua from .events import Event, get_filter_from_event_type @@ -44,25 +53,55 @@ def __str__(self): __repr__ = __str__ +class DataChangeNotificationHandler(Protocol): + def datachange_notification(self, node: Node, val: Any, data: DataChangeNotif) -> None: + """ + called for every datachange notification from server + """ + ... + + +class EventNotificationHandler(Protocol): + def event_notification(self, event: ua.EventNotificationList) -> None: + """ + called for every event notification from server + """ + ... + + +class StatusChangeNotificationHandler(Protocol): + def status_change_notification(self, status: ua.StatusChangeNotification) -> None: + """ + called for every status change notification from server + """ + ... + + +SubscriptionHandler = Union[DataChangeNotificationHandler, EventNotificationHandler, StatusChangeNotificationHandler] +""" +Protocol class representing subscription handlers to receive events from server. +""" + + class SubHandler: """ Subscription Handler. To receive events from server for a subscription This class is just a sample class. Whatever class having these methods can be used """ - def datachange_notification(self, node: Node, val, data: DataChangeNotif): + def datachange_notification(self, node: Node, val: Any, data: DataChangeNotif) -> None: """ called for every datachange notification from server """ pass - def event_notification(self, event: ua.EventNotificationList): + def event_notification(self, event: ua.EventNotificationList) -> None: """ called for every event notification from server """ pass - def status_change_notification(self, status: ua.StatusChangeNotification): + def status_change_notification(self, status: ua.StatusChangeNotification) -> None: """ called for every status change notification from server """ @@ -75,14 +114,19 @@ class Subscription: The object represent a subscription to an opc-ua server. This is a high level class, especially `subscribe_data_change` and `subscribe_events methods`. If more control is necessary look at code and/or use `create_monitored_items method`. - :param server: `InternalSession` or `UAClient` + :param server: `InternalSession` or `UaClient` """ - def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: SubHandler): + def __init__( + self, + server: Union[InternalSession, UaClient], + params: ua.CreateSubscriptionParameters, + handler: SubscriptionHandler, + ): self.logger = logging.getLogger(__name__) self.server = server self._client_handle = 200 - self._handler: SubHandler = handler + self._handler: SubscriptionHandler = handler self.parameters: ua.CreateSubscriptionParameters = params # move to data class self._monitored_items = {} self.subscription_id: Optional[int] = None @@ -99,14 +143,16 @@ async def init(self) -> ua.CreateSubscriptionResult: async def update( self, params: ua.ModifySubscriptionParameters - ) -> ua.ModifySubscriptionResponse: + ) -> ua.ModifySubscriptionResult: + if not isinstance(self.server, UaClient): + raise ua.uaerrors.UaError(f"update() is not supported in {self.server}.") response = await self.server.update_subscription(params) self.logger.info('Subscription updated %s', params.SubscriptionId) # update the self.parameters attr with the updated values copy_dataclass_attr(params, self.parameters) return response - async def publish_callback(self, publish_result: ua.PublishResult): + async def publish_callback(self, publish_result: ua.PublishResult) -> None: """ Handle `PublishResult` callback. """ @@ -122,14 +168,14 @@ async def publish_callback(self, publish_result: ua.PublishResult): else: self.logger.warning("Notification type not supported yet for notification %s", notif) - async def delete(self): + async def delete(self) -> None: """ Delete subscription on server. This is automatically done by Client and Server classes on exit. """ results = await self.server.delete_subscriptions([self.subscription_id]) results[0].check() - async def _call_datachange(self, datachange: ua.DataChangeNotification): + async def _call_datachange(self, datachange: ua.DataChangeNotification) -> None: if not hasattr(self._handler, "datachange_notification"): self.logger.error("DataChange subscription created but handler has no datachange_notification method") return @@ -153,7 +199,7 @@ async def _call_datachange(self, datachange: ua.DataChangeNotification): except Exception as ex: self.logger.exception("Exception calling data change handler. Error: %s", ex) - async def _call_event(self, eventlist: ua.EventNotificationList): + async def _call_event(self, eventlist: ua.EventNotificationList) -> None: for event in eventlist.Events: if event.ClientHandle not in self._monitored_items: self.logger.warning("Received a notification for unknown handle: %s", event.ClientHandle) @@ -172,7 +218,7 @@ async def _call_event(self, eventlist: ua.EventNotificationList): else: self.logger.error("Event subscription created but handler has no event_notification method") - async def _call_status(self, status: ua.StatusChangeNotification): + async def _call_status(self, status: ua.StatusChangeNotification) -> None: if not hasattr(self._handler, "status_change_notification"): self.logger.error("DataChange subscription has no status_change_notification method") return @@ -184,6 +230,28 @@ async def _call_status(self, status: ua.StatusChangeNotification): except Exception: self.logger.exception("Exception calling status change handler") + @overload + async def subscribe_data_change( + self, + nodes: Node, + attr: ua.AttributeIds = ua.AttributeIds.Value, + queuesize: int = 0, + monitoring=ua.MonitoringMode.Reporting, + sampling_interval: ua.Duration = 0.0 + ) -> int: + ... + + @overload + async def subscribe_data_change( + self, + nodes: Union[Node, Iterable[Node]], + attr: ua.AttributeIds = ua.AttributeIds.Value, + queuesize: int = 0, + monitoring=ua.MonitoringMode.Reporting, + sampling_interval: ua.Duration = 0.0 + ) -> List[Union[int, ua.StatusCode]]: + ... + async def subscribe_data_change( self, nodes: Union[Node, Iterable[Node]], @@ -213,8 +281,10 @@ async def subscribe_data_change( nodes, attr, queuesize=queuesize, monitoring=monitoring, sampling_interval=sampling_interval ) - async def _create_eventfilter(self, evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]], where_clause_generation: bool = True): - if not isinstance(evtypes, (list, tuple)): + async def _create_eventfilter( + self, evtypes: Union[int, ua.NodeId, Iterable[Union[int, ua.NodeId]]], where_clause_generation: bool = True + ) -> ua.EventFilter: + if isinstance(evtypes, (int, ua.NodeId)): evtypes = [evtypes] evtypes = [Node(self.server, evtype) for evtype in evtypes] # type: ignore[union-attr] evfilter = await get_filter_from_event_type(evtypes, where_clause_generation) # type: ignore[union-attr] @@ -222,9 +292,9 @@ async def _create_eventfilter(self, evtypes: Union[ua.ObjectIds, List[ua.ObjectI async def subscribe_events( self, - sourcenode: Node = ua.ObjectIds.Server, - evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]] = ua.ObjectIds.BaseEventType, - evfilter: ua.EventFilter = None, + sourcenode: Union[Node, ua.NodeId, str, int] = ua.ObjectIds.Server, + evtypes: Union[int, ua.NodeId, Iterable[Union[ua.NodeId, int]]] = ua.ObjectIds.BaseEventType, + evfilter: Optional[ua.EventFilter] = None, queuesize: int = 0, where_clause_generation: bool = True ) -> int: @@ -245,7 +315,7 @@ async def subscribe_events( """ sourcenode = Node(self.server, sourcenode) if evfilter is None: - if not isinstance(evtypes, (list, tuple)) and evtypes == ua.ObjectIds.BaseEventType: + if evtypes == ua.ObjectIds.BaseEventType or evtypes == ua.NodeId(ua.ObjectIds.BaseEventType): # Remove where clause for base event type, for servers that have problems with long WhereClauses. # Also because BaseEventType wants every event we can ommit it. Issue: #1205 where_clause_generation = False @@ -254,9 +324,9 @@ async def subscribe_events( async def subscribe_alarms_and_conditions( self, - sourcenode: Node = ua.ObjectIds.Server, - evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]] = ua.ObjectIds.ConditionType, - evfilter: ua.EventFilter = None, + sourcenode: Union[Node, ua.NodeId, str, int] = ua.ObjectIds.Server, + evtypes: Union[int, ua.NodeId, Iterable[Union[int, ua.NodeId]]] = ua.ObjectIds.ConditionType, + evfilter: Optional[ua.EventFilter] = None, queuesize: int = 0 ) -> int: """ @@ -275,11 +345,35 @@ async def subscribe_alarms_and_conditions( """ return await self.subscribe_events(sourcenode, evtypes, evfilter, queuesize) + @overload + async def _subscribe( + self, + nodes: Node, + attr=ua.AttributeIds.Value, + mfilter: Optional[ua.MonitoringFilter] = None, + queuesize: int = 0, + monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting, + sampling_interval: ua.Duration = 0.0 + ) -> int: + ... + + @overload + async def _subscribe( + self, + nodes: Iterable[Node], + attr=ua.AttributeIds.Value, + mfilter: Optional[ua.MonitoringFilter] = None, + queuesize: int = 0, + monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting, + sampling_interval: ua.Duration = 0.0 + ) -> List[Union[int, ua.StatusCode]]: + ... + async def _subscribe( self, nodes: Union[Node, Iterable[Node]], attr=ua.AttributeIds.Value, - mfilter: ua.MonitoringFilter = None, + mfilter: Optional[ua.MonitoringFilter] = None, queuesize: int = 0, monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting, sampling_interval: ua.Duration = 0.0 @@ -344,14 +438,14 @@ def _make_monitored_item_request( mir.RequestedParameters = mparams return mir - async def create_monitored_items(self, monitored_items: List[ua.MonitoredItemCreateRequest]) -> List[Union[int, ua.StatusCode]]: + async def create_monitored_items(self, monitored_items: Iterable[ua.MonitoredItemCreateRequest]) -> List[Union[int, ua.StatusCode]]: """ low level method to have full control over subscription parameters. Client handle must be unique since it will be used as key for internal registration of data. """ params = ua.CreateMonitoredItemsParameters() params.SubscriptionId = self.subscription_id - params.ItemsToCreate = monitored_items + params.ItemsToCreate = list(monitored_items) params.TimestampsToReturn = ua.TimestampsToReturn.Both # insert monitored item into map to avoid notification arrive before result return # server_handle is left as None in purpose as we don't get it yet. @@ -378,18 +472,18 @@ async def create_monitored_items(self, monitored_items: List[ua.MonitoredItemCre mids.append(result.MonitoredItemId) return mids - async def unsubscribe(self, handle: Union[int, List[int]]): + async def unsubscribe(self, handle: Union[int, Iterable[int]]) -> None: """ Unsubscribe from datachange or events using the handle returned while subscribing. If you delete the subscription, you do not need to unsubscribe. :param handle: The handle that was returned when subscribing to the node/nodes """ - handles: List[int] = [handle] if isinstance(handle, int) else handle + handles: Iterable[int] = [handle] if isinstance(handle, int) else handle if not handles: return params = ua.DeleteMonitoredItemsParameters() params.SubscriptionId = self.subscription_id - params.MonitoredItemIds = handles + params.MonitoredItemIds = list(handles) results = await self.server.delete_monitored_items(params) results[0].check() handle_map = {v.server_handle: k for k, v in self._monitored_items.items()} @@ -397,7 +491,9 @@ async def unsubscribe(self, handle: Union[int, List[int]]): if handle in handle_map: del self._monitored_items[handle_map[handle]] - async def modify_monitored_item(self, handle: int, new_samp_time: ua.Duration, new_queuesize: int = 0, mod_filter_val: int = -1): + async def modify_monitored_item( + self, handle: int, new_samp_time: ua.Duration, new_queuesize: int = 0, mod_filter_val: int = -1 + ) -> List[ua.MonitoredItemModifyResult]: """ Modify a monitored item. :param handle: Handle returned when originally subscribing @@ -439,7 +535,7 @@ def _modify_monitored_item_request( new_samp_time: ua.Duration, mod_filter: ua.DataChangeFilter, client_handle: ua.IntegerId - ): + ) -> ua.MonitoringParameters: req_params = ua.MonitoringParameters() req_params.ClientHandle = client_handle req_params.QueueSize = new_queuesize @@ -447,14 +543,36 @@ def _modify_monitored_item_request( req_params.SamplingInterval = new_samp_time return req_params - def deadband_monitor( + @overload + async def deadband_monitor( + self, + var: Node, + deadband_val: ua.Double, + deadbandtype: ua.UInt32 = 1, + queuesize: int = 0, + attr: ua.AttributeIds = ua.AttributeIds.Value + ) -> int: + ... + + @overload + async def deadband_monitor( + self, + var: Iterable[Node], + deadband_val: ua.Double, + deadbandtype: ua.UInt32 = 1, + queuesize: int = 0, + attr: ua.AttributeIds = ua.AttributeIds.Value + ) -> List[Union[int, ua.StatusCode]]: + ... + + async def deadband_monitor( self, var: Union[Node, Iterable[Node]], deadband_val: ua.Double, deadbandtype: ua.UInt32 = 1, queuesize: int = 0, attr: ua.AttributeIds = ua.AttributeIds.Value - ): + ) -> Union[int, List[Union[int, ua.StatusCode]]]: """ Method to create a subscription with a Deadband Value. Default deadband value type is absolute. @@ -471,9 +589,9 @@ def deadband_monitor( deadband_filter.DeadbandType = deadbandtype # absolute float value or from 0 to 100 for percentage deadband deadband_filter.DeadbandValue = deadband_val - return self._subscribe(var, attr, deadband_filter, queuesize) + return await self._subscribe(var, attr, deadband_filter, queuesize) - async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> ua.uatypes.StatusCode: + async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> List[ua.uatypes.StatusCode]: """ The monitoring mode parameter is used to enable/disable the sampling of MonitoredItems @@ -482,6 +600,8 @@ async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> ua.uatypes :param monitoring: The monitoring mode to apply :return: Return a Set Monitoring Mode Result """ + if not isinstance(self.server, UaClient): + raise ua.uaerrors.UaError(f"set_monitoring_mode() is not supported in {self.server}.") node_handles = [] for mi in self._monitored_items.values(): node_handles.append(mi.server_handle) @@ -492,7 +612,7 @@ async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> ua.uatypes params.MonitoringMode = monitoring return await self.server.set_monitoring_mode(params) - async def set_publishing_mode(self, publishing: bool) -> ua.uatypes.StatusCode: + async def set_publishing_mode(self, publishing: bool) -> List[ua.uatypes.StatusCode]: """ Disable publishing of NotificationMessages for the subscription, but doesn't discontinue the sending of keep-alive Messages, @@ -502,6 +622,8 @@ async def set_publishing_mode(self, publishing: bool) -> ua.uatypes.StatusCode: :return: Return a Set Publishing Mode Result """ self.logger.info("set_publishing_mode") + if not isinstance(self.server, UaClient): + raise ua.uaerrors.UaError(f"set_publishing_mode() is not supported in {self.server}.") params = ua.SetPublishingModeParameters() params.SubscriptionIds = [self.subscription_id] # type: ignore params.PublishingEnabled = publishing diff --git a/asyncua/server/history.py b/asyncua/server/history.py index b7cdf9487..609219642 100644 --- a/asyncua/server/history.py +++ b/asyncua/server/history.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import asyncio import logging from datetime import timedelta from datetime import datetime from asyncua import ua -from ..common.subscription import Subscription, SubHandler +from ..common.subscription import Subscription, SubscriptionHandler, SubHandler from ..common.utils import Buffer @@ -240,7 +242,7 @@ def set_storage(self, storage): """ self.storage = storage - async def _create_subscription(self, handler): + async def _create_subscription(self, handler: SubscriptionHandler) -> Subscription: params = ua.CreateSubscriptionParameters() params.RequestedPublishingInterval = 10 params.RequestedLifetimeCount = 3000 diff --git a/asyncua/server/internal_session.py b/asyncua/server/internal_session.py index 940f6c9d6..a78034dfa 100644 --- a/asyncua/server/internal_session.py +++ b/asyncua/server/internal_session.py @@ -59,7 +59,7 @@ async def get_endpoints(self, params=None, sockname=None): def is_activated(self) -> bool: return self.state == SessionState.Activated - async def create_session(self, params: ua.CreateSessionParameters, sockname: Optional[Tuple[str, int]]=None): + async def create_session(self, params: ua.CreateSessionParameters, sockname: Optional[Tuple[str, int]] = None): self.logger.info('Create session request') result = ua.CreateSessionResult() result.SessionId = self.session_id