diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index c46b02cd02681d..de45fd05ed436c 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -68,10 +68,13 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { } Status FlushToken::submit(std::unique_ptr mem_table) { - auto s = _flush_status.load(); - if (s != OK) { - return Status::Error(s, "FlushToken meet error"); + { + std::shared_lock rdlk(_flush_status_lock); + if (!_flush_status.ok()) { + return _flush_status; + } } + if (mem_table == nullptr || mem_table->empty()) { return Status::OK(); } @@ -88,8 +91,13 @@ void FlushToken::cancel() { Status FlushToken::wait() { _flush_token->wait(); - auto s = _flush_status.load(); - return s == OK ? Status::OK() : Status::Error(s, "FlushToken meet error"); + { + std::shared_lock rdlk(_flush_status_lock); + if (!_flush_status.ok()) { + return _flush_status; + } + } + return Status::OK(); } Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) { @@ -116,8 +124,11 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time; _stats.flush_wait_time_ns += flush_wait_time_ns; // If previous flush has failed, return directly - if (_flush_status.load() != OK) { - return; + { + std::shared_lock rdlk(_flush_status_lock); + if (!_flush_status.ok()) { + return; + } } MonotonicStopWatch timer; @@ -127,13 +138,16 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t int64_t flush_size; Status s = _do_flush_memtable(memtable, segment_id, &flush_size); - if (!s) { - LOG(WARNING) << "Flush memtable failed with res = " << s; - // If s is not ok, ignore the code, just use other code is ok - _flush_status.store(s.code()); + { + std::shared_lock rdlk(_flush_status_lock); + if (!_flush_status.ok()) { + return; + } } - if (_flush_status.load() != OK) { - return; + if (!s.ok()) { + std::lock_guard wrlk(_flush_status_lock); + LOG(WARNING) << "Flush memtable failed with res = " << s; + _flush_status = s; } VLOG_CRITICAL << "flush memtable wait time:" << flush_wait_time_ns diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 61cc31ab39df19..cfe1672e38b158 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -58,7 +58,7 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); class FlushToken { public: explicit FlushToken(std::unique_ptr flush_pool_token) - : _flush_token(std::move(flush_pool_token)), _flush_status(ErrorCode::OK) {} + : _flush_token(std::move(flush_pool_token)), _flush_status(Status::OK()) {} Status submit(std::unique_ptr mem_table); @@ -87,7 +87,8 @@ class FlushToken { // Records the current flush status of the tablet. // Note: Once its value is set to Failed, it cannot return to SUCCESS. - std::atomic _flush_status; + Status _flush_status; + std::shared_mutex _flush_status_lock; FlushStatistic _stats;