From 291194a34d8a6f350bd5c97facadc8002dce1da5 Mon Sep 17 00:00:00 2001 From: Mryange Date: Sun, 24 Nov 2024 19:04:16 +0800 Subject: [PATCH] upd --- be/src/vec/runtime/vdata_stream_recvr.cpp | 62 ++++++++++------------- be/src/vec/runtime/vdata_stream_recvr.h | 34 +++++++++++-- 2 files changed, 59 insertions(+), 37 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index b48b9f780b8754e..156e99ba80d04e5 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -27,6 +27,7 @@ #include #include "common/logging.h" +#include "common/status.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" #include "runtime/memory/mem_tracker.h" @@ -69,7 +70,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() { } Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { - std::lock_guard l(_lock); // protect _block_queue #ifndef NDEBUG if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, @@ -83,21 +83,32 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { } Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) { - if (_is_cancelled) { - RETURN_IF_ERROR(_cancel_status); - return Status::Cancelled("Cancelled"); - } - - if (_block_queue.empty()) { - DCHECK_EQ(_num_remaining_senders, 0); - *eos = true; - return Status::OK(); - } + BlockItem block_item; + { + std::lock_guard l(_lock); + //check and get block_item from data_queue + if (_is_cancelled) { + RETURN_IF_ERROR(_cancel_status); + return Status::Cancelled("Cancelled"); + } - DCHECK(!_block_queue.empty()); - auto [next_block, block_byte_size] = std::move(_block_queue.front()); - _block_queue.pop_front(); + if (_block_queue.empty()) { + DCHECK_EQ(_num_remaining_senders, 0); + *eos = true; + return Status::OK(); + } + DCHECK(!_block_queue.empty()); + block_item = std::move(_block_queue.front()); + _block_queue.pop_front(); + } + BlockUPtr next_block; + RETURN_IF_ERROR(block_item.get_block(next_block)); + size_t block_byte_size = block_item.block_byte_size(); + COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time()); + COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); + COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size); + std::lock_guard l(_lock); sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { @@ -163,30 +174,14 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num } } - BlockUPtr block = nullptr; - int64_t deserialize_time = 0; - { - SCOPED_RAW_TIMER(&deserialize_time); - block = Block::create_unique(); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(block->deserialize(pblock)); - } - - const auto rows = block->rows(); - if (rows == 0) { - return Status::OK(); - } - auto block_byte_size = block->allocated_bytes(); - VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << "\n"; + PBlock new_pblock = pblock; std::lock_guard l(_lock); if (_is_cancelled) { return Status::OK(); } - COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time); - COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); - COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); - COUNTER_UPDATE(_recvr->_rows_produced_counter, rows); + const auto block_byte_size = new_pblock.ByteSizeLong(); COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); if (_recvr->_max_wait_worker_time->value() < wait_for_worker) { _recvr->_max_wait_worker_time->set(wait_for_worker); @@ -196,7 +191,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr); } - _block_queue.emplace_back(std::move(block), block_byte_size); + _block_queue.emplace_back(std::move(new_pblock), block_byte_size); COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); _record_debug_info(); try_set_dep_ready_without_lock(); @@ -370,7 +365,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime"); _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES); - _rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced", TUnit::UNIT); _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT); _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT); _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 08fb004f3b1a8ea..130e845b49311b2 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -157,8 +158,6 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { RuntimeProfile::Counter* _decompress_timer = nullptr; RuntimeProfile::Counter* _decompress_bytes = nullptr; - // Number of rows received - RuntimeProfile::Counter* _rows_produced_counter = nullptr; // Number of blocks received RuntimeProfile::Counter* _blocks_produced_counter = nullptr; RuntimeProfile::Counter* _max_wait_worker_time = nullptr; @@ -260,7 +259,36 @@ class VDataStreamRecvr::SenderQueue { Status _cancel_status; int _num_remaining_senders; std::unique_ptr _queue_mem_tracker; - std::list> _block_queue; + + struct BlockItem { + Status get_block(BlockUPtr& block) { + if (!_block) { + SCOPED_RAW_TIMER(&_deserialize_time); + _block = Block::create_unique(); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(_pblock)); + } + block.swap(_block); + _block.reset(); + return Status::OK(); + } + + size_t block_byte_size() const { return _block_byte_size; } + int64_t deserialize_time() const { return _deserialize_time; } + BlockItem() = default; + BlockItem(BlockUPtr&& block, size_t block_byte_size) + : _block(std::move(block)), _block_byte_size(block_byte_size) {} + + BlockItem(PBlock&& pblock, size_t block_byte_size) + : _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {} + + private: + BlockUPtr _block; + PBlock _pblock; + size_t _block_byte_size = 0; + int64_t _deserialize_time = 0; + }; + + std::list _block_queue; // sender_id std::unordered_set _sender_eos_set;