Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Bypass sdk read and write decoupling #45918

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,16 @@ if (ENABLE_FAULT_INJECTION)
message(STATUS "enable fault injection")
endif()

option(ASSERT_STATUS_CHECKED "build with assert status checked" OFF)
if (ASSERT_STATUS_CHECKED)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DSTARROCKS_ASSERT_STATUS_CHECKED")
endif()

if (BUILD_FORMAT_LIB)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fPIC -DBUILD_FORMAT_LIB")
message(STATUS "enable build format library")
endif()

# For CMAKE_BUILD_TYPE=Debug
# -ggdb: Enable gdb debugging
# Debug information is stored as dwarf2 to be as compatible as possible
Expand Down Expand Up @@ -832,7 +842,6 @@ set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES}


set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES}
hdfs
jvm
)
set(JAVA_HOME ${THIRDPARTY_DIR}/open_jdk/)
Expand Down Expand Up @@ -955,29 +964,34 @@ if (${WITH_TENANN} STREQUAL "ON")
endif()

# Add all external dependencies. They should come after the starrocks libs.
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} ${STARROCKS_DEPENDENCIES})

set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS}
${STARROCKS_LINK_LIBS}
${STARROCKS_DEPENDENCIES}
hdfs
)
set(BUILD_FOR_SANITIZE "OFF")

# Add sanitize static link flags or jemalloc
if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR "${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
message("use jemalloc")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} jemalloc)
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS jemalloc)
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "ASAN")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libsan)
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libsan)
else()
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libasan)
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libasan)
endif()
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "LSAN")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-liblsan)
set(BUILD_FOR_SANITIZE "ON")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-liblsan)
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
message("use jemalloc")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libubsan jemalloc)
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libubsan jemalloc)
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libtsan)
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libtsan)
set(BUILD_FOR_SANITIZE "ON")
else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
Expand Down Expand Up @@ -1029,9 +1043,15 @@ if (${MAKE_TEST} STREQUAL "ON")
add_definitions(-DBE_TEST)
add_definitions(-DFIU_ENABLE)
else()
# output *.a, *.so, *.dylib to output/tmp
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE})
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE})
if (BUILD_FORMAT_LIB)
# output *.a, *.so, *.dylib to output/format-lib-tmp
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${OUTPUT_DIR}/format-lib-tmp/${CMAKE_BUILD_TYPE})
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${OUTPUT_DIR}/format-lib-tmp/${CMAKE_BUILD_TYPE})
else()
# output *.a, *.so, *.dylib to output/tmp
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE})
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE})
endif ()
# output *.exe to output/lib
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${OUTPUT_DIR}/lib)
endif ()
Expand Down Expand Up @@ -1101,8 +1121,13 @@ add_subdirectory(${SRC_DIR}/tools)
add_subdirectory(${SRC_DIR}/util)
add_subdirectory(${SRC_DIR}/script)

if (BUILD_FORMAT_LIB)
message(STATUS "Build starrocks format lib")
add_subdirectory(${SRC_DIR}/starrocks_format)
endif()

if (WITH_BENCH STREQUAL "ON")
message(STATUS "Build binary with bench")
message(STATUS "Build binary with bench")
add_subdirectory(${SRC_DIR}/bench)
endif()

Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,11 @@ CONF_Int64(object_storage_request_timeout_ms, "-1");
// if this parameter is 0, use object_storage_request_timeout_ms instead.
CONF_Int64(object_storage_rename_file_request_timeout_ms, "30000");

// Retry strategy for read operation. The following two parameters are the default value of Aws
// DefaultRetryStrategy
CONF_Int64(object_storage_max_retries, "10");
CONF_Int64(object_storage_retry_scale_factor, "25");

CONF_Strings(fallback_to_hadoop_fs_list, "");
CONF_Strings(s3_compatible_fs_list, "s3n://, s3a://, s3://, oss://, cos://, cosn://, obs://, ks3://, tos://");
CONF_mBool(s3_use_list_objects_v1, "false");
Expand Down
5 changes: 4 additions & 1 deletion be/src/fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ set(EXEC_FILES
s3/poco_http_client.cpp
s3/poco_common.cpp
fs_util.cpp
fs_starlet.cpp
)

if (NOT BUILD_FORMAT_LIB)
list(APPEND EXEC_FILES fs_starlet.cpp)
endif()

add_library(FileSystem STATIC
${EXEC_FILES}
)
21 changes: 15 additions & 6 deletions be/src/fs/fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "fs/fs_util.h"
#include "fs/hdfs/fs_hdfs.h"
#include "runtime/file_result_writer.h"
#ifdef USE_STAROS
#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB)
Copy link
Contributor

@kevincai kevincai Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean the format_lib must be built separately, otherwise, the BE binary will not be fully functional.

#include "fs/fs_starlet.h"
#endif

Expand Down Expand Up @@ -52,7 +52,7 @@ std::unique_ptr<RandomAccessFile> RandomAccessFile::from(std::unique_ptr<io::See
static thread_local std::shared_ptr<FileSystem> tls_fs_posix;
static thread_local std::shared_ptr<FileSystem> tls_fs_s3;
static thread_local std::shared_ptr<FileSystem> tls_fs_hdfs;
#ifdef USE_STAROS
#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB)
static thread_local std::shared_ptr<FileSystem> tls_fs_starlet;
#endif

Expand All @@ -77,7 +77,7 @@ inline std::shared_ptr<FileSystem> get_tls_fs_s3() {
return tls_fs_s3;
}

#ifdef USE_STAROS
#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB)
inline std::shared_ptr<FileSystem> get_tls_fs_starlet() {
if (tls_fs_starlet == nullptr) {
tls_fs_starlet.reset(new_fs_starlet().release());
Expand All @@ -86,7 +86,16 @@ inline std::shared_ptr<FileSystem> get_tls_fs_starlet() {
}
#endif

StatusOr<std::unique_ptr<FileSystem>> FileSystem::CreateUniqueFromString(std::string_view uri, FSOptions options) {
StatusOr<std::shared_ptr<FileSystem>> FileSystem::Create(std::string_view uri, const FSOptions& options) {
if (!options._fs_options.empty()) {
return FileSystem::CreateUniqueFromString(uri, options);
} else {
return FileSystem::CreateSharedFromString(uri);
}
}

StatusOr<std::unique_ptr<FileSystem>> FileSystem::CreateUniqueFromString(std::string_view uri,
const FSOptions& options) {
if (fs::is_fallback_to_hadoop_fs(uri)) {
return new_fs_hdfs(options);
}
Expand All @@ -101,7 +110,7 @@ StatusOr<std::unique_ptr<FileSystem>> FileSystem::CreateUniqueFromString(std::st
// Now Azure storage and Google Cloud Storage both are using LibHdfs, we can use cpp sdk instead in the future.
return new_fs_hdfs(options);
}
#ifdef USE_STAROS
#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB)
if (is_starlet_uri(uri)) {
return new_fs_starlet();
}
Expand All @@ -121,7 +130,7 @@ StatusOr<std::shared_ptr<FileSystem>> FileSystem::CreateSharedFromString(std::st
if (fs::is_s3_uri(uri)) {
return get_tls_fs_s3();
}
#ifdef USE_STAROS
#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB)
if (is_starlet_uri(uri)) {
return get_tls_fs_starlet();
}
Expand Down
26 changes: 23 additions & 3 deletions be/src/fs/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct ResultFileOptions;
class TUploadReq;
class TDownloadReq;
struct WritableFileOptions;
class FileSystem;

struct SpaceInfo {
// Total size of the filesystem, in bytes
Expand All @@ -47,13 +48,15 @@ struct FSOptions {
private:
FSOptions(const TBrokerScanRangeParams* scan_range_params, const TExportSink* export_sink,
const ResultFileOptions* result_file_options, const TUploadReq* upload, const TDownloadReq* download,
const TCloudConfiguration* cloud_configuration)
const TCloudConfiguration* cloud_configuration,
const std::unordered_map<std::string, std::string>& fs_options = {})
: scan_range_params(scan_range_params),
export_sink(export_sink),
result_file_options(result_file_options),
upload(upload),
download(download),
cloud_configuration(cloud_configuration) {}
cloud_configuration(cloud_configuration),
_fs_options(fs_options) {}

public:
FSOptions() : FSOptions(nullptr, nullptr, nullptr, nullptr, nullptr, nullptr) {}
Expand All @@ -73,6 +76,9 @@ struct FSOptions {
FSOptions(const TCloudConfiguration* cloud_configuration)
: FSOptions(nullptr, nullptr, nullptr, nullptr, nullptr, cloud_configuration) {}

FSOptions(const std::unordered_map<std::string, std::string>& fs_options)
: FSOptions(nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, fs_options) {}

const THdfsProperties* hdfs_properties() const;

const TBrokerScanRangeParams* scan_range_params;
Expand All @@ -81,6 +87,17 @@ struct FSOptions {
const TUploadReq* upload;
const TDownloadReq* download;
const TCloudConfiguration* cloud_configuration;
const std::unordered_map<std::string, std::string> _fs_options;

static constexpr const char* FS_S3_ENDPOINT = "fs.s3a.endpoint";
static constexpr const char* FS_S3_ENDPOINT_REGION = "fs.s3a.endpoint.region";
static constexpr const char* FS_S3_ACCESS_KEY = "fs.s3a.access.key";
static constexpr const char* FS_S3_SECRET_KEY = "fs.s3a.secret.key";
static constexpr const char* FS_S3_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
static constexpr const char* FS_S3_CONNECTION_SSL_ENABLED = "fs.s3a.connection.ssl.enabled";
static constexpr const char* FS_S3_READ_AHEAD_RANGE = "fs.s3a.readahead.range";
static constexpr const char* FS_S3_RETRY_LIMIT = "fs.s3a.retry.limit";
static constexpr const char* FS_S3_RETRY_INTERVAL = "fs.s3a.retry.interval";
};

struct SequentialFileOptions {
Expand Down Expand Up @@ -112,6 +129,7 @@ struct FileInfo {
std::string path;
std::optional<int64_t> size;
std::string encryption_meta;
std::shared_ptr<FileSystem> fs;
};

struct FileWriteStat {
Expand All @@ -138,8 +156,10 @@ class FileSystem {
FileSystem() = default;
virtual ~FileSystem() = default;

static StatusOr<std::shared_ptr<FileSystem>> Create(std::string_view uri, const FSOptions& options);

static StatusOr<std::unique_ptr<FileSystem>> CreateUniqueFromString(std::string_view uri,
FSOptions options = FSOptions());
const FSOptions& options = FSOptions());

static StatusOr<std::shared_ptr<FileSystem>> CreateSharedFromString(std::string_view uri);

Expand Down
Loading
Loading