Skip to content

Commit

Permalink
[Enhancement] Skip tablet schema in rowset meta during ingestion. (St…
Browse files Browse the repository at this point in the history
…arRocks#50873)

## Why I'm doing:
Since version 3.2, Rowset has been designed to save its own schema, and the complete tablet schema is stored in the metadata. This can lead to the following issues:

1. **When the import frequency is very high**, and a large number of Rowsets are generated, the metadata in RocksDB grows, particularly in non-primary key (non-PK) table scenarios. This is because, in non-PK tables, each update to the tablet metadata rewrites the historical Rowset metadata, leading to a large amount of obsolete data in RocksDB.
2. **When the tablet has a very large number of columns (e.g., 10,000 columns)**, the time taken to persist the Rowset metadata increases, especially when the imported data volume is small.

These two issues can eventually reduce the efficiency of real-time imports.


## What I'm doing:
This PR attempts to solve the issue of reduced import efficiency caused by metadata bloat. 

One feasible solution is to store the tablet schema only once for all Rowsets that share the same schema. Instead of saving the complete schema in each Rowset's metadata, a reference or marker would be saved in each Rowset’s metadata to point to the corresponding tablet schema.

However, the issue with this solution is compatibility. In previous versions, each Rowset generated its corresponding schema based on its own metadata. If the system is upgraded and then rollback to an older version, the older version would not be able to locate the schema using a reference or marker. This would lead to the generation of incorrect schemas, as the previous versions expect the full schema to be included in each Rowset's metadata.

So I choose a more conservative solution, and the main changes are as follows:
1. **Skip the schema in Rowset meta during import if the Rowset's schema is identical to the latest tablet schema.**
2. **Update the RowsetMeta that does not store schema when updating the tablet schema.**

If the BE  exits at any given time and restarts, those Rowsets that have not saved their own schema will be initialized using the tablet's current schema. Since the Rowset meta without schemas is updated each time the tablet schema is modified, it ensures that after the BE restarts, every Rowset can find its corresponding schema.

Moreover, this logic is backward compatible with older versions, so even after an upgrade and subsequent downgrade, the BE will still be able to retrieve the correct schema.

Compared to imports, DDL operations can be considered low-frequency tasks. As a result, in most cases, the Rowset meta generated during imports will not carry the schema, which helps alleviate metadata bloat. 

However, there can still be some bad cases. For example, in non-PK tables, during the period between an alter operation and the deletion of outdated Rowset meta, if the number of outdated Rowsets is particularly large, the system will still rewrite all outdated Rowsets each time the tablet meta is saved. This can still lead to a decline in import performance.

To solve this issue, we need to resolve the problem of storing multiple copies of the same schema. I think we can first support downgrading and then resolve this issue, allowing for an iteration based on this PR.

Below is a test based on this PR: 
a table with 200 columns, one bucket, writing one row of data at a time, with 10 concurrent threads, executed 1,000 times.

| Branch | Table type |Total cost time |
|----------|----------|----------|
|  main-8f128b  | Duplicate  | 1043.77(s)  |
|  this pr  |  Duplicate  | 178.49(s)  | 
|  main-8f128b  | Primary  | 188.46(s)  |
|  this pr  |  Primary  | 186.68(s)  | 


Signed-off-by: sevev <[email protected]>
Signed-off-by: zhangqiang <[email protected]>
  • Loading branch information
sevev authored and ZiheLiu committed Oct 31, 2024
1 parent 47f16df commit 6c1998e
Show file tree
Hide file tree
Showing 19 changed files with 458 additions and 68 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,10 @@ CONF_mBool(enable_lake_compaction_use_partial_segments, "false");
// chunk size used by lake compaction
CONF_mInt32(lake_compaction_chunk_size, "4096");

CONF_mBool(skip_schema_in_rowset_meta, "true");

CONF_mBool(enable_bit_unpack_simd, "true");

CONF_mInt32(max_committed_without_schema_rowset, "1000");

} // namespace starrocks::config
1 change: 1 addition & 0 deletions be/src/storage/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
for (const RowsetMetaSharedPtr& rowset_meta : _tablet_meta->all_rs_metas()) {
if (!rowset_meta->has_tablet_schema_pb()) {
rowset_meta->set_tablet_schema(tablet_schema());
rowset_meta->set_skip_tablet_schema(true);
flag = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class CompactionTask : public BackgroundTask {
}
std::vector<RowsetSharedPtr> to_replace;
_tablet->modify_rowsets_without_lock({_output_rowset}, _input_rowsets, &to_replace);
_tablet->save_meta();
_tablet->save_meta(config::skip_schema_in_rowset_meta);
Rowset::close_rowsets(_input_rowsets);
for (auto& rs : to_replace) {
StorageEngine::instance()->add_unused_rowset(rs);
Expand Down
20 changes: 2 additions & 18 deletions be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,7 @@ Status DataDir::load() {
if (!rowset_meta->tablet_schema()) {
auto tablet_schema_ptr = tablet->tablet_schema();
rowset_meta->set_tablet_schema(tablet_schema_ptr);
RowsetMetaPB meta_pb;
rowset_meta->get_full_meta_pb(&meta_pb);
Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb);
if (!rs_meta_save_status.ok()) {
LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id()
<< " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id();
error_rowset_count++;
return true;
}
rowset_meta->set_skip_tablet_schema(true);
}
Status commit_txn_status = _txn_manager->commit_txn(
_kv_store, rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(),
Expand All @@ -426,15 +418,7 @@ Status DataDir::load() {
Status publish_status = tablet->load_rowset(rowset);
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaPB meta_pb;
rowset_meta->get_full_meta_pb(&meta_pb);
Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb);
if (!rs_meta_save_status.ok()) {
LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id()
<< " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id();
error_rowset_count++;
return true;
}
rowset_meta->set_skip_tablet_schema(true);
}
if (!publish_status.ok() && !publish_status.is_already_exist()) {
LOG(WARNING) << "Fail to add visible rowset=" << rowset->rowset_id()
Expand Down
10 changes: 10 additions & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ struct RowsetId {
}
};

struct HashOfRowsetId {
size_t operator()(const RowsetId& rowset_id) const {
size_t seed = 0;
seed = HashUtil::hash64(&rowset_id.hi, sizeof(rowset_id.hi), seed);
seed = HashUtil::hash64(&rowset_id.mi, sizeof(rowset_id.mi), seed);
seed = HashUtil::hash64(&rowset_id.lo, sizeof(rowset_id.lo), seed);
return seed;
}
};

struct TabletSegmentId {
int64_t tablet_id = INT64_MAX;
uint32_t segment_id = UINT32_MAX;
Expand Down
31 changes: 22 additions & 9 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,16 @@ class RowsetMeta {
// new rowset.
// Before calling it, please confirm if you need a complete `rowset_meta` that includes `tablet_schema_pb`.
// If not, perhaps `get_meta_pb_without_schema()` is enough.
void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, const TabletSchemaCSPtr& tablet_schema = nullptr) const {
void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema = false,
const TabletSchemaCSPtr& tablet_schema = nullptr) const {
*rs_meta_pb = *_rowset_meta_pb;
const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema;

if (target_schema != nullptr) {
rs_meta_pb->clear_tablet_schema();
TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema();
target_schema->to_schema_pb(ts_pb);
if (!skip_schema) {
const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema;
if (target_schema != nullptr) {
rs_meta_pb->clear_tablet_schema();
TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema();
target_schema->to_schema_pb(ts_pb);
}
}
}

Expand Down Expand Up @@ -277,6 +279,16 @@ class RowsetMeta {

bool has_tablet_schema_pb() { return _has_tablet_schema_pb; }

void set_skip_tablet_schema(bool skip_tablet_schema) { _skip_tablet_schema = skip_tablet_schema; }
bool skip_tablet_schema() { return _skip_tablet_schema; }
bool check_schema_id(int64_t latest_tablet_schema_id) {
if (_schema != nullptr && _schema->id() != TabletSchema::invalid_id() &&
_schema->id() == latest_tablet_schema_id) {
return true;
}
return false;
}

private:
bool _deserialize_from_pb(std::string_view value) {
return _rowset_meta_pb->ParseFromArray(value.data(), value.size());
Expand All @@ -297,8 +309,8 @@ class RowsetMeta {
_schema = TabletSchema::create(_rowset_meta_pb->tablet_schema());
}
}
_has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema();

_has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema();
// clear does not release memory but only set it to default value, so we need to copy a new _rowset_meta_pb
_rowset_meta_pb->clear_tablet_schema();
std::unique_ptr<RowsetMetaPB> ptr = std::make_unique<RowsetMetaPB>(*_rowset_meta_pb);
Expand Down Expand Up @@ -332,6 +344,7 @@ class RowsetMeta {
bool _is_removed_from_rowset_meta = false;
TabletSchemaCSPtr _schema = nullptr;
bool _has_tablet_schema_pb = false;
bool _skip_tablet_schema = false;
};

} // namespace starrocks
} // namespace starrocks
7 changes: 7 additions & 0 deletions be/src/storage/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ Status RowsetMetaManager::traverse_rowset_metas(
return meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func);
}

Status RowsetMetaManager::get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id,
std::string* value) {
std::string key = get_rowset_meta_key(tablet_uid, rowset_id);
RETURN_IF_ERROR(meta->get(META_COLUMN_FAMILY_INDEX, key, value));
return Status::OK();
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/storage/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class RowsetMetaManager {

static Status traverse_rowset_metas(
KVStore* meta, std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func);

static Status get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id,
std::string* value);
};

} // namespace starrocks
69 changes: 51 additions & 18 deletions be/src/storage/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,7 @@ Tablet::Tablet(const TabletMetaSharedPtr& tablet_meta, DataDir* data_dir)
_cumulative_point(kInvalidCumulativePoint) {
// change _rs_graph to _timestamped_version_tracker
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());

// if !_tablet_meta->all_rs_metas()[0]->tablet_schema(),
// that mean the tablet_meta is still no upgrade to support-light-schema-change versions.
// Before support-light-schema-change version, rowset metas don't have tablet schema.
// And when upgrade to starrocks support-light-schema-change version,
// all rowset metas will be set the tablet schema from tablet meta.
if (_tablet_meta->all_rs_metas().empty() || !_tablet_meta->all_rs_metas()[0]->tablet_schema()) {
_max_version_schema = BaseTablet::tablet_schema();
} else {
_max_version_schema =
TabletMeta::rowset_meta_with_max_rowset_version(_tablet_meta->all_rs_metas())->tablet_schema();
}

_max_version_schema = BaseTablet::tablet_schema();
MEM_TRACKER_SAFE_CONSUME(GlobalEnv::GetInstance()->tablet_metadata_mem_tracker(), _mem_usage());
}

Expand Down Expand Up @@ -172,8 +160,8 @@ Status Tablet::init() {

// should save tablet meta to remote meta store
// if it's a primary replica
void Tablet::save_meta() {
auto st = _tablet_meta->save_meta(_data_dir);
void Tablet::save_meta(bool skip_tablet_schema) {
auto st = _tablet_meta->save_meta(_data_dir, skip_tablet_schema);
CHECK(st.ok()) << "fail to save tablet_meta: " << st;
}

Expand Down Expand Up @@ -583,7 +571,11 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) {
}

RowsetMetaPB rowset_meta_pb;
rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb);
if (rowset->rowset_meta()->skip_tablet_schema()) {
rowset_meta_pb = rowset->rowset_meta()->get_meta_pb_without_schema();
} else {
rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb);
}
// No matter whether contains the version, the rowset meta should always be saved. TxnManager::publish_txn
// will remove the in-memory txn information if Status::AlreadlyExist, but not the committed rowset meta
// (RowsetStatePB = COMMITTED) saved in rocksdb. Here modify the rowset to visible, and save it again
Expand Down Expand Up @@ -635,6 +627,26 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) {
return Status::OK();
}

bool Tablet::add_committed_rowset(const RowsetSharedPtr& rowset) {
if (_committed_rs_map.size() >= config::max_committed_without_schema_rowset) {
VLOG(1) << "tablet: " << tablet_id()
<< " too many committed without schema rowset : " << _committed_rs_map.size();
return false;
}

if (rowset->rowset_meta()->check_schema_id(_max_version_schema->id())) {
_committed_rs_map[rowset->rowset_id()] = rowset;
return true;
}
return false;
}

void Tablet::erase_committed_rowset(const RowsetSharedPtr& rowset) {
if (rowset != nullptr) {
_committed_rs_map.erase(rowset->rowset_id());
}
}

void Tablet::overwrite_rowset(const RowsetSharedPtr& rowset, int64_t version) {
std::unique_lock wrlock(_meta_lock);
vector<RowsetSharedPtr> origin_rowsets;
Expand Down Expand Up @@ -1436,7 +1448,7 @@ void Tablet::do_tablet_meta_checkpoint() {
return;
}
LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name();
save_meta();
save_meta(config::skip_schema_in_rowset_meta);
// if save meta successfully, then should remove the rowset meta existing in tablet
// meta from rowset meta store
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
Expand Down Expand Up @@ -1806,17 +1818,38 @@ const TabletSchemaCSPtr Tablet::thread_safe_get_tablet_schema() const {
return _max_version_schema;
}

// for non-pk tablet, all published rowset will be rewrite when save `tablet_meta`
// for pk tablet, we need to get the rowset which without `tablet_schema` and rewrite
// the rowsets in `_committed_rs_map` is committed success but not publish yet, so if we update the
// tablet schema, we need to rewrite.
void Tablet::_get_rewrite_meta_rs(std::vector<RowsetSharedPtr>& rewrite_meta_rs) {
for (auto& [_, rs] : _committed_rs_map) {
if (rs->rowset_meta()->skip_tablet_schema()) {
rewrite_meta_rs.emplace_back(rs);
}
}

if (_updates) {
_updates->rewrite_rs_meta();
}
}

void Tablet::update_max_version_schema(const TabletSchemaCSPtr& tablet_schema) {
std::lock_guard l0(_meta_lock);
std::lock_guard l1(_schema_lock);
DeferOp defer([&]() { _update_schema_running.store(false); });
_update_schema_running.store(true);
// Double Check for concurrent update
if (!_max_version_schema || tablet_schema->schema_version() > _max_version_schema->schema_version()) {
if (tablet_schema->id() == TabletSchema::invalid_id()) {
_max_version_schema = tablet_schema;
} else {
_max_version_schema = GlobalTabletSchemaMap::Instance()->emplace(tablet_schema).first;
}
_tablet_meta->save_tablet_schema(_max_version_schema, _data_dir);
std::vector<RowsetSharedPtr> rewrite_meta_rs;
_get_rewrite_meta_rs(rewrite_meta_rs);
_tablet_meta->save_tablet_schema(_max_version_schema, rewrite_meta_rs, _data_dir);
_committed_rs_map.clear();
}
}

Expand Down
18 changes: 17 additions & 1 deletion be/src/storage/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "storage/utils.h"
#include "storage/version_graph.h"
#include "util/once.h"
#include "util/phmap/phmap.h"

namespace starrocks {

Expand Down Expand Up @@ -99,7 +100,7 @@ class Tablet : public BaseTablet {
void register_tablet_into_dir();
void deregister_tablet_from_dir();

void save_meta();
void save_meta(bool skip_tablet_schema = false);
// Used in clone task, to update local meta when finishing a clone job
Status revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
Expand Down Expand Up @@ -335,6 +336,13 @@ class Tablet : public BaseTablet {
// set true when start to drop tablet. only set in `TabletManager::drop_tablet` right now
void set_is_dropping(bool is_dropping) { _is_dropping = is_dropping; }

[[nodiscard]] bool is_update_schema_running() const { return _update_schema_running.load(); }
void set_update_schema_running(bool is_running) { _update_schema_running.store(is_running); }
std::shared_mutex& get_schema_lock() { return _schema_lock; }
bool add_committed_rowset(const RowsetSharedPtr& rowset);
void erase_committed_rowset(const RowsetSharedPtr& rowset);
int64_t committed_rowset_size() { return _committed_rs_map.size(); }

void on_shutdown() override;

private:
Expand Down Expand Up @@ -372,6 +380,7 @@ class Tablet : public BaseTablet {
// those binlog is deleted. Return true the meta has been changed, and needs to be persisted
bool _check_useless_binlog_and_update_meta(int64_t current_second);
void _pick_candicate_rowset_before_specify_version(vector<RowsetSharedPtr>* candidcate_rowsets, int64_t version);
void _get_rewrite_meta_rs(std::vector<RowsetSharedPtr>& rewrite_meta_rs);

friend class TabletUpdates;
static const int64_t kInvalidCumulativePoint = -1;
Expand Down Expand Up @@ -415,6 +424,12 @@ class Tablet : public BaseTablet {
// this policy is judged and computed by TimestampedVersionTracker.
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;

// Keep the rowsets committed but not publish which rowset meta without schema
phmap::parallel_flat_hash_map<RowsetId, std::shared_ptr<Rowset>, HashOfRowsetId, std::equal_to<RowsetId>,
std::allocator<std::pair<const RowsetId, std::shared_ptr<Rowset>>>, 5,
phmap::NullMutex, true>
_committed_rs_map;

// gtid -> version
std::map<int64_t, int64_t> _gtid_to_version_map;

Expand Down Expand Up @@ -456,6 +471,7 @@ class Tablet : public BaseTablet {
bool _will_be_force_replaced = false;

std::atomic<bool> _is_dropping{false};
std::atomic<bool> _update_schema_running{false};
};

inline bool Tablet::init_succeeded() {
Expand Down
Loading

0 comments on commit 6c1998e

Please sign in to comment.