Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](exec)lazy deserialize pblock in VDataStreamRecvr::SenderQueue #44378

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 27 additions & 33 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() {
}

Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
std::lock_guard<std::mutex> l(_lock); // protect _block_queue
#ifndef NDEBUG
if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
Expand All @@ -83,21 +82,33 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
}

Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) {
if (_is_cancelled) {
RETURN_IF_ERROR(_cancel_status);
return Status::Cancelled("Cancelled");
}
BlockItem block_item;
{
std::lock_guard<std::mutex> l(_lock);
//check and get block_item from data_queue
if (_is_cancelled) {
RETURN_IF_ERROR(_cancel_status);
return Status::Cancelled("Cancelled");
}

if (_block_queue.empty()) {
DCHECK_EQ(_num_remaining_senders, 0);
*eos = true;
return Status::OK();
}
if (_block_queue.empty()) {
DCHECK_EQ(_num_remaining_senders, 0);
*eos = true;
return Status::OK();
}

DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
_block_queue.pop_front();
DCHECK(!_block_queue.empty());
block_item = std::move(_block_queue.front());
_block_queue.pop_front();
}
BlockUPtr next_block;
RETURN_IF_ERROR(block_item.get_block(next_block));
size_t block_byte_size = block_item.block_byte_size();
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time());
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
_recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
std::lock_guard<std::mutex> l(_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rethink the logic

sub_blocks_memory_usage(block_byte_size);
_record_debug_info();
if (_block_queue.empty() && _source_dependency) {
Expand Down Expand Up @@ -163,30 +174,14 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
}
}

BlockUPtr block = nullptr;
int64_t deserialize_time = 0;
{
SCOPED_RAW_TIMER(&deserialize_time);
block = Block::create_unique();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(block->deserialize(pblock));
}

const auto rows = block->rows();
if (rows == 0) {
return Status::OK();
}
auto block_byte_size = block->allocated_bytes();
VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << "\n";
PBlock new_pblock = pblock;

std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
}

COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
const auto block_byte_size = new_pblock.ByteSizeLong();
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
_recvr->_max_wait_worker_time->set(wait_for_worker);
Expand All @@ -196,7 +191,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
_recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
}

_block_queue.emplace_back(std::move(block), block_byte_size);
_block_queue.emplace_back(std::move(new_pblock), block_byte_size);
COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
_record_debug_info();
try_set_dep_ready_without_lock();
Expand Down Expand Up @@ -370,7 +365,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang
_first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
_rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced", TUnit::UNIT);
_blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
_max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT);
_max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT);
Expand Down
38 changes: 35 additions & 3 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <gen_cpp/Types_types.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'gen_cpp/Types_types.h' file not found [clang-diagnostic-error]

#include <gen_cpp/Types_types.h>
         ^

#include <gen_cpp/data.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>

Expand Down Expand Up @@ -157,8 +158,6 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {
RuntimeProfile::Counter* _decompress_timer = nullptr;
RuntimeProfile::Counter* _decompress_bytes = nullptr;

// Number of rows received
RuntimeProfile::Counter* _rows_produced_counter = nullptr;
// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
Expand Down Expand Up @@ -260,7 +259,40 @@ class VDataStreamRecvr::SenderQueue {
Status _cancel_status;
int _num_remaining_senders;
std::unique_ptr<MemTracker> _queue_mem_tracker;
std::list<std::pair<BlockUPtr, size_t>> _block_queue;

// `BlockItem` is used in `_block_queue` to handle both local and remote exchange blocks.
// For local exchange blocks, `BlockUPtr` is used directly without any modification.
// For remote exchange blocks, the `pblock` is stored in `BlockItem`.
// When `getBlock` is called, the `pblock` is deserialized into a usable block.
struct BlockItem {
Status get_block(BlockUPtr& block) {
if (!_block) {
SCOPED_RAW_TIMER(&_deserialize_time);
_block = Block::create_unique();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(_pblock));
}
block.swap(_block);
_block.reset();
return Status::OK();
}

size_t block_byte_size() const { return _block_byte_size; }
int64_t deserialize_time() const { return _deserialize_time; }
BlockItem() = default;
BlockItem(BlockUPtr&& block, size_t block_byte_size)
: _block(std::move(block)), _block_byte_size(block_byte_size) {}

BlockItem(PBlock&& pblock, size_t block_byte_size)
: _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {}

private:
BlockUPtr _block;
PBlock _pblock;
size_t _block_byte_size = 0;
int64_t _deserialize_time = 0;
};

std::list<BlockItem> _block_queue;

// sender_id
std::unordered_set<int> _sender_eos_set;
Expand Down
Loading