Skip to content

Commit

Permalink
ready for release
Browse files Browse the repository at this point in the history
  • Loading branch information
ajs1998 committed Jun 24, 2022
1 parent 852cd03 commit 8d6186e
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 55 deletions.
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ It includes a `Dag<T>` interface so you can provide your own implementation.
```java
Dag<String> dag = new HashDag<>();

// Add nodes with (parent, child) relationships to the DAG
// Add nodes with source -> target relationships to the DAG
dag.put("Dorothy", "Shelby");
dag.put("Shelby", "Alex");
dag.put("Joe", "Alex");
Expand All @@ -24,8 +24,9 @@ dag.put("Joe", "Alex");
dag.add("Clare");
dag.add("Sarah");

// Find a reverse-topologically sorted list of the nodes
// Find a topologically sorted list of the nodes
// Ex: ["Alex", "Joe", "Sarah", "Shelby", "Dorothy", "Clare"]
// Ex: ["Dorothy", "Shelby", "Clare", "Joe", "Alex", "Sarah"]
List<String> sorted = dag.sort();

// Find the root nodes of the DAG
Expand All @@ -36,13 +37,13 @@ Set<String> roots = dag.getRoots();
// Ex: ["Alex", "Clare", "Sarah"]
Set<String> leaves = dag.getLeaves();

// Find the parents of a node
// Find a node's incoming nodes
// Ex: ["Joe", "Shelby"]
Set<String> parents = dag.getParents("Alex");
Set<String> incoming = dag.getIncoming("Alex");

// Find the children of a node
// Find a node's outgoing nodes
// Ex: ["Shelby"]
Set<String> children = dag.getChildren("Dorothy");
Set<String> outgoing = dag.getOutgoing("Dorothy");

// Find the ancestors of a node
// Ex: ["Joe", "Shelby", "Dorothy"]
Expand All @@ -62,7 +63,7 @@ Dag<String> copy = dag.clone();
### DAG Traversal

You can use a `DagTraversalTask` to run a task on each node in multiple threads. Each node is only visited once all of
its children have been visited. This is useful for running complex multithreaded pipelines on nodes with shared
its incoming nodes have been visited. This is useful for running complex multithreaded pipelines on nodes with shared
dependencies.

```java
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/me/alexjs/dag/Dag.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface Dag<T> extends Collection<T>, Cloneable, Serializable {
boolean removeEdge(T source, T target);

/**
* Orders the nodes of this DAG such that each source node comes before its target nodes in the ordering
* Orders the nodes of this DAG such that each node comes before its outgoing nodes in the ordering
*
* @return a list of nodes in topological order, or {@code null} if there's a circular dependency
* @see <a href="https://en.wikipedia.org/wiki/Topological_sorting">https://en.wikipedia.org/wiki/Topological_sorting</a>
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/me/alexjs/dag/DagTraversalTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public class DagTraversalTask<T> {
private final Dag<T> dag;
private final Consumer<T> task;
private final ListeningExecutorService executorService;
private final Map<T, Set<T>> parents;
private final Map<T, Set<T>> outgoingNodes;
private final Lock lock;
private final AtomicBoolean failed;

/**
* Create a task that traverses a DAG with an {@link java.util.concurrent.ExecutorService}
* <p>
* The nodes will be traversed in reverse-topological order,
* such that no node is visited until all its children have been visited.
* The nodes will be traversed in topological order,
* such that no node is visited until all its incoming nodes have been visited.
*
* @param dag the DAG to traverse
* @param task the task to apply to each node
Expand All @@ -49,21 +49,21 @@ public DagTraversalTask(Dag<T> dag, Consumer<T> task, ExecutorService executorSe
this.dag = dag.clone();
this.task = task;
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.parents = new HashMap<>();
this.outgoingNodes = new HashMap<>();
this.lock = new ReentrantLock(true);
this.failed = new AtomicBoolean();

// Cache the parents of each node for this DAG
this.dag.getNodes().forEach(node -> this.parents.put(node, this.dag.getIncoming(node)));
// Cache each node's outgoing nodes for this DAG
this.dag.getNodes().forEach(node -> this.outgoingNodes.put(node, this.dag.getOutgoing(node)));

// Get the set of leaves for this dag
Set<T> leaves = this.dag.getLeaves();
Set<T> roots = this.dag.getRoots();

// If there are no leaves, then there are no nodes to visit
if (leaves.isEmpty()) {
if (roots.isEmpty()) {
executorService.shutdown();
} else {
visit(leaves);
visit(roots);
}

}
Expand Down Expand Up @@ -101,13 +101,13 @@ private void visit(Collection<T> nodes) {
executorService.shutdown();
}

Set<T> parents = this.parents.get(node);
parents.retainAll(dag.getNodes());
parents.removeIf(p -> !dag.getOutgoing(p).isEmpty());
Set<T> outgoing = this.outgoingNodes.get(node);
outgoing.retainAll(dag.getNodes());
outgoing.removeIf(p -> !dag.getIncoming(p).isEmpty());

lock.unlock();

visit(parents);
visit(outgoing);

}, MoreExecutors.directExecutor());
} catch (Throwable ignore) {
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/me/alexjs/dag/HashDag.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,21 @@ public List<E> sort() {
List<E> sorted = new LinkedList<>();
Deque<E> s = new LinkedList<>(getRoots());

Dag<E> copy = clone();
Map<E, Collection<E>> copy = this.toMap();

while (!s.isEmpty()) {
E n = s.pop();
sorted.add(n);
for (E m : copy.getOutgoing(n)) {
copy.removeEdge(n, m);
if (copy.getIncoming(m).isEmpty()) {

for (E m : copy.remove(n)) {
boolean hasIncoming = false;
for (Collection<E> entry : copy.values()) {
if (entry.contains(m)) {
hasIncoming = true;
break;
}
}
if (!hasIncoming) {
s.add(m);
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/me/alexjs/dag/TestDag.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static void init() {
@RepeatedTest(50)
public void testSort() {

Dag<Integer> dag = helper.populateDag();
Dag<Integer> dag = helper.populateDagSimple();

List<Integer> sorted = dag.sort();

Expand Down Expand Up @@ -94,23 +94,23 @@ public void testEmptyDag() {
Set<Integer> leaves = dag.getLeaves();
Set<Integer> ancestors = dag.getAncestors(0);
Set<Integer> descendants = dag.getDescendants(0);
Set<Integer> parents = dag.getIncoming(0);
Set<Integer> children = dag.getOutgoing(0);
Set<Integer> incoming = dag.getIncoming(0);
Set<Integer> outgoing = dag.getOutgoing(0);

Assertions.assertTrue(roots.isEmpty());
Assertions.assertTrue(leaves.isEmpty());
Assertions.assertTrue(ancestors.isEmpty());
Assertions.assertTrue(descendants.isEmpty());
Assertions.assertTrue(parents.isEmpty());
Assertions.assertTrue(children.isEmpty());
Assertions.assertTrue(incoming.isEmpty());
Assertions.assertTrue(outgoing.isEmpty());

Iterator<Integer> it = dag.iterator();
Assertions.assertFalse(it.hasNext());

}

@Test
public void testNoChildren() {
public void testNoOutgoing() {

// Test with putAll()
Dag<Integer> dag = new HashDag<>();
Expand Down
28 changes: 17 additions & 11 deletions src/test/java/me/alexjs/dag/TestDagCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,12 @@ public void testRemoveAndRetain() {
Assertions.assertTrue(dag.removeAll(List.of(5, 6, 7)));
Assertions.assertTrue(dag.isEmpty());

dag.put(10, 11);
dag.put(10, 12);
dag.put(13, 14);
dag.put(15, 14);
Assertions.assertEquals(2, dag.getOutgoing(10).size());
Assertions.assertEquals(2, dag.getIncoming(14).size());

Assertions.assertTrue(dag.retainAll(List.of(10, 14)));
Assertions.assertEquals(0, dag.getOutgoing(10).size());
Assertions.assertEquals(0, dag.getIncoming(14).size());
dag.put(8, 9);
Assertions.assertTrue(dag.removeEdge(8, 9));
Assertions.assertTrue(dag.contains(8));
Assertions.assertTrue(dag.contains(9));
Assertions.assertTrue(dag.getOutgoing(8).isEmpty());
Assertions.assertTrue(dag.getIncoming(9).isEmpty());

}

Expand All @@ -58,12 +54,22 @@ public void testSize() {

Assertions.assertTrue(dag.size() > 0);
Assertions.assertFalse(dag.isEmpty());

dag.clear();

Assertions.assertEquals(0, dag.size());
Assertions.assertTrue(dag.isEmpty());

dag.put(1, 2);
dag.put(1, 3);
dag.put(4, 6);
dag.put(5, 6);
Assertions.assertEquals(2, dag.getOutgoing(1).size());
Assertions.assertEquals(2, dag.getIncoming(6).size());

Assertions.assertTrue(dag.retainAll(List.of(1, 6)));
Assertions.assertEquals(0, dag.getOutgoing(1).size());
Assertions.assertEquals(0, dag.getIncoming(6).size());

}

@Test
Expand Down
23 changes: 10 additions & 13 deletions src/test/java/me/alexjs/dag/TestingHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ public TestingHelper() {

public void assertOrder(Dag<Integer> dag, List<Integer> sorted) {
Assertions.assertEquals(dag.getNodes().size(), sorted.size());
for (Integer parent : sorted) {
// If a parent comes before any of its children, then fail
for (Integer child : dag.getOutgoing(parent)) {
if (sorted.indexOf(parent) < sorted.indexOf(child)) {
Assertions.fail();
return;
}
for (Integer node : sorted) {
// If a node comes before any of its outgoing nodes, then fail
for (Integer outgoing : dag.getOutgoing(node)) {
Assertions.assertTrue(sorted.indexOf(node) < sorted.indexOf(outgoing));
}
}
}
Expand All @@ -47,17 +44,17 @@ public int getMiddleNode(Dag<Integer> dag) {

public Dag<Integer> populateDag() {

// Add a ton of parent-child relationships. Many nodes will have multiple children
// Add a ton of source-target relationships. Many nodes will have multiple outgoing edges
Dag<Integer> dag = new HashDag<>();
int nodes = random.nextInt(5000) + 5000;
for (int i = 0; i < nodes; i++) {
// A parent will always be strictly less than its children to ensure no circular dependencies
int parent = random.nextInt(500);
int child = parent + random.nextInt(500) + 1;
dag.put(parent, child);
// Each node will always be strictly less than its outgoing nodes to ensure no circular dependencies
int source = random.nextInt(500);
int target = source + random.nextInt(500) + 1;
dag.put(source, target);
}

// Nodes that are guaranteed to have no parents or children
// Nodes that are guaranteed to have no incoming or outgoing edges
ArrayList<Integer> orphans = IntStream.generate(() -> random.nextInt(2000) + 1000)
.limit(100)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Expand Down

0 comments on commit 8d6186e

Please sign in to comment.