Skip to content

Commit

Permalink
Revert "Remove old remote writeback code"
Browse files Browse the repository at this point in the history
  • Loading branch information
Mats-SX authored Jun 3, 2024
1 parent 03a5559 commit c3e3d9e
Show file tree
Hide file tree
Showing 15 changed files with 870 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

public final class NativeRelationshipStreamExporter extends StatementApi implements RelationshipStreamExporter {

private static final int QUEUE_CAPACITY = 2;

private final LongUnaryOperator toOriginalId;
private final Stream<ExportedRelationship> relationships;
private final int batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ResultStoreNodeLabelExporter implements NodeLabelExporter {

@Override
public void write(String nodeLabel) {
resultStore.addNodeLabel(nodeLabel, nodeCount, toOriginalId);
resultStore.add(jobId, new ResultStoreEntry.NodeLabel(nodeLabel, nodeCount, toOriginalId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public void write(Collection<NodeProperty> nodeProperties) {
var propertyKeys = new ArrayList<String>();
var propertyValues = new ArrayList<NodePropertyValues>();
nodeProperties.forEach(nodeProperty -> {
resultStore.addNodePropertyValues(
nodeLabels,
nodeProperty.propertyKey(),
nodeProperty.properties()
);
propertyKeys.add(nodeProperty.propertyKey());
propertyValues.add(nodeProperty.properties());
writtenProperties += nodeProperty.properties().nodeCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ResultStoreRelationshipExporter implements RelationshipExporter {

@Override
public void write(String relationshipType) {
resultStore.addRelationship(relationshipType, graph, toOriginalId);
resultStore.add(jobId, new ResultStoreEntry.RelationshipTopology(relationshipType, graph, toOriginalId));
}

Expand All @@ -64,6 +65,7 @@ public void write(
String propertyKey,
@Nullable RelationshipWithPropertyConsumer afterWriteConsumer
) {
resultStore.addRelationship(relationshipType, propertyKey, graph, toOriginalId);
resultStore.add(jobId, new ResultStoreEntry.RelationshipsFromGraph(relationshipType, propertyKey, graph, toOriginalId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ public class ResultStoreRelationshipPropertiesExporter implements RelationshipPr
public void write(String relationshipType, List<String> propertyKeys) {
if (propertyKeys.isEmpty()) {
var graph = graphStore.getGraph(RelationshipType.of(relationshipType));
resultStore.addRelationship(relationshipType, graph, graph::toOriginalNodeId);
resultStore.add(jobId, new ResultStoreEntry.RelationshipTopology(relationshipType, graph, graph::toOriginalNodeId));
} else if (propertyKeys.size() == 1) {
var propertyKey = propertyKeys.get(0);
var graph = graphStore.getGraph(RelationshipType.of(relationshipType), Optional.of(propertyKey));
resultStore.addRelationship(relationshipType, propertyKey, graph, graph::toOriginalNodeId);
resultStore.add(jobId, new ResultStoreEntry.RelationshipsFromGraph(relationshipType, propertyKey, graph, graph::toOriginalNodeId));
} else {
var relationshipIterator = graphStore.getCompositeRelationshipIterator(RelationshipType.of(relationshipType), propertyKeys);
resultStore.addRelationshipIterator(relationshipType, propertyKeys, relationshipIterator, graphStore.nodes()::toOriginalNodeId);
resultStore.add(
jobId,
new ResultStoreEntry.RelationshipIterators(relationshipType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ResultStoreRelationshipStreamExporter implements RelationshipStream

@Override
public long write(String relationshipType, List<String> propertyKeys, List<ValueType> propertyTypes) {
resultStore.addRelationshipStream(relationshipType, propertyKeys, propertyTypes, relationshipStream, toOriginalId);
resultStore.add(jobId, new ResultStoreEntry.RelationshipStream(relationshipType, propertyKeys, propertyTypes, relationshipStream, toOriginalId));
// TODO: return the number of relationships written
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ void shouldWriteToResultStore() {

assertThat(nodeLabelExporter.nodeLabelsWritten()).isEqualTo(5);

var nodeLabelEntry = resultStore.getNodeIdsByLabel("label");
assertThat(nodeLabelEntry.nodeCount()).isEqualTo(5);

for (int i = 0; i < 5; i++) {
assertThat(nodeLabelEntry.toOriginalId().applyAsLong(i)).isEqualTo(i + 42);
}

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.NodeLabel.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void shouldWriteSingleNodePropertyToResultStore() {
nodePropertyExporter.write("prop", nodePropertyValues);

assertThat(nodePropertyExporter.propertiesWritten()).isEqualTo(42L);
assertThat(resultStore.getNodePropertyValues(List.of("A"), "prop")).isEqualTo(nodePropertyValues);

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.NodeProperties.class);
Expand Down Expand Up @@ -79,6 +80,8 @@ void shouldWriteMultipleNodePropertiesToResultStore() {
);

assertThat(nodePropertyExporter.propertiesWritten()).isEqualTo(85L);
assertThat(resultStore.getNodePropertyValues(List.of(PROJECT_ALL), "prop1")).isEqualTo(nodePropertyValues1);
assertThat(resultStore.getNodePropertyValues(List.of(PROJECT_ALL), "prop2")).isEqualTo(nodePropertyValues2);

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.NodeProperties.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ void shouldWriteRelationshipWithoutPropertyToResultStore() {
var toOriginalId = mock(LongUnaryOperator.class);
new ResultStoreRelationshipExporter(jobId, resultStore, graph, toOriginalId).write("REL");

var relationshipEntry = resultStore.getRelationship("REL");
assertThat(relationshipEntry.graph()).isEqualTo(graph);
assertThat(relationshipEntry.toOriginalId()).isEqualTo(toOriginalId);

assertThat(resultStore.getRelationship("REL", "foo")).isNull();

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipTopology.class);

Expand All @@ -57,6 +63,12 @@ void shouldWriteRelationshipWithPropertyToResultStore() {
var toOriginalId = mock(LongUnaryOperator.class);
new ResultStoreRelationshipExporter(jobId, resultStore, graph, toOriginalId).write("REL", "prop");

var relationshipEntry = resultStore.getRelationship("REL", "prop");
assertThat(relationshipEntry.graph()).isEqualTo(graph);
assertThat(relationshipEntry.toOriginalId()).isEqualTo(toOriginalId);

assertThat(resultStore.getRelationship("REL")).isNull();

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipsFromGraph.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ void shouldWriteRelationshipIteratorsToResultStore() {
new ResultStoreRelationshipPropertiesExporter(jobId, graphStore, resultStore)
.write("TYPE", List.of("foo", "bar"));

var relationshipIteratorEntry = resultStore.getRelationshipIterator("TYPE", List.of("foo", "bar"));
assertThat(relationshipIteratorEntry.relationshipIterator()).isEqualTo(relationshipIterator);

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipIterators.class);
var jobIdRelationshipIteratorEntry = (ResultStoreEntry.RelationshipIterators) entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.junit.jupiter.api.Test;
import org.neo4j.gds.api.EphemeralResultStore;
import org.neo4j.gds.api.ExportedRelationship;
import org.neo4j.gds.api.ImmutableExportedRelationship;
import org.neo4j.gds.api.ResultStoreEntry;
import org.neo4j.gds.api.nodeproperties.ValueType;
Expand Down Expand Up @@ -51,16 +52,49 @@ void shouldWriteRelationshipStreamWithPropertiesToResultStore() {
List.of("doubleProp", "doubleArrayProp"),
List.of(ValueType.DOUBLE, ValueType.DOUBLE_ARRAY)
);
var relationshipStreamEntry = resultStore.getRelationshipStream("REL", List.of("doubleProp", "doubleArrayProp"));
assertThat(relationshipStreamEntry).isNotNull();
assertRelationshipEntryWithProperties(
relationshipStreamEntry.propertyTypes(),
relationshipStreamEntry.relationshipStream(),
relationshipStreamEntry.toOriginalId(),
mappingOperator
);

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipStream.class);

var jobIdRelationshipStreamEntry = (ResultStoreEntry.RelationshipStream) entry;

assertThat(jobIdRelationshipStreamEntry.relationshipType()).isEqualTo("REL");
assertThat(jobIdRelationshipStreamEntry.propertyKeys()).containsExactly("doubleProp", "doubleArrayProp");
assertThat(jobIdRelationshipStreamEntry.propertyTypes()).containsExactly(ValueType.DOUBLE, ValueType.DOUBLE_ARRAY);
assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStream);
assertThat(jobIdRelationshipStreamEntry.propertyTypes()).isEqualTo(relationshipStreamEntry.propertyTypes());
assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStreamEntry.relationshipStream());
}

private static void assertRelationshipEntryWithProperties(
List<ValueType> propertyTypes,
Stream<ExportedRelationship> relationshipStream,
LongUnaryOperator actualOperator,
LongUnaryOperator expectedOperator
) {
assertThat(propertyTypes).isEqualTo(List.of(ValueType.DOUBLE, ValueType.DOUBLE_ARRAY));
assertThat(actualOperator).isEqualTo(expectedOperator);

var relationshipIterator = relationshipStream.iterator();

assertThat(relationshipIterator).hasNext();
var firstRelationship = relationshipIterator.next();
assertThat(firstRelationship.sourceNode()).isEqualTo(0L);
assertThat(firstRelationship.targetNode()).isEqualTo(1L);
assertThat(firstRelationship.values()).containsExactly(Values.doubleValue(42.0), Values.doubleArray(new double[]{ 43.0, 44.0 }));

assertThat(relationshipIterator).hasNext();
var secondRelationship = relationshipIterator.next();
assertThat(secondRelationship.sourceNode()).isEqualTo(1L);
assertThat(secondRelationship.targetNode()).isEqualTo(2L);
assertThat(secondRelationship.values()).containsExactly(Values.doubleValue(45.0), Values.doubleArray(new double[]{ 46.0, 47.0 }));

assertThat(relationshipIterator).isExhausted();
}

@Test
Expand All @@ -77,13 +111,47 @@ void shouldWriteRelationshipStreamWithoutPropertiesToResultStore() {
List.of(),
List.of()
);
var relationshipStreamEntry = resultStore.getRelationshipStream("REL", List.of());
assertThat(relationshipStreamEntry).isNotNull();
assertRelationshipEntryWithoutProperties(
relationshipStreamEntry.propertyTypes(),
relationshipStreamEntry.relationshipStream(),
relationshipStreamEntry.toOriginalId(),
mappingOperator
);

var entry = resultStore.get(jobId);
assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipStream.class);
var jobIdRelationshipStreamEntry = (ResultStoreEntry.RelationshipStream) entry;
assertThat(jobIdRelationshipStreamEntry.relationshipType()).isEqualTo("REL");
assertThat(jobIdRelationshipStreamEntry.propertyKeys()).isEmpty();
assertThat(jobIdRelationshipStreamEntry.propertyTypes()).isEmpty();
assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStream);
assertThat(jobIdRelationshipStreamEntry.propertyTypes()).isEqualTo(relationshipStreamEntry.propertyTypes());
assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStreamEntry.relationshipStream());
}

private static void assertRelationshipEntryWithoutProperties(
List<ValueType> propertyTypes,
Stream<ExportedRelationship> relationshipStream,
LongUnaryOperator actualOperator,
LongUnaryOperator expectedOperator
) {
assertThat(propertyTypes).isEqualTo(List.of());
assertThat(actualOperator).isEqualTo(expectedOperator);

var relationshipIterator = relationshipStream.iterator();

assertThat(relationshipIterator).hasNext();
var firstRelationship = relationshipIterator.next();
assertThat(firstRelationship.sourceNode()).isEqualTo(0L);
assertThat(firstRelationship.targetNode()).isEqualTo(1L);
assertThat(firstRelationship.values()).isEmpty();

assertThat(relationshipIterator).hasNext();
var secondRelationship = relationshipIterator.next();
assertThat(secondRelationship.sourceNode()).isEqualTo(1L);
assertThat(secondRelationship.targetNode()).isEqualTo(2L);
assertThat(secondRelationship.values()).isEmpty();

assertThat(relationshipIterator).isExhausted();
}
}
137 changes: 137 additions & 0 deletions core/src/main/java/org/neo4j/gds/api/EmptyResultStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
*/
package org.neo4j.gds.api;

import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.api.properties.nodes.NodePropertyValues;
import org.neo4j.gds.core.utils.progress.JobId;

import java.util.List;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream;

public class EmptyResultStore implements ResultStore {

@Override
Expand All @@ -42,4 +48,135 @@ public boolean hasEntry(JobId jobId) {
public void remove(JobId jobId) {

}

@Override
public void addNodePropertyValues(List<String> nodeLabels, String propertyKey, NodePropertyValues propertyValues) {

}

@Override
public NodePropertyValues getNodePropertyValues(List<String> nodeLabels, String propertyKey) {
return null;
}

@Override
public void removeNodePropertyValues(List<String> nodeLabels, String propertyKey) {

}

@Override
public void addNodeLabel(String nodeLabel, long nodeCount, LongUnaryOperator toOriginalId) {

}

@Override
public boolean hasNodeLabel(String nodeLabel) {
return false;
}

@Override
public NodeLabelEntry getNodeIdsByLabel(String nodeLabel) {
return null;
}

@Override
public void removeNodeLabel(String nodeLabel) {

}

@Override
public void addRelationship(String relationshipType, Graph graph, LongUnaryOperator toOriginalId) {

}

@Override
public void addRelationship(
String relationshipType,
String propertyKey,
Graph graph,
LongUnaryOperator toOriginalId
) {

}

@Override
public RelationshipEntry getRelationship(String relationshipType) {
return null;
}

@Override
public RelationshipEntry getRelationship(String relationshipType, String propertyKey) {
return null;
}

@Override
public void removeRelationship(String relationshipType) {

}

@Override
public void removeRelationship(String relationshipType, String propertyKey) {

}

@Override
public void addRelationshipStream(
String relationshipType,
List<String> propertyKeys,
List<ValueType> propertyTypes,
Stream<ExportedRelationship> relationshipStream,
LongUnaryOperator toOriginalId
) {

}

@Override
public RelationshipStreamEntry getRelationshipStream(String relationshipType, List<String> propertyKeys) {
return null;
}

@Override
public void removeRelationshipStream(String relationshipType, List<String> propertyKeys) {

}

@Override
public void addRelationshipIterator(
String relationshipType,
List<String> propertyKeys,
CompositeRelationshipIterator relationshipIterator,
LongUnaryOperator toOriginalId
) {

}

@Override
public RelationshipIteratorEntry getRelationshipIterator(String relationshipType, List<String> propertyKeys) {
return null;
}

@Override
public void removeRelationshipIterator(String relationshipType, List<String> propertyKeys) {

}

@Override
public boolean hasRelationshipIterator(String relationshipType, List<String> propertyKeys) {
return false;
}

@Override
public boolean hasRelationship(String relationshipType) {
return false;
}

@Override
public boolean hasRelationship(String relationshipType, List<String> propertyKeys) {
return false;
}

@Override
public boolean hasRelationshipStream(String relationshipType, List<String> propertyKeys) {
return false;
}
}
Loading

0 comments on commit c3e3d9e

Please sign in to comment.