Skip to content

Commit

Permalink
Pass dedicated executor service for Aura background tasks
Browse files Browse the repository at this point in the history
They should not be in the same queue as algorithms, but be able to run independently
  • Loading branch information
FlorentinD committed Sep 28, 2023
1 parent aab847a commit 0668c70
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.internal.helpers.NamedThreadFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -48,6 +49,18 @@ public static ExecutorService createSingleThreadPool(String threadPrefix) {
return Executors.newSingleThreadExecutor(NamedThreadFactory.daemon(threadPrefix));
}

public static ExecutorService createThreadPool(String threadPrefix, int corePoolSize, int maxPoolSize) {
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
30L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(corePoolSize * 50),
NamedThreadFactory.daemon(threadPrefix),
new ExecutorServiceUtil.CallerBlocksPolicy()
);
}

public static ForkJoinPool createForkJoinPool(int concurrency) {
return new ForkJoinPool(concurrency, FJ_WORKER_THREAD_FACTORY, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

public class GraphStoreToFileExporter extends GraphStoreExporter<GraphStoreToFileExporterConfig> {
Expand All @@ -66,6 +67,7 @@ public class GraphStoreToFileExporter extends GraphStoreExporter<GraphStoreToFil
private final TaskRegistryFactory taskRegistryFactory;
private final Log log;
private final String rootTaskName;
private ExecutorService executorService;

public GraphStoreToFileExporter(
GraphStore graphStore,
Expand All @@ -84,7 +86,8 @@ public GraphStoreToFileExporter(
VisitorProducer<GraphPropertyVisitor> graphPropertyVisitorSupplier,
TaskRegistryFactory taskRegistryFactory,
Log log,
String rootTaskName
String rootTaskName,
ExecutorService executorService
) {
super(graphStore, config, neoNodeProperties, nodeLabelMapping);
this.nodeVisitorSupplier = nodeVisitorSupplier;
Expand All @@ -100,6 +103,7 @@ public GraphStoreToFileExporter(
this.taskRegistryFactory = taskRegistryFactory;
this.log = log;
this.rootTaskName = rootTaskName;
this.executorService = executorService;
}

@Override
Expand Down Expand Up @@ -168,6 +172,7 @@ private void exportNodes(
RunWithConcurrency.builder()
.concurrency(config.writeConcurrency())
.tasks(tasks)
.executor(executorService)
.run();
progressTracker.endSubTask();
}
Expand All @@ -192,6 +197,7 @@ private void exportRelationships(
RunWithConcurrency.builder()
.concurrency(config.writeConcurrency())
.tasks(tasks)
.executor(executorService)
.mayInterruptIfRunning(false)
.run();
progressTracker.endSubTask();
Expand All @@ -218,6 +224,7 @@ private void exportGraphProperties(
RunWithConcurrency.builder()
.concurrency(config.writeConcurrency())
.tasks(tasks)
.executor(executorService)
.run();
progressTracker.endSubTask();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.neo4j.gds.core.io.GraphStoreExporter.DIRECTORY_IS_WRITABLE;
Expand All @@ -49,10 +50,11 @@ public static ExportToCsvResult export(
GraphStoreToFileExporterConfig config,
Optional<NeoNodeProperties> neoNodeProperties,
TaskRegistryFactory taskRegistryFactory,
Log log
Log log,
ExecutorService executorService
) {
try {
var exporter = GraphStoreToCsvExporter.create(graphStore, config, path, neoNodeProperties, taskRegistryFactory, log);
var exporter = GraphStoreToCsvExporter.create(graphStore, config, path, neoNodeProperties, taskRegistryFactory, log, executorService);

var start = System.nanoTime();
var exportedProperties = exporter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.gds.api.GraphStore;
import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.api.schema.MutableNodeSchema;
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.core.io.NeoNodeProperties;
import org.neo4j.gds.core.io.NodeLabelMapping;
import org.neo4j.gds.core.io.file.GraphStoreToFileExporter;
Expand All @@ -35,6 +36,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

public final class GraphStoreToCsvExporter {

Expand All @@ -44,7 +46,14 @@ public static GraphStoreToFileExporter create(
GraphStoreToFileExporterConfig config,
Path exportPath
) {
return create(graphStore, config, exportPath, Optional.empty(), TaskRegistryFactory.empty(), NullLog.getInstance());
return create(graphStore,
config,
exportPath,
Optional.empty(),
TaskRegistryFactory.empty(),
NullLog.getInstance(),
DefaultPool.INSTANCE
);
}

public static GraphStoreToFileExporter create(
Expand All @@ -53,7 +62,8 @@ public static GraphStoreToFileExporter create(
Path exportPath,
Optional<NeoNodeProperties> neoNodeProperties,
TaskRegistryFactory taskRegistryFactory,
Log log
Log log,
ExecutorService executorService
) {
Set<String> headerFiles = ConcurrentHashMap.newKeySet();

Expand All @@ -79,7 +89,7 @@ public static GraphStoreToFileExporter create(
config,
neoNodeProperties,
nodeLabelMapping,
() -> new UserInfoVisitor(exportPath),
() -> new UserInfoVisitor(exportPath),
() -> new CsvGraphInfoVisitor(exportPath),
() -> new CsvNodeSchemaVisitor(exportPath),
() -> new CsvNodeLabelMappingVisitor(exportPath),
Expand All @@ -102,7 +112,8 @@ public static GraphStoreToFileExporter create(
),
taskRegistryFactory,
log,
"Csv"
"Csv",
executorService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.gds.compat.GraphDatabaseApiProxy;
import org.neo4j.gds.core.CypherMapWrapper;
import org.neo4j.gds.core.GraphDimensions;
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.core.io.GraphStoreExporterBaseConfig;
import org.neo4j.gds.core.io.NeoNodeProperties;
import org.neo4j.gds.core.io.db.GraphStoreToDatabaseExporter;
Expand Down Expand Up @@ -160,7 +161,8 @@ public Stream<FileExportResult> csv(
exportConfig,
neoNodeProperties(exportConfig, graphStore),
executionContext().taskRegistryFactory(),
executionContext().log()
executionContext().log(),
DefaultPool.INSTANCE
);

return Stream.of(new FileExportResult(
Expand Down

0 comments on commit 0668c70

Please sign in to comment.