Skip to content

Commit

Permalink
moving SPSP Dijkstra write to being reusable
Browse files Browse the repository at this point in the history
  • Loading branch information
lassewesth committed Jan 3, 2024
1 parent c1b31ef commit be6d68e
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 43 deletions.
2 changes: 2 additions & 0 deletions applications/algorithms/path-finding/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ group = 'org.neo4j.gds'

dependencies {
compileOnly(group: 'org.neo4j', name: 'neo4j-logging', version: ver.'neo4j') { transitive = false }
compileOnly(group: 'org.neo4j', name: 'neo4j-values', version: ver.'neo4j') { transitive = false }

implementation project(':algo')
implementation project(':algo-common')
implementation project(':config-api')
implementation project(':core')
implementation project(':core-write')
implementation project(':logging')
implementation project(':memory-usage')
implementation project(':metrics-api')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.neo4j.gds.applications.algorithms.pathfinding;

import org.neo4j.gds.api.GraphName;
import org.neo4j.gds.core.utils.progress.TaskRegistryFactory;
import org.neo4j.gds.core.write.RelationshipStreamExporterBuilder;
import org.neo4j.gds.logging.Log;
import org.neo4j.gds.paths.astar.AStarMemoryEstimateDefinition;
import org.neo4j.gds.paths.astar.config.ShortestPathAStarStreamConfig;
import org.neo4j.gds.paths.dijkstra.DijkstraMemoryEstimateDefinition;
Expand All @@ -28,6 +31,8 @@
import org.neo4j.gds.paths.dijkstra.config.AllShortestPathsDijkstraStreamConfig;
import org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraMutateConfig;
import org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraStreamConfig;
import org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraWriteConfig;
import org.neo4j.gds.termination.TerminationFlag;

import java.util.Optional;

Expand All @@ -48,15 +53,29 @@
* But importantly, this is where we decide which, if any, mutate or write hooks need to be injected.
*/
public class PathFindingAlgorithmsFacade {
private final Log log;

private final AlgorithmProcessingTemplate algorithmProcessingTemplate;
private final RelationshipStreamExporterBuilder relationshipStreamExporterBuilder;
private final TaskRegistryFactory taskRegistryFactory;
private final TerminationFlag terminationFlag;

private final PathFindingAlgorithms pathFindingAlgorithms;

public PathFindingAlgorithmsFacade(
Log log,
AlgorithmProcessingTemplate algorithmProcessingTemplate,
RelationshipStreamExporterBuilder relationshipStreamExporterBuilder,
TaskRegistryFactory taskRegistryFactory,
TerminationFlag terminationFlag,
PathFindingAlgorithms pathFindingAlgorithms
) {
this.log = log;
this.algorithmProcessingTemplate = algorithmProcessingTemplate;
this.relationshipStreamExporterBuilder = relationshipStreamExporterBuilder;
this.terminationFlag = terminationFlag;
this.pathFindingAlgorithms = pathFindingAlgorithms;
this.taskRegistryFactory = taskRegistryFactory;
}

public <RESULT> RESULT singlePairShortestPathAStarStream(
Expand All @@ -80,13 +99,15 @@ public <RESULT> RESULT singlePairShortestPathDijkstraMutate(
ShortestPathDijkstraMutateConfig configuration,
ResultBuilder<PathFindingResult, RESULT> resultBuilder
) {
var mutateStep = new ShortestPathMutateStep(configuration);

return algorithmProcessingTemplate.processAlgorithm(
graphName,
configuration,
"Dijkstra",
() -> new DijkstraMemoryEstimateDefinition().memoryEstimation(configuration),
graph -> pathFindingAlgorithms.singlePairShortestPathDijkstra(graph, configuration),
Optional.of(new ShortestPathMutateStep(configuration)),
Optional.of(mutateStep),
resultBuilder
);
}
Expand All @@ -107,6 +128,30 @@ public <RESULT> RESULT singlePairShortestPathDijkstraStream(
);
}

public <RESULT> RESULT singlePairShortestPathDijkstraWrite(
GraphName graphName,
ShortestPathDijkstraWriteConfig configuration,
ResultBuilder<PathFindingResult, RESULT> resultBuilder
) {
var writeStep = new ShortestPathWriteStep<>(
log,
relationshipStreamExporterBuilder,
taskRegistryFactory,
terminationFlag,
configuration
);

return algorithmProcessingTemplate.processAlgorithm(
graphName,
configuration,
"Dijkstra",
() -> new DijkstraMemoryEstimateDefinition().memoryEstimation(configuration),
graph -> pathFindingAlgorithms.singlePairShortestPathDijkstra(graph, configuration),
Optional.of(writeStep),
resultBuilder
);
}

public <RESULT> RESULT singleSourceShortestPathDijkstraMutate(
GraphName graphName,
AllShortestPathsDijkstraMutateConfig configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public void withNodePropertiesWritten(long nodePropertiesWritten) {
this.nodePropertiesWritten = nodePropertiesWritten;
}

public void withRelationshipsWritten(long relationshipPropertiesWritten) {
this.relationshipsWritten = relationshipPropertiesWritten;
public void withRelationshipsWritten(long relationshipsWritten) {
this.relationshipsWritten = relationshipsWritten;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.applications.algorithms.pathfinding;

import org.neo4j.gds.api.DatabaseId;
import org.neo4j.gds.api.Graph;
import org.neo4j.gds.api.GraphStore;
import org.neo4j.gds.api.IdMap;
import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.config.WriteRelationshipConfig;
import org.neo4j.gds.core.utils.ProgressTimer;
import org.neo4j.gds.core.utils.progress.TaskRegistryFactory;
import org.neo4j.gds.core.utils.progress.tasks.TaskProgressTracker;
import org.neo4j.gds.core.write.ImmutableExportedRelationship;
import org.neo4j.gds.core.write.RelationshipStreamExporter;
import org.neo4j.gds.core.write.RelationshipStreamExporterBuilder;
import org.neo4j.gds.logging.Log;
import org.neo4j.gds.paths.PathResult;
import org.neo4j.gds.paths.WritePathOptionsConfig;
import org.neo4j.gds.paths.dijkstra.PathFindingResult;
import org.neo4j.gds.termination.TerminationFlag;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.Values;

import java.util.List;

import static org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraWriteConfig.COSTS_KEY;
import static org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraWriteConfig.NODE_IDS_KEY;
import static org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraWriteConfig.TOTAL_COST_KEY;

/**
* This is relationship writes as needed by path finding algorithms (for now).
*/
class ShortestPathWriteStep<CONFIGURATION extends WriteRelationshipConfig & WritePathOptionsConfig> implements
MutateOrWriteStep<PathFindingResult> {
private final Log log;

private final RelationshipStreamExporterBuilder exporterBuilder;
private final TaskRegistryFactory taskRegistryFactory;
private final TerminationFlag terminationFlag;

private final CONFIGURATION configuration;

ShortestPathWriteStep(
Log log,
RelationshipStreamExporterBuilder exporterBuilder,
TaskRegistryFactory taskRegistryFactory,
TerminationFlag terminationFlag, CONFIGURATION configuration
) {
this.log = log;
this.exporterBuilder = exporterBuilder;
this.taskRegistryFactory = taskRegistryFactory;
this.terminationFlag = terminationFlag;
this.configuration = configuration;
}

/**
* Here we translate and write relationships from path finding algorithms back to the database.
* We do it synchronously, time it, and gather metadata about how many relationships we wrote.
*/
@Override
public <RESULT_TO_CALLER> void execute(
Graph graph,
GraphStore graphStore,
PathFindingResult result,
ResultBuilder<PathFindingResult, RESULT_TO_CALLER> resultBuilder
) {
var writeNodeIds = configuration.writeNodeIds();
var writeCosts = configuration.writeCosts();

// we are not timing this bit, is that on purpose?
var relationshipStream = result
.mapPaths(
pathResult -> ImmutableExportedRelationship.of(
pathResult.sourceNode(),
pathResult.targetNode(),
createValues(graph, pathResult, writeNodeIds, writeCosts)
)
);

var progressTracker = new TaskProgressTracker(
RelationshipStreamExporter.baseTask("Write shortest Paths"),
(org.neo4j.logging.Log) log.getNeo4jLog(),
1,
taskRegistryFactory
);

// configure the exporter
var relationshipStreamExporter = exporterBuilder
.withArrowConnectionInfo(
configuration.arrowConnectionInfo(),
graphStore.databaseInfo().remoteDatabaseId().map(DatabaseId::databaseName)
)
.withIdMappingOperator(graph::toOriginalNodeId)
.withProgressTracker(progressTracker)
.withRelationships(relationshipStream)
.withTerminationFlag(terminationFlag)
.build();

var writeRelationshipType = configuration.writeRelationshipType();

/*
* The actual export, with timing.
* Notice that originally we had a CloseableResourceRegistry thing going on here - no longer.
* Because all we are doing is, processing a stream using the exporter, synchronously.
* We are not handing it out to upper layers for sporadic consumption.
* It is done right here, and when we complete, the stream is exhausted.
*/
try (ProgressTimer ignored = ProgressTimer.start(resultBuilder::withPostProcessingMillis)) {
var keys = createKeys(writeNodeIds, writeCosts);
var types = createTypes(writeNodeIds, writeCosts);

var relationshipsWritten = relationshipStreamExporter.write(writeRelationshipType, keys, types);

/*
* We could do this, here. Is it necessary? I don't think so.
* But maybe I don't quite understand the progress tracker?
*/
// relationshipStream.close();

// the final result is the side effect of writing to the database, plus this metadata
resultBuilder.withRelationshipsWritten(relationshipsWritten);
}
}

private Value[] createValues(IdMap idMap, PathResult pathResult, boolean writeNodeIds, boolean writeCosts) {
if (writeNodeIds && writeCosts) {
return new Value[]{
Values.doubleValue(pathResult.totalCost()),
Values.longArray(toOriginalIds(idMap, pathResult.nodeIds())),
Values.doubleArray(pathResult.costs())
};
}
if (writeNodeIds) {
return new Value[]{
Values.doubleValue(pathResult.totalCost()),
Values.longArray(toOriginalIds(idMap, pathResult.nodeIds())),
};
}
if (writeCosts) {
return new Value[]{
Values.doubleValue(pathResult.totalCost()),
Values.doubleArray(pathResult.costs())
};
}
return new Value[]{
Values.doubleValue(pathResult.totalCost()),
};
}

// Replaces the ids in the given array with the original ids
private long[] toOriginalIds(IdMap idMap, long[] internalIds) {
for (int i = 0; i < internalIds.length; i++) {
internalIds[i] = idMap.toOriginalNodeId(internalIds[i]);
}
return internalIds;
}

private List<String> createKeys(boolean writeNodeIds, boolean writeCosts) {
if (writeNodeIds && writeCosts) {
return List.of(
TOTAL_COST_KEY,
NODE_IDS_KEY,
COSTS_KEY
);
}
if (writeNodeIds) {
return List.of(
TOTAL_COST_KEY,
NODE_IDS_KEY
);
}
if (writeCosts) {
return List.of(
TOTAL_COST_KEY,
COSTS_KEY
);
}
return List.of(TOTAL_COST_KEY);
}

private List<ValueType> createTypes(boolean writeNodeIds, boolean writeCosts) {
if (writeNodeIds && writeCosts) {
return List.of(
ValueType.DOUBLE,
ValueType.LONG_ARRAY,
ValueType.DOUBLE_ARRAY
);
}
if (writeNodeIds) {
return List.of(
ValueType.DOUBLE,
ValueType.LONG_ARRAY
);
}
if (writeCosts) {
return List.of(
ValueType.DOUBLE,
ValueType.DOUBLE_ARRAY
);
}
return List.of(ValueType.DOUBLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.neo4j.gds.core.write.RelationshipStreamExporterBuilder;
import org.neo4j.gds.executor.ExecutionContext;
import org.neo4j.gds.executor.MemoryEstimationExecutor;
import org.neo4j.gds.executor.ProcedureExecutor;
import org.neo4j.gds.procedures.GraphDataScience;
import org.neo4j.gds.results.MemoryEstimateResult;
import org.neo4j.gds.results.StandardWriteRelationshipsResult;
import org.neo4j.procedure.Context;
Expand All @@ -39,7 +39,9 @@
import static org.neo4j.procedure.Mode.WRITE;

public class ShortestPathDijkstraWriteProc extends BaseProc {

@Context
public GraphDataScience facade;

@Context
public RelationshipStreamExporterBuilder relationshipStreamExporterBuilder;

Expand All @@ -49,10 +51,7 @@ public Stream<StandardWriteRelationshipsResult> write(
@Name(value = "graphName") String graphName,
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
) {
return new ProcedureExecutor<>(
new ShortestPathDijkstraWriteSpec(),
executionContext()
).compute(graphName, configuration);
return facade.pathFinding().singlePairShortestPathDijkstraWrite(graphName, configuration);
}

@Procedure(name = "gds.shortestPath.dijkstra.write.estimate", mode = READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class GraphDataScience {
private final DeprecatedProceduresMetricService deprecatedProceduresMetricService;

public GraphDataScience(
Log log, CatalogFacade catalogFacade,
Log log,
CatalogFacade catalogFacade,
CentralityProcedureFacade centralityProcedureFacade,
CommunityProcedureFacade communityProcedureFacade,
NodeEmbeddingsProcedureFacade nodeEmbeddingsProcedureFacade,
Expand Down
Loading

0 comments on commit be6d68e

Please sign in to comment.