Skip to content

Commit

Permalink
Merge pull request #9586 from neo-technology/remove-old-pbi-usage
Browse files Browse the repository at this point in the history
Remove old way of instantiating record PBI
  • Loading branch information
Mats-SX authored Sep 6, 2024
2 parents ff77597 + b8b09fb commit ddb2dc2
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 890 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
*/
package org.neo4j.gds.compat._522;

import org.neo4j.common.DependencyResolver;
import org.neo4j.configuration.Config;
import org.neo4j.gds.compat.batchimport.BatchImporter;
import org.neo4j.gds.compat.batchimport.ExecutionMonitor;
import org.neo4j.gds.compat.batchimport.ImportConfig;
import org.neo4j.gds.compat.batchimport.InputIterable;
import org.neo4j.gds.compat.batchimport.InputIterator;
Expand All @@ -36,18 +34,14 @@
import org.neo4j.gds.compat.batchimport.input.InputEntityVisitor;
import org.neo4j.gds.compat.batchimport.input.ReadableGroups;
import org.neo4j.internal.batchimport.AdditionalInitialIds;
import org.neo4j.internal.batchimport.BatchImporterFactory;
import org.neo4j.internal.batchimport.IndexConfig;
import org.neo4j.internal.batchimport.input.Collectors;
import org.neo4j.internal.batchimport.input.Groups;
import org.neo4j.internal.batchimport.staging.CoarseBoundedProgressExecutionMonitor;
import org.neo4j.internal.batchimport.staging.StageExecution;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.layout.DatabaseLayout;
import org.neo4j.io.pagecache.context.CursorContextFactory;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.kernel.impl.index.schema.IndexImporterFactoryImpl;
import org.neo4j.kernel.impl.transaction.log.EmptyLogTailMetadata;
import org.neo4j.kernel.impl.transaction.log.files.TransactionLogInitializer;
import org.neo4j.logging.internal.LogService;
import org.neo4j.memory.EmptyMemoryTracker;
Expand All @@ -58,13 +52,12 @@
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.function.LongConsumer;

public final class BatchImporterCompat {

private BatchImporterCompat() {}

static BatchImporter instantiateBlockBatchImporter(
static BatchImporter instantiateBatchImporter(
DatabaseLayout dbLayout,
FileSystemAbstraction fileSystem,
ImportConfig config,
Expand Down Expand Up @@ -104,40 +97,6 @@ private static final class MonitorAdapter implements org.neo4j.internal.batchimp

private MonitorAdapter(Monitor delegate) {this.delegate = delegate;}

@Override
public void doubleRelationshipRecordUnitsEnabled() {
delegate.doubleRelationshipRecordUnitsEnabled();
}

@Override
public void mayExceedNodeIdCapacity(long capacity, long estimatedCount) {
delegate.mayExceedNodeIdCapacity(capacity, estimatedCount);
}

@Override
public void mayExceedRelationshipIdCapacity(long capacity, long estimatedCount) {
delegate.mayExceedRelationshipIdCapacity(capacity, estimatedCount);
}

@Override
public void insufficientHeapSize(long optimalMinimalHeapSize, long heapSize) {
delegate.insufficientHeapSize(optimalMinimalHeapSize, heapSize);
}

@Override
public void abundantHeapSize(long optimalMinimalHeapSize, long heapSize) {
delegate.abundantHeapSize(optimalMinimalHeapSize, heapSize);
}

@Override
public void insufficientAvailableMemory(
long estimatedCacheSize,
long optimalMinimalHeapSize,
long availableMemory
) {
delegate.insufficientAvailableMemory(estimatedCacheSize, optimalMinimalHeapSize, availableMemory);
}

@Override
public void started() {
delegate.started();
Expand All @@ -154,38 +113,6 @@ public void completed(boolean success) {
}
}

static BatchImporter instantiateRecordBatchImporter(
DatabaseLayout directoryStructure,
FileSystemAbstraction fileSystem,
ImportConfig config,
ExecutionMonitor executionMonitor,
LogService logService,
Config dbConfig,
JobScheduler jobScheduler,
Collector badCollector
) {
var importer = BatchImporterFactory.withHighestPriority()
.instantiate(
directoryStructure,
fileSystem,
PageCacheTracer.NULL,
new ConfigurationAdapter(config),
logService,
new ExecutionMonitorAdapter(executionMonitor),
AdditionalInitialIds.EMPTY,
new EmptyLogTailMetadata(dbConfig),
dbConfig,
org.neo4j.internal.batchimport.Monitor.NO_MONITOR,
jobScheduler,
badCollector != null ? ((CollectorAdapter) badCollector).delegate : null,
TransactionLogInitializer.getLogFilesInitializer(),
new IndexImporterFactoryImpl(),
EmptyMemoryTracker.INSTANCE,
CursorContextFactory.NULL_CONTEXT_FACTORY
);
return new BatchImporterReverseAdapter(importer);
}

static final class ConfigurationAdapter implements org.neo4j.internal.batchimport.Configuration {
private final ImportConfig inner;

Expand Down Expand Up @@ -221,104 +148,6 @@ public IndexConfig indexConfig() {
}
}

static ExecutionMonitor newCoarseBoundedProgressExecutionMonitor(
long highNodeId,
long highRelationshipId,
int batchSize,
LongConsumer progress,
LongConsumer outNumberOfBatches
) {
var delegate = new CoarseBoundedProgressExecutionMonitor(
highNodeId,
highRelationshipId,
org.neo4j.internal.batchimport.Configuration.withBatchSize(
org.neo4j.internal.batchimport.Configuration.DEFAULT,
batchSize
)
) {
@Override
protected void progress(long l) {
progress.accept(l);
}

long numberOfBatches() {
return this.total();
}
};
// Note: this only works because we declare the delegate with `var`
outNumberOfBatches.accept(delegate.numberOfBatches());

return new ExecutionMonitor() {

@Override
public Monitor toMonitor() {
throw new UnsupportedOperationException("Cannot call `toMonitor` on this one");
}

@Override
public void start(StageExecution execution) {
delegate.start(execution);
}

@Override
public void end(StageExecution execution, long totalTimeMillis) {
delegate.end(execution, totalTimeMillis);
}

@Override
public void done(boolean successful, long totalTimeMillis, String additionalInformation) {
delegate.done(successful, totalTimeMillis, additionalInformation);
}

@Override
public long checkIntervalMillis() {
return delegate.checkIntervalMillis();
}

@Override
public void check(StageExecution execution) {
delegate.check(execution);
}
};
}

static final class ExecutionMonitorAdapter implements org.neo4j.internal.batchimport.staging.ExecutionMonitor {
private final ExecutionMonitor delegate;

ExecutionMonitorAdapter(ExecutionMonitor delegate) {this.delegate = delegate;}

@Override
public void initialize(DependencyResolver dependencyResolver) {
org.neo4j.internal.batchimport.staging.ExecutionMonitor.super.initialize(dependencyResolver);
this.delegate.initialize(dependencyResolver);
}

@Override
public void start(StageExecution stageExecution) {
this.delegate.start(stageExecution);
}

@Override
public void end(StageExecution stageExecution, long l) {
this.delegate.end(stageExecution, l);
}

@Override
public void done(boolean b, long l, String s) {
this.delegate.done(b, l, s);
}

@Override
public long checkIntervalMillis() {
return this.delegate.checkIntervalMillis();
}

@Override
public void check(StageExecution stageExecution) {
this.delegate.check(stageExecution);
}
}

static final class BatchImporterReverseAdapter implements BatchImporter {
private final org.neo4j.internal.batchimport.BatchImporter delegate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.gds.compat.CompatCallableProcedure;
import org.neo4j.gds.compat.Neo4jProxyApi;
import org.neo4j.gds.compat.batchimport.BatchImporter;
import org.neo4j.gds.compat.batchimport.ExecutionMonitor;
import org.neo4j.gds.compat.batchimport.ImportConfig;
import org.neo4j.gds.compat.batchimport.Monitor;
import org.neo4j.gds.compat.batchimport.input.Collector;
Expand All @@ -48,14 +47,13 @@
import org.neo4j.values.SequenceValue;

import java.io.OutputStream;
import java.util.function.LongConsumer;

import static org.neo4j.internal.helpers.collection.Iterators.asRawIterator;

public final class Neo4jProxyImpl implements Neo4jProxyApi {

@Override
public BatchImporter instantiateBlockBatchImporter(
public BatchImporter instantiateBatchImporter(
DatabaseLayout dbLayout,
FileSystemAbstraction fileSystem,
ImportConfig config,
Expand All @@ -65,7 +63,7 @@ public BatchImporter instantiateBlockBatchImporter(
JobScheduler jobScheduler,
Collector badCollector
) {
return BatchImporterCompat.instantiateBlockBatchImporter(
return BatchImporterCompat.instantiateBatchImporter(
dbLayout,
fileSystem,
config,
Expand All @@ -77,46 +75,6 @@ public BatchImporter instantiateBlockBatchImporter(
);
}

@Override
public BatchImporter instantiateRecordBatchImporter(
DatabaseLayout directoryStructure,
FileSystemAbstraction fileSystem,
ImportConfig config,
ExecutionMonitor executionMonitor,
LogService logService,
Config dbConfig,
JobScheduler jobScheduler,
Collector badCollector
) {
return BatchImporterCompat.instantiateRecordBatchImporter(
directoryStructure,
fileSystem,
config,
executionMonitor,
logService,
dbConfig,
jobScheduler,
badCollector
);
}

@Override
public ExecutionMonitor newCoarseBoundedProgressExecutionMonitor(
long highNodeId,
long highRelationshipId,
int batchSize,
LongConsumer progress,
LongConsumer outNumberOfBatches
) {
return BatchImporterCompat.newCoarseBoundedProgressExecutionMonitor(
highNodeId,
highRelationshipId,
batchSize,
progress,
outNumberOfBatches
);
}

@Override
public ReadableGroups newGroups() {
return BatchImporterCompat.newGroups();
Expand Down
Loading

0 comments on commit ddb2dc2

Please sign in to comment.