diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 4bc87dff48958c..f2288b62ec2f3f 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -26,6 +26,7 @@ #include "common/status.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" +#include "util/debug_points.h" #include "util/runtime_profile.h" #include "util/thrift_rpc_helper.h" @@ -44,10 +45,18 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) { } Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { + LOG_INFO( + "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, " + "db_id={}, table_id={}, column_id={}, length={}", + _db_id, _table_id, _column_id, length); constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; _rpc_status = Status::OK(); TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { + DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", { + _rpc_status = Status::InternalError("injected error"); + break; + }); TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; request.__set_db_id(_db_id); @@ -67,8 +76,9 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { if (_rpc_status.is()) { LOG_WARNING( - "Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change " - "to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}", + "Failed to fetch auto-increment range, requested to non-master FE@{}:{}, " + "change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, " + "column_id={}", master_addr.hostname, master_addr.port, result.master_address.hostname, result.master_address.port, retry_times, _db_id, _table_id, _column_id); master_addr = result.master_address; @@ -78,7 +88,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { if (!_rpc_status.ok()) { LOG_WARNING( - "Failed to fetch auto-incremnt range, encounter rpc failure. " + "Failed to fetch auto-increment range, encounter rpc failure. " "errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}", _rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -86,7 +96,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { } if (result.length != length) [[unlikely]] { auto msg = fmt::format( - "Failed to fetch auto-incremnt range, request length={}, but get " + "Failed to fetch auto-increment range, request length={}, but get " "result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}", length, result.length, retry_times, _db_id, _table_id, _column_id); LOG(WARNING) << msg; @@ -96,14 +106,14 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { } LOG_INFO( - "get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, " + "get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, " "retry_time={}, db_id={}, table_id={}, column_id={}", master_addr.hostname, master_addr.port, result.start, result.length, get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id); return result.start; } CHECK(!_rpc_status.ok()); - return _rpc_status; + return ResultError(_rpc_status); } void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( @@ -153,10 +163,19 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { auto&& res = _fetch_ids_from_fe(length); if (!res.has_value()) [[unlikely]] { + auto&& err = res.error(); + LOG_WARNING( + "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment " + "values from fe, db_id={}, table_id={}, column_id={}, status={}", + _db_id, _table_id, _column_id, err); _is_fetching = false; return; } int64_t start = res.value(); + LOG_INFO( + "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment " + "values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}", + _db_id, _table_id, _column_id, start, length); { std::lock_guard lock {_latch}; _buffers.emplace_back(start, length); diff --git a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out new file mode 100644 index 00000000000000..453e378f9c43f6 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 + +-- !sql -- +4 + +-- !sql -- +0 + diff --git a/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy new file mode 100644 index 00000000000000..e9bb6ae9a3c6ca --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +suite("test_auto_inc_fetch_fail", "nonConcurrent") { + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + def table1 = "test_auto_inc_fetch_fail" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k` int, + `c1` int, + `c2` int, + `c3` int, + `id` BIGINT NOT NULL AUTO_INCREMENT(10000), + ) UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); """ + + GetDebugPoint().enableDebugPointForAllBEs("AutoIncIDBuffer::_fetch_ids_from_fe.failed") + + try { + sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """ + } catch (Exception e) { + logger.info("error : ${e}") + } + qt_sql "select count(*) from ${table1};" + + GetDebugPoint().clearDebugPointsForAllBEs() + + Thread.sleep(1000) + + sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """ + qt_sql "select count(*) from ${table1};" + qt_sql "select count(*) from ${table1} where id < 10000;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +}