Skip to content

Commit

Permalink
rename NodeInfo and NodeT
Browse files Browse the repository at this point in the history
To follow more standard python naming conventions.
Will require libraries using nxontology to update imports.
  • Loading branch information
dhimmel committed Jul 12, 2023
1 parent b7083ef commit b68411d
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 63 deletions.
12 changes: 6 additions & 6 deletions nxontology/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from nxontology import NXOntology
from nxontology.exceptions import NodeNotFound
from nxontology.node import Node
from nxontology.node import NodeT

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,21 +86,21 @@ def from_file(handle: BinaryIO | str | PathLike[AnyStr]) -> NXOntology[str]:

def _pronto_edges_for_term(
term: Term, default_rel_type: str = "is a"
) -> list[tuple[Node, Node, str]]:
) -> list[tuple[NodeT, NodeT, str]]:
"""
Extract edges including "is a" relationships for a Pronto term.
https://github.com/althonos/pronto/issues/119#issuecomment-956541286
"""
rels = []
source_id = cast(Node, term.id)
source_id = cast(NodeT, term.id)
for target in term.superclasses(distance=1, with_self=False):
rels.append((source_id, cast(Node, target.id), default_rel_type))
rels.append((source_id, cast(NodeT, target.id), default_rel_type))
for rel_type, targets in term.relationships.items():
for target in sorted(targets):
rels.append(
(
cast(Node, term.id),
cast(Node, target.id),
cast(NodeT, term.id),
cast(NodeT, target.id),
rel_type.name or rel_type.id,
)
)
Expand Down
26 changes: 13 additions & 13 deletions nxontology/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

# Type definitions. networkx does not declare types.
# https://github.com/networkx/networkx/issues/3988#issuecomment-639969263
Node = TypeVar("Node", bound=Hashable)
NodeT = TypeVar("NodeT", bound=Hashable)


class Node_Info(Freezable, Generic[Node]):
class NodeInfo(Freezable, Generic[NodeT]):
"""
Compute metrics and values for a node of an NXOntology.
Includes intrinsic information content (IC) metrics.
Expand All @@ -35,7 +35,7 @@ class Node_Info(Freezable, Generic[Node]):
Each ic_metric has a scaled version accessible by adding a _scaled suffix.
"""

def __init__(self, nxo: NXOntology[Node], node: Node):
def __init__(self, nxo: NXOntology[NodeT], node: NodeT):
if node not in nxo.graph:
raise NodeNotFound(f"{node} not in graph.")
self.nxo = nxo
Expand Down Expand Up @@ -98,12 +98,12 @@ def data(self) -> dict[Any, Any]:
return data

@property
def parents(self) -> set[Node]:
def parents(self) -> set[NodeT]:
"""Direct parent nodes of this node."""
return set(self.nxo.graph.predecessors(self.node))

@property
def parent(self) -> Node | None:
def parent(self) -> NodeT | None:
"""
Sole parent of this node, or None if this node is a root.
If this node has multiple parents, raise ValueError.
Expand All @@ -118,13 +118,13 @@ def parent(self) -> Node | None:
raise ValueError(f"Node {self!r} has multiple parents.")

@property
def children(self) -> set[Node]:
def children(self) -> set[NodeT]:
"""Direct child nodes of this node."""
return set(self.nxo.graph.successors(self.node))

@property
@cache_on_frozen
def ancestors(self) -> set[Node]:
def ancestors(self) -> set[NodeT]:
"""
Get ancestors of node in graph, including the node itself.
Ancestors refers to more general concepts in an ontology,
Expand All @@ -137,7 +137,7 @@ def ancestors(self) -> set[Node]:

@property
@cache_on_frozen
def descendants(self) -> set[Node]:
def descendants(self) -> set[NodeT]:
"""
Get descendants of node in graph, including the node itself.
Descendants refers to more specific concepts in an ontology,
Expand All @@ -160,13 +160,13 @@ def n_descendants(self) -> int:

@property
@cache_on_frozen
def roots(self) -> set[Node]:
def roots(self) -> set[NodeT]:
"""Ancestors of this node that are roots (top-level)."""
return self.ancestors & self.nxo.roots

@property
def leaves(self) -> set[Node]:
"""Descendents of this node that are leaves."""
def leaves(self) -> set[NodeT]:
"""Descendants of this node that are leaves."""
return self.descendants & self.nxo.leaves

@property
Expand All @@ -181,14 +181,14 @@ def depth(self) -> int:
return depth

@property
def paths_from_roots(self) -> Iterator[list[Node]]:
def paths_from_roots(self) -> Iterator[list[NodeT]]:
for root in self.roots:
yield from nx.all_simple_paths(
self.nxo.graph, source=root, target=self.node
)

@property
def paths_to_leaves(self) -> Iterator[list[Node]]:
def paths_to_leaves(self) -> Iterator[list[NodeT]]:
yield from nx.all_simple_paths(
self.nxo.graph, source=self.node, target=self.leaves
)
Expand Down
48 changes: 24 additions & 24 deletions nxontology/ontology.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
from networkx.algorithms.isolate import isolates
from networkx.readwrite.json_graph import node_link_data, node_link_graph

from nxontology.node import Node
from nxontology.node import NodeT

from .exceptions import DuplicateError, NodeNotFound
from .node import Node_Info
from .node import NodeInfo
from .similarity import SimilarityIC
from .utils import Freezable, cache_on_frozen, get_datetime_now, get_nxontology_version

logger = logging.getLogger(__name__)


class NXOntology(Freezable, Generic[Node]):
class NXOntology(Freezable, Generic[NodeT]):
"""
Encapsulate a networkx.DiGraph to represent an ontology.
Regarding edge directionality, parent terms should point to child term.
Expand All @@ -39,7 +39,7 @@ def __init__(
# in case there are compatability issues in the future.
self._add_nxontology_metadata()
self.check_is_dag()
self._node_info_cache: dict[Node, Node_Info[Node]] = {}
self._node_info_cache: dict[NodeT, NodeInfo[NodeT]] = {}

def _add_nxontology_metadata(self) -> None:
self.graph.graph["nxontology_version"] = get_nxontology_version()
Expand Down Expand Up @@ -77,7 +77,7 @@ def write_node_link_json(self, path: str | PathLike[str]) -> None:
write_file.write("\n") # json.dump does not include a trailing newline

@classmethod
def read_node_link_json(cls, path: str | PathLike[str]) -> NXOntology[Node]:
def read_node_link_json(cls, path: str | PathLike[str]) -> NXOntology[NodeT]:
"""
Retrun a new graph from node-link format as written by `write_node_link_json`.
"""
Expand All @@ -90,7 +90,7 @@ def read_node_link_json(cls, path: str | PathLike[str]) -> NXOntology[Node]:
nxo = cls(digraph)
return nxo

def add_node(self, node_for_adding: Node, **attr: Any) -> None:
def add_node(self, node_for_adding: NodeT, **attr: Any) -> None:
"""
Like networkx.DiGraph.add_node but raises a DuplicateError
if the node already exists.
Expand All @@ -99,7 +99,7 @@ def add_node(self, node_for_adding: Node, **attr: Any) -> None:
raise DuplicateError(f"node already in graph: {node_for_adding}")
self.graph.add_node(node_for_adding, **attr)

def add_edge(self, u_of_edge: Node, v_of_edge: Node, **attr: Any) -> None:
def add_edge(self, u_of_edge: NodeT, v_of_edge: NodeT, **attr: Any) -> None:
"""
Like networkx.DiGraph.add_edge but
raises a NodeNotFound if either node does not exist
Expand All @@ -116,7 +116,7 @@ def add_edge(self, u_of_edge: Node, v_of_edge: Node, **attr: Any) -> None:

@property
@cache_on_frozen
def roots(self) -> set[Node]:
def roots(self) -> set[NodeT]:
"""
Return all top-level nodes, including isolates.
"""
Expand All @@ -127,7 +127,7 @@ def roots(self) -> set[Node]:
return roots

@property
def root(self) -> Node:
def root(self) -> NodeT:
"""
Sole root of this directed acyclic graph.
If this ontology has multiple roots, raise ValueError.
Expand All @@ -142,7 +142,7 @@ def root(self) -> Node:

@property
@cache_on_frozen
def leaves(self) -> set[Node]:
def leaves(self) -> set[NodeT]:
"""
Return all bottom-level nodes, including isolates.
"""
Expand All @@ -154,7 +154,7 @@ def leaves(self) -> set[Node]:

@property
@cache_on_frozen
def isolates(self) -> set[Node]:
def isolates(self) -> set[NodeT]:
"""
Return disconnected nodes.
"""
Expand All @@ -175,17 +175,17 @@ def frozen(self) -> bool:

def similarity(
self,
node_0: Node,
node_1: Node,
node_0: NodeT,
node_1: NodeT,
ic_metric: str = "intrinsic_ic_sanchez",
) -> SimilarityIC[Node]:
) -> SimilarityIC[NodeT]:
"""SimilarityIC instance for the specified nodes"""
return SimilarityIC(self, node_0, node_1, ic_metric)

def similarity_metrics(
self,
node_0: Node,
node_1: Node,
node_0: NodeT,
node_1: NodeT,
ic_metric: str = "intrinsic_ic_sanchez",
keys: list[str] | None = None,
) -> dict[str, Any]:
Expand All @@ -197,8 +197,8 @@ def similarity_metrics(

def compute_similarities(
self,
source_nodes: Iterable[Node],
target_nodes: Iterable[Node],
source_nodes: Iterable[NodeT],
target_nodes: Iterable[NodeT],
ic_metrics: list[str] | tuple[str, ...] = ("intrinsic_ic_sanchez",),
) -> Iterable[dict[str, Any]]:
"""
Expand All @@ -213,16 +213,16 @@ def compute_similarities(
yield metrics

@classmethod
def _get_node_info_cls(cls) -> type[Node_Info[Node]]:
def _get_node_info_cls(cls) -> type[NodeInfo[NodeT]]:
"""
Return the Node_Info class to use for this ontology.
Subclasses can override this to use a custom Node_Info class.
For the complexity of typing this method, see
<https://github.com/related-sciences/nxontology/pull/26>.
"""
return Node_Info
return NodeInfo

def node_info(self, node: Node) -> Node_Info[Node]:
def node_info(self, node: NodeT) -> NodeInfo[NodeT]:
"""
Return Node_Info instance for `node`.
If frozen, cache node info in `self._node_info_cache`.
Expand All @@ -235,8 +235,8 @@ def node_info(self, node: Node) -> Node_Info[Node]:
return self._node_info_cache[node]

@cache_on_frozen
def _get_name_to_node_info(self) -> dict[str, Node_Info[Node]]:
name_to_node_info: dict[str, Node_Info[Node]] = {}
def _get_name_to_node_info(self) -> dict[str, NodeInfo[NodeT]]:
name_to_node_info: dict[str, NodeInfo[NodeT]] = {}
for node in self.graph:
info = self.node_info(node)
name = info.name
Expand All @@ -249,7 +249,7 @@ def _get_name_to_node_info(self) -> dict[str, Node_Info[Node]]:
name_to_node_info[name] = info
return name_to_node_info

def node_info_by_name(self, name: str) -> Node_Info[Node]:
def node_info_by_name(self, name: str) -> NodeInfo[NodeT]:
"""
Return Node_Info instance using a lookup by name.
"""
Expand Down
24 changes: 12 additions & 12 deletions nxontology/similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

from networkx import shortest_path_length

from nxontology.node import Node, Node_Info
from nxontology.node import NodeInfo, NodeT
from nxontology.utils import Freezable, cache_on_frozen


class Similarity(Freezable, Generic[Node]):
class Similarity(Freezable, Generic[NodeT]):
"""
Compute intrinsic similarity metrics for a pair of nodes.
"""
Expand All @@ -29,7 +29,7 @@ class Similarity(Freezable, Generic[Node]):
"batet_log",
]

def __init__(self, nxo: NXOntology[Node], node_0: Node, node_1: Node):
def __init__(self, nxo: NXOntology[NodeT], node_0: NodeT, node_1: NodeT):
self.nxo = nxo
self.node_0 = node_0
self.node_1 = node_1
Expand Down Expand Up @@ -68,12 +68,12 @@ def depth(self) -> int | None:

@property
@cache_on_frozen
def common_ancestors(self) -> set[Node]:
def common_ancestors(self) -> set[NodeT]:
return self.info_0.ancestors & self.info_1.ancestors

@property
@cache_on_frozen
def union_ancestors(self) -> set[Node]:
def union_ancestors(self) -> set[NodeT]:
return self.info_0.ancestors | self.info_1.ancestors

@property
Expand Down Expand Up @@ -116,7 +116,7 @@ def results(self, keys: list[str] | None = None) -> dict[str, Any]:
return {key: getattr(self, key) for key in keys}


class SimilarityIC(Similarity[Node]):
class SimilarityIC(Similarity[NodeT]):
"""
Compute intrinsic similarity metrics for a pair of nodes,
including Information Content (IC) derived metrics.
Expand All @@ -125,9 +125,9 @@ class SimilarityIC(Similarity[Node]):

def __init__(
self,
nxo: NXOntology[Node],
node_0: Node,
node_1: Node,
nxo: NXOntology[NodeT],
node_0: NodeT,
node_1: NodeT,
ic_metric: str = "intrinsic_ic_sanchez",
):
super().__init__(nxo, node_0, node_1)
Expand All @@ -151,7 +151,7 @@ def __init__(
"jiang_seco",
]

def _get_ic(self, node_info: Node_Info[Node], ic_metric: str) -> float:
def _get_ic(self, node_info: NodeInfo[NodeT], ic_metric: str) -> float:
ic = getattr(node_info, ic_metric)
assert isinstance(ic, float)
return ic
Expand All @@ -174,7 +174,7 @@ def node_1_ic_scaled(self) -> float:

@property
@cache_on_frozen
def _resnik_mica(self) -> tuple[float, Node | None]:
def _resnik_mica(self) -> tuple[float, NodeT | None]:
if not self.common_ancestors:
return 0.0, None
resnik, mica = max(
Expand All @@ -185,7 +185,7 @@ def _resnik_mica(self) -> tuple[float, Node | None]:
return resnik, mica

@property
def mica(self) -> Node | None:
def mica(self) -> NodeT | None:
"""
Most informative common ancestor.
None if no common ancestors exist.
Expand Down
Loading

0 comments on commit b68411d

Please sign in to comment.