Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-3.0: [performance](move-memtable) async close tablet streams #43618

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 40 additions & 36 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,63 +277,63 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
return _status;
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_close_wait_timer);
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto wait_func = [this, &mu, &cv] {
auto st = Status::OK();
auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
for (auto& token : _flush_tokens) {
token->wait();
}
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
if (ret) {
cv.wait(lock);
} else {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
if (!ret) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
return _status;
}
cv.wait(lock);
return st;
}

void TabletStream::pre_close() {
if (!_status.ok()) {
return;
}

SCOPED_TIMER(_close_wait_timer);
_status = _run_in_heavy_work_pool([this]() {
for (auto& token : _flush_tokens) {
token->wait();
}
return Status::OK();
});
// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
if (!_status.ok()) {
return;
}

DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
return _status;
return;
}

// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
_status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); });
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

auto close_func = [this, &mu, &cv]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->close();
if (!st.ok() && _status.ok()) {
_status = st;
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
SCOPED_TIMER(_close_wait_timer);
_status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); });
return _status;
}

Expand Down Expand Up @@ -402,6 +402,10 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
}
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
tablet_stream->pre_close();
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ class TabletStream {
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
void pre_close();
Status close();
int64_t id() const { return _id; }

friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream);

private:
Status _run_in_heavy_work_pool(std::function<Status()> fn);

int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
Expand Down
12 changes: 10 additions & 2 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, siz
return Status::OK();
}

Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
Status LoadStreamWriter::_pre_close() {
SCOPED_ATTACH_TASK(_query_thread_context);
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
Expand Down Expand Up @@ -306,6 +305,15 @@ Status LoadStreamWriter::close() {

RETURN_IF_ERROR(_rowset_builder->build_rowset());
RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
_pre_closed = true;
return Status::OK();
}

Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
if (!_pre_closed) {
RETURN_IF_ERROR(_pre_close());
}
RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
// FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions
RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
Expand Down
11 changes: 10 additions & 1 deletion be/src/runtime/load_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,23 @@ class LoadStreamWriter {

Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema);

Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size);
Status pre_close() {
std::lock_guard<std::mutex> l(_lock);
return _pre_close();
}

// wait for all memtables to be flushed.
Status close();

private:
Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size);

// without lock
Status _pre_close();

bool _is_init = false;
bool _is_canceled = false;
bool _pre_closed = false;
WriteRequest _req;
std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<RowsetWriter> _rowset_writer;
Expand Down