Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Sep 12, 2023
1 parent d3f1388 commit 2f2edc4
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
40 changes: 27 additions & 13 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
}

Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
auto s = _flush_status.load();
if (s != OK) {
return Status::Error(s, "FlushToken meet error");
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return _flush_status;
}
}

if (mem_table == nullptr || mem_table->empty()) {
return Status::OK();
}
Expand All @@ -88,8 +91,13 @@ void FlushToken::cancel() {

Status FlushToken::wait() {
_flush_token->wait();
auto s = _flush_status.load();
return s == OK ? Status::OK() : Status::Error(s, "FlushToken meet error");
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return _flush_status;
}
}
return Status::OK();
}

Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
Expand All @@ -116,8 +124,11 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
if (_flush_status.load() != OK) {
return;
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}

MonotonicStopWatch timer;
Expand All @@ -127,13 +138,16 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t
int64_t flush_size;
Status s = _do_flush_memtable(memtable, segment_id, &flush_size);

if (!s) {
LOG(WARNING) << "Flush memtable failed with res = " << s;
// If s is not ok, ignore the code, just use other code is ok
_flush_status.store(s.code());
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}
if (_flush_status.load() != OK) {
return;
if (!s.ok()) {
std::lock_guard wrlk(_flush_status_lock);
LOG(WARNING) << "Flush memtable failed with res = " << s;
_flush_status = s;
}

VLOG_CRITICAL << "flush memtable wait time:" << flush_wait_time_ns
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
class FlushToken {
public:
explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
: _flush_token(std::move(flush_pool_token)), _flush_status(ErrorCode::OK) {}
: _flush_token(std::move(flush_pool_token)), _flush_status(Status::OK()) {}

Status submit(std::unique_ptr<MemTable> mem_table);

Expand Down Expand Up @@ -87,7 +87,8 @@ class FlushToken {

// Records the current flush status of the tablet.
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
std::atomic<int> _flush_status;
Status _flush_status;
std::shared_mutex _flush_status_lock;

FlushStatistic _stats;

Expand Down

0 comments on commit 2f2edc4

Please sign in to comment.