From 5fc02b5f1334f263c97b072a61dff45ea28be6c2 Mon Sep 17 00:00:00 2001 From: Lasse Westh-Nielsen Date: Wed, 3 Jan 2024 14:00:08 +0100 Subject: [PATCH] I decided a try-with-resources block to handle the implicit closing of progress tracker is a good compromise Also because trying to backfit a test around this would be a headache --- .../pathfinding/ShortestPathWriteStep.java | 97 ++++++++++--------- 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/applications/algorithms/path-finding/src/main/java/org/neo4j/gds/applications/algorithms/pathfinding/ShortestPathWriteStep.java b/applications/algorithms/path-finding/src/main/java/org/neo4j/gds/applications/algorithms/pathfinding/ShortestPathWriteStep.java index 09dd3ba56b..524cb9c1a3 100644 --- a/applications/algorithms/path-finding/src/main/java/org/neo4j/gds/applications/algorithms/pathfinding/ShortestPathWriteStep.java +++ b/applications/algorithms/path-finding/src/main/java/org/neo4j/gds/applications/algorithms/pathfinding/ShortestPathWriteStep.java @@ -48,8 +48,7 @@ /** * This is relationship writes as needed by path finding algorithms (for now). */ -class ShortestPathWriteStep implements - MutateOrWriteStep { +class ShortestPathWriteStep implements MutateOrWriteStep { private final Log log; private final RelationshipStreamExporterBuilder exporterBuilder; @@ -62,7 +61,8 @@ class ShortestPathWriteStep void execute( var writeNodeIds = configuration.writeNodeIds(); var writeCosts = configuration.writeCosts(); - // we are not timing this bit, is that on purpose? - var relationshipStream = result - .mapPaths( - pathResult -> ImmutableExportedRelationship.of( - pathResult.sourceNode(), - pathResult.targetNode(), - createValues(graph, pathResult, writeNodeIds, writeCosts) + /* + * We have to ensure the stream closes, so that progress tracker closes. + * It is abominable that we have to do this. To be fixed in the future, somehow. + * The problem is that apparently progress tracker is keyed off of ths stream, + * and that we cannot rely on whatever plugged in exporter comes along takes responsibility for these things. + * Ergo we need this little block, but really we should engineer it all better. + */ + try ( + var relationshipStream = result + .mapPaths( + pathResult -> ImmutableExportedRelationship.of( + pathResult.sourceNode(), + pathResult.targetNode(), + createValues(graph, pathResult, writeNodeIds, writeCosts) + ) ) + ) { + var progressTracker = new TaskProgressTracker( + RelationshipStreamExporter.baseTask("Write shortest Paths"), + (org.neo4j.logging.Log) log.getNeo4jLog(), + 1, + taskRegistryFactory ); - var progressTracker = new TaskProgressTracker( - RelationshipStreamExporter.baseTask("Write shortest Paths"), - (org.neo4j.logging.Log) log.getNeo4jLog(), - 1, - taskRegistryFactory - ); - - // configure the exporter - var relationshipStreamExporter = exporterBuilder - .withArrowConnectionInfo( - configuration.arrowConnectionInfo(), - graphStore.databaseInfo().remoteDatabaseId().map(DatabaseId::databaseName) - ) - .withIdMappingOperator(graph::toOriginalNodeId) - .withProgressTracker(progressTracker) - .withRelationships(relationshipStream) - .withTerminationFlag(terminationFlag) - .build(); - - var writeRelationshipType = configuration.writeRelationshipType(); - - /* - * The actual export, with timing. - * Notice that originally we had a CloseableResourceRegistry thing going on here - no longer. - * Because all we are doing is, processing a stream using the exporter, synchronously. - * We are not handing it out to upper layers for sporadic consumption. - * It is done right here, and when we complete, the stream is exhausted. - */ - try (ProgressTimer ignored = ProgressTimer.start(resultBuilder::withPostProcessingMillis)) { - var keys = createKeys(writeNodeIds, writeCosts); - var types = createTypes(writeNodeIds, writeCosts); + // configure the exporter + var relationshipStreamExporter = exporterBuilder + .withArrowConnectionInfo( + configuration.arrowConnectionInfo(), + graphStore.databaseInfo().remoteDatabaseId().map(DatabaseId::databaseName) + ) + .withIdMappingOperator(graph::toOriginalNodeId) + .withProgressTracker(progressTracker) + .withRelationships(relationshipStream) + .withTerminationFlag(terminationFlag) + .build(); - var relationshipsWritten = relationshipStreamExporter.write(writeRelationshipType, keys, types); + var writeRelationshipType = configuration.writeRelationshipType(); /* - * We could do this, here. Is it necessary? I don't think so. - * But maybe I don't quite understand the progress tracker? + * The actual export, with timing. + * Notice that originally we had a CloseableResourceRegistry thing going on here - no longer. + * Because all we are doing is, processing a stream using the exporter, synchronously. + * We are not handing it out to upper layers for sporadic consumption. + * It is done right here, and when we complete, the stream is exhausted. + * We still explicitly close the stream tho because, yeah, confusion I guess. */ - // relationshipStream.close(); + try (ProgressTimer ignored = ProgressTimer.start(resultBuilder::withPostProcessingMillis)) { + var keys = createKeys(writeNodeIds, writeCosts); + var types = createTypes(writeNodeIds, writeCosts); + + var relationshipsWritten = relationshipStreamExporter.write(writeRelationshipType, keys, types); - // the final result is the side effect of writing to the database, plus this metadata - resultBuilder.withRelationshipsWritten(relationshipsWritten); + // the final result is the side effect of writing to the database, plus this metadata + resultBuilder.withRelationshipsWritten(relationshipsWritten); + } } }