Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Dec 16, 2024
1 parent 87b9092 commit 11233d1
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
_finished_channels.emplace(channel_id);
if (_working_channels_count.fetch_sub(1) == 1) {
set_reach_limit();
if (_finish_dependency) {
_finish_dependency->set_ready();
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() ||
!p.get_local_state(state)._eos) {
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
}

if (*eos) {
RETURN_IF_ERROR(close(Status::OK(), false));
}

if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
Status status = _sink->sink(_state, block, *eos);
Expand All @@ -383,7 +387,6 @@ Status PipelineTask::execute(bool* eos) {
}

if (*eos) { // just return, the scheduler will do finish work
RETURN_IF_ERROR(close(status, false));
_task_profile->add_info_string("TaskState", "Finished");
_eos = true;
return Status::OK();
Expand Down

0 comments on commit 11233d1

Please sign in to comment.