Skip to content

Commit

Permalink
[Enhancement] improve persistent index amp score generate
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <[email protected]>
  • Loading branch information
luohaha committed Mar 20, 2024
1 parent 936d309 commit 1081317
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 59 deletions.
18 changes: 3 additions & 15 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3447,7 +3447,6 @@ Status PersistentIndex::on_commited() {
_dump_snapshot = false;
_flushed = false;
_need_bloom_filter = false;
_calc_write_amp_score();

return Status::OK();
}
Expand Down Expand Up @@ -4712,7 +4711,6 @@ StatusOr<EditVersion> PersistentIndex::_major_compaction_impl(
}
}
RETURN_IF_ERROR(writer->finish());
_write_amp_score.store(0.0);
std::stringstream debug_str;
major_compaction_debug_str(l2_versions, l2_vec, new_l2_version, writer, debug_str);
LOG(INFO) << "PersistentIndex background compact l2 : " << debug_str.str() << " cost: " << watch.elapsed_time();
Expand Down Expand Up @@ -4884,27 +4882,17 @@ Status PersistentIndex::test_flush_varlen_to_immutable_index(const std::string&
return writer.finish();
}

double PersistentIndex::major_compaction_score(size_t l1_count, size_t l2_count) {
double PersistentIndex::major_compaction_score(const PersistentIndexMetaPB& index_meta) {
// return 0.0, so scheduler can skip this index, if l2 less than 2.
const size_t l1_count = index_meta.has_l1_version() ? 1 : 0;
const size_t l2_count = index_meta.l2_versions_size();
if (l2_count <= 1) return 0.0;
double l1_l2_count = (double)(l1_count + l2_count);
// write amplification
// = 1 + 1 + (l1 and l2 file count + config::l0_l1_merge_ratio) / (l1 and l2 file count) / 0.85
return 2.0 + (l1_l2_count + (double)config::l0_l1_merge_ratio) / l1_l2_count / 0.85;
}

void PersistentIndex::_calc_write_amp_score() {
_write_amp_score.store(major_compaction_score(_has_l1 ? 1 : 0, _l2_versions.size()));
}

double PersistentIndex::get_write_amp_score() const {
if (_major_compaction_running.load()) {
return 0.0;
} else {
return _write_amp_score.load();
}
}

Status PersistentIndex::reset(Tablet* tablet, EditVersion version, PersistentIndexMetaPB* index_meta) {
std::unique_lock wrlock(_lock);
_cancel_major_compaction = true;
Expand Down
8 changes: 1 addition & 7 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -763,9 +763,7 @@ class PersistentIndex {

Status TEST_major_compaction(PersistentIndexMetaPB& index_meta);

double get_write_amp_score() const;

static double major_compaction_score(size_t l1_count, size_t l2_count);
static double major_compaction_score(const PersistentIndexMetaPB& index_meta);

// not thread safe, just for unit test
size_t kv_num_in_immutable_index() {
Expand Down Expand Up @@ -853,8 +851,6 @@ class PersistentIndex {

bool _enable_minor_compaction();

void _calc_write_amp_score();

size_t _get_tmp_l1_count();

bool _l0_is_full(int64_t l1_l2_size = 0);
Expand Down Expand Up @@ -909,8 +905,6 @@ class PersistentIndex {
// std::vector<std::unique_ptr<BloomFilter>> _bf_vec;
// set if major compaction is running
std::atomic<bool> _major_compaction_running{false};
// write amplification score, 0.0 means this index doesn't need major compaction
std::atomic<double> _write_amp_score{0.0};
// Latest major compaction time. In second.
int64_t _latest_compaction_time = 0;
};
Expand Down
8 changes: 0 additions & 8 deletions be/src/storage/primary_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1436,14 +1436,6 @@ std::unique_ptr<PrimaryIndex> TEST_create_primary_index(const Schema& pk_schema)
return std::make_unique<PrimaryIndex>(pk_schema);
}

double PrimaryIndex::get_write_amp_score() {
if (_persistent_index != nullptr) {
return _persistent_index->get_write_amp_score();
} else {
return 0.0;
}
}

Status PrimaryIndex::major_compaction(DataDir* data_dir, int64_t tablet_id, std::timed_mutex* mutex) {
if (_persistent_index != nullptr) {
return _persistent_index->major_compaction(data_dir, tablet_id, mutex);
Expand Down
2 changes: 0 additions & 2 deletions be/src/storage/primary_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ class PrimaryIndex {

Status on_commited();

double get_write_amp_score();

Status major_compaction(DataDir* data_dir, int64_t tablet_id, std::timed_mutex* mutex);

Status abort();
Expand Down
35 changes: 9 additions & 26 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v
failure_handler("primary index on_commit failed", st);
return;
}
_pk_index_write_amp_score.store(PersistentIndex::major_compaction_score(index_meta));

_update_total_stats(version_info.rowsets, nullptr, nullptr);
}
Expand Down Expand Up @@ -1587,6 +1588,7 @@ void TabletUpdates::_apply_normal_rowset_commit(const EditVersionInfo& version_i
failure_handler(msg, false);
return;
}
_pk_index_write_amp_score.store(PersistentIndex::major_compaction_score(index_meta));

// if `enable_persistent_index` of tablet is change(maybe changed by alter table)
// we should try to remove the index_entry from cache
Expand Down Expand Up @@ -2203,6 +2205,7 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
failure_handler(msg);
return;
}
_pk_index_write_amp_score.store(PersistentIndex::major_compaction_score(index_meta));

{
// Update the stats of affected rowsets.
Expand Down Expand Up @@ -4355,31 +4358,6 @@ void TabletUpdates::get_basic_info_extra(TabletBasicInfo& info) {
}
}

static double get_pk_index_write_amp_score_from_meta(Tablet* tablet) {
PersistentIndexMetaPB index_meta;
auto st = TabletMetaManager::get_persistent_index_meta(tablet->data_dir(), tablet->tablet_id(), &index_meta);
if (!st.ok()) {
// skip compaction if get index meta fail
return 0.0;
}
return PersistentIndex::major_compaction_score((index_meta.has_l1_version() ? 1 : 0),
index_meta.l2_versions_size());
}

double TabletUpdates::get_pk_index_write_amp_score() {
double score = 0.0;
auto& index_cache = StorageEngine::instance()->update_manager()->index_cache();
auto index_entry = index_cache.get(_tablet.tablet_id());
if (index_entry != nullptr) {
auto& index = index_entry->value();
score = index.get_write_amp_score();
index_cache.release(index_entry);
} else {
score = get_pk_index_write_amp_score_from_meta(&_tablet);
}
return score;
}

Status TabletUpdates::pk_index_major_compaction() {
auto manager = StorageEngine::instance()->update_manager();
auto index_entry = manager->index_cache().get_or_create(_tablet.tablet_id());
Expand All @@ -4402,7 +4380,12 @@ Status TabletUpdates::pk_index_major_compaction() {
}
});
manager->index_cache().update_object_size(index_entry, index.memory_usage());
return index.major_compaction(_tablet.data_dir(), _tablet.tablet_id(), _tablet.updates()->get_index_lock());
st = index.major_compaction(_tablet.data_dir(), _tablet.tablet_id(), _tablet.updates()->get_index_lock());
if (st.ok()) {
// reset score after major compaction finish
_pk_index_write_amp_score.store(0.0);
}
return st;
}

void TabletUpdates::_to_updates_pb_unlocked(TabletUpdatesPB* updates_pb) const {
Expand Down
4 changes: 3 additions & 1 deletion be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class TabletUpdates {

Status get_rowset_and_segment_idx_by_rssid(uint32_t rssid, RowsetSharedPtr* rowset, uint32_t* segment_idx);

double get_pk_index_write_amp_score();
double get_pk_index_write_amp_score() const { return _pk_index_write_amp_score.load(); }

Status pk_index_major_compaction();

Expand Down Expand Up @@ -535,6 +535,8 @@ class TabletUpdates {
// the whole BE, and more more operation on this tablet is allowed
std::atomic<bool> _error{false};
std::string _error_msg;

std::atomic<double> _pk_index_write_amp_score{0.0};
};

} // namespace starrocks
42 changes: 42 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,48 @@ TEST_F(TabletUpdatesTest, writeread_with_persistent_index) {
test_writeread(true);
}

TEST_F(TabletUpdatesTest, test_pk_index_write_amp_score) {
srand(GetCurrentTimeMicros());
_tablet = create_tablet(rand(), rand());
_tablet->set_enable_persistent_index(true);
// write
const int N = 8000;
std::vector<int64_t> keys;
std::vector<int64_t> keys2;
std::vector<int64_t> keys3;
std::vector<int64_t> keys4;
for (int i = 0; i < N; i++) {
keys.push_back(i);
keys2.push_back(i + N);
keys3.push_back(i + N * 2);
keys4.push_back(i + N * 3);
}
const int64_t old_l0_max_mem_usage = config::l0_max_mem_usage;
// make sure generate l1
config::l0_max_mem_usage = 10;
auto rs0 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok());
// read
ASSERT_EQ(N, read_tablet(_tablet, 2));
// check score
ASSERT_TRUE(_tablet->updates()->get_pk_index_write_amp_score() == 0);
ASSERT_EQ(2, _tablet->updates()->max_version());
auto rs1 = create_rowset(_tablet, keys2);
ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok());
ASSERT_EQ(3, _tablet->updates()->max_version());
auto rs2 = create_rowset(_tablet, keys3);
ASSERT_TRUE(_tablet->rowset_commit(4, rs2).ok());
ASSERT_EQ(4, _tablet->updates()->max_version());
auto rs3 = create_rowset(_tablet, keys4);
ASSERT_TRUE(_tablet->rowset_commit(5, rs3).ok());
ASSERT_EQ(5, _tablet->updates()->max_version());
// read
ASSERT_EQ(N * 4, read_tablet(_tablet, 5));
// check score
ASSERT_TRUE(_tablet->updates()->get_pk_index_write_amp_score() > 0);
config::l0_max_mem_usage = old_l0_max_mem_usage;
}

TEST_F(TabletUpdatesTest, writeread_with_sort_key) {
srand(GetCurrentTimeMicros());
_tablet = create_tablet_with_sort_key(rand(), rand(), {1});
Expand Down

0 comments on commit 1081317

Please sign in to comment.