Skip to content

Commit

Permalink
[Opt](orc)Optimize the merge io when orc reader read multiple tiny st…
Browse files Browse the repository at this point in the history
…ripes. (apache#42004)

### What problem does this PR solve?

When reading orc files, we may encounter a scenario where the stripe
byte size is very small but the number of stripes is very large.

This pr introduces three session variables
`orc_tiny_stripe_threshold_bytes`, `orc_once_max_read_bytes`, and
`orc_max_merge_distance_bytes` to optimize io reading for the above
scenarios.

If a stripe byte size is less than `orc_tiny_stripe_threshold_bytes`, we
will consider it as a tiny stripe. For multiple tiny stripes, we will
perform IO merge reading according to the `orc_once_max_read_bytes` and
`orc_max_merge_distance_bytes` parameters. Among them,
`orc_once_max_read_bytes` indicates the maximum size of the merged IO.
You should not set `orc_once_max_read_bytes` less than
`orc_tiny_stripe_threshold_bytes`, although we will not force an error.
When using tiny stripe reading optimization, since tiny stripes are not
necessarily continuous, when the distance between two tiny stripes is
greater than `orc_max_merge_distance_bytes`, we will not merge them into
one IO.

If you don't want to use this optimization, you can `set
orc_tiny_stripe_threshold_bytes = 0`.


Default parameters:
```mysql
orc_tiny_stripe_threshold_bytes = 8388608 (8M)
orc_once_max_read_bytes = 8388608 (8M)
orc_max_merge_distance_bytes = 1048576 (1M)
```

We also add relevant profiles for this purpose so that parameters can be
adjusted to optimize reading.
`RangeCacheFileReader`:
1. `CacheRefreshCount`: how many IOs are merged
2. `ReadToCacheBytes`: how much data is actually read after merging
3. `ReadToCacheTime`: how long it takes to read data after merging
4. `RequestBytes`: how many bytes does the apache-orc library actually
need to read the orc file
5. `RequestIO`: how many times the apache-orc library calls this read
interface
6. `RequestTime`: how long it takes the apache-orc library to call this
read interface

It should be noted that `RangeCacheFileReader` is a wrapper of the
reader that actually reads data, such as the hdfs reader, so strictly
speaking, `CacheRefreshCount` is not equal to how many IOs are initiated
to hdfs, because each time the hdfs reader is requested, the hdfs reader
may not be able to read all the data at once.

This pr also involves changes to the apache-orc third-party library:
apache/doris-thirdparty#244.
Reference implementation:
https://github.com/trinodb/trino/blob/master/lib/trino-orc/src/main/java/io/trino/orc/OrcDataSourceUtils.java#L36

#### Summary:
```mysql
set orc_tiny_stripe_threshold_bytes = xxx;
set orc_once_max_read_bytes = xxx;
set orc_max_merge_distance_bytes = xxx;

# xxx is the size in bytes
```

### Release note
Introduces three session variables `orc_tiny_stripe_threshold_bytes`,
`orc_once_max_read_bytes`, and `orc_max_merge_distance_bytes` to
optimize io reading of scenarios where the orc stripe byte size is very
small but the number of stripes is very large.


Co-authored-by: kaka11chen <[email protected]>
Co-authored-by: daidai <[email protected]>
  • Loading branch information
3 people committed Nov 19, 2024
1 parent ab7f5ba commit a098012
Show file tree
Hide file tree
Showing 15 changed files with 3,038 additions and 16 deletions.
2 changes: 1 addition & 1 deletion be/src/apache-orc
102 changes: 102 additions & 0 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,5 +871,107 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile,
}
return Status();
}

Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset,
io::PrefetchRange& result_range) {
while (index < _ranges.size()) {
io::PrefetchRange& range = _ranges[index];
if (range.end_offset > desired_offset) {
if (range.start_offset > desired_offset) [[unlikely]] {
return Status::InvalidArgument("Invalid desiredOffset");
}
result_range = range;
return Status::OK();
}
++index;
}
return Status::InvalidArgument("Invalid desiredOffset");
}

RangeCacheFileReader::RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
std::shared_ptr<RangeFinder> range_finder)
: _profile(profile),
_inner_reader(std::move(inner_reader)),
_range_finder(std::move(range_finder)) {
_size = _inner_reader->size();
uint64_t max_cache_size =
std::max((uint64_t)4096, (uint64_t)_range_finder->get_max_range_size());
_cache = OwnedSlice(max_cache_size);

if (_profile != nullptr) {
const char* random_profile = "RangeCacheFileReader";
ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
_request_io =
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1);
_request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
random_profile, 1);
_request_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RequestTime", random_profile, 1);
_read_to_cache_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadToCacheTime", random_profile, 1);
_cache_refresh_count = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "CacheRefreshCount",
TUnit::UNIT, random_profile, 1);
_read_to_cache_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ReadToCacheBytes",
TUnit::BYTES, random_profile, 1);
}
}

Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
auto request_size = result.size;

_cache_statistics.request_io++;
_cache_statistics.request_bytes += request_size;
SCOPED_RAW_TIMER(&_cache_statistics.request_time);

PrefetchRange range;
if (_range_finder->get_range_for(offset, range)) [[likely]] {
if (_current_start_offset != range.start_offset) { // need read new range to cache.
auto range_size = range.end_offset - range.start_offset;

_cache_statistics.cache_refresh_count++;
_cache_statistics.read_to_cache_bytes += range_size;
SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);

Slice cache_slice = {_cache.data(), range_size};
RETURN_IF_ERROR(
_inner_reader->read_at(range.start_offset, cache_slice, bytes_read, io_ctx));

if (*bytes_read != range_size) [[unlikely]] {
return Status::InternalError(
"RangeCacheFileReader use inner reader read bytes {} not eq expect size {}",
*bytes_read, range_size);
}

_current_start_offset = range.start_offset;
}

int64_t buffer_offset = offset - _current_start_offset;
memcpy(result.data, _cache.data() + buffer_offset, request_size);
*bytes_read = request_size;

return Status::OK();
} else {
return Status::InternalError("RangeCacheFileReader read not in Ranges. Offset = {}",
offset);
// RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx));
// return Status::OK();
// think return error is ok,otherwise it will cover up the error.
}
}

void RangeCacheFileReader::_collect_profile_before_close() {
if (_profile != nullptr) {
COUNTER_UPDATE(_request_io, _cache_statistics.request_io);
COUNTER_UPDATE(_request_bytes, _cache_statistics.request_bytes);
COUNTER_UPDATE(_request_time, _cache_statistics.request_time);
COUNTER_UPDATE(_read_to_cache_time, _cache_statistics.read_to_cache_time);
COUNTER_UPDATE(_cache_refresh_count, _cache_statistics.cache_refresh_count);
COUNTER_UPDATE(_read_to_cache_bytes, _cache_statistics.read_to_cache_bytes);
if (_inner_reader != nullptr) {
_inner_reader->collect_profile_before_close();
}
}
}

} // namespace io
} // namespace doris
141 changes: 141 additions & 0 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,147 @@ struct PrefetchRange {
: start_offset(start_offset), end_offset(end_offset) {}

PrefetchRange() : start_offset(0), end_offset(0) {}

bool operator==(const PrefetchRange& other) const {
return (start_offset == other.start_offset) && (end_offset == other.end_offset);
}

bool operator!=(const PrefetchRange& other) const { return !(*this == other); }

PrefetchRange span(const PrefetchRange& other) const {
return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)};
}
PrefetchRange seq_span(const PrefetchRange& other) const {
return {start_offset, other.end_offset};
}

//Ranges needs to be sorted.
static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
int64_t once_max_read_bytes) {
if (seq_ranges.empty()) {
return {};
}
// Merge overlapping ranges
std::vector<PrefetchRange> result;
PrefetchRange last = seq_ranges.front();
for (size_t i = 1; i < seq_ranges.size(); ++i) {
PrefetchRange current = seq_ranges[i];
PrefetchRange merged = last.seq_span(current);
if (merged.end_offset <= once_max_read_bytes + merged.start_offset &&
last.end_offset + max_merge_distance_bytes >= current.start_offset) {
last = merged;
} else {
result.push_back(last);
last = current;
}
}
result.push_back(last);
return result;
}
};

class RangeFinder {
public:
virtual ~RangeFinder() = default;
virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0;
virtual size_t get_max_range_size() const = 0;
};

class LinearProbeRangeFinder : public RangeFinder {
public:
LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {}

Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override;

size_t get_max_range_size() const override {
size_t max_range_size = 0;
for (const auto& range : _ranges) {
max_range_size = std::max(max_range_size, range.end_offset - range.start_offset);
}
return max_range_size;
}

~LinearProbeRangeFinder() override = default;

private:
std::vector<io::PrefetchRange> _ranges;
size_t index {0};
};

/**
* The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario.
* For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs,
* I first merge the access to the orc files to be read (of course there is a problem of read amplification,
* but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time),
* and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder.
*/
class RangeCacheFileReader : public io::FileReader {
struct RangeCacheReaderStatistics {
int64_t request_io = 0;
int64_t request_bytes = 0;
int64_t request_time = 0;
int64_t read_to_cache_time = 0;
int64_t cache_refresh_count = 0;
int64_t read_to_cache_bytes = 0;
};

public:
RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
std::shared_ptr<RangeFinder> range_finder);

~RangeCacheFileReader() override = default;

Status close() override {
if (!_closed) {
_closed = true;
}
return Status::OK();
}

const io::Path& path() const override { return _inner_reader->path(); }

size_t size() const override { return _size; }

bool closed() const override { return _closed; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;

void _collect_profile_before_close() override;

private:
RuntimeProfile* _profile = nullptr;
io::FileReaderSPtr _inner_reader;
std::shared_ptr<RangeFinder> _range_finder;

OwnedSlice _cache;
int64_t _current_start_offset = -1;

size_t _size;
bool _closed = false;

RuntimeProfile::Counter* _request_io = nullptr;
RuntimeProfile::Counter* _request_bytes = nullptr;
RuntimeProfile::Counter* _request_time = nullptr;
RuntimeProfile::Counter* _read_to_cache_time = nullptr;
RuntimeProfile::Counter* _cache_refresh_count = nullptr;
RuntimeProfile::Counter* _read_to_cache_bytes = nullptr;
RangeCacheReaderStatistics _cache_statistics;
/**
* `RangeCacheFileReader`:
* 1. `CacheRefreshCount`: how many IOs are merged
* 2. `ReadToCacheBytes`: how much data is actually read after merging
* 3. `ReadToCacheTime`: how long it takes to read data after merging
* 4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file
* 5. `RequestIO`: how many times the apache-orc library calls this read interface
* 6. `RequestTime`: how long it takes the apache-orc library to call this read interface
*
* It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as
* the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs,
* because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once.
*/
};

/**
Expand Down
80 changes: 67 additions & 13 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,28 +857,79 @@ Status OrcReader::set_fill_columns(
if (_colname_to_value_range == nullptr || !_init_search_argument(_colname_to_value_range)) {
_lazy_read_ctx.can_lazy_read = false;
}
try {
_row_reader_options.range(_range_start_offset, _range_size);
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
_row_reader_options.include(_read_cols);
_row_reader_options.setEnableLazyDecoding(true);

if (!_lazy_read_ctx.can_lazy_read) {
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
uint64_t number_of_stripes = _reader->getNumberOfStripes();
auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options);

int64_t range_end_offset = _range_start_offset + _range_size;

// If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes merge io optimization will not be used.
int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L;
int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L;

if (_state != nullptr) {
orc_tiny_stripe_threshold_bytes =
_state->query_options().orc_tiny_stripe_threshold_bytes;
orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
}
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);

bool all_tiny_stripes = true;
std::vector<io::PrefetchRange> tiny_stripe_ranges;

for (uint64_t i = 0; i < number_of_stripes; i++) {
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
uint64_t strip_start_offset = strip_info->getOffset();
uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();

if (strip_start_offset >= range_end_offset || strip_end_offset < _range_start_offset ||
!all_stripes_needed[i]) {
continue;
}
if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) {
all_tiny_stripes = false;
break;
}

tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
}
}
if (all_tiny_stripes && number_of_stripes > 0) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
orc_max_merge_distance_bytes,
orc_once_max_read_bytes);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));

_fill_all_columns = true;
auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
orc_input_stream_ptr->set_all_tiny_stripes();
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
range_finder);
}

// create orc row reader
try {
_row_reader_options.range(_range_start_offset, _range_size);
_row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
_row_reader_options.include(_read_cols);
if (!_lazy_read_ctx.can_lazy_read) {
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
}
for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
}
}

_fill_all_columns = true;
// create orc row reader
if (_lazy_read_ctx.can_lazy_read) {
_row_reader_options.filter(_lazy_read_ctx.predicate_orc_columns);
_orc_filter = std::unique_ptr<ORCFilterImpl>(new ORCFilterImpl(this));
}
_row_reader_options.setEnableLazyDecoding(true);
if (!_lazy_read_ctx.conjuncts.empty()) {
_string_dict_filter = std::make_unique<StringDictFilterImpl>(this);
}
Expand Down Expand Up @@ -2416,6 +2467,9 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column(
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
std::vector<bool> selected_columns) {
if (_is_all_tiny_stripes) {
return;
}
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
Expand Down
Loading

0 comments on commit a098012

Please sign in to comment.