Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Add broker address in broker rpc error message #46085

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 51 additions & 41 deletions be/src/fs/fs_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "fs/fs.h"
#include "gen_cpp/FileBrokerService_types.h"
#include "gen_cpp/TFileBrokerService.h"
#include "gutil/strings/substitute.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -56,30 +55,36 @@ inline BrokerServiceClientCache* client_cache() {
return ExecEnv::GetInstance()->broker_client_cache();
}

static Status to_status(const TBrokerOperationStatus& st) {
static Status to_status(const TBrokerOperationStatus& st, const TNetworkAddress& broker) {
switch (st.statusCode) {
case TBrokerOperationStatusCode::OK:
return Status::OK();
case TBrokerOperationStatusCode::END_OF_FILE:
return Status::EndOfFile(st.message);
return Status::EndOfFile(fmt::format("error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::NOT_AUTHORIZED:
return Status::IOError("No broker permission, " + st.message);
return Status::IOError(fmt::format("No broker permission, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::DUPLICATE_REQUEST:
return Status::InternalError("Duplicate broker request, " + st.message);
return Status::InternalError(
fmt::format("Duplicate broker request, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::INVALID_INPUT_OFFSET:
return Status::InvalidArgument("Invalid broker offset, " + st.message);
return Status::InvalidArgument(
fmt::format("Invalid broker offset, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::INVALID_ARGUMENT:
return Status::InvalidArgument("Invalid broker argument, " + st.message);
return Status::InvalidArgument(
fmt::format("Invalid broker argument, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::INVALID_INPUT_FILE_PATH:
return Status::NotFound("Invalid broker file path, " + st.message);
return Status::NotFound(
fmt::format("Invalid broker file path, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::FILE_NOT_FOUND:
return Status::NotFound("Broker file not found, " + st.message);
return Status::NotFound(fmt::format("Broker file not found, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::TARGET_STORAGE_SERVICE_ERROR:
return Status::InternalError("Broker storage service error, " + st.message);
return Status::InternalError(
fmt::format("Broker storage service error, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::OPERATION_NOT_SUPPORTED:
return Status::NotSupported("Broker operation not supported, " + st.message);
return Status::NotSupported(
fmt::format("Broker operation not supported, error={}, broker={}", st.message, broker.hostname));
}
return Status::InternalError("Unknown broker error, " + st.message);
return Status::InternalError(fmt::format("Unknown broker error, error={}, broker={}", st.message, broker.hostname));
}

template <typename Method, typename Request, typename Response>
Expand All @@ -90,8 +95,8 @@ static Status call_method(const TNetworkAddress& broker, Method method, const Re
Status status;
BrokerServiceConnection conn(client_cache(), broker, timeout_ms, &status);
if (!status.ok()) {
LOG(WARNING) << "Fail to get broker client: " << status;
return status;
LOG(WARNING) << "Fail to get broker client, error=" << status << ", broker=" << broker.hostname;
return status.clone_and_append(fmt::format("broker={}", broker.hostname));
}
client = conn.get();
#else
Expand All @@ -110,10 +115,12 @@ static Status call_method(const TNetworkAddress& broker, Method method, const Re
if (retry_count-- > 0) {
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
return Status::ThriftRpcError(e.what());
return Status::ThriftRpcError(
fmt::format("Fail to call broker, error={}, broker={}", e.what(), broker.hostname));
}
} catch (apache::thrift::TException& e) {
return Status::ThriftRpcError(e.what());
return Status::ThriftRpcError(
fmt::format("Fail to call broker, error={}, broker={}", e.what(), broker.hostname));
}
}
}
Expand All @@ -136,7 +143,7 @@ static Status broker_pread(void* buff, const TNetworkAddress& broker, const TBro
if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) {
break;
} else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
return to_status(response.opStatus);
return to_status(response.opStatus, broker);
} else if (response.data.empty()) {
break;
}
Expand All @@ -155,7 +162,7 @@ static void broker_close_reader(const TNetworkAddress& broker, const TBrokerFD&
request.__set_fd(fd);

Status st = call_method(broker, &BrokerServiceClient::closeReader, request, &response);
LOG_IF(WARNING, !st.ok()) << "Fail to close broker reader, " << st.to_string();
LOG_IF(WARNING, !st.ok()) << "Fail to close broker reader, error=" << st.to_string();
}

static Status broker_close_writer(const TNetworkAddress& broker, const TBrokerFD& fd, int timeout_ms) {
Expand All @@ -167,12 +174,12 @@ static Status broker_close_writer(const TNetworkAddress& broker, const TBrokerFD

Status st = call_method(broker, &BrokerServiceClient::closeWriter, request, &response, 1, timeout_ms);
if (!st.ok()) {
LOG(WARNING) << "Fail to close broker writer: " << st;
LOG(WARNING) << "Fail to close broker writer, error=" << st;
return st;
}
if (response.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to close broker writer: " << response.message;
return to_status(response);
LOG(WARNING) << "Fail to close broker writer, error=" << response.message << ", broker=" << broker.hostname;
return to_status(response, broker);
}
return Status::OK();
}
Expand Down Expand Up @@ -226,12 +233,13 @@ class BrokerWritableFile : public WritableFile {

Status st = call_method(_broker, &BrokerServiceClient::pwrite, request, &response, 0, _timeout_ms);
if (!st.ok()) {
LOG(WARNING) << "Fail to append " << _path << ": " << st;
LOG(WARNING) << "Fail to append " << _path << ", error=" << st;
return st;
}
if (response.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to append " << _path << ": " << response.message;
return to_status(response);
LOG(WARNING) << "Fail to append " << _path << ", error=" << response.message
<< ", broker=" << _broker.hostname;
return to_status(response, _broker);
}
_offset += data.size;
return Status::OK();
Expand Down Expand Up @@ -289,12 +297,13 @@ StatusOr<std::unique_ptr<SequentialFile>> BrokerFileSystem::new_sequential_file(

Status st = call_method(_broker_addr, &BrokerServiceClient::openReader, request, &response);
if (!st.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << st;
LOG(WARNING) << "Fail to open " << path << ", error=" << st;
return st;
}
if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to open " << path << ": " << response.opStatus.message;
return to_status(response.opStatus);
LOG(WARNING) << "Fail to open " << path << ", error=" << response.opStatus.message
<< ", broker=" << _broker_addr.hostname;
return to_status(response.opStatus, _broker_addr);
}

// Get file size.
Expand All @@ -317,12 +326,13 @@ StatusOr<std::unique_ptr<RandomAccessFile>> BrokerFileSystem::new_random_access_

Status st = call_method(_broker_addr, &BrokerServiceClient::openReader, request, &response);
if (!st.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << st;
LOG(WARNING) << "Fail to open " << path << ", error=" << st;
return st;
}
if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to open " << path << ": " << response.opStatus.message;
return to_status(response.opStatus);
LOG(WARNING) << "Fail to open " << path << ", error=" << response.opStatus.message
<< ", broker=" << _broker_addr.hostname;
return to_status(response.opStatus, _broker_addr);
}

// Get file size.
Expand Down Expand Up @@ -354,8 +364,7 @@ StatusOr<std::unique_ptr<WritableFile>> BrokerFileSystem::new_writable_file(cons
fmt::format("Cannot open an already exists file through broker, path={}", path));
}
} else {
auto msg = strings::Substitute("Unsupported open mode $0", opts.mode);
return Status::NotSupported(msg);
return Status::NotSupported(fmt::format("Unsupported open mode {}", opts.mode));
}

TBrokerOpenWriterRequest request;
Expand All @@ -369,13 +378,14 @@ StatusOr<std::unique_ptr<WritableFile>> BrokerFileSystem::new_writable_file(cons

Status st = call_method(_broker_addr, &BrokerServiceClient::openWriter, request, &response, 1, _timeout_ms);
if (!st.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << st;
LOG(WARNING) << "Fail to open " << path << ", error=" << st;
return st;
}

if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to open " << path << ": " << response.opStatus.message;
return to_status(response.opStatus);
LOG(WARNING) << "Fail to open " << path << ", error=" << response.opStatus.message
<< ", broker=" << _broker_addr.hostname;
return to_status(response.opStatus, _broker_addr);
}

return std::make_unique<BrokerWritableFile>(_broker_addr, path, response.fd, 0, _timeout_ms);
Expand All @@ -393,7 +403,7 @@ Status BrokerFileSystem::_path_exists(const std::string& path) {
request.__set_version(TBrokerVersion::VERSION_ONE);
RETURN_IF_ERROR(call_method(_broker_addr, &BrokerServiceClient::checkPathExist, request, &response));
if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
return to_status(response.opStatus);
return to_status(response.opStatus, _broker_addr);
}
return response.isPathExist ? Status::OK() : Status::NotFound(path);
}
Expand Down Expand Up @@ -430,14 +440,14 @@ Status BrokerFileSystem::_delete_file(const std::string& path) {

Status st = call_method(_broker_addr, &BrokerServiceClient::deletePath, request, &response);
if (!st.ok()) {
LOG(WARNING) << "Fail to delete " << path << ": " << st.message();
LOG(WARNING) << "Fail to delete " << path << ", error=" << st.message() << ", broker=" << _broker_addr.hostname;
return st;
}
st = to_status(response);
st = to_status(response, _broker_addr);
if (st.ok()) {
LOG(INFO) << "Deleted " << path;
} else {
LOG(WARNING) << "Fail to delete " << path << ": " << st.message();
LOG(WARNING) << "Fail to delete " << path << ", error=" << st.message();
}
return st;
}
Expand Down Expand Up @@ -508,12 +518,12 @@ Status BrokerFileSystem::_list_file(const std::string& path, TBrokerFileStatus*
return Status::NotFound(path);
} else if (response.opStatus.statusCode == TBrokerOperationStatusCode::OK) {
if (response.files.size() != 1) {
return Status::InternalError(strings::Substitute("unexpected file list size=$0", response.files.size()));
return Status::InternalError(fmt::format("unexpected file list size={}", response.files.size()));
}
swap(*stat, response.files[0]);
return Status::OK();
} else {
return to_status(response.opStatus);
return to_status(response.opStatus, _broker_addr);
}
}

wyb marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
15 changes: 12 additions & 3 deletions be/test/fs/fs_broker_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class MockBrokerServer {
response.__set_statusCode(TBrokerOperationStatusCode::OK);
} else {
response.__set_statusCode(TBrokerOperationStatusCode::FILE_NOT_FOUND);
response.__set_message("File not exist");
}
}

Expand All @@ -95,6 +96,7 @@ class MockBrokerServer {
_readers[fd.low] = std::move(res).value();
} else if (res.status().is_not_found()) {
response.opStatus.__set_statusCode(TBrokerOperationStatusCode::FILE_NOT_FOUND);
response.opStatus.__set_message("File not exist");
} else {
// I don't know how real broker handle this case, just return an non-OK status here.
response.opStatus.__set_statusCode(TBrokerOperationStatusCode::INVALID_ARGUMENT);
Expand Down Expand Up @@ -326,8 +328,13 @@ TEST_F(EnvBrokerTest, test_open_non_exist_file) {

// Check the specific (mocked) error code is meaningless, because
// I don't know what it is.
ASSERT_FALSE(_fs.new_sequential_file(url).ok());
ASSERT_FALSE(_fs.new_random_access_file(url).ok());
auto st = _fs.new_sequential_file(url);
ASSERT_TRUE(st.status().is_not_found());
ASSERT_EQ("Broker file not found, error=File not exist, broker=127.0.0.1", st.status().message());

auto st2 = _fs.new_random_access_file(url);
ASSERT_TRUE(st2.status().is_not_found());
ASSERT_EQ("Broker file not found, error=File not exist, broker=127.0.0.1", st2.status().message());
}

// NOLINTNEXTLINE
Expand Down Expand Up @@ -407,7 +414,9 @@ TEST_F(EnvBrokerTest, test_write_exist_file) {
TEST_F(EnvBrokerTest, test_delete_file) {
const std::string path = "/tmp/1.txt";

ASSERT_FALSE(_fs.delete_file(path).ok());
auto st = _fs.delete_file(path);
ASSERT_TRUE(st.is_not_found());
ASSERT_EQ("Broker file not found, error=File not exist, broker=127.0.0.1", st.message());

ASSERT_OK(_fs_mem->create_file(path));
ASSERT_OK(_fs_mem->append_file(path, "file content"));
Expand Down
Loading