Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Dec 12, 2024
1 parent afdd975 commit fe5e99e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
19 changes: 10 additions & 9 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
!p.get_local_state(state)._eos) {
return Base::close(state, exec_status);
}
bool blocked_by_complete_build_stage =
p._shared_hashtable_controller && !p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;

try {
if (state->get_task()->wake_up_early()) {
Expand All @@ -162,14 +157,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
} else {
RETURN_IF_ERROR(
_runtime_filter_slots->copy_from_shared_context(p._shared_hash_table_context));
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
} catch (Exception& e) {
bool blocked_by_complete_build_stage = p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;

return Status::InternalError(
"rf process meet error: {}, wake_up_early: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, "
Expand Down Expand Up @@ -471,7 +469,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down Expand Up @@ -548,6 +545,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
return _shared_hash_table_context->status;
}

RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));

local_state.profile()->add_info_string(
"SharedHashTableFrom",
print_id(
Expand All @@ -573,6 +573,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}

if (eos) {
local_state._eos = true;
local_state.init_short_circuit_for_probe();
// Since the comparison of null values is meaningless, null aware left anti/semi join should not output null
// when the build side is not empty.
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ void TaskScheduler::_do_work(int index) {
#else
uint32_t core_id = sched_getcpu();
#endif

ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
Expand Down

0 comments on commit fe5e99e

Please sign in to comment.