diff --git a/canopen/emcy.py b/canopen/emcy.py index 1bbdeb75..520cfdc0 100644 --- a/canopen/emcy.py +++ b/canopen/emcy.py @@ -4,6 +4,9 @@ import time from typing import Callable, List, Optional +import canopen.network + + # Error code, error register, vendor specific data EMCY_STRUCT = struct.Struct(" None: + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self._node_id = 0 self._data = None self.responses = queue.Queue() diff --git a/canopen/network.py b/canopen/network.py index cd754c8c..9d6e66c6 100644 --- a/canopen/network.py +++ b/canopen/network.py @@ -3,7 +3,7 @@ from collections.abc import MutableMapping import logging import threading -from typing import Callable, Dict, Iterator, List, Optional, Union +from typing import Callable, Dict, Final, Iterator, List, Optional, Union import can from can import Listener @@ -282,6 +282,21 @@ def __len__(self) -> int: return len(self.nodes) +class _UninitializedNetwork(Network): + """Empty network implementation as a placeholder before actual initialization.""" + + def __init__(self, bus: Optional[can.BusABC] = None): + """Do not initialize attributes, by skipping the parent constructor.""" + + def __getattribute__(self, name): + raise RuntimeError("No actual Network object was assigned, " + "try associating to a real network first.") + + +#: Singleton instance +_UNINITIALIZED_NETWORK: Final[Network] = _UninitializedNetwork() + + class PeriodicMessageTask: """ Task object to transmit a message periodically using python-can's diff --git a/canopen/nmt.py b/canopen/nmt.py index 125042d3..6f29e917 100644 --- a/canopen/nmt.py +++ b/canopen/nmt.py @@ -4,6 +4,8 @@ import time from typing import Callable, Optional, TYPE_CHECKING +import canopen.network + if TYPE_CHECKING: from canopen.network import PeriodicMessageTask @@ -49,7 +51,7 @@ class NmtBase: def __init__(self, node_id: int): self.id = node_id - self.network = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self._state = 0 def on_command(self, can_id, data, timestamp): diff --git a/canopen/node/base.py b/canopen/node/base.py index bf72d959..45ad35b4 100644 --- a/canopen/node/base.py +++ b/canopen/node/base.py @@ -1,4 +1,6 @@ from typing import TextIO, Union + +import canopen.network from canopen.objectdictionary import ObjectDictionary, import_od @@ -17,10 +19,14 @@ def __init__( node_id: int, object_dictionary: Union[ObjectDictionary, str, TextIO], ): - self.network = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK if not isinstance(object_dictionary, ObjectDictionary): object_dictionary = import_od(object_dictionary, node_id) self.object_dictionary = object_dictionary self.id = node_id or self.object_dictionary.node_id + + def has_network(self) -> bool: + """Check whether the node has been associated to a network.""" + return not isinstance(self.network, canopen.network._UninitializedNetwork) diff --git a/canopen/node/local.py b/canopen/node/local.py index eb74b98d..eb614601 100644 --- a/canopen/node/local.py +++ b/canopen/node/local.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import logging from typing import Dict, Union +import canopen.network from canopen.node.base import BaseNode from canopen.sdo import SdoServer, SdoAbortedError from canopen.pdo import PDO, TPDO, RPDO @@ -34,7 +37,7 @@ def __init__( self.add_write_callback(self.nmt.on_write) self.emcy = EmcyProducer(0x80 + self.id) - def associate_network(self, network): + def associate_network(self, network: canopen.network.Network): self.network = network self.sdo.network = network self.tpdo.network = network @@ -44,15 +47,15 @@ def associate_network(self, network): network.subscribe(self.sdo.rx_cobid, self.sdo.on_request) network.subscribe(0, self.nmt.on_command) - def remove_network(self): + def remove_network(self) -> None: self.network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request) self.network.unsubscribe(0, self.nmt.on_command) - self.network = None - self.sdo.network = None - self.tpdo.network = None - self.rpdo.network = None - self.nmt.network = None - self.emcy.network = None + self.network = canopen.network._UNINITIALIZED_NETWORK + self.sdo.network = canopen.network._UNINITIALIZED_NETWORK + self.tpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.rpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.nmt.network = canopen.network._UNINITIALIZED_NETWORK + self.emcy.network = canopen.network._UNINITIALIZED_NETWORK def add_read_callback(self, callback): self._read_callbacks.append(callback) diff --git a/canopen/node/remote.py b/canopen/node/remote.py index 4f3281db..b354b8f9 100644 --- a/canopen/node/remote.py +++ b/canopen/node/remote.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import logging from typing import Union, TextIO +import canopen.network from canopen.sdo import SdoClient, SdoCommunicationError, SdoAbortedError from canopen.nmt import NmtMaster from canopen.emcy import EmcyConsumer @@ -46,7 +49,7 @@ def __init__( if load_od: self.load_configuration() - def associate_network(self, network): + def associate_network(self, network: canopen.network.Network): self.network = network self.sdo.network = network self.pdo.network = network @@ -59,18 +62,18 @@ def associate_network(self, network): network.subscribe(0x80 + self.id, self.emcy.on_emcy) network.subscribe(0, self.nmt.on_command) - def remove_network(self): + def remove_network(self) -> None: for sdo in self.sdo_channels: self.network.unsubscribe(sdo.tx_cobid, sdo.on_response) self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat) self.network.unsubscribe(0x80 + self.id, self.emcy.on_emcy) self.network.unsubscribe(0, self.nmt.on_command) - self.network = None - self.sdo.network = None - self.pdo.network = None - self.tpdo.network = None - self.rpdo.network = None - self.nmt.network = None + self.network = canopen.network._UNINITIALIZED_NETWORK + self.sdo.network = canopen.network._UNINITIALIZED_NETWORK + self.pdo.network = canopen.network._UNINITIALIZED_NETWORK + self.tpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.rpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.nmt.network = canopen.network._UNINITIALIZED_NETWORK def add_sdo(self, rx_cobid, tx_cobid): """Add an additional SDO channel. @@ -87,7 +90,7 @@ def add_sdo(self, rx_cobid, tx_cobid): """ client = SdoClient(rx_cobid, tx_cobid, self.object_dictionary) self.sdo_channels.append(client) - if self.network is not None: + if self.has_network(): self.network.subscribe(client.tx_cobid, client.on_response) return client diff --git a/canopen/pdo/base.py b/canopen/pdo/base.py index f2a7d205..dc81aa60 100644 --- a/canopen/pdo/base.py +++ b/canopen/pdo/base.py @@ -6,12 +6,12 @@ import logging import binascii +import canopen.network from canopen.sdo import SdoAbortedError from canopen import objectdictionary from canopen import variable if TYPE_CHECKING: - from canopen.network import Network from canopen import LocalNode, RemoteNode from canopen.pdo import RPDO, TPDO from canopen.sdo import SdoRecord @@ -30,7 +30,7 @@ class PdoBase(Mapping): """ def __init__(self, node: Union[LocalNode, RemoteNode]): - self.network: Optional[Network] = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self.map: Optional[PdoMaps] = None self.node: Union[LocalNode, RemoteNode] = node diff --git a/canopen/sdo/base.py b/canopen/sdo/base.py index 81d5e710..39b10634 100644 --- a/canopen/sdo/base.py +++ b/canopen/sdo/base.py @@ -4,6 +4,7 @@ from typing import Iterator, Optional, Union from collections.abc import Mapping +import canopen.network from canopen import objectdictionary from canopen import variable from canopen.utils import pretty_index @@ -43,7 +44,7 @@ def __init__( """ self.rx_cobid = rx_cobid self.tx_cobid = tx_cobid - self.network = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self.od = od def __getitem__(