diff --git a/core-write/src/main/java/org/neo4j/gds/core/write/NativeRelationshipStreamExporter.java b/core-write/src/main/java/org/neo4j/gds/core/write/NativeRelationshipStreamExporter.java index 9467bb5db5..f2ffe6056d 100644 --- a/core-write/src/main/java/org/neo4j/gds/core/write/NativeRelationshipStreamExporter.java +++ b/core-write/src/main/java/org/neo4j/gds/core/write/NativeRelationshipStreamExporter.java @@ -42,6 +42,8 @@ public final class NativeRelationshipStreamExporter extends StatementApi implements RelationshipStreamExporter { + private static final int QUEUE_CAPACITY = 2; + private final LongUnaryOperator toOriginalId; private final Stream relationships; private final int batchSize; diff --git a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporter.java b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporter.java index 19f8fd76ba..72e01440bd 100644 --- a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporter.java +++ b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporter.java @@ -42,6 +42,7 @@ public class ResultStoreNodeLabelExporter implements NodeLabelExporter { @Override public void write(String nodeLabel) { + resultStore.addNodeLabel(nodeLabel, nodeCount, toOriginalId); resultStore.add(jobId, new ResultStoreEntry.NodeLabel(nodeLabel, nodeCount, toOriginalId)); } diff --git a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporter.java b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporter.java index db761886b6..26d4aac62b 100644 --- a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporter.java +++ b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporter.java @@ -62,6 +62,11 @@ public void write(Collection nodeProperties) { var propertyKeys = new ArrayList(); var propertyValues = new ArrayList(); nodeProperties.forEach(nodeProperty -> { + resultStore.addNodePropertyValues( + nodeLabels, + nodeProperty.propertyKey(), + nodeProperty.properties() + ); propertyKeys.add(nodeProperty.propertyKey()); propertyValues.add(nodeProperty.properties()); writtenProperties += nodeProperty.properties().nodeCount(); diff --git a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporter.java b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporter.java index a4c4b7c425..03728fabe3 100644 --- a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporter.java +++ b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporter.java @@ -50,6 +50,7 @@ public class ResultStoreRelationshipExporter implements RelationshipExporter { @Override public void write(String relationshipType) { + resultStore.addRelationship(relationshipType, graph, toOriginalId); resultStore.add(jobId, new ResultStoreEntry.RelationshipTopology(relationshipType, graph, toOriginalId)); } @@ -64,6 +65,7 @@ public void write( String propertyKey, @Nullable RelationshipWithPropertyConsumer afterWriteConsumer ) { + resultStore.addRelationship(relationshipType, propertyKey, graph, toOriginalId); resultStore.add(jobId, new ResultStoreEntry.RelationshipsFromGraph(relationshipType, propertyKey, graph, toOriginalId)); } } diff --git a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporter.java b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporter.java index 5faa02d2b1..94813b4a12 100644 --- a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporter.java +++ b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporter.java @@ -49,13 +49,16 @@ public class ResultStoreRelationshipPropertiesExporter implements RelationshipPr public void write(String relationshipType, List propertyKeys) { if (propertyKeys.isEmpty()) { var graph = graphStore.getGraph(RelationshipType.of(relationshipType)); + resultStore.addRelationship(relationshipType, graph, graph::toOriginalNodeId); resultStore.add(jobId, new ResultStoreEntry.RelationshipTopology(relationshipType, graph, graph::toOriginalNodeId)); } else if (propertyKeys.size() == 1) { var propertyKey = propertyKeys.get(0); var graph = graphStore.getGraph(RelationshipType.of(relationshipType), Optional.of(propertyKey)); + resultStore.addRelationship(relationshipType, propertyKey, graph, graph::toOriginalNodeId); resultStore.add(jobId, new ResultStoreEntry.RelationshipsFromGraph(relationshipType, propertyKey, graph, graph::toOriginalNodeId)); } else { var relationshipIterator = graphStore.getCompositeRelationshipIterator(RelationshipType.of(relationshipType), propertyKeys); + resultStore.addRelationshipIterator(relationshipType, propertyKeys, relationshipIterator, graphStore.nodes()::toOriginalNodeId); resultStore.add( jobId, new ResultStoreEntry.RelationshipIterators(relationshipType, diff --git a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporter.java b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporter.java index 3d04ba1134..40475a6ef0 100644 --- a/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporter.java +++ b/core-write/src/main/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporter.java @@ -51,6 +51,7 @@ public class ResultStoreRelationshipStreamExporter implements RelationshipStream @Override public long write(String relationshipType, List propertyKeys, List propertyTypes) { + resultStore.addRelationshipStream(relationshipType, propertyKeys, propertyTypes, relationshipStream, toOriginalId); resultStore.add(jobId, new ResultStoreEntry.RelationshipStream(relationshipType, propertyKeys, propertyTypes, relationshipStream, toOriginalId)); // TODO: return the number of relationships written return 0; diff --git a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporterTest.java b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporterTest.java index d9ba222296..4f51d603a5 100644 --- a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporterTest.java +++ b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodeLabelExporterTest.java @@ -40,6 +40,13 @@ void shouldWriteToResultStore() { assertThat(nodeLabelExporter.nodeLabelsWritten()).isEqualTo(5); + var nodeLabelEntry = resultStore.getNodeIdsByLabel("label"); + assertThat(nodeLabelEntry.nodeCount()).isEqualTo(5); + + for (int i = 0; i < 5; i++) { + assertThat(nodeLabelEntry.toOriginalId().applyAsLong(i)).isEqualTo(i + 42); + } + var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.NodeLabel.class); diff --git a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporterTest.java b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporterTest.java index 489a8a760f..f6d94f0c12 100644 --- a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporterTest.java +++ b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreNodePropertyExporterTest.java @@ -48,6 +48,7 @@ void shouldWriteSingleNodePropertyToResultStore() { nodePropertyExporter.write("prop", nodePropertyValues); assertThat(nodePropertyExporter.propertiesWritten()).isEqualTo(42L); + assertThat(resultStore.getNodePropertyValues(List.of("A"), "prop")).isEqualTo(nodePropertyValues); var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.NodeProperties.class); @@ -79,6 +80,8 @@ void shouldWriteMultipleNodePropertiesToResultStore() { ); assertThat(nodePropertyExporter.propertiesWritten()).isEqualTo(85L); + assertThat(resultStore.getNodePropertyValues(List.of(PROJECT_ALL), "prop1")).isEqualTo(nodePropertyValues1); + assertThat(resultStore.getNodePropertyValues(List.of(PROJECT_ALL), "prop2")).isEqualTo(nodePropertyValues2); var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.NodeProperties.class); diff --git a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporterTest.java b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporterTest.java index 8c4433a407..05c228c0af 100644 --- a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporterTest.java +++ b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipExporterTest.java @@ -40,6 +40,12 @@ void shouldWriteRelationshipWithoutPropertyToResultStore() { var toOriginalId = mock(LongUnaryOperator.class); new ResultStoreRelationshipExporter(jobId, resultStore, graph, toOriginalId).write("REL"); + var relationshipEntry = resultStore.getRelationship("REL"); + assertThat(relationshipEntry.graph()).isEqualTo(graph); + assertThat(relationshipEntry.toOriginalId()).isEqualTo(toOriginalId); + + assertThat(resultStore.getRelationship("REL", "foo")).isNull(); + var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipTopology.class); @@ -57,6 +63,12 @@ void shouldWriteRelationshipWithPropertyToResultStore() { var toOriginalId = mock(LongUnaryOperator.class); new ResultStoreRelationshipExporter(jobId, resultStore, graph, toOriginalId).write("REL", "prop"); + var relationshipEntry = resultStore.getRelationship("REL", "prop"); + assertThat(relationshipEntry.graph()).isEqualTo(graph); + assertThat(relationshipEntry.toOriginalId()).isEqualTo(toOriginalId); + + assertThat(resultStore.getRelationship("REL")).isNull(); + var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipsFromGraph.class); diff --git a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporterTest.java b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporterTest.java index ea15f9431e..a20d2f3410 100644 --- a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporterTest.java +++ b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipPropertiesExporterTest.java @@ -48,6 +48,9 @@ void shouldWriteRelationshipIteratorsToResultStore() { new ResultStoreRelationshipPropertiesExporter(jobId, graphStore, resultStore) .write("TYPE", List.of("foo", "bar")); + var relationshipIteratorEntry = resultStore.getRelationshipIterator("TYPE", List.of("foo", "bar")); + assertThat(relationshipIteratorEntry.relationshipIterator()).isEqualTo(relationshipIterator); + var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipIterators.class); var jobIdRelationshipIteratorEntry = (ResultStoreEntry.RelationshipIterators) entry; diff --git a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporterTest.java b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporterTest.java index fe6ae339d6..6b477bfe5e 100644 --- a/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporterTest.java +++ b/core-write/src/test/java/org/neo4j/gds/core/write/resultstore/ResultStoreRelationshipStreamExporterTest.java @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test; import org.neo4j.gds.api.EphemeralResultStore; +import org.neo4j.gds.api.ExportedRelationship; import org.neo4j.gds.api.ImmutableExportedRelationship; import org.neo4j.gds.api.ResultStoreEntry; import org.neo4j.gds.api.nodeproperties.ValueType; @@ -51,16 +52,49 @@ void shouldWriteRelationshipStreamWithPropertiesToResultStore() { List.of("doubleProp", "doubleArrayProp"), List.of(ValueType.DOUBLE, ValueType.DOUBLE_ARRAY) ); + var relationshipStreamEntry = resultStore.getRelationshipStream("REL", List.of("doubleProp", "doubleArrayProp")); + assertThat(relationshipStreamEntry).isNotNull(); + assertRelationshipEntryWithProperties( + relationshipStreamEntry.propertyTypes(), + relationshipStreamEntry.relationshipStream(), + relationshipStreamEntry.toOriginalId(), + mappingOperator + ); var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipStream.class); var jobIdRelationshipStreamEntry = (ResultStoreEntry.RelationshipStream) entry; - assertThat(jobIdRelationshipStreamEntry.relationshipType()).isEqualTo("REL"); assertThat(jobIdRelationshipStreamEntry.propertyKeys()).containsExactly("doubleProp", "doubleArrayProp"); - assertThat(jobIdRelationshipStreamEntry.propertyTypes()).containsExactly(ValueType.DOUBLE, ValueType.DOUBLE_ARRAY); - assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStream); + assertThat(jobIdRelationshipStreamEntry.propertyTypes()).isEqualTo(relationshipStreamEntry.propertyTypes()); + assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStreamEntry.relationshipStream()); + } + + private static void assertRelationshipEntryWithProperties( + List propertyTypes, + Stream relationshipStream, + LongUnaryOperator actualOperator, + LongUnaryOperator expectedOperator + ) { + assertThat(propertyTypes).isEqualTo(List.of(ValueType.DOUBLE, ValueType.DOUBLE_ARRAY)); + assertThat(actualOperator).isEqualTo(expectedOperator); + + var relationshipIterator = relationshipStream.iterator(); + + assertThat(relationshipIterator).hasNext(); + var firstRelationship = relationshipIterator.next(); + assertThat(firstRelationship.sourceNode()).isEqualTo(0L); + assertThat(firstRelationship.targetNode()).isEqualTo(1L); + assertThat(firstRelationship.values()).containsExactly(Values.doubleValue(42.0), Values.doubleArray(new double[]{ 43.0, 44.0 })); + + assertThat(relationshipIterator).hasNext(); + var secondRelationship = relationshipIterator.next(); + assertThat(secondRelationship.sourceNode()).isEqualTo(1L); + assertThat(secondRelationship.targetNode()).isEqualTo(2L); + assertThat(secondRelationship.values()).containsExactly(Values.doubleValue(45.0), Values.doubleArray(new double[]{ 46.0, 47.0 })); + + assertThat(relationshipIterator).isExhausted(); } @Test @@ -77,13 +111,47 @@ void shouldWriteRelationshipStreamWithoutPropertiesToResultStore() { List.of(), List.of() ); + var relationshipStreamEntry = resultStore.getRelationshipStream("REL", List.of()); + assertThat(relationshipStreamEntry).isNotNull(); + assertRelationshipEntryWithoutProperties( + relationshipStreamEntry.propertyTypes(), + relationshipStreamEntry.relationshipStream(), + relationshipStreamEntry.toOriginalId(), + mappingOperator + ); var entry = resultStore.get(jobId); assertThat(entry).isInstanceOf(ResultStoreEntry.RelationshipStream.class); var jobIdRelationshipStreamEntry = (ResultStoreEntry.RelationshipStream) entry; assertThat(jobIdRelationshipStreamEntry.relationshipType()).isEqualTo("REL"); assertThat(jobIdRelationshipStreamEntry.propertyKeys()).isEmpty(); - assertThat(jobIdRelationshipStreamEntry.propertyTypes()).isEmpty(); - assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStream); + assertThat(jobIdRelationshipStreamEntry.propertyTypes()).isEqualTo(relationshipStreamEntry.propertyTypes()); + assertThat(jobIdRelationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStreamEntry.relationshipStream()); + } + + private static void assertRelationshipEntryWithoutProperties( + List propertyTypes, + Stream relationshipStream, + LongUnaryOperator actualOperator, + LongUnaryOperator expectedOperator + ) { + assertThat(propertyTypes).isEqualTo(List.of()); + assertThat(actualOperator).isEqualTo(expectedOperator); + + var relationshipIterator = relationshipStream.iterator(); + + assertThat(relationshipIterator).hasNext(); + var firstRelationship = relationshipIterator.next(); + assertThat(firstRelationship.sourceNode()).isEqualTo(0L); + assertThat(firstRelationship.targetNode()).isEqualTo(1L); + assertThat(firstRelationship.values()).isEmpty(); + + assertThat(relationshipIterator).hasNext(); + var secondRelationship = relationshipIterator.next(); + assertThat(secondRelationship.sourceNode()).isEqualTo(1L); + assertThat(secondRelationship.targetNode()).isEqualTo(2L); + assertThat(secondRelationship.values()).isEmpty(); + + assertThat(relationshipIterator).isExhausted(); } } diff --git a/core/src/main/java/org/neo4j/gds/api/EmptyResultStore.java b/core/src/main/java/org/neo4j/gds/api/EmptyResultStore.java index 02b0601e3a..a1943dd41f 100644 --- a/core/src/main/java/org/neo4j/gds/api/EmptyResultStore.java +++ b/core/src/main/java/org/neo4j/gds/api/EmptyResultStore.java @@ -19,8 +19,14 @@ */ package org.neo4j.gds.api; +import org.neo4j.gds.api.nodeproperties.ValueType; +import org.neo4j.gds.api.properties.nodes.NodePropertyValues; import org.neo4j.gds.core.utils.progress.JobId; +import java.util.List; +import java.util.function.LongUnaryOperator; +import java.util.stream.Stream; + public class EmptyResultStore implements ResultStore { @Override @@ -42,4 +48,135 @@ public boolean hasEntry(JobId jobId) { public void remove(JobId jobId) { } + + @Override + public void addNodePropertyValues(List nodeLabels, String propertyKey, NodePropertyValues propertyValues) { + + } + + @Override + public NodePropertyValues getNodePropertyValues(List nodeLabels, String propertyKey) { + return null; + } + + @Override + public void removeNodePropertyValues(List nodeLabels, String propertyKey) { + + } + + @Override + public void addNodeLabel(String nodeLabel, long nodeCount, LongUnaryOperator toOriginalId) { + + } + + @Override + public boolean hasNodeLabel(String nodeLabel) { + return false; + } + + @Override + public NodeLabelEntry getNodeIdsByLabel(String nodeLabel) { + return null; + } + + @Override + public void removeNodeLabel(String nodeLabel) { + + } + + @Override + public void addRelationship(String relationshipType, Graph graph, LongUnaryOperator toOriginalId) { + + } + + @Override + public void addRelationship( + String relationshipType, + String propertyKey, + Graph graph, + LongUnaryOperator toOriginalId + ) { + + } + + @Override + public RelationshipEntry getRelationship(String relationshipType) { + return null; + } + + @Override + public RelationshipEntry getRelationship(String relationshipType, String propertyKey) { + return null; + } + + @Override + public void removeRelationship(String relationshipType) { + + } + + @Override + public void removeRelationship(String relationshipType, String propertyKey) { + + } + + @Override + public void addRelationshipStream( + String relationshipType, + List propertyKeys, + List propertyTypes, + Stream relationshipStream, + LongUnaryOperator toOriginalId + ) { + + } + + @Override + public RelationshipStreamEntry getRelationshipStream(String relationshipType, List propertyKeys) { + return null; + } + + @Override + public void removeRelationshipStream(String relationshipType, List propertyKeys) { + + } + + @Override + public void addRelationshipIterator( + String relationshipType, + List propertyKeys, + CompositeRelationshipIterator relationshipIterator, + LongUnaryOperator toOriginalId + ) { + + } + + @Override + public RelationshipIteratorEntry getRelationshipIterator(String relationshipType, List propertyKeys) { + return null; + } + + @Override + public void removeRelationshipIterator(String relationshipType, List propertyKeys) { + + } + + @Override + public boolean hasRelationshipIterator(String relationshipType, List propertyKeys) { + return false; + } + + @Override + public boolean hasRelationship(String relationshipType) { + return false; + } + + @Override + public boolean hasRelationship(String relationshipType, List propertyKeys) { + return false; + } + + @Override + public boolean hasRelationshipStream(String relationshipType, List propertyKeys) { + return false; + } } diff --git a/core/src/main/java/org/neo4j/gds/api/EphemeralResultStore.java b/core/src/main/java/org/neo4j/gds/api/EphemeralResultStore.java index 56fefdd29c..421243c952 100644 --- a/core/src/main/java/org/neo4j/gds/api/EphemeralResultStore.java +++ b/core/src/main/java/org/neo4j/gds/api/EphemeralResultStore.java @@ -24,22 +24,40 @@ import com.github.benmanes.caffeine.cache.Scheduler; import com.github.benmanes.caffeine.cache.Ticker; import org.jetbrains.annotations.Nullable; +import org.neo4j.gds.api.nodeproperties.ValueType; +import org.neo4j.gds.api.properties.nodes.NodePropertyValues; import org.neo4j.gds.core.concurrency.ExecutorServiceUtil; import org.neo4j.gds.core.utils.ClockService; import org.neo4j.gds.core.utils.progress.JobId; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.Collection; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.LongUnaryOperator; +import java.util.stream.Stream; public class EphemeralResultStore implements ResultStore { + private static final String NO_PROPERTY_KEY = ""; + private static final List NO_PROPERTIES_LIST = List.of(NO_PROPERTY_KEY); static final Duration CACHE_EVICTION_DURATION = Duration.of(10, ChronoUnit.MINUTES); + private final Cache nodeProperties; + private final Cache nodeIdsByLabel; + private final Cache relationships; + private final Cache relationshipStreams; + private final Cache relationshipIterators; private final Cache resultEntries; public EphemeralResultStore() { var singleThreadScheduler = ExecutorServiceUtil.createSingleThreadScheduler("GDS-ResultStore"); + this.nodeProperties = createCache(singleThreadScheduler); + this.nodeIdsByLabel = createCache(singleThreadScheduler); + this.relationships = createCache(singleThreadScheduler); + this.relationshipStreams = createCache(singleThreadScheduler); + this.relationshipIterators = createCache(singleThreadScheduler); this.resultEntries = createCache(singleThreadScheduler); } @@ -64,6 +82,143 @@ public void remove(JobId jobId) { this.resultEntries.invalidate(jobId); } + @Override + public void addNodePropertyValues(List nodeLabels, String propertyKey, NodePropertyValues propertyValues) { + this.nodeProperties.put(new NodeKey(nodeLabels, propertyKey), propertyValues); + } + + @Override + public NodePropertyValues getNodePropertyValues(List nodeLabels, String propertyKey) { + return this.nodeProperties.getIfPresent(new NodeKey(nodeLabels, propertyKey)); + } + + @Override + public void removeNodePropertyValues(List nodeLabels, String propertyKey) { + this.nodeProperties.invalidate(new NodeKey(nodeLabels, propertyKey)); + } + + @Override + public void addNodeLabel(String nodeLabel, long nodeCount, LongUnaryOperator toOriginalId) { + this.nodeIdsByLabel.put(nodeLabel, new NodeLabelEntry(nodeCount, toOriginalId)); + } + + @Override + public boolean hasNodeLabel(String nodeLabel) { + return this.nodeIdsByLabel.getIfPresent(nodeLabel) != null; + } + + @Override + public NodeLabelEntry getNodeIdsByLabel(String nodeLabel) { + return this.nodeIdsByLabel.getIfPresent(nodeLabel); + } + + @Override + public void removeNodeLabel(String nodeLabel) { + this.nodeIdsByLabel.invalidate(nodeLabel); + } + + @Override + public void addRelationship(String relationshipType, Graph graph, LongUnaryOperator toOriginalId) { + addRelationship(relationshipType, NO_PROPERTY_KEY, graph, toOriginalId); + } + + @Override + public void addRelationship( + String relationshipType, + String propertyKey, + Graph graph, + LongUnaryOperator toOriginalId + ) { + this.relationships.put(new RelationshipKey(relationshipType, List.of(propertyKey)), new RelationshipEntry(graph, toOriginalId)); + } + + @Override + public void addRelationshipStream( + String relationshipType, + List propertyKeys, + List propertyTypes, + Stream relationshipStream, + LongUnaryOperator toOriginalId + ) { + this.relationshipStreams.put( + new RelationshipKey(relationshipType, propertyKeys), + new RelationshipStreamEntry(relationshipStream, propertyTypes, toOriginalId) + ); + } + + @Override + public RelationshipStreamEntry getRelationshipStream(String relationshipType, List propertyKeys) { + return this.relationshipStreams.getIfPresent(new RelationshipKey(relationshipType, propertyKeys)); + } + + @Override + public void removeRelationshipStream(String relationshipType, List propertyKeys) { + this.relationshipStreams.invalidate(new RelationshipKey(relationshipType, propertyKeys)); + } + + @Override + public RelationshipEntry getRelationship(String relationshipType) { + return getRelationship(relationshipType, NO_PROPERTY_KEY); + } + + @Override + public RelationshipEntry getRelationship(String relationshipType, String propertyKey) { + return this.relationships.getIfPresent(new RelationshipKey(relationshipType, List.of(propertyKey))); + } + + @Override + public void removeRelationship(String relationshipType) { + removeRelationship(relationshipType, NO_PROPERTY_KEY); + } + + @Override + public void removeRelationship(String relationshipType, String propertyKey) { + this.relationships.invalidate(new RelationshipKey(relationshipType, List.of(propertyKey))); + } + + @Override + public void addRelationshipIterator( + String relationshipType, + List propertyKeys, + CompositeRelationshipIterator relationshipIterator, + LongUnaryOperator toOriginalId + ) { + this.relationshipIterators.put( + new RelationshipKey(relationshipType, propertyKeys), + new RelationshipIteratorEntry(relationshipIterator, toOriginalId) + ); + } + + @Override + public RelationshipIteratorEntry getRelationshipIterator(String relationshipType, List propertyKeys) { + return this.relationshipIterators.getIfPresent(new RelationshipKey(relationshipType, propertyKeys)); + } + + @Override + public void removeRelationshipIterator(String relationshipType, List propertyKeys) { + this.relationshipIterators.invalidate(new RelationshipKey(relationshipType, propertyKeys)); + } + + @Override + public boolean hasRelationship(String relationshipType) { + return hasRelationship(relationshipType, NO_PROPERTIES_LIST); + } + + @Override + public boolean hasRelationship(String relationshipType, List propertyKeys) { + return this.relationships.getIfPresent(new RelationshipKey(relationshipType, propertyKeys)) != null; + } + + @Override + public boolean hasRelationshipStream(String relationshipType, List propertyKeys) { + return this.relationshipStreams.getIfPresent(new RelationshipKey(relationshipType, propertyKeys)) != null; + } + + @Override + public boolean hasRelationshipIterator(String relationshipType, List propertyKeys) { + return this.relationshipIterators.getIfPresent(new RelationshipKey(relationshipType, propertyKeys)) != null; + } + private static Cache createCache(ScheduledExecutorService singleThreadScheduler) { return Caffeine.newBuilder() .expireAfterAccess(CACHE_EVICTION_DURATION) @@ -77,5 +232,10 @@ private static class ClockServiceWrappingTicker implements Ticker { public long read() { return ClockService.clock().millis() * 1000000; } + } + + private record NodeKey(List nodeLabels, String propertyKey) {} + + private record RelationshipKey(String relationshipType, Collection propertyKeys) {} } diff --git a/core/src/main/java/org/neo4j/gds/api/ResultStore.java b/core/src/main/java/org/neo4j/gds/api/ResultStore.java index fc3343e41d..a12e77ac7f 100644 --- a/core/src/main/java/org/neo4j/gds/api/ResultStore.java +++ b/core/src/main/java/org/neo4j/gds/api/ResultStore.java @@ -19,8 +19,14 @@ */ package org.neo4j.gds.api; +import org.neo4j.gds.api.nodeproperties.ValueType; +import org.neo4j.gds.api.properties.nodes.NodePropertyValues; import org.neo4j.gds.core.utils.progress.JobId; +import java.util.List; +import java.util.function.LongUnaryOperator; +import java.util.stream.Stream; + /** * A store for write results that are not immediately persisted in the database. * This is mainly used for the session architecture, where algorithms results are first @@ -49,5 +55,160 @@ public interface ResultStore { */ void remove(JobId jobId); + /** + * Stores node property values under the given property key. + */ + void addNodePropertyValues(List nodeLabels, String propertyKey, NodePropertyValues propertyValues); + + /** + * Retrieves node property values from this store based on the property key. + */ + NodePropertyValues getNodePropertyValues(List nodeLabels, String propertyKey); + + /** + * Removes node property values from this store based on the property key. + */ + void removeNodePropertyValues(List nodeLabels, String propertyKey); + + /** + * Stores node id information for the given label in this store. + */ + void addNodeLabel(String nodeLabel, long nodeCount, LongUnaryOperator toOriginalId); + + /** + * Checks if this store has node id information for the given label. + */ + boolean hasNodeLabel(String nodeLabel); + + /** + * Retrieves node id information for the given label. + */ + NodeLabelEntry getNodeIdsByLabel(String nodeLabel); + + /** + * Removes node id information for the given label from this store. + */ + void removeNodeLabel(String nodeLabel); + + /** + * Stores a relationship information in this store, held by the given graph. + * + * @param toOriginalId a mapping function to for the relationship source and target ids + */ + void addRelationship(String relationshipType, Graph graph, LongUnaryOperator toOriginalId); + + /** + * Stores a relationship information in this store, held by the given graph. + * + * @param propertyKey the property key for the relationship + * @param toOriginalId a mapping function to for the relationship source and target ids + */ + void addRelationship(String relationshipType, String propertyKey, Graph graph, LongUnaryOperator toOriginalId); + + /** + * Retrieves relationship information from this store based on the relationship type. + */ + RelationshipEntry getRelationship(String relationshipType); + + /** + * Retrieves relationship information from this store based on the relationship type and property key. + */ + RelationshipEntry getRelationship(String relationshipType, String propertyKey); + + /** + * Removes relationship information from this store based on the relationship type. + */ + void removeRelationship(String relationshipType); + + /** + * Removes relationship information from this store based on the relationship type and property key. + */ + void removeRelationship(String relationshipType, String propertyKey); + + /** + * Stores a stream of relationships in this store. + * + * @param propertyKeys the property keys for the relationships + * @param propertyTypes the property types for the relationship properties, expected to match the order of the property keys + * @param toOriginalId a mapping function to for the relationship source and target ids + */ + void addRelationshipStream( + String relationshipType, + List propertyKeys, + List propertyTypes, + Stream relationshipStream, + LongUnaryOperator toOriginalId + ); + + /** + * Retrieves a stream of relationships from this store based on the relationship type and property keys. + */ + RelationshipStreamEntry getRelationshipStream(String relationshipType, List propertyKeys); + + /** + * Removes a stream of relationships from this store based on the relationship type and property keys. + */ + void removeRelationshipStream(String relationshipType, List propertyKeys); + + /** + * Stores a composite relationship iterator in this store. + * Composite relationship iterators store multiple properties + * for a given adjacency cursor. + */ + void addRelationshipIterator( + String relationshipType, + List propertyKeys, + CompositeRelationshipIterator relationshipIterator, + LongUnaryOperator toOriginalId + ); + + /** + * Retrieves a relationship iterator representing the given relationship type and property keys. + */ + RelationshipIteratorEntry getRelationshipIterator( + String relationshipType, + List propertyKeys + ); + + /** + * Removes a relationship iterator from this store based on the relationship type and property keys. + */ + void removeRelationshipIterator( + String relationshipType, + List propertyKeys + ); + + /** + * Checks if this store has a relationship of the given type. + * Does not include relationship streams. + */ + boolean hasRelationship(String relationshipType); + + /** + * Checks if this store has a relationship of the given type and property keys. + * Does not include other kinds of stored relationships. + */ + boolean hasRelationship(String relationshipType, List propertyKeys); + + /** + * Checks if this store has a relationship stream of the given type and properties. + * Does not include other kinds of stored relationships. + */ + boolean hasRelationshipStream(String relationshipType, List propertyKeys); + + /** + * Checks if this store has a relationship iterator of the given type and properties. + * Does not include other kinds of stored relationships. + */ + boolean hasRelationshipIterator(String relationshipType, List propertyKeys); + + record NodeLabelEntry(long nodeCount, LongUnaryOperator toOriginalId) {} + + record RelationshipEntry(Graph graph, LongUnaryOperator toOriginalId) {} + + record RelationshipStreamEntry(Stream relationshipStream, List propertyTypes, LongUnaryOperator toOriginalId) {} + + record RelationshipIteratorEntry(CompositeRelationshipIterator relationshipIterator, LongUnaryOperator toOriginalId) {} + ResultStore EMPTY = new EmptyResultStore(); } diff --git a/core/src/test/java/org/neo4j/gds/api/EphemeralResultStoreTest.java b/core/src/test/java/org/neo4j/gds/api/EphemeralResultStoreTest.java index f3d4935b90..35db11325f 100644 --- a/core/src/test/java/org/neo4j/gds/api/EphemeralResultStoreTest.java +++ b/core/src/test/java/org/neo4j/gds/api/EphemeralResultStoreTest.java @@ -20,6 +20,7 @@ package org.neo4j.gds.api; import org.junit.jupiter.api.Test; +import org.neo4j.gds.api.nodeproperties.ValueType; import org.neo4j.gds.api.properties.nodes.NodePropertyValues; import org.neo4j.gds.core.utils.progress.JobId; import org.neo4j.gds.extension.FakeClockExtension; @@ -28,6 +29,7 @@ import java.util.List; import java.util.function.LongUnaryOperator; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -120,4 +122,302 @@ void shouldEvictEntryAfter10Minutes() throws InterruptedException { assertThat(resultStore.hasEntry(JOB_ID)).isFalse(); } + + @Test + void shouldStoreNodeProperty() { + var resultStore = new EphemeralResultStore(); + + var propertyValues = mock(NodePropertyValues.class); + resultStore.addNodePropertyValues(List.of("A", "B"), "foo", propertyValues); + + assertThat(resultStore.getNodePropertyValues(List.of("A", "B"), "foo")).isEqualTo(propertyValues); + } + + @Test + void shouldNotResolveNodePropertiesWhenLabelsDoNotMatch() { + var resultStore = new EphemeralResultStore(); + + var propertyValues = mock(NodePropertyValues.class); + resultStore.addNodePropertyValues(List.of("A"), "foo", propertyValues); + + assertThat(resultStore.getNodePropertyValues(List.of("B"), "foo")).isNull(); + } + + @Test + void shouldRemoveNodeProperty() { + var resultStore = new EphemeralResultStore(); + + var propertyValues = mock(NodePropertyValues.class); + resultStore.addNodePropertyValues(List.of("A", "B"), "foo", propertyValues); + + assertThat(resultStore.getNodePropertyValues(List.of("A", "B"), "foo")).isNotNull(); + + resultStore.removeNodePropertyValues(List.of("A", "B"), "foo"); + + assertThat(resultStore.getNodePropertyValues(List.of("A", "B"), "foo")).isNull(); + } + + @Test + void shouldEvictNodePropertyEntryAfter10Minutes() throws InterruptedException { + var resultStore = new EphemeralResultStore(); + + var propertyValues = mock(NodePropertyValues.class); + resultStore.addNodePropertyValues(List.of("A"), "prop", propertyValues); + + assertThat(resultStore.getNodePropertyValues(List.of("A"), "prop")).isNotNull(); + + clock.forward(EphemeralResultStore.CACHE_EVICTION_DURATION.plusMinutes(1)); + // make some room for the cache eviction thread to trigger a cleanup + Thread.sleep(100); + + assertThat(resultStore.getNodePropertyValues(List.of("A"), "prop")).isNull(); + } + + @Test + void shouldStoreNodeLabel() { + var resultStore = new EphemeralResultStore(); + + var nodeCount = 1337L; + var toOriginalId = mock(LongUnaryOperator.class); + resultStore.addNodeLabel("Label", nodeCount, toOriginalId); + + var nodeLabelEntry = resultStore.getNodeIdsByLabel("Label"); + assertThat(nodeLabelEntry.nodeCount()).isEqualTo(1337L); + assertThat(nodeLabelEntry.toOriginalId()).isEqualTo(toOriginalId); + + assertThat(resultStore.hasNodeLabel("Label")).isTrue(); + } + + @Test + void shouldRemoveNodeLabel() { + var resultStore = new EphemeralResultStore(); + + var nodeCount = 1337L; + var toOriginalId = mock(LongUnaryOperator.class); + resultStore.addNodeLabel("Label", nodeCount, toOriginalId); + + assertThat(resultStore.hasNodeLabel("Label")).isTrue(); + + resultStore.removeNodeLabel("Label"); + + assertThat(resultStore.hasNodeLabel("Label")).isFalse(); + } + + @Test + void shouldEvictNodeLabelEntryAfter10Minutes() throws InterruptedException { + var resultStore = new EphemeralResultStore(); + + var nodeCount = 1337L; + var toOriginalId = mock(LongUnaryOperator.class); + resultStore.addNodeLabel("Label", nodeCount, toOriginalId); + + assertThat(resultStore.hasNodeLabel("Label")).isTrue(); + + clock.forward(EphemeralResultStore.CACHE_EVICTION_DURATION.plusMinutes(1)); + // make some room for the cache eviction thread to trigger a cleanup + Thread.sleep(100); + + assertThat(resultStore.hasNodeLabel("Label")).isFalse(); + } + + @Test + void shouldStoreGraphBasedRelationshipsWithoutProperty() { + var resultStore = new EphemeralResultStore(); + + var graph = mock(Graph.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationship("Type", graph, toOriginalId); + + var relationshipEntry = resultStore.getRelationship("Type"); + assertThat(relationshipEntry.graph()).isEqualTo(graph); + assertThat(relationshipEntry.toOriginalId()).isEqualTo(toOriginalId); + + assertThat(resultStore.hasRelationship("Type")).isTrue(); + } + + @Test + void shouldRemoveGraphBasedRelationshipsWithoutProperty() { + var resultStore = new EphemeralResultStore(); + + var graph = mock(Graph.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationship("Type", graph, toOriginalId); + + assertThat(resultStore.hasRelationship("Type")).isTrue(); + + resultStore.removeRelationship("Type"); + + assertThat(resultStore.hasRelationship("Type")).isFalse(); + } + + @Test + void shouldEvictGraphBasedRelationshipsWithoutPropertyAfter10Minutes() throws InterruptedException { + var resultStore = new EphemeralResultStore(); + + var graph = mock(Graph.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationship("Type", graph, toOriginalId); + + assertThat(resultStore.hasRelationship("Type")).isTrue(); + + clock.forward(EphemeralResultStore.CACHE_EVICTION_DURATION.plusMinutes(1)); + // make some room for the cache eviction thread to trigger a cleanup + Thread.sleep(100); + + assertThat(resultStore.hasRelationship("Type")).isFalse(); + } + + @Test + void shouldStoreGraphBasedRelationshipsWithProperty() { + var resultStore = new EphemeralResultStore(); + + var graph = mock(Graph.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationship("Type", "prop", graph, toOriginalId); + + var relationshipEntry = resultStore.getRelationship("Type", "prop"); + assertThat(relationshipEntry.graph()).isEqualTo(graph); + assertThat(relationshipEntry.toOriginalId()).isEqualTo(toOriginalId); + + assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isTrue(); + } + + @Test + void shouldRemoveGraphBasedRelationshipsWithProperty() { + var resultStore = new EphemeralResultStore(); + + var graph = mock(Graph.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationship("Type", "prop", graph, toOriginalId); + + assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isTrue(); + + resultStore.removeRelationship("Type", "prop"); + + assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isFalse(); + } + + @Test + void shouldEvictGraphBasedRelationshipsWithPropertyAfter10Minutes() throws InterruptedException { + var resultStore = new EphemeralResultStore(); + + var graph = mock(Graph.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationship("Type", "prop", graph, toOriginalId); + + assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isTrue(); + + + clock.forward(EphemeralResultStore.CACHE_EVICTION_DURATION.plusMinutes(1)); + // make some room for the cache eviction thread to trigger a cleanup + Thread.sleep(100); + + assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isFalse(); + } + + @Test + void shouldStoreStreamBasedRelationships() { + var resultStore = new EphemeralResultStore(); + + var relationshipStream = mock(Stream.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationshipStream("Type", List.of("foo"), List.of(ValueType.DOUBLE), relationshipStream, toOriginalId); + + var relationshipStreamEntry = resultStore.getRelationshipStream("Type", List.of("foo")); + assertThat(relationshipStreamEntry.relationshipStream()).isEqualTo(relationshipStream); + assertThat(relationshipStreamEntry.toOriginalId()).isEqualTo(toOriginalId); + + assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isTrue(); + assertThat(resultStore.hasRelationshipStream("Type", List.of())).isFalse(); + } + + @Test + void shouldRemoveStreamBasedRelationships() { + var resultStore = new EphemeralResultStore(); + + var relationshipStream = mock(Stream.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationshipStream("Type", List.of("foo"), List.of(ValueType.DOUBLE), relationshipStream, toOriginalId); + + assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isTrue(); + + resultStore.removeRelationshipStream("Type", List.of("foo")); + + assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isFalse(); + } + + @Test + void shouldEvictStreamBasedRelationshipsAfter10Minutes() throws InterruptedException { + var resultStore = new EphemeralResultStore(); + + var relationshipStream = mock(Stream.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationshipStream("Type", List.of("foo"), List.of(ValueType.DOUBLE), relationshipStream, toOriginalId); + + assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isTrue(); + + clock.forward(EphemeralResultStore.CACHE_EVICTION_DURATION.plusMinutes(1)); + // make some room for the cache eviction thread to trigger a cleanup + Thread.sleep(100); + + assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isFalse(); + } + + @Test + void shouldStoreRelationshipIterator() { + var resultStore = new EphemeralResultStore(); + + var relationshipIterator = mock(CompositeRelationshipIterator.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationshipIterator("Type", List.of("foo", "bar"), relationshipIterator, toOriginalId); + + var relationshipIteratorEntry = resultStore.getRelationshipIterator("Type", List.of("foo", "bar")); + assertThat(relationshipIteratorEntry.relationshipIterator()).isEqualTo(relationshipIterator); + assertThat(relationshipIteratorEntry.toOriginalId()).isEqualTo(toOriginalId); + assertThat(resultStore.hasRelationshipIterator("Type", List.of("foo", "bar"))).isTrue(); + } + + @Test + void shouldRemoveRelationshipIterator() { + var resultStore = new EphemeralResultStore(); + + var relationshipIterator = mock(CompositeRelationshipIterator.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationshipIterator("Type", List.of("foo", "bar"), relationshipIterator, toOriginalId); + + assertThat(resultStore.hasRelationshipIterator("Type", List.of("foo", "bar"))).isTrue(); + + resultStore.removeRelationshipIterator("Type", List.of("foo", "bar")); + + assertThat(resultStore.hasRelationshipIterator("Type", List.of("foo", "bar"))).isFalse(); + } + + @Test + void shouldEvictIteratorBasedRelationshipsAfter10Minutes() throws InterruptedException { + var resultStore = new EphemeralResultStore(); + + var relationshipIterator = mock(CompositeRelationshipIterator.class); + var toOriginalId = mock(LongUnaryOperator.class); + + resultStore.addRelationshipIterator("Type", List.of("foo"), relationshipIterator, toOriginalId); + + assertThat(resultStore.hasRelationshipIterator("Type", List.of("foo"))).isTrue(); + + clock.forward(EphemeralResultStore.CACHE_EVICTION_DURATION.plusMinutes(1)); + // make some room for the cache eviction thread to trigger a cleanup + Thread.sleep(100); + + assertThat(resultStore.hasRelationshipIterator("Type", List.of("foo"))).isFalse(); + } }