Skip to content

Commit

Permalink
[Enhancement] improve partial update by column mode overflow estimate…
Browse files Browse the repository at this point in the history
… strategy (#50300)

Signed-off-by: luohaha <[email protected]>
(cherry picked from commit 0b2eebf)
  • Loading branch information
luohaha authored and mergify[bot] committed Sep 2, 2024
1 parent b69e91f commit bbdc39e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
42 changes: 24 additions & 18 deletions be/src/storage/rowset_column_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,25 +300,23 @@ Status RowsetColumnUpdateState::_finalize_partial_update_state(Tablet* tablet, R
return Status::OK();
}

static int64_t calc_upt_memory_usage_per_row_column(Rowset* rowset) {
const auto& txn_meta = rowset->rowset_meta()->get_meta_pb_without_schema().txn_meta();
const int64_t total_update_col_cnt = txn_meta.partial_update_column_ids_size();
// `num_rows_upt` and `total_update_row_size` could be zero when upgrade from old version,
int64_t RowsetColumnUpdateState::calc_upt_memory_usage_per_row(Rowset* rowset) {
// `num_rows_upt` could be zero after upgrade from old version,
// then we will return zero and no limit.
if ((rowset->num_rows_upt() * total_update_col_cnt) <= 0) return 0;
return rowset->total_update_row_size() / (rowset->num_rows_upt() * total_update_col_cnt);
if ((rowset->num_rows_upt()) <= 0) return 0;
return rowset->total_update_row_size() / rowset->num_rows_upt();
}

// Read chunk from source segment file and call `update_func` to update it.
// `update_func` accept ChunkUniquePtr and [start_rowid, end_rowid) range of this chunk.
static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& schema, Tablet* tablet,
OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path,
const std::function<Status(StreamChunkContainer)>& update_func) {
static Status read_from_source_segment_and_update(
Rowset* rowset, const Schema& schema, Tablet* tablet, OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path,
const std::function<Status(StreamChunkContainer, bool, int64_t)>& update_func) {
CHECK_MEM_LIMIT("RowsetColumnUpdateState::read_from_source_segment");
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path()));
// We need to estimate each update rows size before it has been actually updated.
const int64_t upt_memory_usage_per_row_column = calc_upt_memory_usage_per_row_column(rowset);
const int64_t upt_memory_usage_per_row = RowsetColumnUpdateState::calc_upt_memory_usage_per_row(rowset);
auto segment = Segment::open(fs, FileInfo{path}, rowset_seg_id.segment_id, rowset->schema());
if (!segment.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << segment.status();
Expand Down Expand Up @@ -354,13 +352,16 @@ static Status read_from_source_segment_and_update(Rowset* rowset, const Schema&
source_chunk_ptr->append(*tmp_chunk_ptr);
// Avoid too many memory usage and Column overflow, we will limit source chunk's size.
if (source_chunk_ptr->num_rows() >= INT32_MAX ||
(int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row_column * (int64_t)schema.num_fields() >
(int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row >
config::partial_update_memory_limit_per_worker) {
// Because we will handle columns group by group (define by config::vertical_compaction_max_columns_per_group),
// so use `upt_memory_usage_per_row` to estimate source chunk future memory cost will be overvalued.
// But it's better to be overvalued than undervalued.
StreamChunkContainer container = {
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
RETURN_IF_ERROR(update_func(container, true /*print log*/, upt_memory_usage_per_row));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
Expand All @@ -371,7 +372,7 @@ static Status read_from_source_segment_and_update(Rowset* rowset, const Schema&
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
RETURN_IF_ERROR(update_func(container, false /*print log*/, upt_memory_usage_per_row));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
Expand Down Expand Up @@ -759,10 +760,15 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
rowset->rowset_path(), rowsetid_segid.unique_rowset_id, rowsetid_segid.segment_id);
RETURN_IF_ERROR(read_from_source_segment_and_update(
rowset, partial_schema, tablet, &stats, latest_applied_version.major_number(), rowsetid_segid,
seg_path, [&](StreamChunkContainer container) {
VLOG(2) << "RowsetColumnUpdateState read from source segment: [byte usage: "
<< container.chunk_ptr->bytes_usage() << " row cnt: " << container.chunk_ptr->num_rows()
<< "] row range : [" << container.start_rowid << ", " << container.end_rowid << ")";
seg_path, [&](StreamChunkContainer container, bool print_log, int64_t upt_memory_usage_per_row) {
if (print_log) {
LOG(INFO) << "RowsetColumnUpdateState read from source segment: tablet id:"
<< tablet->tablet_id() << " [byte usage: " << container.chunk_ptr->bytes_usage()
<< " row cnt: " << container.chunk_ptr->num_rows() << "] row range : ["
<< container.start_rowid << ", " << container.end_rowid
<< ") upt_memory_usage_per_row : " << upt_memory_usage_per_row
<< " update column cnt : " << update_column_ids.size();
}
const size_t source_chunk_size = container.chunk_ptr->memory_usage();
tracker->consume(source_chunk_size);
DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); });
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset_column_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class RowsetColumnUpdateState {
// For UT test now
const std::vector<BatchPKsPtr>& upserts() const { return _upserts; }

static int64_t calc_upt_memory_usage_per_row(Rowset* rowset);

private:
Status _load_upserts(Rowset* rowset, MemTracker* update_mem_tracker, uint32_t start_idx, uint32_t* end_idx);

Expand Down
4 changes: 4 additions & 0 deletions be/test/storage/rowset_column_partial_update_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,10 @@ TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_source_chunk_limit) {
return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
}
}));
// check `calc_upt_memory_usage_per_row`
for (int i = 10; i < 20; i++) {
ASSERT_TRUE(RowsetColumnUpdateState::calc_upt_memory_usage_per_row(rowsets[i].get()) > 0);
}
config::vector_chunk_size = old_vector_chunk_size;
config::partial_update_memory_limit_per_worker = old_partial_update_memory_limit_per_worker;
final_check(tablet, rowsets);
Expand Down

0 comments on commit bbdc39e

Please sign in to comment.