Skip to content

Commit

Permalink
[opt](inverted index) add performance profiling for remote io access …
Browse files Browse the repository at this point in the history
…in inverted index (#43542)

Problem Summary:

1. Implement the accounting of remote I/O operations for the inverted
index in profiling statistics.

### Release note

Add performance profiling for remote IO access in inverted index
  • Loading branch information
zzzxl1993 authored Nov 17, 2024
1 parent 3ed9984 commit 74238ae
Show file tree
Hide file tree
Showing 28 changed files with 373 additions and 125 deletions.
2 changes: 1 addition & 1 deletion be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void search(lucene::store::Directory* dir, std::string& field, std::string& toke
std::vector<std::string> terms = split(token, '|');

doris::TQueryOptions queryOptions;
ConjunctionQuery conjunct_query(s, queryOptions);
ConjunctionQuery conjunct_query(s, queryOptions, nullptr);
conjunct_query.add(field_ws, terms);
conjunct_query.search(result);

Expand Down
9 changes: 3 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,9 @@ Status Compaction::do_inverted_index_compaction() {
fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(seg_id));
bool open_idx_file_cache = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache),
"inverted_index_file_reader init failed");
inverted_index_file_reader->init(config::inverted_index_read_buffer_size),
"inverted_index_file_reader init faiqled");
inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader);
}

Expand Down Expand Up @@ -785,9 +783,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(i));
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(
config::inverted_index_read_buffer_size, open_idx_file_cache);
config::inverted_index_read_buffer_size);
index_file_path = inverted_index_file_reader->get_index_file_path(index_meta);
DBUG_EXECUTE_IF(
"Compaction::construct_skip_inverted_index_index_file_reader_init_"
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ Status ColumnReader::new_inverted_index_iterator(
{
std::shared_lock<std::shared_mutex> rlock(_load_index_lock);
if (_inverted_index) {
RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.stats,
RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.io_ctx, read_options.stats,
read_options.runtime_state, iterator));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
namespace doris::segment_v2 {

ConjunctionQuery::ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_io_ctx(io_ctx),
_index_version(_searcher->getReader()->getIndexVersion()),
_conjunction_ratio(query_options.inverted_index_conjunction_opt_threshold) {}

Expand All @@ -48,7 +49,7 @@ void ConjunctionQuery::add(const std::wstring& field_name, const std::vector<std
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
iterators.emplace_back(term_doc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris::segment_v2 {
class ConjunctionQuery : public Query {
public:
ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~ConjunctionQuery() override;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand All @@ -41,6 +41,7 @@ class ConjunctionQuery : public Query {

public:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

IndexVersion _index_version = IndexVersion::kV0;
int32_t _conjunction_ratio = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
namespace doris::segment_v2 {

DisjunctionQuery::DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
: _searcher(searcher) {}
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher), _io_ctx(io_ctx) {}

void DisjunctionQuery::add(const std::wstring& field_name, const std::vector<std::string>& terms) {
if (terms.empty()) {
Expand All @@ -36,7 +36,7 @@ void DisjunctionQuery::search(roaring::Roaring& roaring) {
auto func = [this, &roaring](const std::string& term, bool first) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
auto* t = _CLNEW Term(_field_name.c_str(), ws_term.c_str());
auto* term_doc = _searcher->getReader()->termDocs(t);
auto* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
TermIterator iterator(term_doc);

DocRange doc_range;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ namespace doris::segment_v2 {
class DisjunctionQuery : public Query {
public:
DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~DisjunctionQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
void search(roaring::Roaring& roaring) override;

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

std::wstring _field_name;
std::vector<std::string> _terms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace doris::segment_v2 {

PhraseEdgeQuery::PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhraseEdgeQuery : public Query {
public:
PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhraseEdgeQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
namespace doris::segment_v2 {

PhrasePrefixQuery::PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options,
const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhrasePrefixQuery : public Query {
public:
PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhrasePrefixQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ bool OrderedSloppyPhraseMatcher::stretch_to_order(PostingsAndPosition* prev_post
}

PhraseQuery::PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
: _searcher(searcher) {}
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher), _io_ctx(io_ctx) {}

PhraseQuery::~PhraseQuery() {
for (auto& term_doc : _term_docs) {
Expand Down Expand Up @@ -173,7 +173,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(terms[0]);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
_lead1 = TermIterator(term_doc);
return;
Expand All @@ -185,7 +185,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermPositions* term_pos = _searcher->getReader()->termPositions(t);
TermPositions* term_pos = _searcher->getReader()->termPositions(t, _io_ctx);
_term_docs.push_back(term_pos);
if (is_save_iter) {
iterators.emplace_back(term_pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ using Matcher = std::variant<ExactPhraseMatcher, OrderedSloppyPhraseMatcher>;
class PhraseQuery : public Query {
public:
PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhraseQuery() override;

void add(const InvertedIndexQueryInfo& query_info) override;
Expand All @@ -112,6 +112,7 @@ class PhraseQuery : public Query {

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

TermIterator _lead1;
TermIterator _lead2;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/inverted_index/query/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <memory>

#include "common/status.h"
#include "io/io_common.h"
#include "roaring/roaring.hh"

CL_NS_USE(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
namespace doris::segment_v2 {

RegexpQuery::RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_max_expansions(query_options.inverted_index_max_expansions),
_query(searcher, query_options) {}
_query(searcher, query_options, io_ctx) {}

void RegexpQuery::add(const std::wstring& field_name, const std::vector<std::string>& patterns) {
if (patterns.size() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace doris::segment_v2 {
class RegexpQuery : public Query {
public:
RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~RegexpQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& patterns) override;
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
CL_NS(store)::IndexInput* base;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;
bool _is_index_file = false; // Indicates if the file is a TII file

protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
Expand All @@ -75,6 +77,8 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
const char* getDirectoryType() const override { return DorisCompoundReader::getClassName(); }
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
void setIndexFile(bool isIndexFile) override;
};

CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
Expand All @@ -92,9 +96,12 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
base->setIoContext(_io_ctx);
base->setIndexFile(_is_index_file);
base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
base->setIoContext(nullptr);
}

CSIndexInput::~CSIndexInput() = default;
Expand All @@ -111,6 +118,14 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone

void CSIndexInput::close() {}

void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

void CSIndexInput::setIndexFile(bool isIndexFile) {
_is_index_file = isIndexFile;
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
Expand Down
5 changes: 1 addition & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@

namespace doris::segment_v2 {

Status InvertedIndexFileReader::init(int32_t read_buffer_size, bool open_idx_file_cache) {
Status InvertedIndexFileReader::init(int32_t read_buffer_size) {
if (!_inited) {
_read_buffer_size = read_buffer_size;
_open_idx_file_cache = open_idx_file_cache;
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
auto st = _init_from_v2(read_buffer_size);
if (!st.ok()) {
Expand Down Expand Up @@ -76,7 +75,6 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) {
"CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path,
err.what());
}
index_input->setIdxFileCache(_open_idx_file_cache);
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);

// 3. read file
Expand Down Expand Up @@ -198,7 +196,6 @@ Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open(
}

// 3. read file in DorisCompoundReader
index_input->setIdxFileCache(_open_idx_file_cache);
compound_reader = std::make_unique<DorisCompoundReader>(index_input, _read_buffer_size);
} catch (CLuceneError& err) {
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ class InvertedIndexFileReader {
_storage_format(storage_format),
_idx_file_info(idx_file_info) {}

Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size,
bool open_idx_file_cache = false);
Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size);
Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta) const;
void debug_file_entries();
std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
Expand All @@ -80,7 +79,6 @@ class InvertedIndexFileReader {
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
int32_t _read_buffer_size = -1;
bool _open_idx_file_cache = false;
InvertedIndexStorageFormatPB _storage_format;
mutable std::shared_mutex _mutex; // Use mutable for const read operations
bool _inited = false;
Expand Down
37 changes: 36 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,27 @@ void DorisFSDirectory::FSIndexInput::close() {
}*/
}

void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
if (io_ctx) {
const auto& ctx = static_cast<const io::IOContext*>(io_ctx);
_io_ctx.reader_type = ctx->reader_type;
_io_ctx.query_id = ctx->query_id;
_io_ctx.file_cache_stats = ctx->file_cache_stats;
} else {
_io_ctx.reader_type = ReaderType::UNKNOWN;
_io_ctx.query_id = nullptr;
_io_ctx.file_cache_stats = nullptr;
}
}

const void* DorisFSDirectory::FSIndexInput::getIoContext() {
return &_io_ctx;
}

void DorisFSDirectory::FSIndexInput::setIndexFile(bool isIndexFile) {
_io_ctx.is_index_data = isIndexFile;
}

void DorisFSDirectory::FSIndexInput::seekInternal(const int64_t position) {
CND_PRECONDITION(position >= 0 && position < _handle->_length, "Seeking out of range");
_pos = position;
Expand All @@ -239,9 +260,23 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len)
_handle->_fpos = _pos;
}

DBUG_EXECUTE_IF(
"DorisFSDirectory::FSIndexInput::readInternal", ({
static thread_local std::unordered_map<const TUniqueId*, io::FileCacheStatistics*>
thread_file_cache_map;
auto it = thread_file_cache_map.find(_io_ctx.query_id);
if (it != thread_file_cache_map.end()) {
if (_io_ctx.file_cache_stats != it->second) {
_CLTHROWA(CL_ERR_IO, "File cache statistics mismatch");
}
} else {
thread_file_cache_map[_io_ctx.query_id] = _io_ctx.file_cache_stats;
}
}));

Slice result {b, (size_t)len};
size_t bytes_read = 0;
auto st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
Status st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error", {
st = Status::InternalError(
"debug point: DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error");
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput
: BufferedIndexInput(buffer_size) {
this->_pos = 0;
this->_handle = std::move(handle);
this->_io_ctx.reader_type = ReaderType::READER_QUERY;
this->_io_ctx.is_index_data = false;
}

protected:
Expand All @@ -199,8 +197,9 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput
const char* getDirectoryType() const override { return DorisFSDirectory::getClassName(); }
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "FSIndexInput"; }

void setIdxFileCache(bool index) override { _io_ctx.is_index_data = index; }
void setIoContext(const void* io_ctx) override;
const void* getIoContext() override;
void setIndexFile(bool isIndexFile) override;

std::mutex _this_lock;

Expand Down
Loading

0 comments on commit 74238ae

Please sign in to comment.