Skip to content

Commit

Permalink
support force_jni_reader and use_old_hudi_jni_reader for hudi
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 committed Nov 25, 2024
1 parent d85b144 commit b26c594
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 13 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,8 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");

DEFINE_Bool(use_old_hudi_jni_reader, "false");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,9 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
// Enable validation to check the correctness of table size.
DECLARE_Bool(enable_table_size_correctness_check);

// Use old hudi jni reader
DECLARE_mBool(use_old_hudi_jni_reader);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
14 changes: 9 additions & 5 deletions be/src/vec/exec/format/table/hudi_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#include "hudi_jni_reader.h"

#include <map>
#include <ostream>

#include "common/config.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
Expand Down Expand Up @@ -65,17 +65,21 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
{"input_format", _hudi_params.input_format}};

// Use compatible hadoop client to read data
for (auto& kv : _scan_params.properties) {
for (const auto& kv : _scan_params.properties) {
if (kv.first.starts_with(HOODIE_CONF_PREFIX)) {
params[kv.first] = kv.second;
} else {
params[HADOOP_CONF_PREFIX + kv.first] = kv.second;
}
}

// _jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params,
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HadoopHudiJniScanner",
params, required_fields);
if (config::use_old_hudi_jni_reader) [[unlikely]] {
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner",
params, required_fields);
} else {
_jni_connector = std::make_unique<JniConnector>(
"org/apache/doris/hudi/HadoopHudiJniScanner", params, required_fields);
}
}

Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/format/table/hudi_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

#pragma once

#include <stddef.h>

#include <memory>
#include <cstddef>
#include <string>
#include <unordered_map>
#include <unordered_set>
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,8 @@ Status VFileScanner::_get_next_reader() {
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
if (!_params->force_jni_reader && format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params) {
if (range.table_format_params.table_format_type == "hudi" &&
range.table_format_params.hudi_params.delta_logs.empty()) {
// fall back to native reader if there is no log file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public void createScanRangeLocations() throws UserException {
scanBackendIds.add(backend.getId());
}
}
params.setForceJniReader(ConnectContext.get().getSessionVariable().isForceJniScanner());

if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ public class HudiScanNode extends HiveScanNode {

/**
* External file scan node for Query Hudi table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column
* priv
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
* These scan nodes do not have corresponding catalog/database/table info, so no
* need to do priv check
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv,
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) {
Expand Down Expand Up @@ -304,7 +306,8 @@ private List<HivePartition> getPrunedPartitions(
}
}
}
// unpartitioned table, create a dummy partition to save location and inputformat,
// unpartitioned table, create a dummy partition to save location and
// inputformat,
// so that we can unify the interface.
HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true,
hmsTable.getRemoteTable().getSd().getInputFormat(),
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ struct TMaxComputeFileDesc {
1: optional string partition_spec // deprecated
2: optional string session_id
3: optional string table_batch_read_session

}

struct THudiFileDesc {
Expand Down Expand Up @@ -448,6 +447,8 @@ struct TFileScanRangeParams {
22: optional TTextSerdeType text_serde_type
// used by flexible partial update
23: optional string sequence_map_col
// if set true, be will be forced to use jni reader
24: bool force_jni_reader;
}

struct TFileRangeDesc {
Expand Down

0 comments on commit b26c594

Please sign in to comment.