Skip to content

Commit

Permalink
Use caches instead of hash maps
Browse files Browse the repository at this point in the history
Also test that cache eviction works correctly
  • Loading branch information
soerenreichardt committed Apr 24, 2024
1 parent bc0106b commit df4ebed
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 21 deletions.
65 changes: 44 additions & 21 deletions core/src/main/java/org/neo4j/gds/api/EphemeralResultStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@
*/
package org.neo4j.gds.api;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Ticker;
import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.api.properties.nodes.NodePropertyValues;
import org.neo4j.gds.core.concurrency.ExecutorServiceUtil;
import org.neo4j.gds.core.utils.ClockService;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream;

Expand All @@ -34,16 +40,17 @@ public class EphemeralResultStore implements ResultStore {
private static final String NO_PROPERTY_KEY = "";
private static final List<String> NO_PROPERTIES_LIST = List.of(NO_PROPERTY_KEY);

private final Map<NodeKey, NodePropertyValues> nodeProperties;
private final Map<String, NodeLabelEntry> nodeIdsByLabel;
private final Map<RelationshipKey, RelationshipEntry> relationships;
private final Map<RelationshipKey, RelationshipStreamEntry> relationshipStreams;
private final Cache<NodeKey, NodePropertyValues> nodeProperties;
private final Cache<String, NodeLabelEntry> nodeIdsByLabel;
private final Cache<RelationshipKey, RelationshipEntry> relationships;
private final Cache<RelationshipKey, RelationshipStreamEntry> relationshipStreams;

public EphemeralResultStore() {
this.nodeProperties = new HashMap<>();
this.nodeIdsByLabel = new HashMap<>();
this.relationships = new HashMap<>();
this.relationshipStreams = new HashMap<>();
var singleThreadScheduler = ExecutorServiceUtil.createSingleThreadScheduler("GDS-ResultStore");
this.nodeProperties = createCache(singleThreadScheduler);
this.nodeIdsByLabel = createCache(singleThreadScheduler);
this.relationships = createCache(singleThreadScheduler);
this.relationshipStreams = createCache(singleThreadScheduler);
}

@Override
Expand All @@ -53,12 +60,12 @@ public void addNodePropertyValues(List<String> nodeLabels, String propertyKey, N

@Override
public NodePropertyValues getNodePropertyValues(List<String> nodeLabels, String propertyKey) {
return this.nodeProperties.get(new NodeKey(nodeLabels, propertyKey));
return this.nodeProperties.getIfPresent(new NodeKey(nodeLabels, propertyKey));
}

@Override
public void removeNodePropertyValues(List<String> nodeLabels, String propertyKey) {
this.nodeProperties.remove(new NodeKey(nodeLabels, propertyKey));
this.nodeProperties.invalidate(new NodeKey(nodeLabels, propertyKey));
}

@Override
Expand All @@ -68,17 +75,17 @@ public void addNodeLabel(String nodeLabel, long nodeCount, LongUnaryOperator toO

@Override
public boolean hasNodeLabel(String nodeLabel) {
return this.nodeIdsByLabel.containsKey(nodeLabel);
return this.nodeIdsByLabel.getIfPresent(nodeLabel) != null;
}

@Override
public NodeLabelEntry getNodeIdsByLabel(String nodeLabel) {
return this.nodeIdsByLabel.get(nodeLabel);
return this.nodeIdsByLabel.getIfPresent(nodeLabel);
}

@Override
public void removeNodeLabel(String nodeLabel) {
this.nodeIdsByLabel.remove(nodeLabel);
this.nodeIdsByLabel.invalidate(nodeLabel);
}

@Override
Expand Down Expand Up @@ -112,12 +119,12 @@ public void addRelationshipStream(

@Override
public RelationshipStreamEntry getRelationshipStream(String relationshipType, List<String> propertyKeys) {
return this.relationshipStreams.get(new RelationshipKey(relationshipType, propertyKeys));
return this.relationshipStreams.getIfPresent(new RelationshipKey(relationshipType, propertyKeys));
}

@Override
public void removeRelationshipStream(String relationshipType, List<String> propertyKeys) {
this.relationshipStreams.remove(new RelationshipKey(relationshipType, propertyKeys));
this.relationshipStreams.invalidate(new RelationshipKey(relationshipType, propertyKeys));
}

@Override
Expand All @@ -127,7 +134,7 @@ public RelationshipEntry getRelationship(String relationshipType) {

@Override
public RelationshipEntry getRelationship(String relationshipType, String propertyKey) {
return this.relationships.get(new RelationshipKey(relationshipType, List.of(propertyKey)));
return this.relationships.getIfPresent(new RelationshipKey(relationshipType, List.of(propertyKey)));
}

@Override
Expand All @@ -137,7 +144,7 @@ public void removeRelationship(String relationshipType) {

@Override
public void removeRelationship(String relationshipType, String propertyKey) {
this.relationships.remove(new RelationshipKey(relationshipType, List.of(propertyKey)));
this.relationships.invalidate(new RelationshipKey(relationshipType, List.of(propertyKey)));
}

@Override
Expand All @@ -147,12 +154,28 @@ public boolean hasRelationship(String relationshipType) {

@Override
public boolean hasRelationship(String relationshipType, List<String> propertyKeys) {
return this.relationships.containsKey(new RelationshipKey(relationshipType, propertyKeys));
return this.relationships.getIfPresent(new RelationshipKey(relationshipType, propertyKeys)) != null;
}

@Override
public boolean hasRelationshipStream(String relationshipType, List<String> propertyKeys) {
return this.relationshipStreams.containsKey(new RelationshipKey(relationshipType, propertyKeys));
return this.relationshipStreams.getIfPresent(new RelationshipKey(relationshipType, propertyKeys)) != null;
}

private static <K, V> Cache<K, V> createCache(ScheduledExecutorService singleThreadScheduler) {
return Caffeine.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.ticker(new ClockServiceWrappingTicker())
.executor(singleThreadScheduler)
.scheduler(Scheduler.forScheduledExecutorService(singleThreadScheduler))
.build();
}

private static class ClockServiceWrappingTicker implements Ticker {
@Override
public long read() {
return ClockService.clock().millis() * 1000000;
}
}

private record NodeKey(List<String> nodeLabels, String propertyKey) {}
Expand Down
96 changes: 96 additions & 0 deletions core/src/test/java/org/neo4j/gds/api/EphemeralResultStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,24 @@
import org.junit.jupiter.api.Test;
import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.api.properties.nodes.NodePropertyValues;
import org.neo4j.gds.extension.FakeClockExtension;
import org.neo4j.gds.extension.Inject;
import org.neo4j.time.FakeClock;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

@FakeClockExtension
class EphemeralResultStoreTest {

@Inject
FakeClock clock;

@Test
void shouldStoreNodeProperty() {
var resultStore = new EphemeralResultStore();
Expand Down Expand Up @@ -66,6 +74,22 @@ void shouldRemoveNodeProperty() {
assertThat(resultStore.getNodePropertyValues(List.of("A", "B"), "foo")).isNull();
}

@Test
void shouldEvictNodePropertyEntryAfter10Minutes() throws InterruptedException {
var resultStore = new EphemeralResultStore();

var propertyValues = mock(NodePropertyValues.class);
resultStore.addNodePropertyValues(List.of("A"), "prop", propertyValues);

assertThat(resultStore.getNodePropertyValues(List.of("A"), "prop")).isNotNull();

clock.forward(11, TimeUnit.MINUTES);
// make some room for the cache eviction thread to trigger a cleanup
Thread.sleep(100);

assertThat(resultStore.getNodePropertyValues(List.of("A"), "prop")).isNull();
}

@Test
void shouldStoreNodeLabel() {
var resultStore = new EphemeralResultStore();
Expand Down Expand Up @@ -96,6 +120,23 @@ void shouldRemoveNodeLabel() {
assertThat(resultStore.hasNodeLabel("Label")).isFalse();
}

@Test
void shouldEvictNodeLabelEntryAfter10Minutes() throws InterruptedException {
var resultStore = new EphemeralResultStore();

var nodeCount = 1337L;
var toOriginalId = mock(LongUnaryOperator.class);
resultStore.addNodeLabel("Label", nodeCount, toOriginalId);

assertThat(resultStore.hasNodeLabel("Label")).isTrue();

clock.forward(11, TimeUnit.MINUTES);
// make some room for the cache eviction thread to trigger a cleanup
Thread.sleep(100);

assertThat(resultStore.hasNodeLabel("Label")).isFalse();
}

@Test
void shouldStoreGraphBasedRelationshipsWithoutProperty() {
var resultStore = new EphemeralResultStore();
Expand Down Expand Up @@ -128,6 +169,24 @@ void shouldRemoveGraphBasedRelationshipsWithoutProperty() {
assertThat(resultStore.hasRelationship("Type")).isFalse();
}

@Test
void shouldEvictGraphBasedRelationshipsWithoutPropertyAfter10Minutes() throws InterruptedException {
var resultStore = new EphemeralResultStore();

var graph = mock(Graph.class);
var toOriginalId = mock(LongUnaryOperator.class);

resultStore.addRelationship("Type", graph, toOriginalId);

assertThat(resultStore.hasRelationship("Type")).isTrue();

clock.forward(11, TimeUnit.MINUTES);
// make some room for the cache eviction thread to trigger a cleanup
Thread.sleep(100);

assertThat(resultStore.hasRelationship("Type")).isFalse();
}

@Test
void shouldStoreGraphBasedRelationshipsWithProperty() {
var resultStore = new EphemeralResultStore();
Expand Down Expand Up @@ -160,6 +219,25 @@ void shouldRemoveGraphBasedRelationshipsWithProperty() {
assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isFalse();
}

@Test
void shouldEvictGraphBasedRelationshipsWithPropertyAfter10Minutes() throws InterruptedException {
var resultStore = new EphemeralResultStore();

var graph = mock(Graph.class);
var toOriginalId = mock(LongUnaryOperator.class);

resultStore.addRelationship("Type", "prop", graph, toOriginalId);

assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isTrue();


clock.forward(11, TimeUnit.MINUTES);
// make some room for the cache eviction thread to trigger a cleanup
Thread.sleep(100);

assertThat(resultStore.hasRelationship("Type", List.of("prop"))).isFalse();
}

@Test
void shouldStoreStreamBasedRelationships() {
var resultStore = new EphemeralResultStore();
Expand Down Expand Up @@ -192,4 +270,22 @@ void shouldRemoveStreamBasedRelationships() {

assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isFalse();
}

@Test
void shouldEvictStreamBasedRelationshipsAfter10Minutes() throws InterruptedException {
var resultStore = new EphemeralResultStore();

var relationshipStream = mock(Stream.class);
var toOriginalId = mock(LongUnaryOperator.class);

resultStore.addRelationshipStream("Type", List.of("foo"), List.of(ValueType.DOUBLE), relationshipStream, toOriginalId);

assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isTrue();

clock.forward(11, TimeUnit.MINUTES);
// make some room for the cache eviction thread to trigger a cleanup
Thread.sleep(100);

assertThat(resultStore.hasRelationshipStream("Type", List.of("foo"))).isFalse();
}
}

0 comments on commit df4ebed

Please sign in to comment.