Skip to content

Commit

Permalink
I decided a try-with-resources block to handle the implicit closing o…
Browse files Browse the repository at this point in the history
…f progress tracker is a good compromise

Also because trying to backfit a test around this would be a headache
  • Loading branch information
lassewesth committed Jan 3, 2024
1 parent db8d5c7 commit 5fc02b5
Showing 1 changed file with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
/**
* This is relationship writes as needed by path finding algorithms (for now).
*/
class ShortestPathWriteStep<CONFIGURATION extends WriteRelationshipConfig & WritePathOptionsConfig> implements
MutateOrWriteStep<PathFindingResult> {
class ShortestPathWriteStep<CONFIGURATION extends WriteRelationshipConfig & WritePathOptionsConfig> implements MutateOrWriteStep<PathFindingResult> {
private final Log log;

private final RelationshipStreamExporterBuilder exporterBuilder;
Expand All @@ -62,7 +61,8 @@ class ShortestPathWriteStep<CONFIGURATION extends WriteRelationshipConfig & Writ
Log log,
RelationshipStreamExporterBuilder exporterBuilder,
TaskRegistryFactory taskRegistryFactory,
TerminationFlag terminationFlag, CONFIGURATION configuration
TerminationFlag terminationFlag,
CONFIGURATION configuration
) {
this.log = log;
this.exporterBuilder = exporterBuilder;
Expand All @@ -85,58 +85,61 @@ public <RESULT_TO_CALLER> void execute(
var writeNodeIds = configuration.writeNodeIds();
var writeCosts = configuration.writeCosts();

// we are not timing this bit, is that on purpose?
var relationshipStream = result
.mapPaths(
pathResult -> ImmutableExportedRelationship.of(
pathResult.sourceNode(),
pathResult.targetNode(),
createValues(graph, pathResult, writeNodeIds, writeCosts)
/*
* We have to ensure the stream closes, so that progress tracker closes.
* It is abominable that we have to do this. To be fixed in the future, somehow.
* The problem is that apparently progress tracker is keyed off of ths stream,
* and that we cannot rely on whatever plugged in exporter comes along takes responsibility for these things.
* Ergo we need this little block, but really we should engineer it all better.
*/
try (
var relationshipStream = result
.mapPaths(
pathResult -> ImmutableExportedRelationship.of(
pathResult.sourceNode(),
pathResult.targetNode(),
createValues(graph, pathResult, writeNodeIds, writeCosts)
)
)
) {
var progressTracker = new TaskProgressTracker(
RelationshipStreamExporter.baseTask("Write shortest Paths"),
(org.neo4j.logging.Log) log.getNeo4jLog(),
1,
taskRegistryFactory
);

var progressTracker = new TaskProgressTracker(
RelationshipStreamExporter.baseTask("Write shortest Paths"),
(org.neo4j.logging.Log) log.getNeo4jLog(),
1,
taskRegistryFactory
);

// configure the exporter
var relationshipStreamExporter = exporterBuilder
.withArrowConnectionInfo(
configuration.arrowConnectionInfo(),
graphStore.databaseInfo().remoteDatabaseId().map(DatabaseId::databaseName)
)
.withIdMappingOperator(graph::toOriginalNodeId)
.withProgressTracker(progressTracker)
.withRelationships(relationshipStream)
.withTerminationFlag(terminationFlag)
.build();

var writeRelationshipType = configuration.writeRelationshipType();

/*
* The actual export, with timing.
* Notice that originally we had a CloseableResourceRegistry thing going on here - no longer.
* Because all we are doing is, processing a stream using the exporter, synchronously.
* We are not handing it out to upper layers for sporadic consumption.
* It is done right here, and when we complete, the stream is exhausted.
*/
try (ProgressTimer ignored = ProgressTimer.start(resultBuilder::withPostProcessingMillis)) {
var keys = createKeys(writeNodeIds, writeCosts);
var types = createTypes(writeNodeIds, writeCosts);
// configure the exporter
var relationshipStreamExporter = exporterBuilder
.withArrowConnectionInfo(
configuration.arrowConnectionInfo(),
graphStore.databaseInfo().remoteDatabaseId().map(DatabaseId::databaseName)
)
.withIdMappingOperator(graph::toOriginalNodeId)
.withProgressTracker(progressTracker)
.withRelationships(relationshipStream)
.withTerminationFlag(terminationFlag)
.build();

var relationshipsWritten = relationshipStreamExporter.write(writeRelationshipType, keys, types);
var writeRelationshipType = configuration.writeRelationshipType();

/*
* We could do this, here. Is it necessary? I don't think so.
* But maybe I don't quite understand the progress tracker?
* The actual export, with timing.
* Notice that originally we had a CloseableResourceRegistry thing going on here - no longer.
* Because all we are doing is, processing a stream using the exporter, synchronously.
* We are not handing it out to upper layers for sporadic consumption.
* It is done right here, and when we complete, the stream is exhausted.
* We still explicitly close the stream tho because, yeah, confusion I guess.
*/
// relationshipStream.close();
try (ProgressTimer ignored = ProgressTimer.start(resultBuilder::withPostProcessingMillis)) {
var keys = createKeys(writeNodeIds, writeCosts);
var types = createTypes(writeNodeIds, writeCosts);

var relationshipsWritten = relationshipStreamExporter.write(writeRelationshipType, keys, types);

// the final result is the side effect of writing to the database, plus this metadata
resultBuilder.withRelationshipsWritten(relationshipsWritten);
// the final result is the side effect of writing to the database, plus this metadata
resultBuilder.withRelationshipsWritten(relationshipsWritten);
}
}
}

Expand Down

0 comments on commit 5fc02b5

Please sign in to comment.