Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Nov 28, 2024
1 parent b7fa249 commit 72625c2
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
}
}

profile()->add_info_string("ShareCollectedEnabled",
profile()->add_info_string("SharedCollectedEnabled",
std::to_string(state->enable_share_hash_table_for_broadcast_join()));
profile()->add_info_string("ShouldCollectedBlocks", std::to_string(_should_collected_blocks));
if (!_should_collected_blocks) {
Expand Down Expand Up @@ -147,12 +147,12 @@ Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector

auto rows = block->rows();
if (rows != 0) {
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)rows);
if (_match_all_build || _is_right_semi_anti) {
local_state._shared_state->build_side_visited_flags.emplace_back(
vectorized::ColumnUInt8::create(rows, 0));
}
if (local_state._should_collected_blocks) {
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)rows);
auto mem_usage = block->allocated_bytes();
local_state._build_rows += rows;
local_state._total_mem_usage += mem_usage;
Expand Down Expand Up @@ -199,7 +199,6 @@ Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector
print_id(_shared_collected_data_controller->get_builder_fragment_instance_id(
node_id())));
}

if (eos) {
// optimize `in bitmap`, see https://github.com/apache/doris/issues/14338
if (_is_output_left_side_only && ((_join_op == TJoinOp::type::LEFT_SEMI_JOIN &&
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class NestedLoopJoinBuildSinkOperatorX final

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

bool should_dry_run(RuntimeState* state) override {
return !state->get_sink_local_state()
->cast<NestedLoopJoinBuildSinkLocalState>()
._should_collected_blocks;
}

DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,10 @@ public PlanFragment visitPhysicalNestedLoopJoin(
if (nestedLoopJoin.getStats() != null) {
nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount());
}
PlanNode rightPlanRoot = rightFragment.getPlanRoot();
Preconditions.checkState(rightPlanRoot instanceof ExchangeNode,
"right child of nested loop join must be ExchangeNode but it is " + rightPlanRoot);
((ExchangeNode) rightPlanRoot).setRightChildOfBroadcastHashJoin(true);
nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot());
nestedLoopJoinNode.setChild(1, rightFragment.getPlanRoot());
setPlanRoot(leftFragment, nestedLoopJoinNode, nestedLoopJoin);
Expand Down

0 comments on commit 72625c2

Please sign in to comment.