Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Skip tablet schema in rowset meta during ingestion. #50873

Merged
merged 12 commits into from
Oct 12, 2024
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,8 @@ CONF_mBool(enable_lake_compaction_use_partial_segments, "false");
// chunk size used by lake compaction
CONF_mInt32(lake_compaction_chunk_size, "4096");

CONF_mBool(skip_schema_in_rowset_meta, "true");

CONF_mBool(enable_bit_unpack_simd, "true");

} // namespace starrocks::config
1 change: 1 addition & 0 deletions be/src/storage/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
for (const RowsetMetaSharedPtr& rowset_meta : _tablet_meta->all_rs_metas()) {
if (!rowset_meta->has_tablet_schema_pb()) {
rowset_meta->set_tablet_schema(tablet_schema());
rowset_meta->set_skip_tablet_schema(true);
flag = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class CompactionTask : public BackgroundTask {
}
std::vector<RowsetSharedPtr> to_replace;
_tablet->modify_rowsets_without_lock({_output_rowset}, _input_rowsets, &to_replace);
_tablet->save_meta();
_tablet->save_meta(config::skip_schema_in_rowset_meta);
Rowset::close_rowsets(_input_rowsets);
for (auto& rs : to_replace) {
StorageEngine::instance()->add_unused_rowset(rs);
Expand Down
20 changes: 2 additions & 18 deletions be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,7 @@ Status DataDir::load() {
if (!rowset_meta->tablet_schema()) {
auto tablet_schema_ptr = tablet->tablet_schema();
rowset_meta->set_tablet_schema(tablet_schema_ptr);
RowsetMetaPB meta_pb;
rowset_meta->get_full_meta_pb(&meta_pb);
Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb);
if (!rs_meta_save_status.ok()) {
LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id()
<< " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id();
error_rowset_count++;
return true;
}
rowset_meta->set_skip_tablet_schema(true);
}
Status commit_txn_status = _txn_manager->commit_txn(
_kv_store, rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(),
Expand All @@ -426,15 +418,7 @@ Status DataDir::load() {
Status publish_status = tablet->load_rowset(rowset);
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaPB meta_pb;
rowset_meta->get_full_meta_pb(&meta_pb);
Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb);
if (!rs_meta_save_status.ok()) {
LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id()
<< " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id();
error_rowset_count++;
return true;
}
rowset_meta->set_skip_tablet_schema(true);
}
if (!publish_status.ok() && !publish_status.is_already_exist()) {
LOG(WARNING) << "Fail to add visible rowset=" << rowset->rowset_id()
Expand Down
11 changes: 11 additions & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,17 @@ struct RowsetId {
}
};

struct HashOfRowsetId {
size_t operator()(const RowsetId& rowset_id) const {
size_t seed = 0;
size_t h1 = HashUtil::hash64(&rowset_id.hi, sizeof(rowset_id.hi), seed);
size_t h2 = HashUtil::hash64(&rowset_id.mi, sizeof(rowset_id.mi), seed);
size_t h3 = HashUtil::hash64(&rowset_id.lo, sizeof(rowset_id.lo), seed);
// Combine the three hash values (hi, mi, lo)
return h1 ^ (h2 << 1) ^ (h3 << 2);
}
};

struct TabletSegmentId {
int64_t tablet_id = INT64_MAX;
uint32_t segment_id = UINT32_MAX;
Expand Down
24 changes: 15 additions & 9 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,16 @@ class RowsetMeta {
// new rowset.
// Before calling it, please confirm if you need a complete `rowset_meta` that includes `tablet_schema_pb`.
// If not, perhaps `get_meta_pb_without_schema()` is enough.
void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, const TabletSchemaCSPtr& tablet_schema = nullptr) const {
void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema = false,
const TabletSchemaCSPtr& tablet_schema = nullptr) const {
*rs_meta_pb = *_rowset_meta_pb;
const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema;

if (target_schema != nullptr) {
rs_meta_pb->clear_tablet_schema();
TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema();
target_schema->to_schema_pb(ts_pb);
if (!skip_schema) {
const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema;
if (target_schema != nullptr) {
rs_meta_pb->clear_tablet_schema();
TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema();
target_schema->to_schema_pb(ts_pb);
}
}
}

Expand Down Expand Up @@ -277,6 +279,9 @@ class RowsetMeta {

bool has_tablet_schema_pb() { return _has_tablet_schema_pb; }

void set_skip_tablet_schema(bool skip_tablet_schema) { _skip_tablet_schema = skip_tablet_schema; }
bool skip_tablet_schema() { return _skip_tablet_schema; }

private:
bool _deserialize_from_pb(std::string_view value) {
return _rowset_meta_pb->ParseFromArray(value.data(), value.size());
Expand All @@ -297,8 +302,8 @@ class RowsetMeta {
_schema = TabletSchema::create(_rowset_meta_pb->tablet_schema());
}
}
_has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema();

_has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema();
// clear does not release memory but only set it to default value, so we need to copy a new _rowset_meta_pb
_rowset_meta_pb->clear_tablet_schema();
std::unique_ptr<RowsetMetaPB> ptr = std::make_unique<RowsetMetaPB>(*_rowset_meta_pb);
Expand Down Expand Up @@ -332,6 +337,7 @@ class RowsetMeta {
bool _is_removed_from_rowset_meta = false;
TabletSchemaCSPtr _schema = nullptr;
bool _has_tablet_schema_pb = false;
bool _skip_tablet_schema = false;
};

} // namespace starrocks
} // namespace starrocks
7 changes: 7 additions & 0 deletions be/src/storage/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ Status RowsetMetaManager::traverse_rowset_metas(
return meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func);
}

Status RowsetMetaManager::get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id,
std::string* value) {
std::string key = get_rowset_meta_key(tablet_uid, rowset_id);
RETURN_IF_ERROR(meta->get(META_COLUMN_FAMILY_INDEX, key, value));
return Status::OK();
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/storage/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class RowsetMetaManager {

static Status traverse_rowset_metas(
KVStore* meta, std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func);

static Status get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id,
std::string* value);
};

} // namespace starrocks
61 changes: 44 additions & 17 deletions be/src/storage/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,7 @@ Tablet::Tablet(const TabletMetaSharedPtr& tablet_meta, DataDir* data_dir)
_cumulative_point(kInvalidCumulativePoint) {
// change _rs_graph to _timestamped_version_tracker
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());

// if !_tablet_meta->all_rs_metas()[0]->tablet_schema(),
// that mean the tablet_meta is still no upgrade to support-light-schema-change versions.
// Before support-light-schema-change version, rowset metas don't have tablet schema.
// And when upgrade to starrocks support-light-schema-change version,
// all rowset metas will be set the tablet schema from tablet meta.
if (_tablet_meta->all_rs_metas().empty() || !_tablet_meta->all_rs_metas()[0]->tablet_schema()) {
_max_version_schema = BaseTablet::tablet_schema();
} else {
_max_version_schema =
TabletMeta::rowset_meta_with_max_rowset_version(_tablet_meta->all_rs_metas())->tablet_schema();
}

_max_version_schema = BaseTablet::tablet_schema();
MEM_TRACKER_SAFE_CONSUME(GlobalEnv::GetInstance()->tablet_metadata_mem_tracker(), _mem_usage());
}

Expand Down Expand Up @@ -172,8 +160,8 @@ Status Tablet::init() {

// should save tablet meta to remote meta store
// if it's a primary replica
void Tablet::save_meta() {
auto st = _tablet_meta->save_meta(_data_dir);
void Tablet::save_meta(bool skip_tablet_schema) {
auto st = _tablet_meta->save_meta(_data_dir, skip_tablet_schema);
CHECK(st.ok()) << "fail to save tablet_meta: " << st;
}

Expand Down Expand Up @@ -583,7 +571,11 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) {
}

RowsetMetaPB rowset_meta_pb;
rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb);
if (rowset->rowset_meta()->skip_tablet_schema()) {
rowset_meta_pb = rowset->rowset_meta()->get_meta_pb_without_schema();
} else {
rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb);
}
// No matter whether contains the version, the rowset meta should always be saved. TxnManager::publish_txn
// will remove the in-memory txn information if Status::AlreadlyExist, but not the committed rowset meta
// (RowsetStatePB = COMMITTED) saved in rocksdb. Here modify the rowset to visible, and save it again
Expand Down Expand Up @@ -635,6 +627,20 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) {
return Status::OK();
}

bool Tablet::add_committed_rowset(const RowsetSharedPtr& rowset) {
if (rowset->rowset_meta()->tablet_schema() != nullptr &&
rowset->rowset_meta()->tablet_schema()->id() != TabletSchema::invalid_id() &&
rowset->rowset_meta()->tablet_schema()->id() == _max_version_schema->id()) {
_committed_rs_map[rowset->rowset_id()] = rowset;
return true;
}
return false;
}

void Tablet::erase_committed_rowset(const RowsetSharedPtr& rowset) {
_committed_rs_map.erase(rowset->rowset_id());
}

void Tablet::overwrite_rowset(const RowsetSharedPtr& rowset, int64_t version) {
std::unique_lock wrlock(_meta_lock);
vector<RowsetSharedPtr> origin_rowsets;
Expand Down Expand Up @@ -1806,17 +1812,38 @@ const TabletSchemaCSPtr Tablet::thread_safe_get_tablet_schema() const {
return _max_version_schema;
}

// for non-pk tablet, all published rowset will be rewrite when save `tablet_meta`
// for pk tablet, we need to get the rowset which without `tablet_schema` and rewrite
// the rowsets in `_committed_rs_map` is committed success but not publish yet, so if we update the
// tablet schema, we need to rewrite.
void Tablet::_get_rewrite_meta_rs(std::vector<RowsetSharedPtr>& rewrite_meta_rs) {
for (auto& [_, rs] : _committed_rs_map) {
if (rs->rowset_meta()->skip_tablet_schema()) {
rewrite_meta_rs.emplace_back(rs);
}
}

if (_updates) {
_updates->rewrite_rs_meta();
}
}

void Tablet::update_max_version_schema(const TabletSchemaCSPtr& tablet_schema) {
sevev marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard l0(_meta_lock);
std::lock_guard l1(_schema_lock);
DeferOp defer([&]() { _update_schema_running.store(false); });
_update_schema_running.store(true);
// Double Check for concurrent update
if (!_max_version_schema || tablet_schema->schema_version() > _max_version_schema->schema_version()) {
if (tablet_schema->id() == TabletSchema::invalid_id()) {
_max_version_schema = tablet_schema;
} else {
_max_version_schema = GlobalTabletSchemaMap::Instance()->emplace(tablet_schema).first;
}
_tablet_meta->save_tablet_schema(_max_version_schema, _data_dir);
std::vector<RowsetSharedPtr> rewrite_meta_rs;
_get_rewrite_meta_rs(rewrite_meta_rs);
_tablet_meta->save_tablet_schema(_max_version_schema, rewrite_meta_rs, _data_dir);
_committed_rs_map.clear();
}
}

Expand Down
18 changes: 17 additions & 1 deletion be/src/storage/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "storage/utils.h"
#include "storage/version_graph.h"
#include "util/once.h"
#include "util/phmap/phmap.h"

namespace starrocks {

Expand Down Expand Up @@ -99,7 +100,7 @@ class Tablet : public BaseTablet {
void register_tablet_into_dir();
void deregister_tablet_from_dir();

void save_meta();
void save_meta(bool skip_tablet_schema = false);
// Used in clone task, to update local meta when finishing a clone job
Status revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
Expand Down Expand Up @@ -335,6 +336,13 @@ class Tablet : public BaseTablet {
// set true when start to drop tablet. only set in `TabletManager::drop_tablet` right now
void set_is_dropping(bool is_dropping) { _is_dropping = is_dropping; }

[[nodiscard]] bool is_update_schema_running() const { return _update_schema_running.load(); }
void set_update_schema_running(bool is_running) { _update_schema_running.store(is_running); }
std::shared_mutex& get_schema_lock() { return _schema_lock; }
bool add_committed_rowset(const RowsetSharedPtr& rowset);
void erase_committed_rowset(const RowsetSharedPtr& rowset);
int64_t committed_rowset_size() { return _committed_rs_map.size(); }

void on_shutdown() override;

private:
Expand Down Expand Up @@ -372,6 +380,7 @@ class Tablet : public BaseTablet {
// those binlog is deleted. Return true the meta has been changed, and needs to be persisted
bool _check_useless_binlog_and_update_meta(int64_t current_second);
void _pick_candicate_rowset_before_specify_version(vector<RowsetSharedPtr>* candidcate_rowsets, int64_t version);
void _get_rewrite_meta_rs(std::vector<RowsetSharedPtr>& rewrite_meta_rs);

friend class TabletUpdates;
static const int64_t kInvalidCumulativePoint = -1;
Expand Down Expand Up @@ -415,6 +424,12 @@ class Tablet : public BaseTablet {
// this policy is judged and computed by TimestampedVersionTracker.
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;

// Keep the rowsets committed but not publish which rowset meta without schema
phmap::parallel_flat_hash_map<RowsetId, std::shared_ptr<Rowset>, HashOfRowsetId, std::equal_to<RowsetId>,
std::allocator<std::pair<const RowsetId, std::shared_ptr<Rowset>>>, 5,
phmap::NullMutex, true>
_committed_rs_map;

// gtid -> version
std::map<int64_t, int64_t> _gtid_to_version_map;

Expand Down Expand Up @@ -456,6 +471,7 @@ class Tablet : public BaseTablet {
bool _will_be_force_replaced = false;

std::atomic<bool> _is_dropping{false};
std::atomic<bool> _update_schema_running{false};
};

inline bool Tablet::init_succeeded() {
Expand Down
Loading
Loading