Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Nov 17, 2024
1 parent 79d70c1 commit fd3f86d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,17 +276,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
}

Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) {
if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
if (_row_distribution.batching_rows() > 0) { // maybe try_close more than 1 time
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
_row_distribution._batching_block->clear_column_data();
RETURN_IF_ERROR(p.sink(_state, input_block, false));
// Recovery back
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
}
return Status::OK();
Expand Down Expand Up @@ -518,7 +518,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
old_channel_mem_usage += channel->mem_usage();
}
// check out of limit
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>();
const auto& num_channels = local_state._partition_count;
std::vector<std::vector<uint32>> channel2rows;
Expand All @@ -544,10 +543,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
}

if (eos) {
local_state._row_distribution._deal_batched = true;
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
}
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));

{
SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
// the convert_block maybe different with block after execute exprs
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class VRowDistribution {
std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
size_t batching_rows() const { return _batching_rows; }
// create partitions when need for auto-partition table using #_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();
Expand Down

0 comments on commit fd3f86d

Please sign in to comment.