Skip to content

Commit

Permalink
[Enhancement] Add broker address in broker rpc error message (#46085)
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb authored Aug 8, 2024
1 parent 51ce589 commit 5708243
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 44 deletions.
92 changes: 51 additions & 41 deletions be/src/fs/fs_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "fs/fs.h"
#include "gen_cpp/FileBrokerService_types.h"
#include "gen_cpp/TFileBrokerService.h"
#include "gutil/strings/substitute.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -56,30 +55,36 @@ inline BrokerServiceClientCache* client_cache() {
return ExecEnv::GetInstance()->broker_client_cache();
}

static Status to_status(const TBrokerOperationStatus& st) {
static Status to_status(const TBrokerOperationStatus& st, const TNetworkAddress& broker) {
switch (st.statusCode) {
case TBrokerOperationStatusCode::OK:
return Status::OK();
case TBrokerOperationStatusCode::END_OF_FILE:
return Status::EndOfFile(st.message);
return Status::EndOfFile(fmt::format("error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::NOT_AUTHORIZED:
return Status::IOError("No broker permission, " + st.message);
return Status::IOError(fmt::format("No broker permission, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::DUPLICATE_REQUEST:
return Status::InternalError("Duplicate broker request, " + st.message);
return Status::InternalError(
fmt::format("Duplicate broker request, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::INVALID_INPUT_OFFSET:
return Status::InvalidArgument("Invalid broker offset, " + st.message);
return Status::InvalidArgument(
fmt::format("Invalid broker offset, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::INVALID_ARGUMENT:
return Status::InvalidArgument("Invalid broker argument, " + st.message);
return Status::InvalidArgument(
fmt::format("Invalid broker argument, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::INVALID_INPUT_FILE_PATH:
return Status::NotFound("Invalid broker file path, " + st.message);
return Status::NotFound(
fmt::format("Invalid broker file path, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::FILE_NOT_FOUND:
return Status::NotFound("Broker file not found, " + st.message);
return Status::NotFound(fmt::format("Broker file not found, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::TARGET_STORAGE_SERVICE_ERROR:
return Status::InternalError("Broker storage service error, " + st.message);
return Status::InternalError(
fmt::format("Broker storage service error, error={}, broker={}", st.message, broker.hostname));
case TBrokerOperationStatusCode::OPERATION_NOT_SUPPORTED:
return Status::NotSupported("Broker operation not supported, " + st.message);
return Status::NotSupported(
fmt::format("Broker operation not supported, error={}, broker={}", st.message, broker.hostname));
}
return Status::InternalError("Unknown broker error, " + st.message);
return Status::InternalError(fmt::format("Unknown broker error, error={}, broker={}", st.message, broker.hostname));
}

template <typename Method, typename Request, typename Response>
Expand All @@ -90,8 +95,8 @@ static Status call_method(const TNetworkAddress& broker, Method method, const Re
Status status;
BrokerServiceConnection conn(client_cache(), broker, timeout_ms, &status);
if (!status.ok()) {
LOG(WARNING) << "Fail to get broker client: " << status;
return status;
LOG(WARNING) << "Fail to get broker client, error=" << status << ", broker=" << broker.hostname;
return status.clone_and_append(fmt::format("broker={}", broker.hostname));
}
client = conn.get();
#else
Expand All @@ -110,10 +115,12 @@ static Status call_method(const TNetworkAddress& broker, Method method, const Re
if (retry_count-- > 0) {
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
return Status::ThriftRpcError(e.what());
return Status::ThriftRpcError(
fmt::format("Fail to call broker, error={}, broker={}", e.what(), broker.hostname));
}
} catch (apache::thrift::TException& e) {
return Status::ThriftRpcError(e.what());
return Status::ThriftRpcError(
fmt::format("Fail to call broker, error={}, broker={}", e.what(), broker.hostname));
}
}
}
Expand All @@ -136,7 +143,7 @@ static Status broker_pread(void* buff, const TNetworkAddress& broker, const TBro
if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) {
break;
} else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
return to_status(response.opStatus);
return to_status(response.opStatus, broker);
} else if (response.data.empty()) {
break;
}
Expand All @@ -155,7 +162,7 @@ static void broker_close_reader(const TNetworkAddress& broker, const TBrokerFD&
request.__set_fd(fd);

Status st = call_method(broker, &BrokerServiceClient::closeReader, request, &response);
LOG_IF(WARNING, !st.ok()) << "Fail to close broker reader, " << st.to_string();
LOG_IF(WARNING, !st.ok()) << "Fail to close broker reader, error=" << st.to_string();
}

static Status broker_close_writer(const TNetworkAddress& broker, const TBrokerFD& fd, int timeout_ms) {
Expand All @@ -167,12 +174,12 @@ static Status broker_close_writer(const TNetworkAddress& broker, const TBrokerFD

Status st = call_method(broker, &BrokerServiceClient::closeWriter, request, &response, 1, timeout_ms);
if (!st.ok()) {
LOG(WARNING) << "Fail to close broker writer: " << st;
LOG(WARNING) << "Fail to close broker writer, error=" << st;
return st;
}
if (response.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to close broker writer: " << response.message;
return to_status(response);
LOG(WARNING) << "Fail to close broker writer, error=" << response.message << ", broker=" << broker.hostname;
return to_status(response, broker);
}
return Status::OK();
}
Expand Down Expand Up @@ -226,12 +233,13 @@ class BrokerWritableFile : public WritableFile {

Status st = call_method(_broker, &BrokerServiceClient::pwrite, request, &response, 0, _timeout_ms);
if (!st.ok()) {
LOG(WARNING) << "Fail to append " << _path << ": " << st;
LOG(WARNING) << "Fail to append " << _path << ", error=" << st;
return st;
}
if (response.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to append " << _path << ": " << response.message;
return to_status(response);
LOG(WARNING) << "Fail to append " << _path << ", error=" << response.message
<< ", broker=" << _broker.hostname;
return to_status(response, _broker);
}
_offset += data.size;
return Status::OK();
Expand Down Expand Up @@ -289,12 +297,13 @@ StatusOr<std::unique_ptr<SequentialFile>> BrokerFileSystem::new_sequential_file(

Status st = call_method(_broker_addr, &BrokerServiceClient::openReader, request, &response);
if (!st.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << st;
LOG(WARNING) << "Fail to open " << path << ", error=" << st;
return st;
}
if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to open " << path << ": " << response.opStatus.message;
return to_status(response.opStatus);
LOG(WARNING) << "Fail to open " << path << ", error=" << response.opStatus.message
<< ", broker=" << _broker_addr.hostname;
return to_status(response.opStatus, _broker_addr);
}

// Get file size.
Expand All @@ -317,12 +326,13 @@ StatusOr<std::unique_ptr<RandomAccessFile>> BrokerFileSystem::new_random_access_

Status st = call_method(_broker_addr, &BrokerServiceClient::openReader, request, &response);
if (!st.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << st;
LOG(WARNING) << "Fail to open " << path << ", error=" << st;
return st;
}
if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to open " << path << ": " << response.opStatus.message;
return to_status(response.opStatus);
LOG(WARNING) << "Fail to open " << path << ", error=" << response.opStatus.message
<< ", broker=" << _broker_addr.hostname;
return to_status(response.opStatus, _broker_addr);
}

// Get file size.
Expand Down Expand Up @@ -354,8 +364,7 @@ StatusOr<std::unique_ptr<WritableFile>> BrokerFileSystem::new_writable_file(cons
fmt::format("Cannot open an already exists file through broker, path={}", path));
}
} else {
auto msg = strings::Substitute("Unsupported open mode $0", opts.mode);
return Status::NotSupported(msg);
return Status::NotSupported(fmt::format("Unsupported open mode {}", opts.mode));
}

TBrokerOpenWriterRequest request;
Expand All @@ -369,13 +378,14 @@ StatusOr<std::unique_ptr<WritableFile>> BrokerFileSystem::new_writable_file(cons

Status st = call_method(_broker_addr, &BrokerServiceClient::openWriter, request, &response, 1, _timeout_ms);
if (!st.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << st;
LOG(WARNING) << "Fail to open " << path << ", error=" << st;
return st;
}

if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Fail to open " << path << ": " << response.opStatus.message;
return to_status(response.opStatus);
LOG(WARNING) << "Fail to open " << path << ", error=" << response.opStatus.message
<< ", broker=" << _broker_addr.hostname;
return to_status(response.opStatus, _broker_addr);
}

return std::make_unique<BrokerWritableFile>(_broker_addr, path, response.fd, 0, _timeout_ms);
Expand All @@ -393,7 +403,7 @@ Status BrokerFileSystem::_path_exists(const std::string& path) {
request.__set_version(TBrokerVersion::VERSION_ONE);
RETURN_IF_ERROR(call_method(_broker_addr, &BrokerServiceClient::checkPathExist, request, &response));
if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
return to_status(response.opStatus);
return to_status(response.opStatus, _broker_addr);
}
return response.isPathExist ? Status::OK() : Status::NotFound(path);
}
Expand Down Expand Up @@ -430,14 +440,14 @@ Status BrokerFileSystem::_delete_file(const std::string& path) {

Status st = call_method(_broker_addr, &BrokerServiceClient::deletePath, request, &response);
if (!st.ok()) {
LOG(WARNING) << "Fail to delete " << path << ": " << st.message();
LOG(WARNING) << "Fail to delete " << path << ", error=" << st.message() << ", broker=" << _broker_addr.hostname;
return st;
}
st = to_status(response);
st = to_status(response, _broker_addr);
if (st.ok()) {
LOG(INFO) << "Deleted " << path;
} else {
LOG(WARNING) << "Fail to delete " << path << ": " << st.message();
LOG(WARNING) << "Fail to delete " << path << ", error=" << st.message();
}
return st;
}
Expand Down Expand Up @@ -508,12 +518,12 @@ Status BrokerFileSystem::_list_file(const std::string& path, TBrokerFileStatus*
return Status::NotFound(path);
} else if (response.opStatus.statusCode == TBrokerOperationStatusCode::OK) {
if (response.files.size() != 1) {
return Status::InternalError(strings::Substitute("unexpected file list size=$0", response.files.size()));
return Status::InternalError(fmt::format("unexpected file list size={}", response.files.size()));
}
swap(*stat, response.files[0]);
return Status::OK();
} else {
return to_status(response.opStatus);
return to_status(response.opStatus, _broker_addr);
}
}

Expand Down
15 changes: 12 additions & 3 deletions be/test/fs/fs_broker_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class MockBrokerServer {
response.__set_statusCode(TBrokerOperationStatusCode::OK);
} else {
response.__set_statusCode(TBrokerOperationStatusCode::FILE_NOT_FOUND);
response.__set_message("File not exist");
}
}

Expand All @@ -95,6 +96,7 @@ class MockBrokerServer {
_readers[fd.low] = std::move(res).value();
} else if (res.status().is_not_found()) {
response.opStatus.__set_statusCode(TBrokerOperationStatusCode::FILE_NOT_FOUND);
response.opStatus.__set_message("File not exist");
} else {
// I don't know how real broker handle this case, just return an non-OK status here.
response.opStatus.__set_statusCode(TBrokerOperationStatusCode::INVALID_ARGUMENT);
Expand Down Expand Up @@ -326,8 +328,13 @@ TEST_F(EnvBrokerTest, test_open_non_exist_file) {

// Check the specific (mocked) error code is meaningless, because
// I don't know what it is.
ASSERT_FALSE(_fs.new_sequential_file(url).ok());
ASSERT_FALSE(_fs.new_random_access_file(url).ok());
auto st = _fs.new_sequential_file(url);
ASSERT_TRUE(st.status().is_not_found());
ASSERT_EQ("Broker file not found, error=File not exist, broker=127.0.0.1", st.status().message());

auto st2 = _fs.new_random_access_file(url);
ASSERT_TRUE(st2.status().is_not_found());
ASSERT_EQ("Broker file not found, error=File not exist, broker=127.0.0.1", st2.status().message());
}

// NOLINTNEXTLINE
Expand Down Expand Up @@ -407,7 +414,9 @@ TEST_F(EnvBrokerTest, test_write_exist_file) {
TEST_F(EnvBrokerTest, test_delete_file) {
const std::string path = "/tmp/1.txt";

ASSERT_FALSE(_fs.delete_file(path).ok());
auto st = _fs.delete_file(path);
ASSERT_TRUE(st.is_not_found());
ASSERT_EQ("Broker file not found, error=File not exist, broker=127.0.0.1", st.message());

ASSERT_OK(_fs_mem->create_file(path));
ASSERT_OK(_fs_mem->append_file(path, "file content"));
Expand Down

0 comments on commit 5708243

Please sign in to comment.