Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new project_to_dag algorithm #206

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contextualized/baselines/networks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def predict_w(self, model, dataloader, project_to_dag=True):
preds = self.predict(model, dataloader)
W = model.W.detach() * model.diag_mask
if project_to_dag:
W = torch.tensor(project_to_dag_torch(W.numpy(force=True))[0])
W = torch.tensor(project_to_dag_torch(W.numpy(force=True)))
W_batch = W.unsqueeze(0).expand(len(preds), -1, -1)
return W_batch.numpy()

Expand Down
131 changes: 72 additions & 59 deletions contextualized/dags/graph_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,77 +88,90 @@ def _simulate_single_equation(X, w, scale):
X[:, j] = _simulate_single_equation(X[:, parents], W[parents, j], scale_vec[j])
return X

def is_dag(W):
G = ig.Graph.Weighted_Adjacency(W.tolist())
return G.is_dag()

def break_symmetry(w):
for i in range(w.shape[0]):
w[i][i] = 0.0
for j in range(i):
if np.abs(w[i][j]) > np.abs(w[j][i]):
w[j][i] = 0.0
else:
w[i][j] = 0.0
return w

def trim_params(w, thresh=0.2):
return w * (np.abs(w) > thresh)


# w is the weighted adjacency matrix
def project_to_dag_torch(w):
def project_to_dag_torch(w, thresh=0.0):
"""
Project a weight matrix to the closest DAG in Frobenius norm.
"""

if is_dag(w):
return w, 0.0
return w

w_dag = w.copy()
w_dag = break_symmetry(w_dag)

vals = sorted(list(set(np.abs(w_dag).flatten())))
low = 0
high = len(vals) - 1

def binary_search(arr, low, high, w): # low and high are indices
# Check base case
if high == low:
return high
if high > low:
mid = (high + low) // 2
if mid == 0:
return mid
result = trim_params(w, arr[mid])
if is_dag(result):
result2 = trim_params(w, arr[mid - 1])
if is_dag(result2): # middle value is too high. go lower.
return binary_search(arr, low, mid - 1, w)
else:
return mid # found it
else: # middle value is too low. go higher.
return binary_search(arr, mid + 1, high, w)
else:
# Element is not present in the array
print("this should be impossible")
return -1
# Easy case first: remove diagnoal entries.
w_dag *= 1 - np.eye(w.shape[0])

idx = binary_search(vals, low, high, w_dag) + 1
thresh = vals[idx]
# First, remove edges with weights smaller than the thresh.
w_dag = trim_params(w_dag, thresh)

# Now add back in edges with weights smaller than the thresh that don't violate DAG-ness.
# want a list of edges (i, j) with weight in decreasing order.
all_vals = np.abs(w_dag).flatten()
idxs_sorted = reversed(np.argsort(all_vals))
for idx in idxs_sorted:
i = idx // w_dag.shape[1]
j = idx % w_dag.shape[1]
if np.abs(w[i][j]) > thresh: # already retained
continue
w_dag[i][j] = w[i][j]
if not is_dag(w_dag):
w_dag[i][j] = 0.0
# Sort nodes by magnitude of edges pointing out.
order = np.argsort(np.abs(w_dag).sum(axis=1))[::-1]

# Re-order
w_dag = w_dag[order, :][:, order]

# Keep only forward edges (i.e. upper triangular part).
w_dag = np.triu(w_dag)

# Return to original order
w_dag = w_dag[np.argsort(order), :][:, np.argsort(order)]

assert is_dag(w_dag)
return w_dag, thresh
return w_dag


def is_dag(W):
G = ig.Graph.Weighted_Adjacency(W.tolist())
return G.is_dag()
def break_symmetry(w):
for i in range(w.shape[0]):
w[i][i] = 0.0
for j in range(i):
if np.abs(w[i][j]) > np.abs(w[j][i]):
w[j][i] = 0.0
else:
w[i][j] = 0.0
return w


def trim_params(w, thresh=0.2):
return w * (np.abs(w) > thresh)
def project_to_dag_search(W):
W = W.copy()
if ig.Graph.Weighted_Adjacency(W).is_dag():
return W
W_mag = np.abs(W)
W_flat = W_mag.flatten()

# Binary search for the minimum threshold where W is a DAG, O(|E|log|E|)
weights = np.sort(W_flat)
low = 0
mid = 0
high = len(weights) - 1
while low < high - 1:
new_mid = (low + high) // 2
mid = new_mid
# print(low, mid, high)
if ig.Graph.Weighted_Adjacency(W * (W_mag > weights[mid])).is_dag():
high = mid
else:
low = mid
W_dag = W * (W_mag > weights[high])

# Re-add edges we removed that don't violate the topological order, O(|E|)
p = len(W_dag)
weights_i = np.argsort(W_flat)
toposort = ig.Graph.Weighted_Adjacency(W_dag).topological_sorting()
toposort_lookup = np.zeros(p)
for topo_i, topo_node in enumerate(toposort):
toposort_lookup[topo_node] = topo_i
for sorted_i in range(high, -1, -1):
i = weights_i[sorted_i]
parent_i = i // p
child_i = i % p
if toposort_lookup[parent_i] < toposort_lookup[child_i]:
W_dag[parent_i, child_i] = weights[sorted_i]
return W_dag
2 changes: 1 addition & 1 deletion contextualized/dags/lightning_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def _format_params(self, w_preds, **kwargs):
w_preds = self._project_factor_graph_to_var(w_preds)
if kwargs.get("project_to_dag", False):
try:
w_preds = np.array([project_to_dag_torch(w)[0] for w in w_preds])
w_preds = np.array([project_to_dag_torch(w) for w in w_preds])
except:
print("Error, couldn't project to dag. Returning normal predictions.")
return trim_params(w_preds, thresh=kwargs.get("threshold", 0.0))
Expand Down
44 changes: 44 additions & 0 deletions contextualized/dags/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,50 @@
from contextualized.dags.trainers import GraphTrainer
from contextualized.dags.losses import mse_loss as mse

class TestProjectToDag(unittest.TestCase):
"""
Test that the project_to_dag function works to create a DAG from a directed cyclic graph.
"""
def __init__(self, *args, **kwargs):
super(TestProjectToDag, self).__init__(*args, **kwargs)

def setUp(self):
"""
Shared unit test setup code.
"""
pass

def test_project_to_dag(self):
"""
Test that the project_to_dag function works to create a DAG from a directed cyclic graph.
"""
# Create a cyclic graph.
W = np.zeros((5, 5))
W[0, 1] = 1
W[1, 2] = 1
W[2, 3] = 1
W[3, 4] = 1
W[4, 0] = 1

# Project to a DAG.
dag = graph_utils.project_to_dag_torch(W)
assert graph_utils.is_dag(dag)

def test_project_to_dag_from_dag(self):
"""
Test that the project_to_dag function works to create a DAG from a DAG.
"""
# Create a DAG.
W = np.zeros((5, 5))
W[0, 1] = 1
W[1, 2] = 1
W[2, 3] = 1
W[3, 4] = 1

# Project to a DAG.
dag = graph_utils.project_to_dag_torch(W)
assert graph_utils.is_dag(dag)


class TestNOTMAD(unittest.TestCase):
"""Unit tests for NOTMAD."""
Expand Down
Loading
Loading