Skip to content

Commit

Permalink
Merge pull request #320 from tomtomhdx/pseudoknots_networkx
Browse files Browse the repository at this point in the history
Use NetworkX for resolving conflicting pseudoknot cliques
  • Loading branch information
padix-key authored Jun 3, 2021
2 parents e5321f6 + 3991045 commit 8aff54e
Showing 1 changed file with 62 additions and 135 deletions.
197 changes: 62 additions & 135 deletions src/biotite/structure/pseudoknots.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
__all__ = ["pseudoknots"]

import numpy as np
import networkx as nx
from itertools import chain, product

def pseudoknots(base_pairs, scores=None, max_pseudoknot_order=None):
@@ -216,10 +217,10 @@ def _find_regions(base_pairs, scores):
Returns
-------
regions : set {_region, ...}
The regions representing the consecutively nested base pairs.
regions : Graph
The ``_Region`` objects as graph, where the edges represent
conflicts.
"""

# Make sure the lower residue is on the left for each row
sorted_base_pairs = np.sort(base_pairs, axis=1)

@@ -268,57 +269,77 @@ def _find_regions(base_pairs, scores):
# new region.
regions.add(_Region(base_pairs, np.array(region_pairs), scores))

return regions
# Return the graphical representation of the conflicting regions
return _generate_graphical_representation(regions)


def _find_non_conflicting_regions(regions):
def _generate_graphical_representation(regions):
"""
Find regions that are not conflicting.
Find the conflicting regions and represent them graphically using
the ``Graph`` class from ``Networkx``.
Parameters
----------
regions : set {_region, ...}
Regions including conflicting regions.
The regions representing the consecutively nested base pairs.
Returns
-------
regions : set {_region, ...}
The non-conflicting regions.
regions : Graph
The ``_Region`` objects as graph, where the edges represent
conflicts.
"""

# Create a graph
region_graph = nx.Graph()

# Add the regions to the graph as nodes
region_graph.add_nodes_from(regions)

# Get the region array and a boolean array, where the start of each
# region is ``True``.
region_array, (start_stops,) = _get_region_array_for(
regions, content=[lambda a : [True, False]], dtype=['bool']
)
starts = np.nonzero(start_stops)[0]

# Regions that are not conflicting
non_conflicting_regions = set()
for start_index in starts:
# Get the current regions start and stop indices in the region
# array
stop_index = _get_first_occurrence_for(
region_array[start_index+1:], region_array[start_index]
)
stop_index = start_index + 1 + stop_index

# Count the occurrence of each individual region between the
# start and stop indices of the regions
_, counts = np.unique(
region_array[start_index+1:stop_index], return_counts=True
)
# if no regions are between the start and stop indices the
# region is non-conflicting
if len(counts) == 0:
non_conflicting_regions.add(region_array[start_index])
# if every region between the start and stop indices has its
# start and stop between the current region's start and stop
# indices the current region is not conflicting
elif np.amin(counts) == 2:
non_conflicting_regions.add(region_array[start_index])
# Check each region for conflicts with other regions
for start, region in enumerate(region_array):
# Check each region only once
if not start_stops[start]:
continue

# Return the non-conflicting regions
return non_conflicting_regions
# Find the index of the stopping of the region in the region
# array
stop = _get_first_occurrence_for(region_array[start+1:], region)
stop += (start + 1)

# Store regions the current region conflicts with
conflicts = set()

# Iterate over the regions between the starting and stopping
# point of the current region
for other_region in region_array[start+1:stop]:
# If the other region is not already a conflict, add it to
# the conflict set
if other_region not in conflicts:
conflicts.add(other_region)
# If the other region is twice between the starting and
# stopping point of the current region, its starting and
# stopping point lie between the current region and it is
# thus non-conflicting
else:
conflicts.remove(other_region)

# Conflicts between regions are represented as graph edges
edges = []

# Convert the edges in a ``NetworkX`` compatible format
for conflict in conflicts:
edges.append((region, conflict))

# Add the edges to the graph
region_graph.add_edges_from(edges)
return region_graph


def _get_first_occurrence_for(iterable, wanted_object):
@@ -402,101 +423,6 @@ def _get_region_array_for(regions, content=[], dtype=[]):
return region_array, content_list


def _conflict_cliques(regions):
"""
Separate regions into mutually conflicting regions.
Parameters
----------
regions : set {_region, ...}
The regions to be separated.
Returns
-------
regions : list [set {_region, ...}, ...]
The separated mutually conflicting regions.
"""
# Get a region array and an array where each region start is denoted
# as ``True`` and each stop is denoted as ``False``
region_array, (start_stops,) = _get_region_array_for(
regions, content=[lambda a : [True, False]], dtype=['bool']
)
starts = np.nonzero(start_stops)[0]

# Mutually conflicting regions
cliques = []
# All regions that have been assigned a clique
seen = set()

# Iterate region starting points from left to right
for start_index in starts:
# Skip if the current region has already been assigned a clique
if region_array[start_index] in seen:
continue
# Members of the current clique that have not been assigned yet
queue = set([start_index])
# The current clíque
clique = set()

# Execute until all regions belonging to the current region have
# been assigned.
while queue != set():
# Assign new region of the queue to the current clique
current_index = queue.pop()
clique.add(region_array[current_index])
seen.add(region_array[current_index])

# Get regions conflicting with current region
mutually_conflicting = _conflicting_regions(
region_array, current_index
)
# Add conflicting regions to queue, if they are not part of
# the clique yet
for region_index in mutually_conflicting:
if region_array[region_index] not in clique:
queue.add(_get_first_occurrence_for(
region_array, region_array[region_index])
)
# Once all regions conflicting with the current region have
# been found, add clique to list of cliques
cliques.append(clique)

# Return the conflict cliques as list of sets
return cliques

def _conflicting_regions(region_array, start_index):
"""
Get regions conflicting with a specific region in a ``region_array``
as returned by :func:`get_region_array_for()`.
Parameters
----------
region_array : ndarray, dtype=object
The array of ordered region objects.
start_index : int
The start index of the region to find conflicts with
Returns
-------
conflicting_regions : ndarray, dtype=int
Start indices of the conflicting regions
"""
# Get the current regions start and stop indices in the region array
stop_index = _get_first_occurrence_for(
region_array[start_index+1:], region_array[start_index]
)
stop_index = start_index + 1 + stop_index
# Count the occurrence of each individual region between the start
# and stop indices of the regions
_, index, counts = np.unique(
region_array[start_index+1:stop_index],
return_counts=True, return_index=True
)

# Conflicting regions only have either their starting or stoping
# point within the starting and stoping points of the given region.
return index[counts==1] + start_index + 1

def _remove_pseudoknots(regions):
"""
Get the optimal solutions according to the algorithm referenced in
@@ -658,8 +584,8 @@ def _get_results(regions, results, max_pseudoknot_order, order=0):
"""

# Remove non-conflicting regions
non_conflicting = _find_non_conflicting_regions(regions)
regions = regions - non_conflicting
non_conflicting = [isolate for isolate in nx.isolates(regions)]
regions.remove_nodes_from(non_conflicting)

# Non-conflicting regions are of the current order:
index_list_non_conflicting = list(
@@ -677,7 +603,7 @@ def _get_results(regions, results, max_pseudoknot_order, order=0):

# Get the optimal solutions for given regions. Evaluate each clique
# of mutually conflicting regions seperately
cliques = _conflict_cliques(regions)
cliques = [component for component in nx.connected_components(regions)]
solutions = [set(chain(*e)) for e in product(
*[_remove_pseudoknots(clique) for clique in cliques]
)]
@@ -691,7 +617,8 @@ def _get_results(regions, results, max_pseudoknot_order, order=0):
for i, solution in enumerate(solutions):

# Get the pseudoknotted regions
pseudoknotted_regions = regions - solution
pseudoknotted_regions = regions.copy()
pseudoknotted_regions.remove_nodes_from(solution)

# Get an index list of the unknotted base pairs
index_list_unknotted = list(

0 comments on commit 8aff54e

Please sign in to comment.