Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into sync_unique_case
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring authored Jul 16, 2024
2 parents 0d0a532 + 253f929 commit 34eee13
Show file tree
Hide file tree
Showing 28 changed files with 1,616 additions and 603 deletions.
5 changes: 1 addition & 4 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,7 @@ inline std::string Status::to_string() const {
}

inline std::string Status::to_string_no_stack() const {
std::stringstream ss;
ss << '[' << code_as_string() << ']';
ss << msg();
return ss.str();
return fmt::format("[{}]{}", code_as_string(), msg());
}

// some generally useful macros
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/bitmap_filter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "exprs/runtime_filter.h"
#include "olap/column_predicate.h"
#include "olap/wrapper_field.h"
#include "util/bitmap_value.h"
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
Expand Down Expand Up @@ -68,7 +67,6 @@ class BitmapFilterColumnPredicate : public ColumnPredicate {
CppType min_value = statistic.first->is_null() /* contains null values */
? 0
: get_zone_map_value<T, CppType>(statistic.first->cell_ptr());
;
return _specific_filter->contains_any(min_value, max_value);
}

Expand Down
44 changes: 39 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,12 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// format: rowsetId_segmentId
std::vector<std::unique_ptr<InvertedIndexFileWriter>> inverted_index_file_writers(
dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i) + ".dat";

// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(seg_id) + ".dat";
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs, tablet_path, prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
Expand All @@ -571,19 +575,34 @@ Status Compaction::do_compaction_impl(int64_t permits) {
if (st.ok()) {
auto index_not_need_to_compact =
DORIS_TRY(inverted_index_file_reader->get_all_directories());
// V1: each index is a separate file
// V2: all indexes are in a single file
if (_cur_tablet_schema->get_inverted_index_storage_format() !=
doris::InvertedIndexStorageFormatPB::V1) {
int64_t fsize = 0;
st = fs->file_size(InvertedIndexDescriptor::get_index_file_name(prefix),
&fsize);
if (!st.ok()) {
LOG(ERROR) << "file size error in index compaction, error:" << st.msg();
return st;
}
compacted_idx_file_size[seg_id] = fsize;
}
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
fs, tablet_path, prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_writer->initialize(index_not_need_to_compact),
"failed to initialize inverted_index_file_writer for " +
inverted_index_file_writer->get_index_file_name());
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
} else if (st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
fs, tablet_path, prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
// no index file
compacted_idx_file_size[seg_id] = 0;
} else {
LOG(ERROR) << "init inverted index "
<< InvertedIndexDescriptor::get_index_file_name(prefix)
Expand Down Expand Up @@ -669,22 +688,37 @@ Status Compaction::do_compaction_impl(int64_t permits) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
}
}
for (auto& inverted_index_file_writer : inverted_index_file_writers) {
uint64_t inverted_index_file_size = 0;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get();
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
inverted_index_file_size += inverted_index_file_writer->get_index_file_size();
inverted_index_file_size -= compacted_idx_file_size[seg_id];
}
}
// check index compaction status. If status is not ok, we should return error and end this compaction round.
if (!status.ok()) {
return status;
}

// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
inverted_index_file_size);

COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());
LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->tablet_id()
<< ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", inverted index file size=" << inverted_index_file_size
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";
} else {
LOG(INFO) << "skip doing index compaction due to no output segments"
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/vertical_beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ Status VerticalBetaRowsetWriter::_flush_columns(
_segment_num_rows.resize(_cur_writer_idx + 1);
_segment_num_rows[_cur_writer_idx] = _segment_writers[_cur_writer_idx]->row_count();
}
_total_index_size +=
static_cast<int64_t>(index_size) + (*segment_writer)->get_inverted_index_file_size();
_total_index_size += static_cast<int64_t>(index_size);
return Status::OK();
}

Expand Down Expand Up @@ -216,6 +215,7 @@ Status VerticalBetaRowsetWriter::final_flush() {
return st;
}
_total_data_size += segment_size + segment_writer->get_inverted_index_file_size();
_total_index_size += segment_writer->get_inverted_index_file_size();
segment_writer.reset();
}
return Status::OK();
Expand Down
Loading

0 comments on commit 34eee13

Please sign in to comment.