Skip to content

Commit

Permalink
[BugFix] fix concurrent issue between primary index unload and compac…
Browse files Browse the repository at this point in the history
…tion (#49341)

Signed-off-by: luohaha <[email protected]>
(cherry picked from commit 1c8df3a)

# Conflicts:
#	be/src/storage/lake/lake_primary_index.cpp
#	be/src/storage/persistent_index.h
  • Loading branch information
luohaha authored and mergify[bot] committed Aug 6, 2024
1 parent 983b5ac commit b5fc20c
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 18 deletions.
15 changes: 12 additions & 3 deletions be/src/storage/lake/lake_primary_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ Status LakePrimaryIndex::_do_lake_load(TabletManager* tablet_mgr, const TabletMe
_set_schema(pkey_schema);

// load persistent index if enable persistent index meta
size_t fix_size = PrimaryKeyEncoder::get_encoded_fixed_size(pkey_schema);

if (metadata->enable_persistent_index() && (fix_size <= 128)) {
if (metadata->enable_persistent_index()) {
DCHECK(_persistent_index == nullptr);

switch (metadata->persistent_index_type()) {
Expand All @@ -103,11 +102,21 @@ Status LakePrimaryIndex::_do_lake_load(TabletManager* tablet_mgr, const TabletMe
RETURN_IF_ERROR(StorageEngine::instance()
->get_persistent_index_store(metadata->id())
->create_dir_if_path_not_exists(path));
_persistent_index = std::make_unique<LakeLocalPersistentIndex>(path);
_persistent_index = std::make_shared<LakeLocalPersistentIndex>(path);
set_enable_persistent_index(true);
return dynamic_cast<LakeLocalPersistentIndex*>(_persistent_index.get())
->load_from_lake_tablet(tablet_mgr, metadata, base_version, builder);
}
<<<<<<< HEAD
=======
case PersistentIndexTypePB::CLOUD_NATIVE: {
_persistent_index = std::make_shared<LakePersistentIndex>(tablet_mgr, metadata->id());
set_enable_persistent_index(true);
auto* lake_persistent_index = dynamic_cast<LakePersistentIndex*>(_persistent_index.get());
RETURN_IF_ERROR(lake_persistent_index->init(metadata->sstable_meta()));
return lake_persistent_index->load_from_lake_tablet(tablet_mgr, metadata, base_version, builder);
}
>>>>>>> 1c8df3aa98 ([BugFix] fix concurrent issue between primary index unload and compaction (#49341))
default:
LOG(WARNING) << "only support LOCAL lake_persistent_index_type for now";
return Status::InternalError("only support LOCAL lake_persistent_index_type for now");
Expand Down
14 changes: 10 additions & 4 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4878,8 +4878,8 @@ StatusOr<EditVersion> PersistentIndex::_major_compaction_impl(
return new_l2_version;
}

void PersistentIndex::modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta) {
Status PersistentIndex::modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta) {
// delete input l2 versions, and add output l2 version
std::vector<EditVersion> new_l2_versions;
std::vector<bool> new_l2_version_merged;
Expand All @@ -4899,6 +4899,11 @@ void PersistentIndex::modify_l2_versions(const std::vector<EditVersion>& input_l
new_l2_version_merged.push_back(index_meta.l2_version_merged(i));
}
}
// Check all input l2 has been removed. If not, that means index has been rebuilt.
if (new_l2_versions.size() + input_l2_versions.size() != index_meta.l2_versions_size() + 1) {
return Status::Aborted(fmt::format("PersistentIndex has been rebuilt, abort this compaction task. meta : {}",
index_meta.ShortDebugString()));
}
// rebuild l2 versions in meta
index_meta.clear_l2_versions();
index_meta.clear_l2_version_merged();
Expand All @@ -4908,6 +4913,7 @@ void PersistentIndex::modify_l2_versions(const std::vector<EditVersion>& input_l
for (const bool merge : new_l2_version_merged) {
index_meta.add_l2_version_merged(merge);
}
return Status::OK();
}

Status PersistentIndex::TEST_major_compaction(PersistentIndexMetaPB& index_meta) {
Expand All @@ -4929,7 +4935,7 @@ Status PersistentIndex::TEST_major_compaction(PersistentIndexMetaPB& index_meta)
}
// 2. merge l2 files to new l2 file
ASSIGN_OR_RETURN(EditVersion new_l2_version, _major_compaction_impl(l2_versions, l2_vec));
modify_l2_versions(l2_versions, new_l2_version, index_meta);
RETURN_IF_ERROR(modify_l2_versions(l2_versions, new_l2_version, index_meta));
// delete useless files
RETURN_IF_ERROR(_reload(index_meta));
RETURN_IF_ERROR(_delete_expired_index_file(
Expand Down Expand Up @@ -4991,7 +4997,7 @@ Status PersistentIndex::major_compaction(DataDir* data_dir, int64_t tablet_id, s
}
PersistentIndexMetaPB index_meta;
RETURN_IF_ERROR(TabletMetaManager::get_persistent_index_meta(data_dir, tablet_id, &index_meta));
modify_l2_versions(l2_versions, new_l2_version, index_meta);
RETURN_IF_ERROR(modify_l2_versions(l2_versions, new_l2_version, index_meta));
RETURN_IF_ERROR(TabletMetaManager::write_persistent_index_meta(data_dir, tablet_id, index_meta));
// reload new l2 versions
RETURN_IF_ERROR(_reload(index_meta));
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,12 @@ class PersistentIndex {

void reset_cancel_major_compaction();

<<<<<<< HEAD
=======
static Status modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta);

>>>>>>> 1c8df3aa98 ([BugFix] fix concurrent issue between primary index unload and compaction (#49341))
Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexMultiLevelPB* dump_pb);

void test_calc_memory_usage() { return _calc_memory_usage(); }
Expand Down
21 changes: 12 additions & 9 deletions be/src/storage/primary_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1184,14 +1184,13 @@ Status PrimaryIndex::_do_load(Tablet* tablet) {
_set_schema(pkey_schema);

// load persistent index if enable persistent index meta
size_t fix_size = PrimaryKeyEncoder::get_encoded_fixed_size(_pk_schema);

if (tablet->get_enable_persistent_index() && (fix_size <= 128)) {
if (tablet->get_enable_persistent_index()) {
// TODO
// PersistentIndex and tablet data are currently stored in the same directory
// We may need to support the separation of PersistentIndex and Tablet data
DCHECK(_persistent_index == nullptr);
_persistent_index = std::make_unique<PersistentIndex>(tablet->schema_hash_path());
_persistent_index = std::make_shared<PersistentIndex>(tablet->schema_hash_path());
return _persistent_index->load_from_tablet(tablet);
}

Expand Down Expand Up @@ -1557,8 +1556,14 @@ std::unique_ptr<PrimaryIndex> TEST_create_primary_index(const Schema& pk_schema)
}

Status PrimaryIndex::major_compaction(DataDir* data_dir, int64_t tablet_id, std::shared_timed_mutex* mutex) {
if (_persistent_index != nullptr) {
return _persistent_index->major_compaction(data_dir, tablet_id, mutex);
// `_persistent_index` could be reset when call `unload()`, so we need to fetch reference first.
std::shared_ptr<PersistentIndex> pindex;
{
std::lock_guard<std::mutex> lg(_lock);
pindex = _persistent_index;
}
if (pindex != nullptr) {
return pindex->major_compaction(data_dir, tablet_id, mutex);
} else {
return Status::OK();
}
Expand All @@ -1576,13 +1581,11 @@ Status PrimaryIndex::reset(Tablet* tablet, EditVersion version, PersistentIndexM
auto pkey_schema = ChunkHelper::convert_schema(tablet_schema_ptr, pk_columns);
_set_schema(pkey_schema);

size_t fix_size = PrimaryKeyEncoder::get_encoded_fixed_size(_pk_schema);

if (tablet->get_enable_persistent_index() && (fix_size <= 128)) {
if (tablet->get_enable_persistent_index()) {
if (_persistent_index != nullptr) {
_persistent_index.reset();
}
_persistent_index = std::make_unique<PersistentIndex>(tablet->schema_hash_path());
_persistent_index = std::make_shared<PersistentIndex>(tablet->schema_hash_path());
RETURN_IF_ERROR(_persistent_index->reset(tablet, version, index_meta));
} else {
if (_pkey_to_rssid_rowid != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/primary_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class PrimaryIndex {
std::atomic<bool> _loaded{false};
Status _status;
int64_t _tablet_id = 0;
std::unique_ptr<PersistentIndex> _persistent_index;
std::shared_ptr<PersistentIndex> _persistent_index;

private:
size_t _key_size = 0;
Expand Down
71 changes: 71 additions & 0 deletions be/test/storage/lake/local_pk_index_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,75 @@ TEST_F(LocalPkIndexManagerTest, test_major_compaction) {
ASSERT_OK(publish_single_version(_tablet_metadata->id(), 3, txn_id).status());
}

TEST_F(LocalPkIndexManagerTest, test_major_compaction_with_unload) {
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->SetCallBack("UpdateManager::pick_tablets_to_do_pk_index_major_compaction:1",
[](void* arg) { *(double*)arg = 1.0; });
std::vector<int> k0{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22};
std::vector<int> v0{2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 41, 44};

auto c0 = Int32Column::create();
auto c1 = Int32Column::create();
c0->append_numbers(k0.data(), k0.size() * sizeof(int));
c1->append_numbers(v0.data(), v0.size() * sizeof(int));

Chunk chunk0({c0, c1}, _schema);
auto rowset_txn_meta = std::make_unique<RowsetTxnMetaPB>();

int64_t txn_id = next_id();
std::shared_ptr<const TabletSchema> const_schema = _tablet_schema;
VersionedTablet tablet(_tablet_mgr.get(), _tablet_metadata);
ASSIGN_OR_ABORT(auto writer, tablet.new_writer(kHorizontal, txn_id));
ASSERT_OK(writer->open());

// write segment #1
ASSERT_OK(writer->write(chunk0));
ASSERT_OK(writer->finish());

// write txnlog
auto txn_log = std::make_shared<TxnLog>();
txn_log->set_tablet_id(_tablet_metadata->id());
txn_log->set_txn_id(txn_id);
auto op_write = txn_log->mutable_op_write();
for (auto& f : writer->files()) {
op_write->mutable_rowset()->add_segments(std::move(f.path));
}
op_write->mutable_rowset()->set_num_rows(writer->num_rows());
op_write->mutable_rowset()->set_data_size(writer->data_size());
op_write->mutable_rowset()->set_overlapped(false);

ASSERT_OK(_tablet_mgr->put_txn_log(txn_log));

writer->close();
ASSERT_OK(publish_single_version(_tablet_metadata->id(), 2, txn_id).status());
auto stores = StorageEngine::instance()->get_stores();
ASSERT_TRUE(stores.size() > 0);
ASSERT_OK(FileSystem::Default()->path_exists(stores[0]->get_persistent_index_path() + "/" +
std::to_string(_tablet_metadata->id())));
auto local_pk_index_manager = std::make_unique<LocalPkIndexManager>();
ASSERT_OK(local_pk_index_manager->init());
local_pk_index_manager->schedule(
[&]() { return local_pk_index_manager->pick_tablets_to_do_pk_index_major_compaction(_update_mgr.get()); });
// LocalPkIndexManager use the global update manager to do major compaction.
// But we are using _update_mgr constructed in ut, so we have to call pk_index_major_compaction explicitly.
std::vector<TabletAndScore> pick_tablets =
local_pk_index_manager->pick_tablets_to_do_pk_index_major_compaction(_update_mgr.get());
for (auto& tablet_score : pick_tablets) {
auto tablet_id = tablet_score.first;
auto* data_dir = StorageEngine::instance()->get_persistent_index_store(tablet_id);
if (data_dir == nullptr) {
continue;
}
std::vector<std::thread> jobs;
jobs.emplace_back([&]() { _update_mgr->pk_index_major_compaction(tablet_id, data_dir); });
jobs.emplace_back([&]() { _update_mgr->unload_primary_index(tablet_id); });
for (auto& job : jobs) {
job.join();
}
}

SyncPoint::GetInstance()->ClearCallBack("UpdateManager::pick_tablets_to_do_pk_index_major_compaction:1");
SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace starrocks::lake
7 changes: 6 additions & 1 deletion be/test/storage/persistent_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3276,7 +3276,7 @@ TEST_P(PersistentIndexTest, pindex_major_compact_meta) {
input_l2_versions.emplace_back(1, 0);
input_l2_versions.emplace_back(1, 1);
input_l2_versions.emplace_back(3, 0);
PersistentIndex::modify_l2_versions(input_l2_versions, input_l2_versions.back(), index_meta);
ASSERT_TRUE(PersistentIndex::modify_l2_versions(input_l2_versions, input_l2_versions.back(), index_meta).ok());

// check result
ASSERT_EQ(index_meta.l2_versions_size(), index_meta.l2_version_merged_size());
Expand All @@ -3290,6 +3290,11 @@ TEST_P(PersistentIndexTest, pindex_major_compact_meta) {
ASSERT_FALSE(index_meta.l2_version_merged(i));
}
}

// rebuild index
index_meta.clear_l2_versions();
index_meta.clear_l2_version_merged();
ASSERT_FALSE(PersistentIndex::modify_l2_versions(input_l2_versions, input_l2_versions.back(), index_meta).ok());
}

INSTANTIATE_TEST_SUITE_P(PersistentIndexTest, PersistentIndexTest,
Expand Down

0 comments on commit b5fc20c

Please sign in to comment.