From fd3f86d1b575ddc09b6bcfdd3adcb0980cce75f7 Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Mon, 18 Nov 2024 00:09:10 +0800 Subject: [PATCH] update --- be/src/pipeline/exec/exchange_sink_operator.cpp | 11 ++++------- be/src/vec/sink/vrow_distribution.h | 1 + 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 9b619baac3d632..1559a35236eb04 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -276,7 +276,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) { - if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time + if (_row_distribution.batching_rows() > 0) { // maybe try_close more than 1 time RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); auto& p = _parent->cast(); // these order is unique. @@ -284,9 +284,9 @@ Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* inpu // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. _row_distribution.clear_batching_stats(); + _row_distribution._batching_block->clear_column_data(); RETURN_IF_ERROR(p.sink(_state, input_block, false)); // Recovery back - _row_distribution._batching_block->clear_column_data(); _row_distribution._deal_batched = false; } return Status::OK(); @@ -518,7 +518,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block old_channel_mem_usage += channel->mem_usage(); } // check out of limit - RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); std::shared_ptr convert_block = std::make_shared(); const auto& num_channels = local_state._partition_count; std::vector> channel2rows; @@ -544,10 +543,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } } - if (eos) { - local_state._row_distribution._deal_batched = true; - RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); - } + RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); + { SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); // the convert_block maybe different with block after execute exprs diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 87fd801984ad73..40202556290ea8 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -132,6 +132,7 @@ class VRowDistribution { std::vector& row_part_tablet_ids, int64_t& rows_stat_val); bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } + size_t batching_rows() const { return _batching_rows; } // create partitions when need for auto-partition table using #_partitions_need_create. Status automatic_create_partition(); void clear_batching_stats();