diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d9b9a02260a4e6b..f580cf7c5a1dfe8 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1391,6 +1391,8 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false"); DEFINE_Bool(enable_table_size_correctness_check, "false"); DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false"); +DEFINE_Bool(use_old_hudi_jni_reader, "false"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 7f18406eeee721b..537dc182017817e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1472,6 +1472,9 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction); // Enable validation to check the correctness of table size. DECLARE_Bool(enable_table_size_correctness_check); +// Use old hudi jni reader +DECLARE_mBool(use_old_hudi_jni_reader); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index ea801b798f377d1..f6cda63b6b5b4a6 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -18,8 +18,8 @@ #include "hudi_jni_reader.h" #include -#include +#include "common/config.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/types.h" @@ -65,7 +65,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, {"input_format", _hudi_params.input_format}}; // Use compatible hadoop client to read data - for (auto& kv : _scan_params.properties) { + for (const auto& kv : _scan_params.properties) { if (kv.first.starts_with(HOODIE_CONF_PREFIX)) { params[kv.first] = kv.second; } else { @@ -73,9 +73,13 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, } } - // _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", params, - _jni_connector = std::make_unique("org/apache/doris/hudi/HadoopHudiJniScanner", - params, required_fields); + if (config::use_old_hudi_jni_reader) [[unlikely]] { + _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", + params, required_fields); + } else { + _jni_connector = std::make_unique( + "org/apache/doris/hudi/HadoopHudiJniScanner", params, required_fields); + } } Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index e9bb55a69a77e7a..bfa0291a61035c6 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -17,9 +17,7 @@ #pragma once -#include - -#include +#include #include #include #include diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 9353887799207dd..1d805d914d869fb 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -758,7 +758,8 @@ Status VFileScanner::_get_next_reader() { // JNI reader can only push down column value range bool push_down_predicates = !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI; - if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) { + if (!_params->force_jni_reader && format_type == TFileFormatType::FORMAT_JNI && + range.__isset.table_format_params) { if (range.table_format_params.table_format_type == "hudi" && range.table_format_params.hudi_params.delta_logs.empty()) { // fall back to native reader if there is no log file diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 96dae05b7fd0459..7a7967fe6546de6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -368,6 +368,7 @@ public void createScanRangeLocations() throws UserException { scanBackendIds.add(backend.getId()); } } + params.setForceJniReader(ConnectContext.get().getSessionVariable().isForceJniScanner()); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index ab32ee45993c01e..3115a964066cfb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -116,9 +116,11 @@ public class HudiScanNode extends HiveScanNode { /** * External file scan node for Query Hudi table - * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column + * priv * eg: s3 tvf - * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + * These scan nodes do not have corresponding catalog/database/table info, so no + * need to do priv check */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, Optional scanParams, Optional incrementalRelation) { @@ -304,7 +306,8 @@ private List getPrunedPartitions( } } } - // unpartitioned table, create a dummy partition to save location and inputformat, + // unpartitioned table, create a dummy partition to save location and + // inputformat, // so that we can unify the interface. HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true, hmsTable.getRemoteTable().getSd().getInputFormat(), diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index ec4497b267b22f0..ba5da21f79a583f 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -353,7 +353,6 @@ struct TMaxComputeFileDesc { 1: optional string partition_spec // deprecated 2: optional string session_id 3: optional string table_batch_read_session - } struct THudiFileDesc { @@ -448,6 +447,8 @@ struct TFileScanRangeParams { 22: optional TTextSerdeType text_serde_type // used by flexible partial update 23: optional string sequence_map_col + // if set true, be will be forced to use jni reader + 24: bool force_jni_reader; } struct TFileRangeDesc {