Skip to content

Commit

Permalink
[performance](load) fix broker load scan ranges for unsplittable files
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Nov 3, 2024
1 parent 5f07b88 commit 87a37bb
Showing 1 changed file with 64 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -53,7 +54,9 @@

import java.net.URI;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;

/**
* FileTable encapsulates a set of files to be scanned into a Table like structure,
Expand Down Expand Up @@ -84,6 +87,7 @@ public enum JobType {
private boolean strictMode;
private int loadParallelism;
// set by getFileStatusAndCalcInstance
private long numInstances = 1;
private long bytesPerInstance = 0;
// used for stream load, FILE_LOCAL or FILE_STREAM
private TFileType fileType;
Expand Down Expand Up @@ -189,7 +193,6 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
throw new UserException("No source file in this table(" + targetTable.getName() + ").");
}

int numInstances = 1;
if (jobType == JobType.BULK_LOAD) {
long totalBytes = 0;
for (TBrokerFileStatus fileStatus : fileStatuses) {
Expand All @@ -208,6 +211,7 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
}
} else {
// stream load, not need to split
numInstances = 1;
bytesPerInstance = Long.MAX_VALUE;
}
LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance);
Expand All @@ -216,6 +220,60 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
// Currently, we do not support mixed file types (or compress types).
// If any of the file is unsplittable, all files will be treated as unsplittable.
boolean isSplittable = true;
for (TBrokerFileStatus fileStatus : fileStatuses) {
TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path);
TFileCompressType compressType =
Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path);
// Now only support split plain text
if (compressType == TFileCompressType.PLAIN
&& ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
|| formatType == TFileFormatType.FORMAT_JSON)) {
// is splittable
} else {
isSplittable = false;
break;
}
}

if (isSplittable) {
createScanRangeLocationsSplittable(context, backendPolicy, scanRangeLocations);
} else {
createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations);
}
}

public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations)
throws UserException {
PriorityQueue<Pair<Long, TScanRangeLocations>> pq = new PriorityQueue<>(Comparator.comparingLong(Pair::key));
for (int i = 0; i < Math.min(fileStatuses.size(), numInstances); i++) {
pq.add(Pair.of(0L, newLocations(context.params, brokerDesc, backendPolicy)));
}
fileStatuses.sort((a, b) -> Long.compare(b.size, a.size));
for (TBrokerFileStatus fileStatus : fileStatuses) {
TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path);
context.params.setFormatType(formatType);
TFileCompressType compressType =
Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path);
context.params.setCompressType(compressType);
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
Pair<Long, TScanRangeLocations> p = pq.poll();
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath);
p.value().getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
pq.add(Pair.of(p.key() + fileStatus.size, p.value()));
}
pq.stream().map(Pair::value).forEach(scanRangeLocations::add);
}

public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {

TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy);
long curInstanceBytes = 0;
long curFileOffset = 0;
Expand All @@ -234,27 +292,16 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context
// Assign scan range locations only for broker load.
// stream load has only one file, and no need to set multi scan ranges.
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) {
// Now only support split plain text
if (compressType == TFileCompressType.PLAIN
&& ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
|| formatType == TFileFormatType.FORMAT_JSON)) {
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;
} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, leftBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
i++;
}
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;

// New one scan
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, brokerDesc, backendPolicy);
curInstanceBytes = 0;

} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
Expand Down

0 comments on commit 87a37bb

Please sign in to comment.