diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index 932e698e2cdec9..5bf1c95bc034fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; @@ -53,7 +54,9 @@ import java.net.URI; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; /** * FileTable encapsulates a set of files to be scanned into a Table like structure, @@ -84,6 +87,7 @@ public enum JobType { private boolean strictMode; private int loadParallelism; // set by getFileStatusAndCalcInstance + private long numInstances = 1; private long bytesPerInstance = 0; // used for stream load, FILE_LOCAL or FILE_STREAM private TFileType fileType; @@ -189,7 +193,6 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throw new UserException("No source file in this table(" + targetTable.getName() + ")."); } - int numInstances = 1; if (jobType == JobType.BULK_LOAD) { long totalBytes = 0; for (TBrokerFileStatus fileStatus : fileStatuses) { @@ -208,6 +211,7 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) } } else { // stream load, not need to split + numInstances = 1; bytesPerInstance = Long.MAX_VALUE; } LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance); @@ -216,6 +220,60 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { + // Currently, we do not support mixed file types (or compress types). + // If any of the file is unsplittable, all files will be treated as unsplittable. + boolean isSplittable = true; + for (TBrokerFileStatus fileStatus : fileStatuses) { + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + // Now only support split plain text + if (compressType == TFileCompressType.PLAIN + && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + || formatType == TFileFormatType.FORMAT_JSON)) { + // is splittable + } else { + isSplittable = false; + break; + } + } + + if (isSplittable) { + createScanRangeLocationsSplittable(context, backendPolicy, scanRangeLocations); + } else { + createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations); + } + } + + public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) + throws UserException { + PriorityQueue> pq = new PriorityQueue<>(Comparator.comparingLong(Pair::key)); + for (int i = 0; i < Math.min(fileStatuses.size(), numInstances); i++) { + pq.add(Pair.of(0L, newLocations(context.params, brokerDesc, backendPolicy))); + } + fileStatuses.sort((a, b) -> Long.compare(b.size, a.size)); + for (TBrokerFileStatus fileStatus : fileStatuses) { + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + context.params.setFormatType(formatType); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + context.params.setCompressType(compressType); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnNamesFromPath()); + Pair p = pq.poll(); + TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath); + p.value().getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + pq.add(Pair.of(p.key() + fileStatus.size, p.value())); + } + pq.stream().map(Pair::value).forEach(scanRangeLocations::add); + } + + public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy); long curInstanceBytes = 0; long curFileOffset = 0; @@ -234,27 +292,16 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context // Assign scan range locations only for broker load. // stream load has only one file, and no need to set multi scan ranges. if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) { - // Now only support split plain text - if (compressType == TFileCompressType.PLAIN - && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) - || formatType == TFileFormatType.FORMAT_JSON)) { - long rangeBytes = bytesPerInstance - curInstanceBytes; - TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, - columnsFromPath); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - curFileOffset += rangeBytes; - } else { - TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, leftBytes, - columnsFromPath); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - i++; - } + long rangeBytes = bytesPerInstance - curInstanceBytes; + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, + columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset += rangeBytes; // New one scan scanRangeLocations.add(curLocations); curLocations = newLocations(context.params, brokerDesc, backendPolicy); curInstanceBytes = 0; - } else { TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);