Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Improve pk index compaction score calculation strategy (backport #42803) #42874

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3028,7 +3028,6 @@ Status PersistentIndex::on_commited() {
_dump_snapshot = false;
_flushed = false;
_need_bloom_filter = false;
_calc_write_amp_score();

return Status::OK();
}
Expand Down Expand Up @@ -4305,7 +4304,6 @@ StatusOr<EditVersion> PersistentIndex::_major_compaction_impl(
}
}
RETURN_IF_ERROR(writer->finish());
_write_amp_score.store(0.0);
std::stringstream debug_str;
major_compaction_debug_str(l2_versions, l2_vec, new_l2_version, writer, debug_str);
LOG(INFO) << "PersistentIndex background compact l2 : " << debug_str.str() << " cost: " << watch.elapsed_time();
Expand Down Expand Up @@ -4480,27 +4478,17 @@ Status PersistentIndex::test_flush_varlen_to_immutable_index(const std::string&
return writer.finish();
}

double PersistentIndex::major_compaction_score(size_t l1_count, size_t l2_count) {
double PersistentIndex::major_compaction_score(const PersistentIndexMetaPB& index_meta) {
// return 0.0, so scheduler can skip this index, if l2 less than 2.
const size_t l1_count = index_meta.has_l1_version() ? 1 : 0;
const size_t l2_count = index_meta.l2_versions_size();
if (l2_count <= 1) return 0.0;
double l1_l2_count = (double)(l1_count + l2_count);
// write amplification
// = 1 + 1 + (l1 and l2 file count + config::l0_l1_merge_ratio) / (l1 and l2 file count) / 0.85
return 2.0 + (l1_l2_count + (double)config::l0_l1_merge_ratio) / l1_l2_count / 0.85;
}

void PersistentIndex::_calc_write_amp_score() {
_write_amp_score.store(major_compaction_score(_has_l1 ? 1 : 0, _l2_versions.size()));
}

double PersistentIndex::get_write_amp_score() const {
if (_major_compaction_running.load()) {
return 0.0;
} else {
return _write_amp_score.load();
}
}

Status PersistentIndex::reset(Tablet* tablet, EditVersion version, PersistentIndexMetaPB* index_meta) {
std::unique_lock wrlock(_lock);
_cancel_major_compaction = true;
Expand Down
8 changes: 1 addition & 7 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,7 @@ class PersistentIndex {

Status TEST_major_compaction(PersistentIndexMetaPB& index_meta);

double get_write_amp_score() const;

static double major_compaction_score(size_t l1_count, size_t l2_count);
static double major_compaction_score(const PersistentIndexMetaPB& index_meta);

// not thread safe, just for unit test
size_t kv_num_in_immutable_index() {
Expand Down Expand Up @@ -731,8 +729,6 @@ class PersistentIndex {

bool _enable_minor_compaction();

void _calc_write_amp_score();

size_t _get_tmp_l1_count();

bool _l0_is_full(int64_t l1_l2_size = 0);
Expand Down Expand Up @@ -785,8 +781,6 @@ class PersistentIndex {
std::vector<KeysInfo> _found_keys_info;
// set if major compaction is running
std::atomic<bool> _major_compaction_running{false};
// write amplification score, 0.0 means this index doesn't need major compaction
std::atomic<double> _write_amp_score{0.0};
// Latest major compaction time. In second.
int64_t _latest_compaction_time = 0;
};
Expand Down
8 changes: 0 additions & 8 deletions be/src/storage/primary_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1395,14 +1395,6 @@ std::unique_ptr<PrimaryIndex> TEST_create_primary_index(const vectorized::Schema
return std::make_unique<PrimaryIndex>(pk_schema);
}

double PrimaryIndex::get_write_amp_score() {
if (_persistent_index != nullptr) {
return _persistent_index->get_write_amp_score();
} else {
return 0.0;
}
}

Status PrimaryIndex::major_compaction(Tablet* tablet) {
if (_persistent_index != nullptr) {
return _persistent_index->major_compaction(tablet);
Expand Down
2 changes: 0 additions & 2 deletions be/src/storage/primary_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ class PrimaryIndex {

Status on_commited();

double get_write_amp_score();

Status major_compaction(Tablet* tablet);

Status abort();
Expand Down
34 changes: 8 additions & 26 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,7 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
failure_handler(msg, false);
return;
}
_pk_index_write_amp_score.store(PersistentIndex::major_compaction_score(index_meta));

// if `enable_persistent_index` of tablet is change(maybe changed by alter table)
// we should try to remove the index_entry from cache
Expand Down Expand Up @@ -1794,6 +1795,7 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
_set_error(msg);
return;
}
_pk_index_write_amp_score.store(PersistentIndex::major_compaction_score(index_meta));

// if `enable_persistent_index` of tablet is change(maybe changed by alter table)
// we should try to remove the index_entry from cache
Expand Down Expand Up @@ -3499,31 +3501,6 @@ void TabletUpdates::get_basic_info_extra(TabletBasicInfo& info) {
}
}

static double get_pk_index_write_amp_score_from_meta(Tablet* tablet) {
PersistentIndexMetaPB index_meta;
auto st = TabletMetaManager::get_persistent_index_meta(tablet->data_dir(), tablet->tablet_id(), &index_meta);
if (!st.ok()) {
// skip compaction if get index meta fail
return 0.0;
}
return PersistentIndex::major_compaction_score((index_meta.has_l1_version() ? 1 : 0),
index_meta.l2_versions_size());
}

double TabletUpdates::get_pk_index_write_amp_score() {
double score = 0.0;
auto& index_cache = StorageEngine::instance()->update_manager()->index_cache();
auto index_entry = index_cache.get(_tablet.tablet_id());
if (index_entry != nullptr) {
auto& index = index_entry->value();
score = index.get_write_amp_score();
index_cache.release(index_entry);
} else {
score = get_pk_index_write_amp_score_from_meta(&_tablet);
}
return score;
}

Status TabletUpdates::pk_index_major_compaction() {
auto manager = StorageEngine::instance()->update_manager();
auto index_entry = manager->index_cache().get_or_create(_tablet.tablet_id());
Expand All @@ -3546,7 +3523,12 @@ Status TabletUpdates::pk_index_major_compaction() {
}
});
manager->index_cache().update_object_size(index_entry, index.memory_usage());
return index.major_compaction(&_tablet);
st = index.major_compaction(&_tablet);
if (st.ok()) {
// reset score after major compaction finish
_pk_index_write_amp_score.store(0.0);
}
return st;
}

void TabletUpdates::_to_updates_pb_unlocked(TabletUpdatesPB* updates_pb) const {
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class TabletUpdates {

Status get_rowset_and_segment_idx_by_rssid(uint32_t rssid, RowsetSharedPtr* rowset, uint32_t* segment_idx);

double get_pk_index_write_amp_score();
double get_pk_index_write_amp_score() const { return _pk_index_write_amp_score.load(); }

Status pk_index_major_compaction();

Expand Down Expand Up @@ -471,6 +471,7 @@ class TabletUpdates {

TabletUpdates(const TabletUpdates&) = delete;
const TabletUpdates& operator=(const TabletUpdates&) = delete;
std::atomic<double> _pk_index_write_amp_score{0.0};
};

} // namespace starrocks
44 changes: 44 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,50 @@ TEST_F(TabletUpdatesTest, writeread_with_persistent_index) {
test_writeread(true);
}

TEST_F(TabletUpdatesTest, test_pk_index_write_amp_score) {
srand(GetCurrentTimeMicros());
_tablet = create_tablet(rand(), rand());
_tablet->set_enable_persistent_index(true);
// write
const int N = 8000;
std::vector<int64_t> keys;
std::vector<int64_t> keys2;
std::vector<int64_t> keys3;
std::vector<int64_t> keys4;
for (int i = 0; i < N; i++) {
keys.push_back(i);
keys2.push_back(i + N);
keys3.push_back(i + N * 2);
keys4.push_back(i + N * 3);
}
const int64_t old_l0_max_mem_usage = config::l0_max_mem_usage;
// make sure generate l1
config::l0_max_mem_usage = 10;
auto rs0 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok());
// read
ASSERT_EQ(N, read_tablet(_tablet, 2));
// check score
ASSERT_TRUE(_tablet->updates()->get_pk_index_write_amp_score() == 0);
ASSERT_EQ(2, _tablet->updates()->max_version());
auto rs1 = create_rowset(_tablet, keys2);
ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok());
ASSERT_EQ(3, _tablet->updates()->max_version());
auto rs2 = create_rowset(_tablet, keys3);
ASSERT_TRUE(_tablet->rowset_commit(4, rs2).ok());
ASSERT_EQ(4, _tablet->updates()->max_version());
auto rs3 = create_rowset(_tablet, keys4);
ASSERT_TRUE(_tablet->rowset_commit(5, rs3).ok());
ASSERT_EQ(5, _tablet->updates()->max_version());
// read
ASSERT_EQ(N * 4, read_tablet(_tablet, 5));
// check score
ASSERT_TRUE(_tablet->updates()->get_pk_index_write_amp_score() > 0);
ASSERT_TRUE(_tablet->updates()->pk_index_major_compaction().ok());
ASSERT_TRUE(_tablet->updates()->get_pk_index_write_amp_score() == 0);
config::l0_max_mem_usage = old_l0_max_mem_usage;
}

TEST_F(TabletUpdatesTest, writeread_with_sort_key) {
srand(GetCurrentTimeMicros());
_tablet = create_tablet_with_sort_key(rand(), rand(), {1});
Expand Down
Loading