Skip to content

Commit

Permalink
[fix](pipeline) only sub_running_sink_operators in close #43500 (#43726)
Browse files Browse the repository at this point in the history
#43500
### What problem does this PR solve?
Previously, sub_running_sink_operators was called only when encountering
EOS during sink
or when all sources were closed. However, this approach has issues, as
it’s possible
for the user to manually cancel, in which case there may be no EOS and
the sources may
not be closed. This would prevent running_sink_operators from reaching
zero, leading to errors.
```
PipelineTask[this = 0x7fc369fe9600, id = 0, open = true, eos = false, finish = false, dry run = false, elapse time = 26361.740784032s], block dependency = NULL, is running = true
operators: 
LOCAL_EXCHANGE_OPERATOR (LOCAL_MERGE_SORT): id=-5, parallel_tasks=4, _channel_id: 0, _num_partitions: 4, _num_senders: 4, _num_sources: 4, _running_sink_operators: 1, _running_source_operators: 1, mem_usage: 0, data queue info: Data Queue 0: [size approx = 0, eos = false], MemTrackers: 0: 0, 1: 34537472, 2: 5701632, 3: 0, 
  DATA_STREAM_SINK_OPERATOR: id=6, Sink Buffer: (_should_stop = false, _busy_channels = 0, _is_finishing = false), _reach_limit: false
0.   this=0x7fc376438f10, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true
0.   this=0x7fc3764bc110, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true
0.   this=0x7fc3764bc310, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true
0.   this=0x7fc3764bc510, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true
```
- [x] Confirm test cases
- [x] Confirm document
- [x] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
  • Loading branch information
Mryange authored Nov 16, 2024
1 parent 90254d8 commit 261c65f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_close_timer);
if (Base::_closed) {
return Status::OK();
}
if (_shared_state) {
_shared_state->sub_running_sink_operators();
}
return Base::close(state, exec_status);
}

std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
Expand Down Expand Up @@ -109,12 +121,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

// If all exchange sources ended due to limit reached, current task should also finish
if (local_state._exchanger->_running_source_operators == 0) {
local_state._shared_state->sub_running_sink_operators();
return Status::EndOfFile("receiver eof");
}
if (eos) {
local_state._shared_state->sub_running_sink_operators();
}

return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState<LocalEx
Status open(RuntimeState* state) override;
std::string debug_string(int indentation_level) const override;

Status close(RuntimeState* state, Status exec_status) override;

private:
friend class LocalExchangeSinkOperatorX;
friend class ShuffleExchanger;
Expand Down

0 comments on commit 261c65f

Please sign in to comment.