diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index f229134e9d8319..7bd9fa631c8da3 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -125,7 +125,7 @@ private void initReader() throws IOException { int[] projected = getProjected(); readBuilder.withProjection(projected); readBuilder.withFilter(getPredicates()); - reader = readBuilder.newRead().createReader(getSplit()); + reader = readBuilder.newRead().executeFilter().createReader(getSplit()); paimonDataTypeList = Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java index e85fed8b62a879..0d67a9e44b6f29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java @@ -46,7 +46,8 @@ public abstract class ExternalScanNode extends ScanNode { protected boolean needCheckColumnPriv; protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null - && ConnectContext.get().getSessionVariable().enableFileCache) + && (ConnectContext.get().getSessionVariable().enableFileCache + || ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan())) ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index a2b902fd744a7f..1e1787c1f64947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -497,7 +497,7 @@ public void funnel(Backend backend, PrimitiveSink primitiveSink) { private static class SplitHash implements Funnel { @Override public void funnel(Split split, PrimitiveSink primitiveSink) { - primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8)); + primitiveSink.putBytes(split.getConsistentHashString().getBytes(StandardCharsets.UTF_8)); primitiveSink.putLong(split.getStart()); primitiveSink.putLong(split.getLength()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java index 7eaa87b74aab63..1ebb390e90438f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java @@ -47,6 +47,9 @@ public class FileSplit implements Split { // the location type for BE, eg: HDFS, LOCAL, S3 protected TFileType locationType; + public Long selfSplitWeight; + public Long targetSplitSize; + public FileSplit(LocationPath path, long start, long length, long fileLength, long modificationTime, String[] hosts, List partitionValues) { this.path = path; @@ -89,4 +92,20 @@ public Split create(LocationPath path, long start, long length, long fileLength, return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); } } + + @Override + public void setTargetSplitSize(Long targetSplitSize) { + this.targetSplitSize = targetSplitSize; + } + + @Override + public SplitWeight getSplitWeight() { + if (selfSplitWeight != null && targetSplitSize != null) { + double computedWeight = selfSplitWeight * 1.0 / targetSplitSize; + // Clamp the value be between the minimum weight and 1.0 (standard weight) + return SplitWeight.fromProportion(Math.min(Math.max(computedWeight, 0.01), 1.0)); + } else { + return SplitWeight.standard(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java index 394bc849a56a49..b876732ff3f4e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java @@ -25,23 +25,25 @@ @Data public class IcebergDeleteFileFilter { private String deleteFilePath; + private long filesize; - public IcebergDeleteFileFilter(String deleteFilePath) { + public IcebergDeleteFileFilter(String deleteFilePath, long filesize) { this.deleteFilePath = deleteFilePath; + this.filesize = filesize; } public static PositionDelete createPositionDelete(String deleteFilePath, Long positionLowerBound, - Long positionUpperBound) { - return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound); + Long positionUpperBound, long filesize) { + return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound, filesize); } - public static EqualityDelete createEqualityDelete(String deleteFilePath, List fieldIds) { + public static EqualityDelete createEqualityDelete(String deleteFilePath, List fieldIds, long fileSize) { // todo: // Schema deleteSchema = TypeUtil.select(scan.schema(), new HashSet<>(fieldIds)); // StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct()); // pass deleteSet to BE // compare two StructLike value, if equals, filtered - return new EqualityDelete(deleteFilePath, fieldIds); + return new EqualityDelete(deleteFilePath, fieldIds, fileSize); } static class PositionDelete extends IcebergDeleteFileFilter { @@ -49,8 +51,8 @@ static class PositionDelete extends IcebergDeleteFileFilter { private final Long positionUpperBound; public PositionDelete(String deleteFilePath, Long positionLowerBound, - Long positionUpperBound) { - super(deleteFilePath); + Long positionUpperBound, long fileSize) { + super(deleteFilePath, fileSize); this.positionLowerBound = positionLowerBound; this.positionUpperBound = positionUpperBound; } @@ -67,8 +69,8 @@ public OptionalLong getPositionUpperBound() { static class EqualityDelete extends IcebergDeleteFileFilter { private List fieldIds; - public EqualityDelete(String deleteFilePath, List fieldIds) { - super(deleteFilePath); + public EqualityDelete(String deleteFilePath, List fieldIds, long fileSize) { + super(deleteFilePath, fileSize); this.fieldIds = fieldIds; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index fe6c54cf53b976..56dda7b4fe28b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -282,7 +282,7 @@ private List doGetSplits() throws UserException { } selectedPartitionNum = partitionPathSet.size(); - + splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } @@ -315,10 +315,11 @@ private List getDeleteFileFilters(FileScanTask spitTask .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId())) .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(), - positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L))); + positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L), + delete.fileSizeInBytes())); } else if (delete.content() == FileContent.EQUALITY_DELETES) { filters.add(IcebergDeleteFileFilter.createEqualityDelete( - delete.path().toString(), delete.equalityFieldIds())); + delete.path().toString(), delete.equalityFieldIds(), delete.fileSizeInBytes())); } else { throw new IllegalStateException("Unknown delete content: " + delete.content()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 46e8f96ba35daf..580d3cf1bb23f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -47,6 +47,7 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength, this.formatVersion = formatVersion; this.config = config; this.originalPath = originalPath; + this.selfSplitWeight = length; } public long getRowCount() { @@ -56,4 +57,9 @@ public long getRowCount() { public void setRowCount(long rowCount) { this.rowCount = rowCount; } + + public void setDeleteFileFilters(List deleteFileFilters) { + this.deleteFileFilters = deleteFileFilters; + this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 02f831ba37bc1c..cd477cc9b290c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -28,7 +28,7 @@ import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -101,9 +101,14 @@ public String toString() { private int rawFileSplitNum = 0; private int paimonSplitNum = 0; private List splitStats = new ArrayList<>(); + private SessionVariable sessionVariable; - public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + public PaimonScanNode(PlanNodeId id, + TupleDescriptor desc, + boolean needCheckColumnPriv, + SessionVariable sessionVariable) { super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); + this.sessionVariable = sessionVariable; } @Override @@ -176,7 +181,9 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) @Override public List getSplits() throws UserException { - boolean forceJniScanner = ConnectContext.get().getSessionVariable().isForceJniScanner(); + boolean forceJniScanner = sessionVariable.isForceJniScanner(); + SessionVariable.IgnoreSplitType ignoreSplitType = + SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType()); List splits = new ArrayList<>(); int[] projected = desc.getSlots().stream().mapToInt( slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName()))) @@ -196,7 +203,11 @@ public List getSplits() throws UserException { selectedPartitionValues.add(partitionValue); Optional> optRawFiles = dataSplit.convertToRawFiles(); Optional> optDeletionFiles = dataSplit.deletionFiles(); + if (supportNativeReader(optRawFiles)) { + if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_NATIVE) { + continue; + } splitStat.setType(SplitReadType.NATIVE); splitStat.setRawFileConvertable(true); List rawFiles = optRawFiles.get(); @@ -252,10 +263,16 @@ public List getSplits() throws UserException { } } } else { + if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { + continue; + } splits.add(new PaimonSplit(split)); ++paimonSplitNum; } } else { + if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { + continue; + } splits.add(new PaimonSplit(split)); ++paimonSplitNum; } @@ -263,6 +280,8 @@ public List getSplits() throws UserException { } this.selectedPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number + // We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile. + splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index ffd063d77e8bab..3ab38c7db28e9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -23,11 +23,14 @@ import org.apache.doris.datasource.TableFormatType; import com.google.common.collect.Maps; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.Split; import java.util.List; import java.util.Optional; +import java.util.UUID; public class PaimonSplit extends FileSplit { private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap()); @@ -35,11 +38,20 @@ public class PaimonSplit extends FileSplit { private TableFormatType tableFormatType; private Optional optDeletionFile; + public PaimonSplit(Split split) { super(DUMMY_PATH, 0, 0, 0, 0, null, null); this.split = split; this.tableFormatType = TableFormatType.PAIMON; this.optDeletionFile = Optional.empty(); + + if (split instanceof DataSplit) { + List dataFileMetas = ((DataSplit) split).dataFiles(); + this.path = new LocationPath("/" + dataFileMetas.get(0).fileName()); + this.selfSplitWeight = dataFileMetas.stream().mapToLong(DataFileMeta::fileSize).sum(); + } else { + this.selfSplitWeight = split.rowCount(); + } } private PaimonSplit(LocationPath file, long start, long length, long fileLength, long modificationTime, @@ -47,6 +59,15 @@ private PaimonSplit(LocationPath file, long start, long length, long fileLength, super(file, start, length, fileLength, modificationTime, hosts, partitionList); this.tableFormatType = TableFormatType.PAIMON; this.optDeletionFile = Optional.empty(); + this.selfSplitWeight = length; + } + + @Override + public String getConsistentHashString() { + if (this.path == DUMMY_PATH) { + return UUID.randomUUID().toString(); + } + return getPathString(); } public Split getSplit() { @@ -66,6 +87,7 @@ public Optional getDeletionFile() { } public void setDeletionFile(DeletionFile deletionFile) { + this.selfSplitWeight += deletionFile.length(); this.optDeletionFile = Optional.of(deletionFile); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 3078fb36df681a..654ccc8ca1155a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -574,7 +574,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla } else if (table instanceof IcebergExternalTable) { scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); } else if (table instanceof PaimonExternalTable) { - scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false, + ConnectContext.get().getSessionVariable()); } else if (table instanceof TrinoConnectorExternalTable) { scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false); } else if (table instanceof MaxComputeExternalTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 152bb7cc8813e2..d94ad0a2552240 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1986,7 +1986,8 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; case PAIMON_EXTERNAL_TABLE: - scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, + ConnectContext.get().getSessionVariable()); break; case TRINO_CONNECTOR_EXTERNAL_TABLE: scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 52ea334a14200f..5f031843025274 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -958,6 +958,26 @@ public class SessionVariable implements Serializable, Writable { setter = "setPipelineTaskNum") public int parallelPipelineTaskNum = 0; + + public enum IgnoreSplitType { + NONE, + IGNORE_JNI, + IGNORE_NATIVE + } + + public static final String IGNORE_SPLIT_TYPE = "ignore_split_type"; + @VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE, + checker = "checkIgnoreSplitType", + options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"}, + description = {"忽略指定类型的split", "Ignore splits of the specified type"}) + public String ignoreSplitType = IgnoreSplitType.NONE.toString(); + + public static final String USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN = "use_consistent_hash_for_external_scan"; + @VariableMgr.VarAttr(name = USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN, + description = {"对外表采用一致性hash的方式做split的分发", + "Use consistent hashing to split the appearance for external scan"}) + public boolean useConsistentHashForExternalScan = false; + @VariableMgr.VarAttr(name = PROFILE_LEVEL, fuzzy = true) public int profileLevel = 1; @@ -4380,6 +4400,22 @@ public boolean isForceJniScanner() { return forceJniScanner; } + public String getIgnoreSplitType() { + return ignoreSplitType; + } + + public void checkIgnoreSplitType(String value) { + try { + IgnoreSplitType.valueOf(value); + } catch (Exception e) { + throw new UnsupportedOperationException("We only support `NONE`, `IGNORE_JNI` and `IGNORE_NATIVE`"); + } + } + + public boolean getUseConsistentHashForExternalScan() { + return useConsistentHashForExternalScan; + } + public void setForceJniScanner(boolean force) { forceJniScanner = force; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java index e86b287ac939f1..412e4b6792f69a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java @@ -48,4 +48,9 @@ default boolean isRemotelyAccessible() { void setAlternativeHosts(List alternativeHosts); + default String getConsistentHashString() { + return getPathString(); + } + + void setTargetSplitSize(Long targetSplitSize); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index df2e7dd3932d65..3b3e2eeedf7421 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -741,4 +741,99 @@ private static boolean areMultimapsEqualIgnoringOrder( return entries1.containsAll(entries2) && entries2.containsAll(entries1); } + + @Test + public void testSplitWeight() { + FileSplit fileSplit = new FileSplit(new LocationPath("s1"), 0, 1000, 1000, 0, null, Collections.emptyList()); + fileSplit.setSelfSplitWeight(1000L); + + fileSplit.setTargetSplitSize(10L); + Assert.assertEquals(100L, fileSplit.getSplitWeight().getRawValue(), 100L); + + fileSplit.setTargetSplitSize(10000000L); + Assert.assertEquals(1L, fileSplit.getSplitWeight().getRawValue()); + + fileSplit.setTargetSplitSize(2000L); + Assert.assertEquals(50, fileSplit.getSplitWeight().getRawValue()); + } + + @Test + public void testBiggerSplit() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(1L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(2L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(3L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(genFileSplit("s1", 1000000L, 1000L)); // belong 2 + splits.add(genFileSplit("s2", 100000L, 1000L)); // belong 2 + splits.add(genFileSplit("s3", 200000L, 1000L)); // belong 2 + splits.add(genFileSplit("s4", 300000L, 1000L)); // belong 2 + splits.add(genFileSplit("s5", 800000L, 1000L)); // belong 1 + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + // Set these options to ensure that the consistent hash algorithm is consistent. + policy.setEnableSplitsRedistribution(false); + Config.split_assigner_min_consistent_hash_candidate_num = 1; + policy.init(); + Multimap assignment = policy.computeScanRangeAssignment(splits); + Map> backendListMap = mergeAssignment(assignment); + backendListMap.forEach((k, v) -> { + if (k.getId() == 1) { + Assert.assertEquals(800000L, v.stream().mapToLong(Split::getLength).sum()); + } else if (k.getId() == 2) { + Assert.assertEquals(1600000L, v.stream().mapToLong(Split::getLength).sum()); + } + }); + + Config.split_assigner_min_consistent_hash_candidate_num = 1; + FederationBackendPolicy policy2 = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy2.init(); + Multimap assignment2 = policy2.computeScanRangeAssignment(splits); + Map> backendListMap2 = mergeAssignment(assignment2); + backendListMap2.forEach((k, v) -> { + if (k.getId() == 1) { + Assert.assertEquals(900000L, v.stream().mapToLong(Split::getLength).sum()); + } else if (k.getId() == 2) { + Assert.assertEquals(500000L, v.stream().mapToLong(Split::getLength).sum()); + } else if (k.getId() == 3) { + Assert.assertEquals(1000000L, v.stream().mapToLong(Split::getLength).sum()); + } + }); + } + + private Map> mergeAssignment(Multimap ass) { + HashMap> map = new HashMap<>(); + ass.forEach((k, v) -> { + if (map.containsKey(k)) { + map.get(k).add(v); + } else { + ArrayList splits = new ArrayList<>(); + splits.add(v); + map.put(k, splits); + } + }); + return map; + } + + private FileSplit genFileSplit(String path, long length, long targetSplit) { + FileSplit s = new FileSplit(new LocationPath(path), 0, length, length, 0, null, Collections.emptyList()); + s.setSelfSplitWeight(length); + s.setTargetSplitSize(targetSplit); + return s; + } }