Skip to content

Commit

Permalink
[enchement](multi-catalog)Add enable_parquet_merge_small_io and enabl…
Browse files Browse the repository at this point in the history
…e_orc_merge_small_io Session variables
  • Loading branch information
hubgeter committed Jan 13, 2025
1 parent 744691a commit c0ea299
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 7 deletions.
9 changes: 8 additions & 1 deletion be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
_enable_lazy_mat(enable_lazy_mat),
_enable_filter_by_min_max(
state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
_enable_merge_small_io(
state == nullptr ? true : state->query_options().enable_orc_merge_small_io),
_dict_cols_has_converted(false) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
VecDateTimeValue t;
Expand Down Expand Up @@ -252,7 +254,8 @@ Status OrcReader::_create_file_reader() {
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
_file_input_stream = std::make_unique<ORCFileInputStream>(
_scan_range.path, std::move(inner_reader), &_statistics, _io_ctx, _profile);
_scan_range.path, std::move(inner_reader), &_statistics, _io_ctx, _profile,
_enable_merge_small_io);
}
if (_file_input_stream->getLength() == 0) {
return Status::EndOfFile("empty orc file: " + _scan_range.path);
Expand Down Expand Up @@ -2664,6 +2667,10 @@ void ORCFileInputStream::beforeReadStripe(
if (_is_all_tiny_stripes) {
return;
}
if (!_enable_merge_small_io) {
_file_reader = _inner_reader;
return;
}
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ class OrcReader : public GenericReader {
io::IOContext* _io_ctx = nullptr;
bool _enable_lazy_mat = true;
bool _enable_filter_by_min_max = true;
bool _enable_merge_small_io = true;

std::vector<DecimalScaleParams> _decimal_scale_params;
size_t _decimal_scale_params_index;
Expand Down Expand Up @@ -656,13 +657,14 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
public:
ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr inner_reader,
OrcReader::Statistics* statistics, const io::IOContext* io_ctx,
RuntimeProfile* profile)
RuntimeProfile* profile, bool enable_merge_small_io)
: _file_name(file_name),
_inner_reader(inner_reader),
_file_reader(inner_reader),
_statistics(statistics),
_io_ctx(io_ctx),
_profile(profile) {}
_profile(profile),
_enable_merge_small_io(enable_merge_small_io) {}

~ORCFileInputStream() override {
if (_file_reader != nullptr) {
Expand Down Expand Up @@ -700,5 +702,6 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
OrcReader::Statistics* _statistics = nullptr;
const io::IOContext* _io_ctx = nullptr;
RuntimeProfile* _profile = nullptr;
bool _enable_merge_small_io = true;
};
} // namespace doris::vectorized
13 changes: 9 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams
_state(state),
_meta_cache(meta_cache),
_enable_lazy_mat(enable_lazy_mat),
_enable_merge_small_io(
state == nullptr ? true : state->query_options().enable_parquet_merge_small_io),
_enable_filter_by_min_max(
state == nullptr ? true
: state->query_options().enable_parquet_filter_by_min_max) {
Expand All @@ -105,6 +107,8 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRang
_io_ctx(io_ctx),
_state(state),
_enable_lazy_mat(enable_lazy_mat),
_enable_merge_small_io(
state == nullptr ? true : state->query_options().enable_parquet_merge_small_io),
_enable_filter_by_min_max(
state == nullptr ? true
: state->query_options().enable_parquet_filter_by_min_max) {
Expand Down Expand Up @@ -650,10 +654,11 @@ Status ParquetReader::_next_row_group_reader() {
_generate_random_access_ranges(row_group_index, &avg_io_size);
// The underlying page reader will prefetch data in column.
// Using both MergeRangeFileReader and BufferedStreamReader simultaneously would waste a lot of memory.
group_file_reader = avg_io_size < io::MergeRangeFileReader::SMALL_IO
? std::make_shared<io::MergeRangeFileReader>(
_profile, _file_reader, io_ranges)
: _file_reader;
group_file_reader =
avg_io_size < io::MergeRangeFileReader::SMALL_IO && _enable_merge_small_io
? std::make_shared<io::MergeRangeFileReader>(_profile, _file_reader,
io_ranges)
: _file_reader;
}
_current_group_reader.reset(new RowGroupReader(
group_file_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class ParquetReader : public GenericReader {
// Maybe null if not used
FileMetaCache* _meta_cache = nullptr;
bool _enable_lazy_mat = true;
bool _enable_merge_small_io = true;
bool _enable_filter_by_min_max = true;
const TupleDescriptor* _tuple_descriptor = nullptr;
const RowDescriptor* _row_descriptor = nullptr;
Expand Down
39 changes: 39 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,10 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_TEXT_VALIDATE_UTF8 = "enable_text_validate_utf8";

public static final String ENABLE_PARQUET_MERGE_SMALL_IO = "enable_parquet_merge_small_io";

public static final String ENABLE_ORC_MERGE_SMALL_IO = "enable_orc_merge_small_io";

/**
* If set false, user couldn't submit analyze SQL and FE won't allocate any related resources.
*/
Expand Down Expand Up @@ -1835,6 +1839,22 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
needForward = true)
public boolean enableOrcFilterByMinMax = true;

@VariableMgr.VarAttr(
name = ENABLE_PARQUET_MERGE_SMALL_IO,
description = {"控制 parquet reader 是否启用小 IO 合并。默认为 true。",
"Controls whether to merge small range io in parquet reader. "
+ "The default value is true."},
needForward = true)
public boolean enableParquetMergeSmallIO = true;

@VariableMgr.VarAttr(
name = ENABLE_ORC_MERGE_SMALL_IO,
description = {"控制 orc reader 是否启用小 IO 合并。默认为 true。",
"Controls whether to merge small range io in orc reader. "
+ "The default value is true."},
needForward = true)
public boolean enableOrcMergeSmallIO = true;

@VariableMgr.VarAttr(
name = EXTERNAL_TABLE_ANALYZE_PART_NUM,
description = {"收集外表统计信息行数时选取的采样分区数,默认-1表示全部分区",
Expand Down Expand Up @@ -2891,6 +2911,22 @@ public void setQueryTimeoutS(int queryTimeoutS) {
this.queryTimeoutS = queryTimeoutS;
}

public boolean isEnableParquetMergeSmallIO() {
return enableParquetMergeSmallIO;
}

public void setEnableParquetMergeSmallIO(boolean enableParquetMergeSmallIO) {
this.enableParquetMergeSmallIO = enableParquetMergeSmallIO;
}

public boolean isEnableOrcMergeSmallIO() {
return enableOrcMergeSmallIO;
}

public void setEnableOrcMergeSmallIO(boolean enableOrcMergeSmallIO) {
this.enableOrcMergeSmallIO = enableOrcMergeSmallIO;
}

// This method will be called by VariableMgr.replayGlobalVariableV2
// We dont want any potential exception is thrown during replay oplog
// so we do not check its validation. Here potential excaption
Expand Down Expand Up @@ -4055,6 +4091,9 @@ public TQueryOptions toThrift() {
tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError);
tResult.setEnableFixedLenToUint32V2(enableFixedLenToUint32V2);

tResult.setEnableParquetMergeSmallIo(enableParquetMergeSmallIO);
tResult.setEnableOrcMergeSmallIo(enableOrcMergeSmallIO);

return tResult;
}

Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ struct TQueryOptions {
145: optional bool enable_inverted_index_query_cache = true;
146: optional bool fuzzy_disable_runtime_filter_in_be = false;

147: optional bool enable_parquet_merge_small_io = true;
148: optional bool enable_orc_merge_small_io = true;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down

0 comments on commit c0ea299

Please sign in to comment.