Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] improve partial update by column mode overflow estimate strategy (backport #50300) #50537

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions be/src/storage/rowset_column_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,25 +300,23 @@ Status RowsetColumnUpdateState::_finalize_partial_update_state(Tablet* tablet, R
return Status::OK();
}

static int64_t calc_upt_memory_usage_per_row_column(Rowset* rowset) {
const auto& txn_meta = rowset->rowset_meta()->get_meta_pb_without_schema().txn_meta();
const int64_t total_update_col_cnt = txn_meta.partial_update_column_ids_size();
// `num_rows_upt` and `total_update_row_size` could be zero when upgrade from old version,
int64_t RowsetColumnUpdateState::calc_upt_memory_usage_per_row(Rowset* rowset) {
// `num_rows_upt` could be zero after upgrade from old version,
// then we will return zero and no limit.
if ((rowset->num_rows_upt() * total_update_col_cnt) <= 0) return 0;
return rowset->total_update_row_size() / (rowset->num_rows_upt() * total_update_col_cnt);
if ((rowset->num_rows_upt()) <= 0) return 0;
return rowset->total_update_row_size() / rowset->num_rows_upt();
}

// Read chunk from source segment file and call `update_func` to update it.
// `update_func` accept ChunkUniquePtr and [start_rowid, end_rowid) range of this chunk.
static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& schema, Tablet* tablet,
OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path,
const std::function<Status(StreamChunkContainer)>& update_func) {
static Status read_from_source_segment_and_update(
Rowset* rowset, const Schema& schema, Tablet* tablet, OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path,
const std::function<Status(StreamChunkContainer, bool, int64_t)>& update_func) {
CHECK_MEM_LIMIT("RowsetColumnUpdateState::read_from_source_segment");
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path()));
// We need to estimate each update rows size before it has been actually updated.
const int64_t upt_memory_usage_per_row_column = calc_upt_memory_usage_per_row_column(rowset);
const int64_t upt_memory_usage_per_row = RowsetColumnUpdateState::calc_upt_memory_usage_per_row(rowset);
auto segment = Segment::open(fs, FileInfo{path}, rowset_seg_id.segment_id, rowset->schema());
if (!segment.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << segment.status();
Expand Down Expand Up @@ -354,13 +352,16 @@ static Status read_from_source_segment_and_update(Rowset* rowset, const Schema&
source_chunk_ptr->append(*tmp_chunk_ptr);
// Avoid too many memory usage and Column overflow, we will limit source chunk's size.
if (source_chunk_ptr->num_rows() >= INT32_MAX ||
(int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row_column * (int64_t)schema.num_fields() >
(int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row >
config::partial_update_memory_limit_per_worker) {
// Because we will handle columns group by group (define by config::vertical_compaction_max_columns_per_group),
// so use `upt_memory_usage_per_row` to estimate source chunk future memory cost will be overvalued.
// But it's better to be overvalued than undervalued.
StreamChunkContainer container = {
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
RETURN_IF_ERROR(update_func(container, true /*print log*/, upt_memory_usage_per_row));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
Expand All @@ -371,7 +372,7 @@ static Status read_from_source_segment_and_update(Rowset* rowset, const Schema&
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
RETURN_IF_ERROR(update_func(container, false /*print log*/, upt_memory_usage_per_row));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
Expand Down Expand Up @@ -759,10 +760,15 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
rowset->rowset_path(), rowsetid_segid.unique_rowset_id, rowsetid_segid.segment_id);
RETURN_IF_ERROR(read_from_source_segment_and_update(
rowset, partial_schema, tablet, &stats, latest_applied_version.major_number(), rowsetid_segid,
seg_path, [&](StreamChunkContainer container) {
VLOG(2) << "RowsetColumnUpdateState read from source segment: [byte usage: "
<< container.chunk_ptr->bytes_usage() << " row cnt: " << container.chunk_ptr->num_rows()
<< "] row range : [" << container.start_rowid << ", " << container.end_rowid << ")";
seg_path, [&](StreamChunkContainer container, bool print_log, int64_t upt_memory_usage_per_row) {
if (print_log) {
LOG(INFO) << "RowsetColumnUpdateState read from source segment: tablet id:"
<< tablet->tablet_id() << " [byte usage: " << container.chunk_ptr->bytes_usage()
<< " row cnt: " << container.chunk_ptr->num_rows() << "] row range : ["
<< container.start_rowid << ", " << container.end_rowid
<< ") upt_memory_usage_per_row : " << upt_memory_usage_per_row
<< " update column cnt : " << update_column_ids.size();
}
const size_t source_chunk_size = container.chunk_ptr->memory_usage();
tracker->consume(source_chunk_size);
DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); });
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset_column_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class RowsetColumnUpdateState {
// For UT test now
const std::vector<BatchPKsPtr>& upserts() const { return _upserts; }

static int64_t calc_upt_memory_usage_per_row(Rowset* rowset);

private:
Status _load_upserts(Rowset* rowset, MemTracker* update_mem_tracker, uint32_t start_idx, uint32_t* end_idx);

Expand Down
4 changes: 4 additions & 0 deletions be/test/storage/rowset_column_partial_update_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,10 @@ TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_source_chunk_limit) {
return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
}
}));
// check `calc_upt_memory_usage_per_row`
for (int i = 10; i < 20; i++) {
ASSERT_TRUE(RowsetColumnUpdateState::calc_upt_memory_usage_per_row(rowsets[i].get()) > 0);
}
config::vector_chunk_size = old_vector_chunk_size;
config::partial_update_memory_limit_per_worker = old_partial_update_memory_limit_per_worker;
final_check(tablet, rowsets);
Expand Down
Loading