Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Nov 28, 2024
1 parent 5e54970 commit 3db1dd9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
19 changes: 12 additions & 7 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,17 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
}
vectorized::Block input_block = *origin_block;

std::vector<int> result_column_ids;
for (const auto& projections : local_state->_intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
{
SCOPED_TIMER(local_state->_intermediate_projection_timer);
std::vector<int> result_column_ids;
for (const auto& projections : local_state->_intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
input_block.shuffle_columns(result_column_ids);
}
input_block.shuffle_columns(result_column_ids);
}

DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
Expand All @@ -293,6 +295,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
if (rows != 0) {
SCOPED_TIMER(local_state->_final_projection_timer);
auto& mutable_columns = mutable_block.mutable_columns();
DCHECK(mutable_columns.size() == local_state->_projections.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
Expand Down Expand Up @@ -480,6 +483,8 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_blocks_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1);
_projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1);
_intermediate_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionIntermediateTime");
_final_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionFinalTime");
_init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1);
_open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ class PipelineXLocalStateBase {
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _intermediate_projection_timer = nullptr;
RuntimeProfile::Counter* _final_projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
// Account for peak memory used by this node
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
Expand Down

0 comments on commit 3db1dd9

Please sign in to comment.