Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Nov 12, 2024
1 parent e2089b8 commit 986c0fa
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
35 changes: 15 additions & 20 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,12 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
_db_id, _table_id, _column_id, length);
constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
_rpc_status = Status::OK();

DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
_rpc_status = Status::InternalError<false>("injected error");
LOG_WARNING(
"AutoIncIDBuffer::_fetch_ids_from_fe.failed, "
"db_id={}, table_id={}, column_id={}, length={}",
_db_id, _table_id, _column_id, length);
return _rpc_status;
});

TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;
for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
_rpc_status = Status::InternalError<false>("injected error");
break;
});
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
request.__set_db_id(_db_id);
Expand All @@ -83,8 +77,9 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {

if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change "
"to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}",
"Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
"change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
"column_id={}",
master_addr.hostname, master_addr.port, result.master_address.hostname,
result.master_address.port, retry_times, _db_id, _table_id, _column_id);
master_addr = result.master_address;
Expand All @@ -94,15 +89,15 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {

if (!_rpc_status.ok()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, encounter rpc failure. "
"Failed to fetch auto-increment range, encounter rpc failure. "
"errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
_rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={}, but get "
"Failed to fetch auto-increment range, request length={}, but get "
"result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
length, result.length, retry_times, _db_id, _table_id, _column_id);
LOG(WARNING) << msg;
Expand All @@ -112,14 +107,14 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
}

LOG_INFO(
"get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.start, result.length,
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
return _rpc_status;
return ResultError(_rpc_status);
}

void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
Expand Down Expand Up @@ -172,16 +167,16 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
auto&& err = res.error();
LOG_WARNING(
"[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
"values from fe, status={}",
err);
"values from fe, db_id={}, table_id={}, column_id={}, status={}",
_db_id, _table_id, _column_id, err);
_is_fetching = false;
return;
}
int64_t start = res.value();
LOG_INFO(
"[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
"values from fe, start={}, length={}",
start, length);
"values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
_db_id, _table_id, _column_id, start, length);
{
std::lock_guard<std::mutex> lock {_latch};
_buffers.emplace_back(start, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
0

-- !sql --
4
0

0 comments on commit 986c0fa

Please sign in to comment.