Skip to content

Commit

Permalink
upadte
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Dec 20, 2024
1 parent db3aff9 commit f3a6432
Show file tree
Hide file tree
Showing 23 changed files with 1,450 additions and 26 deletions.
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -127,6 +128,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -226,6 +228,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -279,6 +282,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,10 @@ DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

DEFINE_mInt32(compaction_num_per_round, "1");

DEFINE_mBool(enable_segments_key_bounds_truncation, "false");
// the max length of segments key bounds, in bytes
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "100");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,9 @@ DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

DECLARE_mInt32(compaction_num_per_round);

DECLARE_mBool(enable_segments_key_bounds_truncation);
DECLARE_mInt32(segments_key_bounds_truncation_threshold);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
17 changes: 14 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,16 @@ Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation&
return Status::OK();
}

bool BaseTablet::key_is_not_in_segment(Slice key, const KeyBoundsPB& segment_key_bounds,
bool is_segments_key_bounds_truncated) {
Slice maybe_truncated_min_key {segment_key_bounds.min_key()};
Slice maybe_truncated_max_key {segment_key_bounds.max_key()};
return Slice::origin_is_strictly_less_than(key, false, maybe_truncated_min_key,
is_segments_key_bounds_truncated) ||
Slice::origin_is_strictly_less_than(maybe_truncated_max_key,
is_segments_key_bounds_truncated, key, false);
}

Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema,
bool with_seq_col,
const std::vector<RowsetSharedPtr>& specified_rowsets,
Expand Down Expand Up @@ -474,13 +484,14 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
delete_bitmap == nullptr ? _tablet_meta->delete_bitmap_ptr() : delete_bitmap;
for (size_t i = 0; i < specified_rowsets.size(); i++) {
const auto& rs = specified_rowsets[i];
const auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds();
std::vector<KeyBoundsPB> segments_key_bounds;
rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds);
int num_segments = cast_set<int>(rs->num_segments());
DCHECK_EQ(segments_key_bounds.size(), num_segments);
std::vector<uint32_t> picked_segments;
for (int j = num_segments - 1; j >= 0; j--) {
if (key_without_seq.compare(segments_key_bounds[j].max_key()) > 0 ||
key_without_seq.compare(segments_key_bounds[j].min_key()) < 0) {
if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j],
rs->rowset_meta()->is_segments_key_bounds_truncated())) {
continue;
}
picked_segments.emplace_back(j);
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ class BaseTablet {
RowsetSharedPtr rowset, const TupleDescriptor* desc,
OlapReaderStatistics& stats, std::string& values,
bool write_to_cache = false);

bool key_is_not_in_segment(Slice key, const KeyBoundsPB& segment_key_bounds,
bool is_segments_key_bounds_truncated);
// Lookup the row location of `encoded_key`, the function sets `row_location` on success.
// NOTE: the method only works in unique key model with primary key index, you will got a
// not supported error in other data model.
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ using namespace ErrorCode;

namespace {

bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) {
bool is_rowset_tidy(std::string& pre_max_key, bool& pre_rs_key_bounds_truncated,
const RowsetSharedPtr& rhs) {
size_t min_tidy_size = config::ordered_data_compaction_min_segment_size;
if (rhs->num_segments() == 0) {
return true;
Expand All @@ -105,11 +106,13 @@ bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) {
if (!ret) {
return false;
}
if (min_key <= pre_max_key) {
bool cur_rs_key_bounds_truncated {rhs->is_segments_key_bounds_truncated()};
if (!Slice::origin_is_strictly_less_than(Slice {pre_max_key}, pre_rs_key_bounds_truncated,
Slice {min_key}, cur_rs_key_bounds_truncated)) {
return false;
}
CHECK(rhs->last_key(&pre_max_key));

pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated;
return true;
}

Expand Down Expand Up @@ -380,8 +383,9 @@ bool CompactionMixin::handle_ordered_data_compaction() {
// files to handle compaction
auto input_size = _input_rowsets.size();
std::string pre_max_key;
bool pre_rs_key_bounds_truncated {false};
for (auto i = 0; i < input_size; ++i) {
if (!is_rowset_tidy(pre_max_key, _input_rowsets[i])) {
if (!is_rowset_tidy(pre_max_key, pre_rs_key_bounds_truncated, _input_rowsets[i])) {
if (i <= input_size / 2) {
return false;
} else {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
return true;
}

bool is_segments_key_bounds_truncated() const {
return _rowset_meta->is_segments_key_bounds_truncated();
}

bool check_rowset_segment();

[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,28 @@ int64_t RowsetMeta::segment_file_size(int seg_id) {
: -1;
}

void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) {
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
*new_key_bounds = key_bounds;
}

bool truncated {config::enable_segments_key_bounds_truncation &&
config::segments_key_bounds_truncation_threshold > 0};
set_segments_key_bounds_truncated(truncated);
if (truncated) {
int32_t threshold = config::segments_key_bounds_truncation_threshold;
for (auto& segment_key_bounds : *_rowset_meta_pb.mutable_segments_key_bounds()) {
if (segment_key_bounds.min_key().size() > threshold) {
segment_key_bounds.mutable_min_key()->resize(threshold);
}
if (segment_key_bounds.max_key().size() > threshold) {
segment_key_bounds.mutable_max_key()->resize(threshold);
}
}
}
}

void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
set_num_segments(num_segments() + other.num_segments());
set_num_rows(num_rows() + other.num_rows());
Expand Down
17 changes: 11 additions & 6 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <vector>

#include "common/config.h"
#include "io/fs/file_system.h"
#include "olap/metadata_adder.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -299,6 +300,15 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {

auto& get_segments_key_bounds() const { return _rowset_meta_pb.segments_key_bounds(); }

bool is_segments_key_bounds_truncated() const {
return _rowset_meta_pb.has_segments_key_bounds_truncated() &&
_rowset_meta_pb.segments_key_bounds_truncated();
}

void set_segments_key_bounds_truncated(bool truncated) {
_rowset_meta_pb.set_segments_key_bounds_truncated(truncated);
}

bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
// for compatibility, old version has not segment key bounds
if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
Expand All @@ -316,12 +326,7 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
return true;
}

void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) {
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
*new_key_bounds = key_bounds;
}
}
void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds);

void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) {
*_rowset_meta_pb.add_segments_key_bounds() = std::move(segments_key_bounds);
Expand Down
19 changes: 19 additions & 0 deletions be/src/util/slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,23 @@ Slice::Slice(const faststring& s)
data((char*)(s.data())),
size(s.size()) {}

bool Slice::origin_is_strictly_less_than(Slice X, bool X_is_truncated, Slice Y,
bool Y_is_truncated) {
// suppose X is a prefix of X', Y is a prefix of Y'
if (!X_is_truncated) {
// (X_is_truncated == false) means X' == X
// we have Y <= Y',
// so X < Y => X < Y',
// so X' = X < Y'
return X.compare(Y) < 0;
}

// let m = min(|X|,|Y|),
// we have Y[1..m] = Y'[1..m] <= Y'
// so X'[1..m] < Y[1..m] => X' < Y'
std::size_t m {std::min(X.get_size(), Y.get_size())};
Slice Y_to_cmp {Y.get_data(), m};
return X.compare(Y_to_cmp) < 0;
}

} // namespace doris
7 changes: 7 additions & 0 deletions be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ struct Slice {
}
return buf;
}

// X is (maybe) a truncated prefix of string X'
// Y is (maybe) a truncated prefix of string Y'
// return true only if we can determine that X' is strictly less than Y'
// based on these maybe truncated prefixes
static bool origin_is_strictly_less_than(Slice X, bool X_is_truncated, Slice Y,
bool Y_is_truncated);
};

inline std::ostream& operator<<(std::ostream& os, const Slice& slice) {
Expand Down
17 changes: 11 additions & 6 deletions be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
return res;
}

bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) {
std::string cur_rs_last_key;
bool BlockReader::_rowsets_not_mono_asc_disjoint(const ReaderParams& read_params) {
std::string pre_rs_last_key;
bool pre_rs_key_bounds_truncated {false};
const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
for (const auto& rs_split : rs_splits) {
if (rs_split.rs_reader->rowset()->num_rows() == 0) {
Expand All @@ -87,13 +88,17 @@ bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) {
if (!has_first_key) {
return true;
}
if (rs_first_key <= cur_rs_last_key) {
bool cur_rs_key_bounds_truncated {
rs_split.rs_reader->rowset()->is_segments_key_bounds_truncated()};
if (!Slice::origin_is_strictly_less_than(Slice {pre_rs_last_key},
pre_rs_key_bounds_truncated, Slice {rs_first_key},
cur_rs_key_bounds_truncated)) {
return true;
}
bool has_last_key = rs_split.rs_reader->rowset()->last_key(&cur_rs_last_key);
bool has_last_key = rs_split.rs_reader->rowset()->last_key(&pre_rs_last_key);
pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated;
CHECK(has_last_key);
}

return false;
}

Expand All @@ -110,7 +115,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
// check if rowsets are noneoverlapping
{
SCOPED_RAW_TIMER(&_stats.block_reader_vcollect_iter_init_timer_ns);
_is_rowsets_overlapping = _rowsets_mono_asc_disjoint(read_params);
_is_rowsets_overlapping = _rowsets_not_mono_asc_disjoint(read_params);
_vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
read_params.read_orderby_key_reverse);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/olap/block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class BlockReader final : public TabletReader {

bool _get_next_row_same();

// return true if keys of rowsets are mono ascending and disjoint
bool _rowsets_mono_asc_disjoint(const ReaderParams& read_params);
// return false if keys of rowsets are mono ascending and disjoint
bool _rowsets_not_mono_asc_disjoint(const ReaderParams& read_params);

VCollectIterator _vcollect_iter;
IteratorRowRef _next_row {{}, -1, false};
Expand Down
1 change: 1 addition & 0 deletions be/test/olap/ordered_data_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class OrderedDataCompactionTest : public ::testing::Test {
ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
config::enable_ordered_data_compaction = true;
config::ordered_data_compaction_min_segment_size = 10;
config::enable_segments_key_bounds_truncation = false;
}
void TearDown() override {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(absolute_dir).ok());
Expand Down
Loading

0 comments on commit f3a6432

Please sign in to comment.