Skip to content

Commit

Permalink
move the logic about falling back to native reader from be to fe
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 committed Dec 1, 2024
1 parent b1d4669 commit 99d8d5b
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 43 deletions.
19 changes: 0 additions & 19 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -754,25 +754,6 @@ Status VFileScanner::_get_next_reader() {
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
if (!_params->force_jni_reader && format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params) {
if (range.table_format_params.table_format_type == "hudi" &&
range.table_format_params.hudi_params.delta_logs.empty()) {
// fall back to native reader if there is no log file
format_type = TFileFormatType::FORMAT_PARQUET;
} else if (range.table_format_params.table_format_type == "paimon" &&
!range.table_format_params.paimon_params.__isset.paimon_split) {
// use native reader
auto format = range.table_format_params.paimon_params.file_format;
if (format == "orc") {
format_type = TFileFormatType::FORMAT_ORC;
} else if (format == "parquet") {
format_type = TFileFormatType::FORMAT_PARQUET;
} else {
return Status::InternalError("Not supported paimon file format: {}", format);
}
}
}
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ public void createScanRangeLocations() throws UserException {
scanBackendIds.add(backend.getId());
}
}
params.setForceJniReader(ConnectContext.get().getSessionVariable().isForceJniScanner());

getSerializedTable().ifPresent(params::setSerializedTable);

Expand Down Expand Up @@ -434,6 +433,8 @@ private TScanRangeLocations splitToScanRange(
}
}

// set file format type, and the type might fall back to native format in setScanParams
rangeDesc.setFormatType(getFileFormatType());
setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class HudiScanNode extends HiveScanNode {

private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);

private boolean isCowOrRoTable;
private boolean isCowTable;

private final AtomicLong noLogsSplitNum = new AtomicLong(0);

Expand Down Expand Up @@ -128,9 +128,9 @@ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumn
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation,
SessionVariable sessionVariable) {
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
isCowOrRoTable = hmsTable.isHoodieCowTable();
isCowTable = hmsTable.isHoodieCowTable();
if (LOG.isDebugEnabled()) {
if (isCowOrRoTable) {
if (isCowTable) {
LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getFullQualifiers());
} else {
LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE",
Expand Down Expand Up @@ -191,13 +191,13 @@ protected void doInitialize() throws UserException {
throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table");
}
if (incrementalRead) {
if (isCowOrRoTable) {
if (isCowTable) {
try {
Map<String, String> serd = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
if ("true".equals(serd.get("hoodie.query.as.ro.table"))
&& hmsTable.getRemoteTable().getTableName().endsWith("_ro")) {
// Incremental read RO table as RT table, I don't know why?
isCowOrRoTable = false;
isCowTable = false;
LOG.warn("Execute incremental read on RO table: {}", hmsTable.getFullQualifiers());
}
} catch (Exception e) {
Expand Down Expand Up @@ -242,7 +242,15 @@ protected Map<String, String> getLocationProperties() throws UserException {
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof HudiSplit) {
setHudiParams(rangeDesc, (HudiSplit) split);
HudiSplit hudiSplit = (HudiSplit) split;
if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI
&& !sessionVariable.isForceJniScanner()
&& hudiSplit.getHudiDeltaLogs().isEmpty()) {
// no logs, is read optimize table, fallback to use native reader
// TODO: hudi only support parquet now?
rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
}
setHudiParams(rangeDesc, hudiSplit);
}
}

Expand All @@ -261,13 +269,12 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes());
// TODO(gaoxin): support complex types
// fileDesc.setNestedFields(hudiSplit.getNestedFields());
fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner());
tableFormatFileDesc.setHudiParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

private boolean canUseNativeReader() {
return !sessionVariable.isForceJniScanner() && isCowOrRoTable;
return !sessionVariable.isForceJniScanner() && isCowTable;
}

private List<HivePartition> getPrunedPartitions(
Expand Down Expand Up @@ -504,7 +511,6 @@ private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String> partitionV
split.setHudiColumnNames(columnNames);
split.setHudiColumnTypes(columnTypes);
split.setInstantTime(queryInstant);
split.setHudiJniScanner(sessionVariable.getHudiJniScanner());
return split;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ public HudiSplit(LocationPath file, long start, long length, long fileLength, St
private List<String> hudiColumnNames;
private List<String> hudiColumnTypes;
private List<String> nestedFields;
private String hudiJniScanner;
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ public String toString() {
private String serializedTable;

public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
boolean needCheckColumnPriv,
SessionVariable sessionVariable) {
TupleDescriptor desc,
boolean needCheckColumnPriv,
SessionVariable sessionVariable) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
this.sessionVariable = sessionVariable;
}
Expand All @@ -127,8 +127,7 @@ protected void convertPredicate() {
predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
}

private static final Base64.Encoder BASE64_ENCODER =
java.util.Base64.getUrlEncoder().withoutPadding();
private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding();

public static <T> String encodeObjectToString(T t) {
try {
Expand All @@ -142,6 +141,19 @@ public static <T> String encodeObjectToString(T t) {
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof PaimonSplit) {
PaimonSplit paimonSplit = (PaimonSplit) split;
if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI
&& paimonSplit.getSplit() != null) {
// fall back to JNI reader
String fileFormat = getFileFormat(paimonSplit.getPathString());
if (fileFormat.equals("orc")) {
rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
} else if (fileFormat.equals("parquet")) {
rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
} else {
throw new RuntimeException("Unsupported file format: " + fileFormat);
}
}
setPaimonParams(rangeDesc, (PaimonSplit) split);
}
}
Expand Down Expand Up @@ -172,7 +184,8 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit)
fileDesc.setTblId(source.getTargetTable().getId());
fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
fileDesc.setPaimonTable(encodeObjectToString(source.getPaimonTable()));
// The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration()
// The hadoop conf should be same with
// PaimonExternalCatalog.createCatalog()#getConfiguration()
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
if (optDeletionFile.isPresent()) {
Expand All @@ -190,8 +203,8 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit)
@Override
public List<Split> getSplits() throws UserException {
boolean forceJniScanner = sessionVariable.isForceJniScanner();
SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType
.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
Expand Down Expand Up @@ -288,7 +301,8 @@ public List<Split> getSplits() throws UserException {
}
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
// We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile.
// We should set fileSplitSize at the end because fileSplitSize may be modified
// in splitFile.
splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}
Expand Down Expand Up @@ -318,8 +332,9 @@ public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundExce

@Override
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
// return new ArrayList<>(source.getPaimonTable().partitionKeys());
//Paymon is not aware of partitions and bypasses some existing logic by returning an empty list
// return new ArrayList<>(source.getPaimonTable().partitionKeys());
// Paymon is not aware of partitions and bypasses some existing logic by
// returning an empty list
return new ArrayList<>();
}

Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ enum TTextSerdeType {
struct TFileScanRangeParams {
// deprecated, move to TFileScanRange
1: optional Types.TFileType file_type;
// deprecated, move to TFileScanRange
2: optional TFileFormatType format_type;
// deprecated, move to TFileScanRange
3: optional TFileCompressType compress_type;
Expand Down Expand Up @@ -453,8 +454,6 @@ struct TFileScanRangeParams {
// 1. Reduce the access to HMS and HDFS on the JNI side.
// 2. There will be no inconsistency between the fe and be tables.
24: optional string serialized_table
// if set true, be will be forced to use jni reader
25: bool force_jni_reader;
}

struct TFileRangeDesc {
Expand Down Expand Up @@ -482,6 +481,7 @@ struct TFileRangeDesc {
// for hive table, different files may have different fs,
// so fs_name should be with TFileRangeDesc
12: optional string fs_name
13: optional TFileFormatType format_type;
}

struct TSplitSource {
Expand Down

0 comments on commit 99d8d5b

Please sign in to comment.