Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 15, 2024
1 parent b15d0b4 commit e82ccc0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
19 changes: 9 additions & 10 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& ti
}
}

// For load which is not flexible partial update, when aggregating rows for a specified key,
// there will be at most one row which allocates memory from arena to store the aggregated result
// of the current key, so we can use Arena::clear() to release the memory in _finalize_one_row() directly.
// But for fleixble partial update, to keep the correctness of the result, we may batch more than one row
// for a specified key. Thus there may be more than one row which allocates memory from arena to store
// the aggregated result. So we can't release the memory in _finalize_one_row(), instead, we should
// release memory after these batched rows are all flushed to the new block.
template <bool is_final, bool clear_arena>
void MemTable::_finalize_one_row(RowInBlock* row,
const vectorized::ColumnsWithTypeAndName& block_data,
Expand Down Expand Up @@ -656,17 +663,9 @@ void MemTable::_aggregate_for_flexible_partial_update_with_seq_col(
};
auto get_idx = [](bool with_seq_col, bool has_delete_sign) {
if (!with_seq_col) {
if (has_delete_sign) {
return 0;
} else {
return 1;
}
return (has_delete_sign ? 0 : 1);
} else {
if (has_delete_sign) {
return 2;
} else {
return 3;
}
return (has_delete_sign ? 2 : 3);
}
};
auto add_row = [&](RowInBlock* row, bool with_seq_col, bool has_delete_sign) {
Expand Down
23 changes: 12 additions & 11 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,8 @@ Status VerticalSegmentWriter::_merge_rows_for_sequence_column(
VLOG_DEBUG << fmt::format(
"VerticalSegmentWriter::_merge_rows_for_sequence_column enter: data.block:{}\n",
data.block->dump_data());
// there will be at most 4 rows for a specified key in block when control flow reaches here
// after this function, there will be at most 2 rows for a specified key
auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id();
auto delete_sign_col_unique_id =
_tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
Expand All @@ -916,24 +918,21 @@ Status VerticalSegmentWriter::_merge_rows_for_sequence_column(
bool has_same_rows {false};
auto get_idx = [](bool with_seq_col, bool has_delete_sign) {
if (!with_seq_col) {
if (has_delete_sign) {
return 0;
} else {
return 1;
}
return (has_delete_sign ? 0 : 1);
} else {
if (has_delete_sign) {
return 2;
} else {
return 3;
}
return (has_delete_sign ? 2 : 3);
}
};

auto find_rows_to_filter = [&](const std::string& key) {
bool has_row_with_seq_col = (batched_rows[0] != -1 || batched_rows[1] != -1);
bool has_row_without_seq_col = (batched_rows[2] != -1 || batched_rows[3] != -1);
if (has_row_with_seq_col && has_row_without_seq_col) {
// for rows with the same key in block, if some of them specify the sequence column,
// some of them not, we need to filter them here. After this function, there will be
// only one kind of them remaines
// If the block only has one kind of them, duplicate rows will be filtered in
// `_merge_rows_for_insert_after_delete`
RowLocation loc;
RowsetSharedPtr rowset;
std::string previous_encoded_seq_value {};
Expand Down Expand Up @@ -974,6 +973,7 @@ Status VerticalSegmentWriter::_merge_rows_for_sequence_column(

// the encoded value is order-preserving, so we can use Slice::compare() to compare them
if (batched_rows[2] != -1 && batched_rows[3] != -1) {
// it's guaranteed that the sequence value of batched_rows[2]'s is strictly smaller than batched_rows[3]'s
if (previous_seq_slice.compare(Slice {row_with_delete_sign_encoded_seq_value}) <=
0) {
remove_rows_without_seq();
Expand Down Expand Up @@ -1051,10 +1051,11 @@ Status VerticalSegmentWriter::_merge_rows_for_insert_after_delete(
vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_signs,
const std::vector<RowsetSharedPtr>& specified_rowsets,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
// TODO(bobhan1): handle sequence column later
VLOG_DEBUG << fmt::format(
"VerticalSegmentWriter::_merge_rows_for_insert_after_delete enter: data.block:{}\n",
data.block->dump_data());
// there will be at most 2 rows for a specified key in block when control flow reaches here
// after this function, there will not be duplicate rows in block
auto filter_column = vectorized::ColumnUInt8::create(data.num_rows, 1);
auto* __restrict filter_map = filter_column->get_data().data();
std::string previous_key {};
Expand Down

0 comments on commit e82ccc0

Please sign in to comment.