diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 88c64eb517c368..460ad5e9580652 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -277,30 +277,43 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data return _status; } -Status TabletStream::close() { - if (!_status.ok()) { - return _status; - } - - SCOPED_TIMER(_close_wait_timer); +Status TabletStream::_run_in_heavy_work_pool(std::function fn) { bthread::Mutex mu; std::unique_lock lock(mu); bthread::ConditionVariable cv; - auto wait_func = [this, &mu, &cv] { + auto st = Status::OK(); + auto func = [this, &mu, &cv, &st, &fn] { signal::set_signal_task_id(_load_id); - for (auto& token : _flush_tokens) { - token->wait(); - } + st = fn(); std::lock_guard lock(mu); cv.notify_one(); }; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func); - if (ret) { - cv.wait(lock); - } else { - _status = Status::Error( + bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func); + if (!ret) { + return Status::Error( "there is not enough thread resource for close load"); - return _status; + } + cv.wait(lock); + return st; +} + +void TabletStream::pre_close() { + if (!_status.ok()) { + return; + } + + SCOPED_TIMER(_close_wait_timer); + _status = _run_in_heavy_work_pool([this]() { + for (auto& token : _flush_tokens) { + token->wait(); + } + return Status::OK(); + }); + // it is necessary to check status after wait_func, + // for create_rowset could fail during add_segment when loading to MOW table, + // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. + if (!_status.ok()) { + return; } DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; }); @@ -308,32 +321,19 @@ Status TabletStream::close() { _status = Status::Corruption( "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, _num_segments, _next_segid.load(), print_id(_load_id)); - return _status; + return; } - // it is necessary to check status after wait_func, - // for create_rowset could fail during add_segment when loading to MOW table, - // in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump. + _status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); }); +} + +Status TabletStream::close() { if (!_status.ok()) { return _status; } - auto close_func = [this, &mu, &cv]() { - signal::set_signal_task_id(_load_id); - auto st = _load_stream_writer->close(); - if (!st.ok() && _status.ok()) { - _status = st; - } - std::lock_guard lock(mu); - cv.notify_one(); - }; - ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func); - if (ret) { - cv.wait(lock); - } else { - _status = Status::Error( - "there is not enough thread resource for close load"); - } + SCOPED_TIMER(_close_wait_timer); + _status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); }); return _status; } @@ -402,6 +402,10 @@ void IndexStream::close(const std::vector& tablets_to_commit, } } + for (auto& [_, tablet_stream] : _tablet_streams_map) { + tablet_stream->pre_close(); + } + for (auto& [_, tablet_stream] : _tablet_streams_map) { auto st = tablet_stream->close(); if (st.ok()) { diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 3b649c688355fe..c156eb45c8bddb 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -54,12 +54,15 @@ class TabletStream { Status add_segment(const PStreamHeader& header, butil::IOBuf* data); void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } void disable_num_segments_check() { _check_num_segments = false; } + void pre_close(); Status close(); int64_t id() const { return _id; } friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream); private: + Status _run_in_heavy_work_pool(std::function fn); + int64_t _id; LoadStreamWriterSharedPtr _load_stream_writer; std::vector> _flush_tokens; diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 37243fab14bdb3..83f852630b7f47 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -245,8 +245,7 @@ Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, siz return Status::OK(); } -Status LoadStreamWriter::close() { - std::lock_guard l(_lock); +Status LoadStreamWriter::_pre_close() { SCOPED_ATTACH_TASK(_query_thread_context); if (!_is_init) { // if this delta writer is not initialized, but close() is called. @@ -306,6 +305,15 @@ Status LoadStreamWriter::close() { RETURN_IF_ERROR(_rowset_builder->build_rowset()); RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task()); + _pre_closed = true; + return Status::OK(); +} + +Status LoadStreamWriter::close() { + std::lock_guard l(_lock); + if (!_pre_closed) { + RETURN_IF_ERROR(_pre_close()); + } RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap()); // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions RETURN_IF_ERROR(static_cast(_rowset_builder.get())->commit_txn()); diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index b22817cb85cb47..8815b0f0e3e70a 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -70,14 +70,23 @@ class LoadStreamWriter { Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema); - Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size); + Status pre_close() { + std::lock_guard l(_lock); + return _pre_close(); + } // wait for all memtables to be flushed. Status close(); private: + Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size); + + // without lock + Status _pre_close(); + bool _is_init = false; bool _is_canceled = false; + bool _pre_closed = false; WriteRequest _req; std::unique_ptr _rowset_builder; std::shared_ptr _rowset_writer;