Skip to content

Commit

Permalink
[performance](move-memtable) async close tablet streams (apache#41156)
Browse files Browse the repository at this point in the history
Asynchronously close tablet streams to speed up load stream close.
  • Loading branch information
kaijchen committed Nov 18, 2024
1 parent ea61206 commit 06d6ee0
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 38 deletions.
76 changes: 40 additions & 36 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,62 +239,62 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
return _status;
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

SCOPED_TIMER(_close_wait_timer);
Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto wait_func = [this, &mu, &cv] {
auto st = Status::OK();
auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
for (auto& token : _flush_tokens) {
token->wait();
}
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
if (ret) {
cv.wait(lock);
} else {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
if (!ret) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
return _status;
}
cv.wait(lock);
return st;
}

void TabletStream::pre_close() {
if (!_status.ok()) {
return;
}

SCOPED_TIMER(_close_wait_timer);
_status = _run_in_heavy_work_pool([this]() {
for (auto& token : _flush_tokens) {
token->wait();
}
return Status::OK();
});
// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
if (!_status.ok()) {
return;
}

if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
return _status;
return;
}

// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task which could cause coredump.
_status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->pre_close(); });
}

Status TabletStream::close() {
if (!_status.ok()) {
return _status;
}

auto close_func = [this, &mu, &cv]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->close();
if (!st.ok() && _status.ok()) {
_status = st;
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
_status = Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
SCOPED_TIMER(_close_wait_timer);
_status = _run_in_heavy_work_pool([this]() { return _load_stream_writer->close(); });
return _status;
}

Expand Down Expand Up @@ -363,6 +363,10 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
}
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
tablet_stream->pre_close();
}

for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ class TabletStream {
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
void pre_close();
Status close();
int64_t id() const { return _id; }

friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream);

private:
Status _run_in_heavy_work_pool(std::function<Status()> fn);

int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
Expand Down
12 changes: 10 additions & 2 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
return _rowset_writer->add_segment(segid, stat, flush_schema);
}

Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
Status LoadStreamWriter::_pre_close() {
SCOPED_ATTACH_TASK(_query_thread_context);
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
Expand All @@ -222,6 +221,15 @@ Status LoadStreamWriter::close() {

RETURN_IF_ERROR(_rowset_builder->build_rowset());
RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
_pre_closed = true;
return Status::OK();
}

Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
if (!_pre_closed) {
RETURN_IF_ERROR(_pre_close());
}
RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
RETURN_IF_ERROR(_rowset_builder->commit_txn());

Expand Down
11 changes: 11 additions & 0 deletions be/src/runtime/load_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,23 @@ class LoadStreamWriter {

Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema);

Status pre_close() {
std::lock_guard<std::mutex> l(_lock);
return _pre_close();
}

// wait for all memtables to be flushed.
Status close();

private:
Status _calc_file_size(uint32_t segid, FileType file_type, size_t* file_size);

// without lock
Status _pre_close();

bool _is_init = false;
bool _is_canceled = false;
bool _pre_closed = false;
WriteRequest _req;
std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<RowsetWriter> _rowset_writer;
Expand Down

0 comments on commit 06d6ee0

Please sign in to comment.