diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 10fb493bc9d2b..428d9e9c7a720 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -712,6 +712,16 @@ if (ENABLE_FAULT_INJECTION) message(STATUS "enable fault injection") endif() +option(ASSERT_STATUS_CHECKED "build with assert status checked" OFF) +if (ASSERT_STATUS_CHECKED) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DSTARROCKS_ASSERT_STATUS_CHECKED") +endif() + +if (BUILD_FORMAT_LIB) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fPIC -DBUILD_FORMAT_LIB") + message(STATUS "enable build format library") +endif() + # For CMAKE_BUILD_TYPE=Debug # -ggdb: Enable gdb debugging # Debug information is stored as dwarf2 to be as compatible as possible @@ -832,7 +842,6 @@ set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES} set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES} - hdfs jvm ) set(JAVA_HOME ${THIRDPARTY_DIR}/open_jdk/) @@ -955,29 +964,34 @@ if (${WITH_TENANN} STREQUAL "ON") endif() # Add all external dependencies. They should come after the starrocks libs. -set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} ${STARROCKS_DEPENDENCIES}) +set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} + ${STARROCKS_LINK_LIBS} + ${STARROCKS_DEPENDENCIES} + hdfs +) set(BUILD_FOR_SANITIZE "OFF") + # Add sanitize static link flags or jemalloc if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR "${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE") message("use jemalloc") - set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} jemalloc) + set(STARROCKS_MEMORY_DEPENDENCIES_LIBS jemalloc) elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "ASAN") if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libsan) + set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libsan) else() - set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libasan) + set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libasan) endif() set(BUILD_FOR_SANITIZE "ON") elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "LSAN") - set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-liblsan) - set(BUILD_FOR_SANITIZE "ON") + set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-liblsan) + set(BUILD_FOR_SANITIZE "ON") elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN") message("use jemalloc") - set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libubsan jemalloc) + set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libubsan jemalloc) set(BUILD_FOR_SANITIZE "ON") elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN") - set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libtsan) + set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libtsan) set(BUILD_FOR_SANITIZE "ON") else() message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") @@ -1029,9 +1043,15 @@ if (${MAKE_TEST} STREQUAL "ON") add_definitions(-DBE_TEST) add_definitions(-DFIU_ENABLE) else() - # output *.a, *.so, *.dylib to output/tmp - set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE}) - set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE}) + if (BUILD_FORMAT_LIB) + # output *.a, *.so, *.dylib to output/format-lib-tmp + set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${OUTPUT_DIR}/format-lib-tmp/${CMAKE_BUILD_TYPE}) + set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${OUTPUT_DIR}/format-lib-tmp/${CMAKE_BUILD_TYPE}) + else() + # output *.a, *.so, *.dylib to output/tmp + set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE}) + set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${OUTPUT_DIR}/tmp/${CMAKE_BUILD_TYPE}) + endif () # output *.exe to output/lib set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${OUTPUT_DIR}/lib) endif () @@ -1101,8 +1121,13 @@ add_subdirectory(${SRC_DIR}/tools) add_subdirectory(${SRC_DIR}/util) add_subdirectory(${SRC_DIR}/script) +if (BUILD_FORMAT_LIB) + message(STATUS "Build starrocks format lib") + add_subdirectory(${SRC_DIR}/starrocks_format) +endif() + if (WITH_BENCH STREQUAL "ON") - message(STATUS "Build binary with bench") + message(STATUS "Build binary with bench") add_subdirectory(${SRC_DIR}/bench) endif() diff --git a/be/src/common/config.h b/be/src/common/config.h index 1f466ace3b2bd..5707637048dc3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -858,6 +858,11 @@ CONF_Int64(object_storage_request_timeout_ms, "-1"); // if this parameter is 0, use object_storage_request_timeout_ms instead. CONF_Int64(object_storage_rename_file_request_timeout_ms, "30000"); +// Retry strategy for read operation. The following two parameters are the default value of Aws +// DefaultRetryStrategy +CONF_Int64(object_storage_max_retries, "10"); +CONF_Int64(object_storage_retry_scale_factor, "25"); + CONF_Strings(fallback_to_hadoop_fs_list, ""); CONF_Strings(s3_compatible_fs_list, "s3n://, s3a://, s3://, oss://, cos://, cosn://, obs://, ks3://, tos://"); CONF_mBool(s3_use_list_objects_v1, "false"); diff --git a/be/src/fs/CMakeLists.txt b/be/src/fs/CMakeLists.txt index 0029f191774fc..7115d61dd9356 100644 --- a/be/src/fs/CMakeLists.txt +++ b/be/src/fs/CMakeLists.txt @@ -32,9 +32,12 @@ set(EXEC_FILES s3/poco_http_client.cpp s3/poco_common.cpp fs_util.cpp - fs_starlet.cpp ) +if (NOT BUILD_FORMAT_LIB) + list(APPEND EXEC_FILES fs_starlet.cpp) +endif() + add_library(FileSystem STATIC ${EXEC_FILES} ) diff --git a/be/src/fs/fs.cpp b/be/src/fs/fs.cpp index cebd0e1718a76..53f647f238007 100644 --- a/be/src/fs/fs.cpp +++ b/be/src/fs/fs.cpp @@ -22,7 +22,7 @@ #include "fs/fs_util.h" #include "fs/hdfs/fs_hdfs.h" #include "runtime/file_result_writer.h" -#ifdef USE_STAROS +#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) #include "fs/fs_starlet.h" #endif @@ -52,7 +52,7 @@ std::unique_ptr RandomAccessFile::from(std::unique_ptr tls_fs_posix; static thread_local std::shared_ptr tls_fs_s3; static thread_local std::shared_ptr tls_fs_hdfs; -#ifdef USE_STAROS +#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) static thread_local std::shared_ptr tls_fs_starlet; #endif @@ -77,7 +77,7 @@ inline std::shared_ptr get_tls_fs_s3() { return tls_fs_s3; } -#ifdef USE_STAROS +#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) inline std::shared_ptr get_tls_fs_starlet() { if (tls_fs_starlet == nullptr) { tls_fs_starlet.reset(new_fs_starlet().release()); @@ -86,7 +86,16 @@ inline std::shared_ptr get_tls_fs_starlet() { } #endif -StatusOr> FileSystem::CreateUniqueFromString(std::string_view uri, FSOptions options) { +StatusOr> FileSystem::Create(std::string_view uri, const FSOptions& options) { + if (!options._fs_options.empty()) { + return FileSystem::CreateUniqueFromString(uri, options); + } else { + return FileSystem::CreateSharedFromString(uri); + } +} + +StatusOr> FileSystem::CreateUniqueFromString(std::string_view uri, + const FSOptions& options) { if (fs::is_fallback_to_hadoop_fs(uri)) { return new_fs_hdfs(options); } @@ -101,7 +110,7 @@ StatusOr> FileSystem::CreateUniqueFromString(std::st // Now Azure storage and Google Cloud Storage both are using LibHdfs, we can use cpp sdk instead in the future. return new_fs_hdfs(options); } -#ifdef USE_STAROS +#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) if (is_starlet_uri(uri)) { return new_fs_starlet(); } @@ -121,7 +130,7 @@ StatusOr> FileSystem::CreateSharedFromString(std::st if (fs::is_s3_uri(uri)) { return get_tls_fs_s3(); } -#ifdef USE_STAROS +#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) if (is_starlet_uri(uri)) { return get_tls_fs_starlet(); } diff --git a/be/src/fs/fs.h b/be/src/fs/fs.h index 8a2656b3d21d8..b795a426f22e3 100644 --- a/be/src/fs/fs.h +++ b/be/src/fs/fs.h @@ -33,6 +33,7 @@ struct ResultFileOptions; class TUploadReq; class TDownloadReq; struct WritableFileOptions; +class FileSystem; struct SpaceInfo { // Total size of the filesystem, in bytes @@ -47,13 +48,15 @@ struct FSOptions { private: FSOptions(const TBrokerScanRangeParams* scan_range_params, const TExportSink* export_sink, const ResultFileOptions* result_file_options, const TUploadReq* upload, const TDownloadReq* download, - const TCloudConfiguration* cloud_configuration) + const TCloudConfiguration* cloud_configuration, + const std::unordered_map& fs_options = {}) : scan_range_params(scan_range_params), export_sink(export_sink), result_file_options(result_file_options), upload(upload), download(download), - cloud_configuration(cloud_configuration) {} + cloud_configuration(cloud_configuration), + _fs_options(fs_options) {} public: FSOptions() : FSOptions(nullptr, nullptr, nullptr, nullptr, nullptr, nullptr) {} @@ -73,6 +76,9 @@ struct FSOptions { FSOptions(const TCloudConfiguration* cloud_configuration) : FSOptions(nullptr, nullptr, nullptr, nullptr, nullptr, cloud_configuration) {} + FSOptions(const std::unordered_map& fs_options) + : FSOptions(nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, fs_options) {} + const THdfsProperties* hdfs_properties() const; const TBrokerScanRangeParams* scan_range_params; @@ -81,6 +87,17 @@ struct FSOptions { const TUploadReq* upload; const TDownloadReq* download; const TCloudConfiguration* cloud_configuration; + const std::unordered_map _fs_options; + + static constexpr const char* FS_S3_ENDPOINT = "fs.s3a.endpoint"; + static constexpr const char* FS_S3_ENDPOINT_REGION = "fs.s3a.endpoint.region"; + static constexpr const char* FS_S3_ACCESS_KEY = "fs.s3a.access.key"; + static constexpr const char* FS_S3_SECRET_KEY = "fs.s3a.secret.key"; + static constexpr const char* FS_S3_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; + static constexpr const char* FS_S3_CONNECTION_SSL_ENABLED = "fs.s3a.connection.ssl.enabled"; + static constexpr const char* FS_S3_READ_AHEAD_RANGE = "fs.s3a.readahead.range"; + static constexpr const char* FS_S3_RETRY_LIMIT = "fs.s3a.retry.limit"; + static constexpr const char* FS_S3_RETRY_INTERVAL = "fs.s3a.retry.interval"; }; struct SequentialFileOptions { @@ -112,6 +129,7 @@ struct FileInfo { std::string path; std::optional size; std::string encryption_meta; + std::shared_ptr fs; }; struct FileWriteStat { @@ -138,8 +156,10 @@ class FileSystem { FileSystem() = default; virtual ~FileSystem() = default; + static StatusOr> Create(std::string_view uri, const FSOptions& options); + static StatusOr> CreateUniqueFromString(std::string_view uri, - FSOptions options = FSOptions()); + const FSOptions& options = FSOptions()); static StatusOr> CreateSharedFromString(std::string_view uri); diff --git a/be/src/fs/fs_s3.cpp b/be/src/fs/fs_s3.cpp index 2e30576131a0f..07ca45c7b58a8 100644 --- a/be/src/fs/fs_s3.cpp +++ b/be/src/fs/fs_s3.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -74,6 +75,8 @@ class S3ClientFactory { using ClientConfiguration = Aws::Client::ClientConfiguration; using S3Client = Aws::S3::S3Client; using S3ClientPtr = std::shared_ptr; + using ClientConfigurationPtr = std::shared_ptr; + using AWSCloudConfigurationPtr = std::shared_ptr; static S3ClientFactory& instance() { static S3ClientFactory obj; @@ -125,11 +128,14 @@ class S3ClientFactory { class ClientCacheKey { public: - ClientConfiguration config; - AWSCloudConfiguration aws_cloud_configuration; + ClientConfigurationPtr config; + AWSCloudConfigurationPtr aws_cloud_configuration; bool operator==(const ClientCacheKey& rhs) const { - return config == rhs.config && aws_cloud_configuration == rhs.aws_cloud_configuration; + if (config && rhs.config && aws_cloud_configuration && rhs.aws_cloud_configuration) { + return *config == *(rhs.config) && *aws_cloud_configuration == *(rhs.aws_cloud_configuration); + } + return !config && !rhs.config && !aws_cloud_configuration && !rhs.aws_cloud_configuration; } }; @@ -138,9 +144,10 @@ class S3ClientFactory { // Only use for UT bool _find_client_cache_keys_by_config_TEST(const Aws::Client::ClientConfiguration& config, AWSCloudConfiguration* cloud_config = nullptr) { + auto aws_config = cloud_config == nullptr ? AWSCloudConfiguration{} : *cloud_config; for (size_t i = 0; i < _items; i++) { - if (_client_cache_keys[i] == - ClientCacheKey{config, cloud_config == nullptr ? AWSCloudConfiguration{} : *cloud_config}) + if (_client_cache_keys[i] == ClientCacheKey{std::make_shared(config), + std::make_shared(aws_config)}) return true; } return false; @@ -197,6 +204,12 @@ void S3ClientFactory::close() { } } +// clang-format: off +static const std::vector retryable_errors = { + // tos qps limit ExceptionName + "ExceedAccountQPSLimit", "ExceedAccountRateLimit", "ExceedBucketQPSLimit", "ExceedBucketRateLimit"}; +// clang-format: on + S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const TCloudConfiguration& t_cloud_configuration, S3ClientFactory::OperationType operation_type) { const AWSCloudConfiguration aws_cloud_configuration = CloudConfigurationFactory::create_aws(t_cloud_configuration); @@ -230,7 +243,9 @@ S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const TCloudConfigurati config.requestTimeoutMs = config::object_storage_request_timeout_ms; } - ClientCacheKey client_cache_key{config, aws_cloud_configuration}; + auto client_conf = std::make_shared(config); + auto aws_config = std::make_shared(aws_cloud_configuration); + ClientCacheKey client_cache_key{client_conf, aws_config}; { // Duplicate code for cache s3 client std::lock_guard l(_lock); @@ -241,6 +256,8 @@ S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const TCloudConfigurati auto credential_provider = _get_aws_credentials_provider(aws_cloud_credential); + config.retryStrategy = std::make_shared( + retryable_errors, config::object_storage_max_retries, config::object_storage_retry_scale_factor); S3ClientPtr client = std::make_shared( credential_provider, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, !path_style_access); @@ -261,8 +278,8 @@ S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const TCloudConfigurati S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const ClientConfiguration& config, const FSOptions& opts) { std::lock_guard l(_lock); - - ClientCacheKey client_cache_key{config, AWSCloudConfiguration{}}; + auto client_conf = std::make_shared(config); + ClientCacheKey client_cache_key{client_conf, std::make_shared()}; for (size_t i = 0; i < _items; i++) { if (_client_cache_keys[i] == client_cache_key) return _clients[i]; } @@ -280,9 +297,20 @@ S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const ClientConfigurati secret_access_key = hdfs_properties->secret_key; } } else { - access_key_id = config::object_storage_access_key_id; - secret_access_key = config::object_storage_secret_access_key; + // resolve path style + auto itr = opts._fs_options.find(FSOptions::FS_S3_PATH_STYLE_ACCESS); + if (itr != opts._fs_options.end()) { + path_style_access = itr->second.compare("true") == 0; + } + + // resolve ak,sk + itr = opts._fs_options.find(FSOptions::FS_S3_ACCESS_KEY); + access_key_id = itr != opts._fs_options.end() ? itr->second : config::object_storage_access_key_id; + + itr = opts._fs_options.find(FSOptions::FS_S3_SECRET_KEY); + secret_access_key = itr != opts._fs_options.end() ? itr->second : config::object_storage_secret_access_key; } + if (!access_key_id.empty() && !secret_access_key.empty()) { auto credentials = std::make_shared(access_key_id, secret_access_key); client = std::make_shared(credentials, config, @@ -343,19 +371,48 @@ static std::shared_ptr new_s3client( config.maxConnections = config::object_storage_max_connection; } } else { - if (!uri.endpoint().empty()) { - config.endpointOverride = uri.endpoint(); + // resolve endpoint + auto itr = opts._fs_options.find(FSOptions::FS_S3_ENDPOINT); + auto ssl_itr = opts._fs_options.find(FSOptions::FS_S3_CONNECTION_SSL_ENABLED); + + if (itr != opts._fs_options.end() && !itr->second.empty()) { + config.endpointOverride = itr->second; } else if (!config::object_storage_endpoint.empty()) { config.endpointOverride = config::object_storage_endpoint; + } else if (ssl_itr != opts._fs_options.end()) { + config.scheme = itr->second.compare("true") == 0 ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP; } else if (config::object_storage_endpoint_use_https) { config.scheme = Aws::Http::Scheme::HTTPS; } else { config.scheme = Aws::Http::Scheme::HTTP; } - if (!config::object_storage_region.empty()) { + + // resolve region + itr = opts._fs_options.find(FSOptions::FS_S3_ENDPOINT_REGION); + if (itr != opts._fs_options.end() && !itr->second.empty()) { + config.region = itr->second; + } else if (!config::object_storage_region.empty()) { config.region = config::object_storage_region; } config.maxConnections = config::object_storage_max_connection; + + // resolve retry + int64_t s3client_max_retries = config::object_storage_max_retries; + itr = opts._fs_options.find(FSOptions::FS_S3_RETRY_LIMIT); + if (itr != opts._fs_options.end() && !itr->second.empty()) { + s3client_max_retries = std::stoi(itr->second); + } + int64_t s3client_retry_scale_factor = config::object_storage_retry_scale_factor; + itr = opts._fs_options.find(FSOptions::FS_S3_RETRY_INTERVAL); + if (itr != opts._fs_options.end() && !itr->second.empty()) { + s3client_retry_scale_factor = std::stoi(itr->second); + } + config.retryStrategy = std::make_shared( + retryable_errors, s3client_max_retries, s3client_retry_scale_factor); + } + + if (!uri.endpoint().empty()) { + config.endpointOverride = uri.endpoint(); } if (config::object_storage_connect_timeout_ms > 0) { config.connectTimeoutMs = config::object_storage_connect_timeout_ms; @@ -370,11 +427,11 @@ static std::shared_ptr new_s3client( } return S3ClientFactory::instance().new_client(config, opts); -} +} // namespace starrocks class S3FileSystem : public FileSystem { public: - S3FileSystem(const FSOptions& options) : _options(options) {} + S3FileSystem(const FSOptions& options) : _options(std::move(options)) {} ~S3FileSystem() override = default; S3FileSystem(const S3FileSystem&) = delete; @@ -458,10 +515,23 @@ class S3FileSystem : public FileSystem { Status delete_dir_v1(const std::string& dirname); Status delete_dir_recursive_v1(const std::string& dirname); -private: FSOptions _options; }; +static int64_t read_ahead_size_from_options(const FSOptions& options) { + int64_t read_ahead_size = 64 * 1024; // default value is 64KB + auto itr = options._fs_options.find(FSOptions::FS_S3_READ_AHEAD_RANGE); + if (itr != options._fs_options.end() && !itr->second.empty()) { + try { + read_ahead_size = std::stoi(itr->second); + } catch (std::logic_error const&) { + LOG_EVERY_N(WARNING, 10) << " Can not convert config " << FSOptions::FS_S3_READ_AHEAD_RANGE + << "'s value to int : " << itr->second; + } + } + return read_ahead_size; +} + StatusOr> S3FileSystem::new_random_access_file(const RandomAccessFileOptions& opts, const std::string& path) { S3URI uri; @@ -469,7 +539,9 @@ StatusOr> S3FileSystem::new_random_access_file return Status::InvalidArgument(fmt::format("Invalid S3 URI: {}", path)); } auto client = new_s3client(uri, _options); - auto input_stream = std::make_unique(std::move(client), uri.bucket(), uri.key()); + auto read_ahead_size = read_ahead_size_from_options(_options); + auto input_stream = + std::make_unique(std::move(client), uri.bucket(), uri.key(), read_ahead_size); return RandomAccessFile::from(std::move(input_stream), path, false, opts.encryption_info); } @@ -480,7 +552,9 @@ StatusOr> S3FileSystem::new_random_access_file return Status::InvalidArgument(fmt::format("Invalid S3 URI: {}", file_info.path)); } auto client = new_s3client(uri, _options); - auto input_stream = std::make_unique(std::move(client), uri.bucket(), uri.key()); + auto read_ahead_size = read_ahead_size_from_options(_options); + auto input_stream = + std::make_unique(std::move(client), uri.bucket(), uri.key(), read_ahead_size); if (file_info.size.has_value()) { input_stream->set_size(file_info.size.value()); } @@ -494,7 +568,10 @@ StatusOr> S3FileSystem::new_sequential_file(cons return Status::InvalidArgument(fmt::format("Invalid S3 URI: {}", path)); } auto client = new_s3client(uri, _options); - auto input_stream = std::make_unique(std::move(client), uri.bucket(), uri.key()); + auto read_ahead_size = read_ahead_size_from_options(_options); + auto input_stream = + std::make_unique(std::move(client), uri.bucket(), uri.key(), read_ahead_size); + return SequentialFile::from(std::move(input_stream), path, opts.encryption_info); } diff --git a/be/src/fs/hdfs/fs_hdfs.cpp b/be/src/fs/hdfs/fs_hdfs.cpp index 171bdd8f7e725..c448a322fc7a4 100644 --- a/be/src/fs/hdfs/fs_hdfs.cpp +++ b/be/src/fs/hdfs/fs_hdfs.cpp @@ -37,7 +37,7 @@ namespace starrocks { class GetHdfsFileReadOnlyHandle { public: - GetHdfsFileReadOnlyHandle(const FSOptions options, std::string path, int buffer_size) + GetHdfsFileReadOnlyHandle(const FSOptions& options, std::string path, int buffer_size) : _options(std::move(options)), _path(std::move(path)), _buffer_size(buffer_size) {} StatusOr getOrCreateFS() { @@ -384,7 +384,7 @@ Status HDFSWritableFile::close() { class HdfsFileSystem : public FileSystem { public: - HdfsFileSystem(const FSOptions& options) : _options(options) {} + HdfsFileSystem(const FSOptions& options) : _options(std::move(options)) {} ~HdfsFileSystem() override = default; HdfsFileSystem(const HdfsFileSystem&) = delete; diff --git a/be/src/gutil/CMakeLists.txt b/be/src/gutil/CMakeLists.txt index b15d66b1af7ea..fc9af87d5782e 100644 --- a/be/src/gutil/CMakeLists.txt +++ b/be/src/gutil/CMakeLists.txt @@ -56,6 +56,8 @@ if ("${CMAKE_BUILD_TARGET_ARCH}" STREQUAL "x86" OR "${CMAKE_BUILD_TARGET_ARCH}" set(SOURCE_FILES ${SOURCE_FILES} atomicops-internals-x86.cc) endif() +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC") + add_library(Gutil STATIC ${SOURCE_FILES}) set_target_properties(Gutil PROPERTIES COMPILE_FLAGS "-funsigned-char -Wno-deprecated -Wno-char-subscripts") diff --git a/be/src/io/s3_input_stream.cpp b/be/src/io/s3_input_stream.cpp index 6b938973f41aa..a32bcb857fc04 100644 --- a/be/src/io/s3_input_stream.cpp +++ b/be/src/io/s3_input_stream.cpp @@ -42,28 +42,79 @@ StatusOr S3InputStream::read(void* out, int64_t count) { if (_offset >= _size) { return 0; } + count = std::min(count, _size - _offset); + + // prefetch case: + // case1: pretech is disable: _read_ahead_size = -1 -> direct read from s3 + // case2: read size greater than _read_ahead_size -> direct read from s3 + // case3: read range is in buffer -> copy from buffer + // case4: read start is in buffer, end is outof buffer -> copy part data from buffer, load data from s3 to buffer, copy remain from buffer + // case5: read start is greater than buffer end -> load data from s3 to buffer, copy from buffer + // case6: read start is lower than buffer start -> load data from s3 to buffer, copy from buffer + if (count > _read_ahead_size) { + auto real_length = std::min(_offset + count, _size) - _offset; + + // https://www.rfc-editor.org/rfc/rfc9110.html#name-range + auto range = fmt::format("bytes={}-{}", _offset, _offset + real_length - 1); + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(_bucket); + request.SetKey(_object); + request.SetRange(std::move(range)); + request.SetResponseStreamFactory([out, real_length]() { + return Aws::New(AWS_ALLOCATE_TAG, reinterpret_cast(out), real_length); + }); - auto real_length = std::min(_offset + count, _size) - _offset; - - // https://www.rfc-editor.org/rfc/rfc9110.html#name-range - auto range = fmt::format("bytes={}-{}", _offset, _offset + real_length - 1); - Aws::S3::Model::GetObjectRequest request; - request.SetBucket(_bucket); - request.SetKey(_object); - request.SetRange(std::move(range)); - request.SetResponseStreamFactory([out, real_length]() { - return Aws::New(AWS_ALLOCATE_TAG, reinterpret_cast(out), real_length); - }); - - Aws::S3::Model::GetObjectOutcome outcome = _s3client->GetObject(request); - if (outcome.IsSuccess()) { - if (UNLIKELY(outcome.GetResult().GetContentLength() != real_length)) { - return Status::InternalError("The response length is different from request length for io stream!"); + Aws::S3::Model::GetObjectOutcome outcome = _s3client->GetObject(request); + if (outcome.IsSuccess()) { + if (UNLIKELY(outcome.GetResult().GetContentLength() != real_length)) { + return Status::InternalError("The response length is different from request length for io stream!"); + } + _offset += real_length; + return real_length; + } else { + return make_error_status(outcome.GetError()); } - _offset += real_length; - return real_length; } else { - return make_error_status(outcome.GetError()); + int64_t remain_to_read_length = count; + int64_t copy_length = 0; + if (_offset >= _buffer_start_offset && _offset < _buffer_start_offset + _buffer_data_length) { + // case 3: read range is in buffer, copy from buffer and remain_to_read_length will be zero. + // case 4: read start is in buffer, end is outof buffer, + // copy partial from buffer and remain_to_read_length will be > 0. + copy_length = std::min(count, _buffer_data_length - (_offset - _buffer_start_offset)); + memcpy(static_cast(out), _read_buffer.get() + (_offset - _buffer_start_offset), copy_length); + remain_to_read_length = remain_to_read_length - copy_length; + } + if (remain_to_read_length > 0) { + // case 4,5,6, load data from s3 + // case 4: load from s3 to buffer from offset: _buffer_start_offset + _buffer_data_length + // case 5,6: load from s3 to buffer from offset: _offset + int64_t read_start_offset = _offset; + if (_offset >= _buffer_start_offset && _offset < _buffer_start_offset + _buffer_data_length) { + read_start_offset = _buffer_start_offset + _buffer_data_length; + } + int64_t read_end_offset = std::min(read_start_offset + _read_ahead_size, _size); + auto range = fmt::format("bytes={}-{}", read_start_offset, read_end_offset); + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(_bucket); + request.SetKey(_object); + request.SetRange(std::move(range)); + + Aws::S3::Model::GetObjectOutcome outcome = _s3client->GetObject(request); + if (outcome.IsSuccess()) { + Aws::IOStream& body = outcome.GetResult().GetBody(); + int64_t read_length = read_end_offset - read_start_offset; + body.read(reinterpret_cast(_read_buffer.get()), read_length); + _buffer_start_offset = read_start_offset; + _buffer_data_length = body.gcount(); + } else { + return make_error_status(outcome.GetError()); + } + memcpy(static_cast(out) + copy_length, _read_buffer.get(), remain_to_read_length); + copy_length += remain_to_read_length; + } + _offset += copy_length; + return copy_length; } } diff --git a/be/src/io/s3_input_stream.h b/be/src/io/s3_input_stream.h index 19398c1658f03..7cf04960c1350 100644 --- a/be/src/io/s3_input_stream.h +++ b/be/src/io/s3_input_stream.h @@ -27,8 +27,14 @@ namespace starrocks::io { class S3InputStream final : public SeekableInputStream { public: - explicit S3InputStream(std::shared_ptr client, std::string bucket, std::string object) - : _s3client(std::move(client)), _bucket(std::move(bucket)), _object(std::move(object)) {} + explicit S3InputStream(std::shared_ptr client, std::string bucket, std::string object, + int64_t read_ahead_size = 5 * 1024 * 1024) + : _s3client(std::move(client)), _bucket(std::move(bucket)), _object(std::move(object)) { + _read_ahead_size = read_ahead_size; + if (_read_ahead_size > 0) { + _read_buffer = std::make_unique(_read_ahead_size); + } + } ~S3InputStream() override = default; @@ -52,12 +58,20 @@ class S3InputStream final : public SeekableInputStream { StatusOr read_all() override; + // only for UT + int64_t get_read_ahead_size() const { return _read_ahead_size; } + private: std::shared_ptr _s3client; std::string _bucket; std::string _object; int64_t _offset{0}; int64_t _size{-1}; + int64_t _read_ahead_size{-1}; + // _read_buffer start offset, indicate buffer[0]'s offset in s3 file. + int64_t _buffer_start_offset{-1}; + int64_t _buffer_data_length{-1}; + std::unique_ptr _read_buffer; }; } // namespace starrocks::io diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 89541e2ab849e..d4f0615d97d1a 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -772,7 +772,9 @@ Status DescriptorTbl::create(RuntimeState* state, ObjectPool* pool, const TDescr for (const auto& tdesc : thrift_tbl.slotDescriptors) { SlotDescriptor* slot_d = pool->add(new SlotDescriptor(tdesc)); (*tbl)->_slot_desc_map[tdesc.id] = slot_d; - + if (!slot_d->col_name().empty()) { + (*tbl)->_slot_with_column_name_map[tdesc.id] = slot_d; + } // link to parent auto entry = (*tbl)->_tuple_desc_map.find(tdesc.parent); @@ -815,6 +817,17 @@ SlotDescriptor* DescriptorTbl::get_slot_descriptor(SlotId id) const { } } +SlotDescriptor* DescriptorTbl::get_slot_descriptor_with_column(SlotId id) const { + // TODO: is there some boost function to do exactly this? + auto i = _slot_with_column_name_map.find(id); + + if (i == _slot_with_column_name_map.end()) { + return nullptr; + } else { + return i->second; + } +} + // return all registered tuple descriptors void DescriptorTbl::get_tuple_descs(std::vector* descs) const { descs->clear(); diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 1a5a45737b3b8..16e775491ce4d 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -488,6 +488,7 @@ class DescriptorTbl { TableDescriptor* get_table_descriptor(TableId id) const; TupleDescriptor* get_tuple_descriptor(TupleId id) const; SlotDescriptor* get_slot_descriptor(SlotId id) const; + SlotDescriptor* get_slot_descriptor_with_column(SlotId id) const; // return all registered tuple descriptors void get_tuple_descs(std::vector* descs) const; @@ -502,6 +503,7 @@ class DescriptorTbl { TableDescriptorMap _tbl_desc_map; TupleDescriptorMap _tuple_desc_map; SlotDescriptorMap _slot_desc_map; + SlotDescriptorMap _slot_with_column_name_map; DescriptorTbl() = default; }; diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index a918c4942e8cc..a4b07cfec9714 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -507,8 +507,8 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { exit(-1); } -#if defined(USE_STAROS) && !defined(BE_TEST) - _lake_location_provider = new lake::StarletLocationProvider(); +#if defined(USE_STAROS) && !defined(BE_TEST) && !defined(BUILD_FORMAT_LIB) + _lake_location_provider = std::make_shared(); _lake_update_manager = new lake::UpdateManager(_lake_location_provider, GlobalEnv::GetInstance()->update_mem_tracker()); _lake_tablet_manager = @@ -524,7 +524,7 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { } #elif defined(BE_TEST) - _lake_location_provider = new lake::FixedLocationProvider(_store_paths.front().path); + _lake_location_provider = std::make_shared(_store_paths.front().path); _lake_update_manager = new lake::UpdateManager(_lake_location_provider, GlobalEnv::GetInstance()->update_mem_tracker()); _lake_tablet_manager = @@ -701,7 +701,6 @@ void ExecEnv::destroy() { SAFE_DELETE(_stream_mgr); SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_lake_tablet_manager); - SAFE_DELETE(_lake_location_provider); SAFE_DELETE(_lake_update_manager); SAFE_DELETE(_lake_replication_txn_manager); SAFE_DELETE(_cache_mgr); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 3cf3cb546c895..bac7816cc44e6 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -314,7 +314,7 @@ class ExecEnv { lake::TabletManager* lake_tablet_manager() const { return _lake_tablet_manager; } - lake::LocationProvider* lake_location_provider() const { return _lake_location_provider; } + std::shared_ptr lake_location_provider() const { return _lake_location_provider; } lake::UpdateManager* lake_update_manager() const { return _lake_update_manager; } @@ -389,7 +389,7 @@ class ExecEnv { ProfileReportWorker* _profile_report_worker = nullptr; lake::TabletManager* _lake_tablet_manager = nullptr; - lake::LocationProvider* _lake_location_provider = nullptr; + std::shared_ptr _lake_location_provider; lake::UpdateManager* _lake_update_manager = nullptr; lake::ReplicationTxnManager* _lake_replication_txn_manager = nullptr; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5beac56553b93..d513cca1a07f3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -875,7 +875,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, c LOG(WARNING) << "tuple descriptor is null. id: " << slot_ref.tuple_id; return Status::InvalidArgument("tuple descriptor is null"); } - auto* slot_desc = desc_tbl->get_slot_descriptor(slot_ref.slot_id); + auto* slot_desc = desc_tbl->get_slot_descriptor_with_column(slot_ref.slot_id); if (slot_desc == nullptr) { LOG(WARNING) << "slot descriptor is null. id: " << slot_ref.slot_id; return Status::InvalidArgument("slot descriptor is null"); diff --git a/be/src/runtime/time_types.h b/be/src/runtime/time_types.h index 8640ab3899c41..faa01305c29c6 100644 --- a/be/src/runtime/time_types.h +++ b/be/src/runtime/time_types.h @@ -239,7 +239,7 @@ class timestamp { // MIN_DATE | 0 static const Timestamp MIN_TIMESTAMP = (1892325482100162560LL); - // seconds from 1970.01.01 + // seconds since julian date epoch to 1970.01.01 static const Timestamp UNIX_EPOCH_SECONDS = (210866803200LL); }; diff --git a/be/src/script/CMakeLists.txt b/be/src/script/CMakeLists.txt index adcf24f0fb443..bd289dea5244c 100644 --- a/be/src/script/CMakeLists.txt +++ b/be/src/script/CMakeLists.txt @@ -21,6 +21,8 @@ set(EXEC_FILES ../thirdparty/wren/wren.c ) +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC") + add_library(Script STATIC ${EXEC_FILES}) target_include_directories(Script PRIVATE ${SRC_DIR}/script ${SRC_DIR}/thirdparty/wren ${SRC_DIR}/thirdparty/wrenbind17) diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index 8641fad08e796..bedb1eca2853a 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -32,7 +32,7 @@ add_library(Service ) # only build starrocks_be when TEST is off -if (NOT ${MAKE_TEST} STREQUAL "ON") +if ((NOT ${MAKE_TEST} STREQUAL "ON") AND (NOT BUILD_FORMAT_LIB)) add_executable(starrocks_be starrocks_main.cpp ) diff --git a/be/src/starrocks_format/CMakeLists.txt b/be/src/starrocks_format/CMakeLists.txt new file mode 100644 index 0000000000000..605d78d083318 --- /dev/null +++ b/be/src/starrocks_format/CMakeLists.txt @@ -0,0 +1,82 @@ +# Copyright 2021-present StarRocks, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +set(CMAKE_VERBOSE_MAKEFILE ON) + +set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/starrocks_format") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/starrocks_format") + +# Set starrocks libraries +set(STARROCKS_LIBS + ${WL_START_GROUP} + Common + Column + Connector + Exec + Exprs + FileSystem + Formats + Gutil + IO + Serde + Storage + Rowset + Runtime + Types + Util + Script + StarRocksGen + Webserver + TestUtil + BlockCache + ${WL_END_GROUP} +) + +add_library(hdfs_so SHARED IMPORTED GLOBAL) +set_target_properties(hdfs_so PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/hadoop/lib/native/libhdfs.so) + +set(STARROCKS_THIRDPARTY_DEPENDENCIES + ${STARROCKS_DEPENDENCIES} + ${STARROCKS_STD_DEPENDENCIES} + ${STARROCKS_MEMORY_DEPENDENCIES_LIBS} + ${STARROCKS_DYNAMIC_DEPENDENCIES_LIBS} +) + +# only build starrocks_be when TEST is off +if (NOT ${MAKE_TEST} STREQUAL "ON") + + add_library(starrocks_format SHARED + starrocks_lib.cpp + ) + + # This permits libraries loaded by dlopen to link to the symbols in the program. + target_link_libraries(starrocks_format + # -Wl,--whole-archive + ${STARROCKS_LIBS} + # -Wl,--no-whole-archive + ${STARROCKS_DEPENDENCIES} + ${STARROCKS_STD_DEPENDENCIES} + ${STARROCKS_DYNAMIC_DEPENDENCIES_LIBS} + hdfs_so + ) + + install(DIRECTORY DESTINATION ${OUTPUT_DIR}/format-lib/) + + install(TARGETS starrocks_format + DESTINATION ${OUTPUT_DIR}/format-lib/) + +endif() diff --git a/be/src/starrocks_format/starrocks_lib.cpp b/be/src/starrocks_format/starrocks_lib.cpp new file mode 100644 index 0000000000000..d6a61b3ce995a --- /dev/null +++ b/be/src/starrocks_format/starrocks_lib.cpp @@ -0,0 +1,84 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include +#include + +#include "common/config.h" +#include "fs/fs_s3.h" +#include "runtime/time_types.h" +#include "storage/lake/fixed_location_provider.h" +#include "storage/lake/tablet_manager.h" +#include "storage/olap_define.h" +#include "util/timezone_utils.h" + +namespace starrocks::lake { + +static bool _starrocks_format_inited = false; +Aws::SDKOptions aws_sdk_options; + +lake::TabletManager* _lake_tablet_manager = nullptr; + +void starrocks_format_initialize(void) { + setenv("STARROCKS_HOME", "./", 0); + setenv("UDF_RUNTIME_DIR", "./", 0); + + if (!_starrocks_format_inited) { + fprintf(stderr, "starrocks format module start to initialize\n"); + // load config file + std::string conffile = std::filesystem::current_path(); + conffile += "/starrocks.conf"; + const char* config_file_path = conffile.c_str(); + std::ifstream ifs(config_file_path); + if (!ifs.good()) { + config_file_path = nullptr; + } + if (!starrocks::config::init(config_file_path)) { + LOG(WARNING) << "read config file:" << config_file_path << " failed!"; + return; + } + + Aws::InitAPI(aws_sdk_options); + + date::init_date_cache(); + + TimezoneUtils::init_time_zones(); + + auto lake_location_provider = std::make_shared(""); + _lake_tablet_manager = new lake::TabletManager(lake_location_provider, config::lake_metadata_cache_limit); + LOG(INFO) << "starrocks format module has been initialized successfully"; + _starrocks_format_inited = true; + } else { + LOG(INFO) << "starrocks format module has already been initialized"; + } +} + +void starrocks_format_shutdown(void) { + if (_starrocks_format_inited) { + LOG(INFO) << "starrocks format module start to deinitialize"; + Aws::ShutdownAPI(aws_sdk_options); + SAFE_DELETE(_lake_tablet_manager); + // SAFE_DELETE(_lake_update_manager); + LOG(INFO) << "starrocks format module has been deinitialized successfully"; + } else { + LOG(INFO) << "starrocks format module has already been deinitialized"; + } +} + +} // namespace starrocks::lake diff --git a/be/src/starrocks_format/starrocks_lib.h b/be/src/starrocks_format/starrocks_lib.h new file mode 100644 index 0000000000000..63fd320acb9df --- /dev/null +++ b/be/src/starrocks_format/starrocks_lib.h @@ -0,0 +1,27 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "storage/lake/tablet_manager.h" + +namespace starrocks::lake { + +extern lake::TabletManager* _lake_tablet_manager; + +void starrocks_format_initialize(void); + +void starrocks_format_shutdown(void); + +} // namespace starrocks::lake diff --git a/be/src/storage/CMakeLists.txt b/be/src/storage/CMakeLists.txt index ee184bbdb0601..e202ce20db71d 100644 --- a/be/src/storage/CMakeLists.txt +++ b/be/src/storage/CMakeLists.txt @@ -19,7 +19,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/storage") add_subdirectory(rowset) -add_library(Storage STATIC +set(STORAGE_FILES aggregate_type.cpp base_tablet.cpp decimal12.cpp @@ -224,6 +224,7 @@ add_library(Storage STATIC sstable/iterator.cpp sstable/options.cpp sstable/merger.cpp + lake/lake_delvec_loader.cpp binlog_util.cpp binlog_file_writer.cpp binlog_file_reader.cpp @@ -234,6 +235,7 @@ add_library(Storage STATIC lake/txn_log_applier.cpp lake/lake_local_persistent_index.cpp persistent_index_compaction_manager.cpp + lake/local_pk_index_manager.cpp persistent_index_tablet_loader.cpp lake/lake_local_persistent_index_tablet_loader.cpp lake/lake_persistent_index.cpp @@ -263,3 +265,11 @@ add_library(Storage STATIC index/vector/tenann/del_id_filter.cpp index/vector/tenann/tenann_index_builder.cpp index/vector/tenann/tenann_index_utils.cpp) + +if (NOT BUILD_FORMAT_LIB) + list(APPEND STORAGE_FILES lake/starlet_location_provider.cpp) +endif() + +add_library(Storage STATIC + ${STORAGE_FILES} +) diff --git a/be/src/storage/lake/general_tablet_writer.cpp b/be/src/storage/lake/general_tablet_writer.cpp index 7551490153f4a..014da57b199b9 100644 --- a/be/src/storage/lake/general_tablet_writer.cpp +++ b/be/src/storage/lake/general_tablet_writer.cpp @@ -69,7 +69,13 @@ void HorizontalGeneralTabletWriter::close() { std::vector full_paths_to_delete; full_paths_to_delete.reserve(_files.size()); for (const auto& f : _files) { - full_paths_to_delete.emplace_back(_tablet_mgr->segment_location(_tablet_id, f.path)); + std::string path; + if (_location_provider) { + path = _location_provider->segment_location(_tablet_id, f.path); + } else { + path = _tablet_mgr->segment_location(_tablet_id, f.path); + } + full_paths_to_delete.emplace_back(path); } delete_files_async(std::move(full_paths_to_delete)); } @@ -87,7 +93,12 @@ Status HorizontalGeneralTabletWriter::reset_segment_writer() { wopts.encryption_info = pair.info; opts.encryption_meta = std::move(pair.encryption_meta); } - ASSIGN_OR_RETURN(auto of, fs::new_writable_file(wopts, _tablet_mgr->segment_location(_tablet_id, name))); + std::unique_ptr of; + if (_location_provider && _fs) { + ASSIGN_OR_RETURN(of, _fs->new_writable_file(wopts, _location_provider->segment_location(_tablet_id, name))); + } else { + ASSIGN_OR_RETURN(of, fs::new_writable_file(wopts, _tablet_mgr->segment_location(_tablet_id, name))); + } auto w = std::make_unique(std::move(of), _seg_id++, _schema, opts); RETURN_IF_ERROR(w->init()); _seg_writer = std::move(w); @@ -252,7 +263,13 @@ void VerticalGeneralTabletWriter::close() { std::vector full_paths_to_delete; full_paths_to_delete.reserve(_files.size()); for (const auto& f : _files) { - full_paths_to_delete.emplace_back(_tablet_mgr->segment_location(_tablet_id, f.path)); + std::string path; + if (_location_provider) { + path = _location_provider->segment_location(_tablet_id, f.path); + } else { + path = _tablet_mgr->segment_location(_tablet_id, f.path); + } + full_paths_to_delete.emplace_back(path); } delete_files_async(std::move(full_paths_to_delete)); } @@ -271,7 +288,12 @@ StatusOr> VerticalGeneralTabletWriter::create_seg wopts.encryption_info = pair.info; opts.encryption_meta = std::move(pair.encryption_meta); } - ASSIGN_OR_RETURN(auto of, fs::new_writable_file(wopts, _tablet_mgr->segment_location(_tablet_id, name))); + std::unique_ptr of; + if (_location_provider && _fs) { + ASSIGN_OR_RETURN(of, _fs->new_writable_file(wopts, _location_provider->segment_location(_tablet_id, name))); + } else { + ASSIGN_OR_RETURN(of, fs::new_writable_file(wopts, _tablet_mgr->segment_location(_tablet_id, name))); + } auto w = std::make_shared(std::move(of), _seg_id++, _schema, opts); RETURN_IF_ERROR(w->init(column_indexes, is_key)); return w; diff --git a/be/src/storage/lake/horizontal_compaction_task.cpp b/be/src/storage/lake/horizontal_compaction_task.cpp index 7a2fa16dc8d78..48f6b37b7549f 100644 --- a/be/src/storage/lake/horizontal_compaction_task.cpp +++ b/be/src/storage/lake/horizontal_compaction_task.cpp @@ -145,9 +145,9 @@ StatusOr HorizontalCompactionTask::calculate_chunk_size() { total_num_rows += rowset->num_rows(); total_input_segs += rowset->is_overlapped() ? rowset->num_segments() : 1; LakeIOOptions lake_io_opts{.fill_data_cache = false, - .buffer_size = config::lake_compaction_stream_buffer_size_bytes}; - auto fill_meta_cache = false; - ASSIGN_OR_RETURN(auto segments, rowset->segments(lake_io_opts, fill_meta_cache)); + .buffer_size = config::lake_compaction_stream_buffer_size_bytes, + .fill_metadata_cache = false}; + ASSIGN_OR_RETURN(auto segments, rowset->segments(lake_io_opts)); for (auto& segment : segments) { for (size_t i = 0; i < segment->num_columns(); ++i) { auto uid = _tablet_schema->column(i).unique_id(); diff --git a/be/src/storage/lake/lake_delvec_loader.cpp b/be/src/storage/lake/lake_delvec_loader.cpp new file mode 100644 index 0000000000000..a55026b6b65be --- /dev/null +++ b/be/src/storage/lake/lake_delvec_loader.cpp @@ -0,0 +1,47 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "storage/lake/lake_delvec_loader.h" + +#include "common/logging.h" +#include "common/status.h" +#include "storage/lake/location_provider.h" + +namespace starrocks::lake { + +Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) { + if (_pk_builder != nullptr) { + // 1. find in meta builder first + auto found = _pk_builder->find_delvec(tsid, pdelvec); + if (!found.ok()) { + return found.status(); + } + if (*found) { + return Status::OK(); + } + } + return load_from_file(tsid, version, pdelvec); +} + +Status LakeDelvecLoader::load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) { + (*pdelvec).reset(new DelVector()); + // 2. find in delvec file + std::string filepath = _tablet_manager->tablet_metadata_location(tsid.tablet_id, version); + ASSIGN_OR_RETURN(auto metadata, _tablet_manager->get_tablet_metadata(_lake_io_opts.fs, filepath, _fill_cache)); + RETURN_IF_ERROR( + lake::get_del_vec(_tablet_manager, *metadata, tsid.segment_id, _fill_cache, _lake_io_opts, pdelvec->get())); + return Status::OK(); +} + +} // namespace starrocks::lake diff --git a/be/src/storage/lake/lake_delvec_loader.h b/be/src/storage/lake/lake_delvec_loader.h new file mode 100644 index 0000000000000..b6d2c386cd652 --- /dev/null +++ b/be/src/storage/lake/lake_delvec_loader.h @@ -0,0 +1,44 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include "common/statusor.h" +#include "storage/del_vector.h" +#include "storage/lake/meta_file.h" +#include "storage/lake/tablet.h" +#include "storage/lake/update_manager.h" +#include "storage/lake/versioned_tablet.h" +#include "storage/olap_common.h" + +namespace starrocks::lake { + +class LakeDelvecLoader : public DelvecLoader { +public: + LakeDelvecLoader(TabletManager* tablet_manager, const MetaFileBuilder* pk_builder, bool fill_cache, + LakeIOOptions lake_io_opts) + : _tablet_manager(tablet_manager), + _pk_builder(pk_builder), + _fill_cache(fill_cache), + _lake_io_opts(std::move(lake_io_opts)) {} + Status load(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec); + Status load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec); + +private: + TabletManager* _tablet_manager; + const MetaFileBuilder* _pk_builder = nullptr; + bool _fill_cache = false; + LakeIOOptions _lake_io_opts; +}; + +} // namespace starrocks::lake \ No newline at end of file diff --git a/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.cpp b/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.cpp index 691638dd26ee4..75986588cd788 100644 --- a/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.cpp +++ b/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.cpp @@ -17,6 +17,7 @@ #include "runtime/exec_env.h" #include "storage/chunk_helper.h" #include "storage/lake/filenames.h" +#include "storage/lake/lake_delvec_loader.h" #include "storage/lake/rowset.h" #include "storage/lake/types_fwd.h" #include "storage/lake/update_manager.h" @@ -47,7 +48,10 @@ Status LakePrimaryKeyCompactionConflictResolver::segment_iterator( ASSIGN_OR_RETURN(auto segment_iters, _rowset->get_each_segment_iterator(pkey_schema, false, &stats)); RETURN_ERROR_IF_FALSE(segment_iters.size() == _rowset->num_segments()); // init delvec loader - auto delvec_loader = std::make_unique(_update_manager, _builder, false /* fill cache */); + SegmentReadOptions seg_options; + + auto delvec_loader = + std::make_unique(_tablet_mgr, _builder, false /* fill cache */, seg_options.lake_io_opts); // init params CompactConflictResolveParams params; params.tablet_id = _rowset->tablet_id(); diff --git a/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.h b/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.h index 46b44f2560bea..22740e039db55 100644 --- a/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.h +++ b/be/src/storage/lake/lake_primary_key_compaction_conflict_resolver.h @@ -16,6 +16,7 @@ #include "storage/lake/tablet_metadata.h" #include "storage/primary_key_compaction_conflict_resolver.h" +#include "storage/tablet_manager.h" namespace starrocks::lake { @@ -27,13 +28,13 @@ class LakePrimaryIndex; class LakePrimaryKeyCompactionConflictResolver : public PrimaryKeyCompactionConflictResolver { public: explicit LakePrimaryKeyCompactionConflictResolver(const TabletMetadata* metadata, Rowset* rowset, - UpdateManager* update_manager, MetaFileBuilder* builder, + TabletManager* tablet_mgr, MetaFileBuilder* builder, LakePrimaryIndex* index, int64_t txn_id, int64_t base_version, std::map* segment_id_to_add_dels, std::vector>* delvecs) : _metadata(metadata), _rowset(rowset), - _update_manager(update_manager), + _tablet_mgr(tablet_mgr), _builder(builder), _index(index), _txn_id(txn_id), @@ -53,7 +54,7 @@ class LakePrimaryKeyCompactionConflictResolver : public PrimaryKeyCompactionConf // input const TabletMetadata* _metadata = nullptr; Rowset* _rowset = nullptr; - UpdateManager* _update_manager = nullptr; + TabletManager* _tablet_mgr = nullptr; MetaFileBuilder* _builder = nullptr; LakePrimaryIndex* _index = nullptr; int64_t _txn_id = 0; diff --git a/be/src/storage/lake/meta_file.cpp b/be/src/storage/lake/meta_file.cpp index 47ba4af60739d..4fdae0b774246 100644 --- a/be/src/storage/lake/meta_file.cpp +++ b/be/src/storage/lake/meta_file.cpp @@ -519,7 +519,7 @@ void MetaFileBuilder::finalize_sstable_meta(const PersistentIndexSstableMetaPB& } Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, uint32_t segment_id, bool fill_cache, - DelVector* delvec) { + const LakeIOOptions& lake_io_opts, DelVector* delvec) { // find delvec by segment id auto iter = metadata.delvec_meta().delvecs().find(segment_id); if (iter != metadata.delvec_meta().delvecs().end()) { @@ -542,9 +542,16 @@ Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, ui return Status::InternalError("Can't find delvec file name"); } const auto& delvec_name = iter2->second.name(); - RandomAccessFileOptions opts{.skip_fill_local_cache = !fill_cache}; - ASSIGN_OR_RETURN(auto rf, - fs::new_random_access_file(opts, tablet_mgr->delvec_location(metadata.id(), delvec_name))); + RandomAccessFileOptions opts{.skip_fill_local_cache = !lake_io_opts.fill_data_cache}; + std::unique_ptr rf; + if (lake_io_opts.fs && lake_io_opts.location_provider) { + ASSIGN_OR_RETURN( + rf, lake_io_opts.fs->new_random_access_file( + opts, lake_io_opts.location_provider->delvec_location(metadata.id(), delvec_name))); + } else { + ASSIGN_OR_RETURN(rf, + fs::new_random_access_file(opts, tablet_mgr->delvec_location(metadata.id(), delvec_name))); + } RETURN_IF_ERROR(rf->read_at_fully(iter->second.offset(), buf.data(), iter->second.size())); // parse delvec RETURN_IF_ERROR(delvec->load(iter->second.version(), buf.data(), iter->second.size())); diff --git a/be/src/storage/lake/meta_file.h b/be/src/storage/lake/meta_file.h index 5afef49dae4a6..4861c65ea0ae4 100644 --- a/be/src/storage/lake/meta_file.h +++ b/be/src/storage/lake/meta_file.h @@ -89,7 +89,7 @@ class MetaFileBuilder { }; Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, uint32_t segment_id, bool fill_cache, - DelVector* delvec); + const LakeIOOptions& lake_io_opts, DelVector* delvec); bool is_primary_key(TabletMetadata* metadata); bool is_primary_key(const TabletMetadata& metadata); diff --git a/be/src/storage/lake/pk_tablet_writer.cpp b/be/src/storage/lake/pk_tablet_writer.cpp index 84414122ddc11..0f14537a5d3db 100644 --- a/be/src/storage/lake/pk_tablet_writer.cpp +++ b/be/src/storage/lake/pk_tablet_writer.cpp @@ -62,7 +62,12 @@ Status HorizontalPkTabletWriter::flush_del_file(const Column& deletes) { wopts.encryption_info = pair.info; encryption_meta.swap(pair.encryption_meta); } - ASSIGN_OR_RETURN(auto of, fs::new_writable_file(wopts, _tablet_mgr->del_location(_tablet_id, name))); + std::unique_ptr of; + if (_location_provider && _fs) { + ASSIGN_OR_RETURN(of, _fs->new_writable_file(_location_provider->del_location(_tablet_id, name))); + } else { + ASSIGN_OR_RETURN(of, fs::new_writable_file(wopts, _tablet_mgr->del_location(_tablet_id, name))); + } size_t sz = serde::ColumnArraySerde::max_serialized_size(deletes); std::vector content(sz); if (serde::ColumnArraySerde::serialize(deletes, content.data()) == nullptr) { diff --git a/be/src/storage/lake/rowset.cpp b/be/src/storage/lake/rowset.cpp index 70893a1c8969e..c451d0037b0d8 100644 --- a/be/src/storage/lake/rowset.cpp +++ b/be/src/storage/lake/rowset.cpp @@ -19,6 +19,7 @@ #include "storage/chunk_helper.h" #include "storage/delete_predicates.h" #include "storage/lake/column_mode_partial_update_handler.h" +#include "storage/lake/lake_delvec_loader.h" #include "storage/lake/tablet.h" #include "storage/lake/tablet_writer.h" #include "storage/lake/update_manager.h" @@ -96,9 +97,11 @@ Status Rowset::add_partial_compaction_segments_info(TxnLogPB_OpCompaction* op_co DCHECK(partial_segments_compaction()); std::vector segments; - std::pair, std::vector> not_used_segments; LakeIOOptions lake_io_opts{.fill_data_cache = true}; - RETURN_IF_ERROR(load_segments(&segments, lake_io_opts, true /* fill_metadata_cache */, ¬_used_segments)); + SegmentReadOptions seg_options; + seg_options.lake_io_opts = lake_io_opts; + std::pair, std::vector> not_used_segments; + RETURN_IF_ERROR(load_segments(&segments, seg_options, ¬_used_segments)); bool clear_file_size_info = false; bool clear_encryption_meta = (metadata().segments_size() != metadata().segment_encryption_metas_size()); @@ -169,9 +172,13 @@ Status Rowset::add_partial_compaction_segments_info(TxnLogPB_OpCompaction* op_co } StatusOr> Rowset::read(const Schema& schema, const RowsetReadOptions& options) { - auto root_loc = _tablet_mgr->tablet_root_location(tablet_id()); SegmentReadOptions seg_options; - ASSIGN_OR_RETURN(seg_options.fs, FileSystem::CreateSharedFromString(root_loc)); + if (options.lake_io_opts.fs) { + seg_options.fs = options.lake_io_opts.fs; + } else { + auto root_loc = _tablet_mgr->tablet_root_location(tablet_id()); + ASSIGN_OR_RETURN(seg_options.fs, FileSystem::CreateSharedFromString(root_loc)); + } seg_options.stats = options.stats; seg_options.ranges = options.ranges; seg_options.pred_tree = options.pred_tree; @@ -190,8 +197,8 @@ StatusOr> Rowset::read(const Schema& schema, const seg_options.has_preaggregation = options.has_preaggregation; if (options.is_primary_keys) { seg_options.is_primary_keys = true; - seg_options.delvec_loader = std::make_shared(_tablet_mgr->update_mgr(), nullptr, - seg_options.lake_io_opts.fill_data_cache); + seg_options.delvec_loader = std::make_shared( + _tablet_mgr, nullptr, seg_options.lake_io_opts.fill_data_cache, seg_options.lake_io_opts); seg_options.dcg_loader = std::make_shared(_tablet_metadata); seg_options.version = options.version; seg_options.tablet_id = tablet_id(); @@ -204,6 +211,7 @@ StatusOr> Rowset::read(const Schema& schema, const if (options.short_key_ranges_option != nullptr) { // logical split. seg_options.short_key_ranges = options.short_key_ranges_option->short_key_ranges; } + seg_options.reader_type = options.reader_type; std::unique_ptr segment_schema_guard; auto* segment_schema = const_cast(&schema); @@ -230,7 +238,7 @@ StatusOr> Rowset::read(const Schema& schema, const } std::vector segments; - RETURN_IF_ERROR(load_segments(&segments, options.lake_io_opts.fill_data_cache, options.lake_io_opts.buffer_size)); + RETURN_IF_ERROR(load_segments(&segments, seg_options, nullptr)); for (auto& seg_ptr : segments) { if (seg_ptr->num_rows() == 0) { continue; @@ -326,9 +334,11 @@ StatusOr> Rowset::get_each_segment_iterator_with_d SegmentReadOptions seg_options; ASSIGN_OR_RETURN(seg_options.fs, FileSystem::CreateSharedFromString(root_loc)); seg_options.stats = stats; + seg_options.lake_io_opts.fs = seg_options.fs; + seg_options.lake_io_opts.location_provider = _tablet_mgr->location_provider(); seg_options.is_primary_keys = true; seg_options.delvec_loader = - std::make_shared(_tablet_mgr->update_mgr(), builder, true /*fill cache*/); + std::make_shared(_tablet_mgr, builder, true /*fill cache*/, seg_options.lake_io_opts); seg_options.dcg_loader = std::make_shared(_tablet_metadata); seg_options.version = version; seg_options.tablet_id = tablet_id(); @@ -366,25 +376,29 @@ std::vector Rowset::get_segments() { } StatusOr> Rowset::segments(bool fill_cache) { - LakeIOOptions lake_io_opts{.fill_data_cache = fill_cache}; - return segments(lake_io_opts, fill_cache); + LakeIOOptions lake_io_opts{.fill_data_cache = fill_cache, .fill_metadata_cache = fill_cache}; + return segments(lake_io_opts); } -StatusOr> Rowset::segments(const LakeIOOptions& lake_io_opts, bool fill_metadata_cache) { +StatusOr> Rowset::segments(const LakeIOOptions& lake_io_opts) { std::vector segments; - RETURN_IF_ERROR(load_segments(&segments, lake_io_opts, fill_metadata_cache, nullptr)); + SegmentReadOptions seg_options; + seg_options.lake_io_opts = lake_io_opts; + RETURN_IF_ERROR(load_segments(&segments, seg_options, nullptr)); return segments; } Status Rowset::load_segments(std::vector* segments, bool fill_cache, int64_t buffer_size) { - LakeIOOptions lake_io_opts{.fill_data_cache = fill_cache, .buffer_size = buffer_size}; - return load_segments(segments, lake_io_opts, fill_cache, nullptr); + SegmentReadOptions seg_options; + seg_options.lake_io_opts.fill_data_cache = fill_cache; + seg_options.lake_io_opts.fill_metadata_cache = fill_cache; + seg_options.lake_io_opts.buffer_size = buffer_size; + return load_segments(segments, seg_options, nullptr); } -Status Rowset::load_segments(std::vector* segments, const LakeIOOptions& lake_io_opts, - bool fill_metadata_cache, +Status Rowset::load_segments(std::vector* segments, SegmentReadOptions& seg_options, std::pair, std::vector>* not_used_segments) { -#ifndef BE_TEST +#if !defined BE_TEST && !defined(BUILD_FORMAT_LIB) RETURN_IF_ERROR(tls_thread_status.mem_tracker()->check_mem_limit("LoadSegments")); #endif @@ -418,8 +432,14 @@ Status Rowset::load_segments(std::vector* segments, const LakeIOOpti }; for (const auto& seg_name : metadata().segments()) { - auto segment_path = _tablet_mgr->segment_location(tablet_id(), seg_name); - auto segment_info = FileInfo{.path = segment_path}; + std::string segment_path; + auto lake_io_opts = seg_options.lake_io_opts; + if (lake_io_opts.location_provider) { + segment_path = lake_io_opts.location_provider->segment_location(tablet_id(), seg_name); + } else { + segment_path = _tablet_mgr->segment_location(tablet_id(), seg_name); + } + auto segment_info = FileInfo{.path = segment_path, .fs = seg_options.fs}; if (LIKELY(has_segment_size)) { segment_info.size = files_to_size.Get(index); } @@ -437,8 +457,8 @@ Status Rowset::load_segments(std::vector* segments, const LakeIOOpti if (_parallel_load) { auto task = std::make_shared, std::string>()>>([=]() { - auto result = _tablet_mgr->load_segment(segment_info, seg_id, lake_io_opts, fill_metadata_cache, - _tablet_schema); + auto result = _tablet_mgr->load_segment(segment_info, seg_id, lake_io_opts, + lake_io_opts.fill_metadata_cache, _tablet_schema); return std::make_pair(std::move(result), seg_name); }); @@ -449,7 +469,7 @@ Status Rowset::load_segments(std::vector* segments, const LakeIOOpti LOG(WARNING) << "sumbit_func failed: " << st.code_as_string() << ", try to load segment serially, seg_id: " << seg_id; auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id, &footer_size_hint, lake_io_opts, - fill_metadata_cache, _tablet_schema); + lake_io_opts.fill_metadata_cache, _tablet_schema); if (auto status = check_status(segment_or, seg_name); !status.ok()) { return status; } @@ -458,7 +478,7 @@ Status Rowset::load_segments(std::vector* segments, const LakeIOOpti segment_futures.push_back(task->get_future()); } else { auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id++, &footer_size_hint, lake_io_opts, - fill_metadata_cache, _tablet_schema); + lake_io_opts.fill_metadata_cache, _tablet_schema); if (auto status = check_status(segment_or, seg_name); !status.ok()) { return status; } diff --git a/be/src/storage/lake/rowset.h b/be/src/storage/lake/rowset.h index e53f421919248..9efd649f1db2d 100644 --- a/be/src/storage/lake/rowset.h +++ b/be/src/storage/lake/rowset.h @@ -108,13 +108,13 @@ class Rowset : public BaseRowset { StatusOr> segments(bool fill_cache); - StatusOr> segments(const LakeIOOptions& lake_io_opts, bool fill_metadata_cache); + [[nodiscard]] StatusOr> segments(const LakeIOOptions& lake_io_opts); // `fill_cache` controls `fill_data_cache` and `fill_meta_cache` Status load_segments(std::vector* segments, bool fill_cache, int64_t buffer_size = -1); - Status load_segments(std::vector* segments, const LakeIOOptions& lake_io_opts, bool fill_metadata_cache, - std::pair, std::vector>* not_used_segments); + [[nodiscard]] Status load_segments(std::vector* segments, SegmentReadOptions& seg_options, + std::pair, std::vector>* not_used_segments); int64_t tablet_id() const { return _tablet_id; } diff --git a/be/src/storage/lake/tablet.cpp b/be/src/storage/lake/tablet.cpp index 49efa315cf8a3..c58022de22082 100644 --- a/be/src/storage/lake/tablet.cpp +++ b/be/src/storage/lake/tablet.cpp @@ -20,6 +20,7 @@ #include "runtime/exec_env.h" #include "storage/lake/filenames.h" #include "storage/lake/general_tablet_writer.h" +#include "storage/lake/location_provider.h" #include "storage/lake/metacache.h" #include "storage/lake/metadata_iterator.h" #include "storage/lake/pk_tablet_writer.h" @@ -108,7 +109,13 @@ const std::shared_ptr Tablet::tablet_schema() const { } StatusOr> Tablet::get_schema() { - return _mgr->get_tablet_schema(_id, &_version_hint); + if (_tablet_schema) { + return _tablet_schema; + } else if (_tablet_metadata) { + return std::make_shared(_tablet_metadata->schema()); + } else { + return _mgr->get_tablet_schema(_id, &_version_hint); + } } StatusOr> Tablet::get_schema_by_id(int64_t schema_id) { @@ -125,15 +132,15 @@ std::vector Tablet::get_rowsets(const TabletMetadataPtr& metadata) { } std::string Tablet::metadata_location(int64_t version) const { - return _mgr->tablet_metadata_location(_id, version); + return _location_provider->tablet_metadata_location(_id, version); } std::string Tablet::metadata_root_location() const { - return _mgr->tablet_metadata_root_location(_id); + return _location_provider->metadata_root_location(_id); } std::string Tablet::txn_log_location(int64_t txn_id) const { - return _mgr->txn_log_location(_id, txn_id); + return _location_provider->txn_log_location(_id, txn_id); } std::string Tablet::txn_slog_location(int64_t txn_id) const { @@ -141,19 +148,19 @@ std::string Tablet::txn_slog_location(int64_t txn_id) const { } std::string Tablet::txn_vlog_location(int64_t version) const { - return _mgr->txn_vlog_location(_id, version); + return _location_provider->txn_vlog_location(_id, version); } std::string Tablet::segment_location(std::string_view segment_name) const { - return _mgr->segment_location(_id, segment_name); + return _location_provider->segment_location(_id, segment_name); } std::string Tablet::del_location(std::string_view del_name) const { - return _mgr->del_location(_id, del_name); + return _location_provider->del_location(_id, del_name); } std::string Tablet::delvec_location(std::string_view delvec_name) const { - return _mgr->delvec_location(_id, delvec_name); + return _location_provider->delvec_location(_id, delvec_name); } std::string Tablet::sst_location(std::string_view sst_name) const { @@ -161,7 +168,7 @@ std::string Tablet::sst_location(std::string_view sst_name) const { } std::string Tablet::root_location() const { - return _mgr->tablet_root_location(_id); + return _location_provider->root_location(_id); } Status Tablet::delete_data(int64_t txn_id, const DeletePredicatePB& delete_predicate) { diff --git a/be/src/storage/lake/tablet.h b/be/src/storage/lake/tablet.h index a05164828a187..5841af20c27f5 100644 --- a/be/src/storage/lake/tablet.h +++ b/be/src/storage/lake/tablet.h @@ -19,6 +19,7 @@ #include #include "common/statusor.h" +#include "fs/fs.h" #include "gen_cpp/types.pb.h" #include "storage/base_tablet.h" #include "storage/lake/metadata_iterator.h" @@ -48,7 +49,25 @@ enum WriterType : int; class Tablet : public BaseTablet { public: - explicit Tablet(TabletManager* mgr, int64_t id) : _mgr(mgr), _id(id) {} + explicit Tablet(TabletManager* mgr, int64_t id) : _mgr(mgr), _id(id) { + if (_mgr != nullptr) { + _location_provider = _mgr->location_provider(); + } + } + + explicit Tablet(TabletManager* mgr, int64_t id, std::shared_ptr location_provider, + TabletMetadataPtr tablet_metadata) + : _mgr(mgr), _id(id) { + _location_provider = std::move(location_provider); + _tablet_metadata = tablet_metadata; + } + + explicit Tablet(TabletManager* mgr, int64_t id, std::shared_ptr location_provider, + std::shared_ptr tablet_schema) + : _mgr(mgr), _id(id) { + _location_provider = std::move(location_provider); + _tablet_schema = tablet_schema; + } ~Tablet() override = default; @@ -137,12 +156,17 @@ class Tablet : public BaseTablet { int64_t data_size(); + const std::shared_ptr& location_provider() const { return _location_provider; } + size_t num_rows() const override; private: TabletManager* _mgr; int64_t _id; int64_t _version_hint = 0; + std::shared_ptr _location_provider; + TabletMetadataPtr _tablet_metadata; + std::shared_ptr _tablet_schema; }; } // namespace starrocks::lake diff --git a/be/src/storage/lake/tablet_manager.cpp b/be/src/storage/lake/tablet_manager.cpp index 59e574a3aa91a..d808d9fa2960d 100644 --- a/be/src/storage/lake/tablet_manager.cpp +++ b/be/src/storage/lake/tablet_manager.cpp @@ -60,14 +60,18 @@ static bvar::LatencyRecorder g_get_txn_log_latency("lake", "get_txn_log"); static bvar::LatencyRecorder g_put_txn_log_latency("lake", "put_txn_log"); static bvar::LatencyRecorder g_del_txn_log_latency("lake", "del_txn_log"); -TabletManager::TabletManager(LocationProvider* location_provider, UpdateManager* update_mgr, int64_t cache_capacity) - : _location_provider(location_provider), +TabletManager::TabletManager(std::shared_ptr location_provider, UpdateManager* update_mgr, + int64_t cache_capacity) + : _location_provider(std::move(location_provider)), _metacache(std::make_unique(cache_capacity)), _compaction_scheduler(std::make_unique(this)), _update_mgr(update_mgr) { _update_mgr->set_tablet_mgr(this); } +TabletManager::TabletManager(std::shared_ptr location_provider, int64_t cache_capacity) + : _location_provider(std::move(location_provider)), _metacache(std::make_unique(cache_capacity)) {} + TabletManager::~TabletManager() = default; std::string TabletManager::tablet_root_location(int64_t tablet_id) const { @@ -231,11 +235,12 @@ Status TabletManager::put_tablet_metadata(const TabletMetadata& metadata) { return put_tablet_metadata(std::move(metadata_ptr)); } -StatusOr TabletManager::load_tablet_metadata(const string& metadata_location, bool fill_cache) { +StatusOr TabletManager::load_tablet_metadata(std::shared_ptr fs, + const string& metadata_location, bool fill_cache) { TEST_ERROR_POINT("TabletManager::load_tablet_metadata"); auto t0 = butil::gettimeofday_us(); auto metadata = std::make_shared(); - ProtobufFile file(metadata_location); + ProtobufFile file(metadata_location, std::move(fs)); RETURN_IF_ERROR(file.load(metadata.get(), fill_cache)); g_get_tablet_metadata_latency << (butil::gettimeofday_us() - t0); return std::move(metadata); @@ -259,11 +264,17 @@ StatusOr TabletManager::get_tablet_metadata(int64_t tablet_id } StatusOr TabletManager::get_tablet_metadata(const string& path, bool fill_cache) { + std::shared_ptr fs; + return get_tablet_metadata(fs, path, fill_cache); +} + +StatusOr TabletManager::get_tablet_metadata(std::shared_ptr fs, const string& path, + bool fill_cache) { if (auto ptr = _metacache->lookup_tablet_metadata(path); ptr != nullptr) { TRACE("got cached tablet metadata"); return ptr; } - ASSIGN_OR_RETURN(auto ptr, load_tablet_metadata(path, fill_cache)); + ASSIGN_OR_RETURN(auto ptr, load_tablet_metadata(std::move(fs), path, fill_cache)); if (fill_cache) { _metacache->cache_tablet_metadata(path, ptr); } @@ -494,7 +505,7 @@ StatusOr TabletManager::get_tablet_num_rows(int64_t tablet_id, int64_t return num_rows; } -#ifdef USE_STAROS +#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) bool TabletManager::is_tablet_in_worker(int64_t tablet_id) { bool in_worker = true; if (g_worker != nullptr) { @@ -517,7 +528,7 @@ StatusOr TabletManager::get_tablet_schema(int64_t tablet_id, in RETURN_IF(ptr != nullptr, ptr); // Cache miss, load tablet metadata from remote storage use the hint version -#ifdef USE_STAROS +#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) // TODO: Eliminate the explicit dependency on staros worker // 2. leverage `indexId` to lookup the global_schema from cache and if missing from file. if (g_worker != nullptr) { @@ -771,7 +782,12 @@ StatusOr TabletManager::load_segment(const FileInfo& segment_info, i // but in meta cache, segment `a` still has segment id 10, it is not changed. auto segment = metacache()->lookup_segment(segment_info.path); if (segment == nullptr) { - ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(segment_info.path)); + std::shared_ptr fs; + if (segment_info.fs) { + fs = segment_info.fs; + } else { + ASSIGN_OR_RETURN(fs, FileSystem::CreateSharedFromString(segment_info.path)); + } segment = std::make_shared(std::move(fs), segment_info, segment_id, std::move(tablet_schema), this); if (fill_metadata_cache) { // NOTE: the returned segment may be not the same as the parameter passed in diff --git a/be/src/storage/lake/tablet_manager.h b/be/src/storage/lake/tablet_manager.h index 7ffbc01abc440..4746536188274 100644 --- a/be/src/storage/lake/tablet_manager.h +++ b/be/src/storage/lake/tablet_manager.h @@ -57,7 +57,10 @@ class TabletManager { // this TabletManager. // |cache_capacity| is the max number of bytes can be used by the // metadata cache. - explicit TabletManager(LocationProvider* location_provider, UpdateManager* update_mgr, int64_t cache_capacity); + explicit TabletManager(std::shared_ptr location_provider, UpdateManager* update_mgr, + int64_t cache_capacity); + + explicit TabletManager(std::shared_ptr location_provider, int64_t cache_capacity); ~TabletManager(); @@ -77,8 +80,9 @@ class TabletManager { StatusOr get_tablet_metadata(int64_t tablet_id, int64_t version, bool fill_cache = true); - // Do not use this function except in a list dir StatusOr get_tablet_metadata(const std::string& path, bool fill_cache = true); + StatusOr get_tablet_metadata(std::shared_ptr fs, const std::string& path, + bool fill_cache = true); TabletMetadataPtr get_latest_cached_tablet_metadata(int64_t tablet_id); @@ -124,13 +128,17 @@ class TabletManager { const TabletMetadata* metadata); #ifdef USE_STAROS +#if !defined(BUILD_FORMAT_LIB) bool is_tablet_in_worker(int64_t tablet_id); +#else + bool is_tablet_in_worker(int64_t tablet_id) { return true; } +#endif #endif // USE_STAROS void prune_metacache(); // TODO: remove this method - LocationProvider* TEST_set_location_provider(LocationProvider* value) { + std::shared_ptr TEST_set_location_provider(std::shared_ptr value) { auto ret = _location_provider; _location_provider = value; return ret; @@ -158,10 +166,9 @@ class TabletManager { std::string delvec_location(int64_t tablet_id, std::string_view delvec_filename) const; + const std::shared_ptr location_provider() { return _location_provider; } std::string sst_location(int64_t tablet_id, std::string_view sst_filename) const; - const LocationProvider* location_provider() const { return _location_provider; } - UpdateManager* update_mgr(); CompactionScheduler* compaction_scheduler() { return _compaction_scheduler.get(); } @@ -209,15 +216,17 @@ class TabletManager { StatusOr load_and_parse_schema_file(const std::string& path); StatusOr get_tablet_schema_by_id(int64_t tablet_id, int64_t schema_id); + StatusOr load_tablet_metadata(std::shared_ptr fs, + const std::string& metadata_location, bool fill_cache); Status put_tablet_metadata(const TabletMetadataPtr& metadata, const std::string& metadata_location); StatusOr load_tablet_metadata(const std::string& metadata_location, bool fill_cache); StatusOr load_txn_log(const std::string& txn_log_location, bool fill_cache); StatusOr load_combined_txn_log(const std::string& path, bool fill_cache); - LocationProvider* _location_provider; + std::shared_ptr _location_provider; std::unique_ptr _metacache; std::unique_ptr _compaction_scheduler; - UpdateManager* _update_mgr; + UpdateManager* _update_mgr = nullptr; std::shared_mutex _meta_lock; std::unordered_map _tablet_in_writing_size; diff --git a/be/src/storage/lake/tablet_reader.cpp b/be/src/storage/lake/tablet_reader.cpp index d3582c5544a56..d465d3fe144c4 100644 --- a/be/src/storage/lake/tablet_reader.cpp +++ b/be/src/storage/lake/tablet_reader.cpp @@ -104,7 +104,8 @@ Status TabletReader::prepare() { Status TabletReader::open(const TabletReaderParams& read_params) { if (read_params.reader_type != ReaderType::READER_QUERY && read_params.reader_type != ReaderType::READER_CHECKSUM && - read_params.reader_type != ReaderType::READER_ALTER_TABLE && !is_compaction(read_params.reader_type)) { + read_params.reader_type != ReaderType::READER_ALTER_TABLE && !is_compaction(read_params.reader_type) && + read_params.reader_type != ReaderType::READER_BYPASS_QUERY) { return Status::NotSupported("reader type not supported now"); } RETURN_IF_ERROR(init_compaction_column_paths(read_params)); @@ -311,6 +312,7 @@ Status TabletReader::get_segment_iterators(const TabletReaderParams& params, std rs_opts.is_primary_keys = true; rs_opts.version = _tablet_metadata->version(); } + rs_opts.reader_type = params.reader_type; if (keys_type == PRIMARY_KEYS || keys_type == DUP_KEYS) { rs_opts.asc_hint = _is_asc_hint; diff --git a/be/src/storage/lake/tablet_reader.h b/be/src/storage/lake/tablet_reader.h index ff0b5af3a206d..6f45dacaeae89 100644 --- a/be/src/storage/lake/tablet_reader.h +++ b/be/src/storage/lake/tablet_reader.h @@ -18,6 +18,7 @@ #include "runtime/mem_pool.h" #include "storage/chunk_iterator.h" #include "storage/delete_predicates.h" +#include "storage/lake/versioned_tablet.h" #include "storage/tablet_reader_params.h" #include "types_fwd.h" @@ -80,6 +81,8 @@ class TabletReader final : public ChunkIterator { size_t merged_rows() const override { return _collect_iter->merged_rows(); } + void set_tablet(std::shared_ptr tablet) { _tablet = tablet; } + void get_split_tasks(std::vector* split_tasks) { split_tasks->swap(_split_tasks); } protected: @@ -136,6 +139,8 @@ class TabletReader final : public ChunkIterator { bool _is_key = false; RowSourceMaskBuffer* _mask_buffer = nullptr; + std::shared_ptr _tablet; + // used for table internal parallel bool _need_split = false; bool _could_split_physically = false; diff --git a/be/src/storage/lake/tablet_writer.h b/be/src/storage/lake/tablet_writer.h index e4a208f854a48..aa7163c33910f 100644 --- a/be/src/storage/lake/tablet_writer.h +++ b/be/src/storage/lake/tablet_writer.h @@ -21,6 +21,7 @@ #include "fs/fs.h" // FileInfo #include "gen_cpp/data.pb.h" #include "gen_cpp/lake_types.pb.h" +#include "storage/lake/location_provider.h" #include "storage/tablet_schema.h" namespace starrocks { @@ -128,6 +129,11 @@ class TabletWriter { // allow to set custom tablet schema for writer, used in partial update void set_tablet_schema(TabletSchemaCSPtr schema) { _schema = std::move(schema); } + void set_fs(const std::shared_ptr fs) { _fs = std::move(fs); } + void set_location_provider(const std::shared_ptr location_provider) { + _location_provider = std::move(location_provider); + } + const OlapWriterStatistics& stats() const { return _stats; } protected: @@ -141,6 +147,8 @@ class TabletWriter { int64_t _data_size = 0; uint32_t _seg_id = 0; bool _finished = false; + std::shared_ptr _fs; + std::shared_ptr _location_provider; OlapWriterStatistics _stats; bool _is_compaction = false; diff --git a/be/src/storage/lake/update_manager.cpp b/be/src/storage/lake/update_manager.cpp index 6f3c0572c4053..6d7ae513b4a9c 100644 --- a/be/src/storage/lake/update_manager.cpp +++ b/be/src/storage/lake/update_manager.cpp @@ -38,11 +38,11 @@ namespace starrocks::lake { -UpdateManager::UpdateManager(LocationProvider* location_provider, MemTracker* mem_tracker) +UpdateManager::UpdateManager(std::shared_ptr location_provider, MemTracker* mem_tracker) : _index_cache(std::numeric_limits::max()), _update_state_cache(std::numeric_limits::max()), _compaction_cache(std::numeric_limits::max()), - _location_provider(location_provider), + _location_provider(std::move(location_provider)), _pk_index_shards(config::pk_index_map_shard_size) { _update_mem_tracker = mem_tracker; const int64_t update_mem_limit = _update_mem_tracker->limit(); @@ -73,10 +73,6 @@ inline std::string cache_key(uint32_t tablet_id, int64_t txn_id) { return strings::Substitute("$0_$1", tablet_id, txn_id); } -Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) { - return _update_mgr->get_del_vec(tsid, version, _pk_builder, _fill_cache, pdelvec); -} - PersistentIndexBlockCache::PersistentIndexBlockCache(MemTracker* mem_tracker, int64_t cache_limit) : _cache(new_lru_cache(cache_limit)) { _mem_tracker = std::make_unique(cache_limit, "lake_persistent_index_block_cache", mem_tracker); @@ -621,8 +617,11 @@ Status UpdateManager::get_del_vec(const TabletSegmentId& tsid, int64_t version, // get delvec in meta file Status UpdateManager::get_del_vec_in_meta(const TabletSegmentId& tsid, int64_t meta_ver, bool fill_cache, DelVector* delvec) { - ASSIGN_OR_RETURN(auto metadata, _tablet_mgr->get_tablet_metadata(tsid.tablet_id, meta_ver, fill_cache)); - RETURN_IF_ERROR(lake::get_del_vec(_tablet_mgr, *metadata, tsid.segment_id, fill_cache, delvec)); + std::string filepath = _tablet_mgr->tablet_metadata_location(tsid.tablet_id, meta_ver); + LakeIOOptions lake_io_opts; + lake_io_opts.fill_data_cache = fill_cache; + ASSIGN_OR_RETURN(auto metadata, _tablet_mgr->get_tablet_metadata(filepath, fill_cache)); + RETURN_IF_ERROR(lake::get_del_vec(_tablet_mgr, *metadata, tsid.segment_id, fill_cache, lake_io_opts, delvec)); return Status::OK(); } @@ -725,8 +724,9 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti *std::max_element(op_compaction.input_rowsets().begin(), op_compaction.input_rowsets().end()); // 2. update primary index, and generate delete info. - auto resolver = std::make_unique( - &metadata, &output_rowset, this, builder, &index, txn_id, base_version, &segment_id_to_add_dels, &delvecs); + auto resolver = std::make_unique(&metadata, &output_rowset, _tablet_mgr, + builder, &index, txn_id, base_version, + &segment_id_to_add_dels, &delvecs); RETURN_IF_ERROR(resolver->execute()); _index_cache.update_object_size(index_entry, index.memory_usage()); // 3. update TabletMeta and write to meta file diff --git a/be/src/storage/lake/update_manager.h b/be/src/storage/lake/update_manager.h index 57338405cc2c3..e7d895bd63f74 100644 --- a/be/src/storage/lake/update_manager.h +++ b/be/src/storage/lake/update_manager.h @@ -42,18 +42,6 @@ class UpdateManager; struct AutoIncrementPartialUpdateState; using IndexEntry = DynamicCache::Entry; -class LakeDelvecLoader : public DelvecLoader { -public: - LakeDelvecLoader(UpdateManager* update_mgr, const MetaFileBuilder* pk_builder, bool fill_cache) - : _update_mgr(update_mgr), _pk_builder(pk_builder), _fill_cache(fill_cache) {} - Status load(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec); - -private: - UpdateManager* _update_mgr = nullptr; - const MetaFileBuilder* _pk_builder = nullptr; - bool _fill_cache = false; -}; - class PersistentIndexBlockCache { public: explicit PersistentIndexBlockCache(MemTracker* mem_tracker, int64_t cache_limit); @@ -90,7 +78,7 @@ class RssidFileInfoContainer { class UpdateManager { public: - UpdateManager(LocationProvider* location_provider, MemTracker* mem_tracker); + UpdateManager(std::shared_ptr location_provider, MemTracker* mem_tracker); ~UpdateManager(); void set_tablet_mgr(TabletManager* tablet_mgr) { _tablet_mgr = tablet_mgr; } void set_cache_expire_ms(int64_t expire_ms) { _cache_expire_ms = expire_ms; } @@ -251,7 +239,7 @@ class UpdateManager { // compaction cache DynamicCache _compaction_cache; std::atomic _last_clear_expired_cache_millis = 0; - LocationProvider* _location_provider = nullptr; + std::shared_ptr _location_provider; TabletManager* _tablet_mgr = nullptr; // memory checkers diff --git a/be/src/storage/lake/vertical_compaction_task.cpp b/be/src/storage/lake/vertical_compaction_task.cpp index 97c76deeabeaf..1f531bb1db282 100644 --- a/be/src/storage/lake/vertical_compaction_task.cpp +++ b/be/src/storage/lake/vertical_compaction_task.cpp @@ -118,9 +118,9 @@ StatusOr VerticalCompactionTask::calculate_chunk_size_for_column_group( // test case: 4k columns, 150 segments, 60w rows // compaction task cost: 272s (fill metadata cache) vs 2400s (not fill metadata cache) LakeIOOptions lake_io_opts{.fill_data_cache = config::lake_enable_vertical_compaction_fill_data_cache, - .buffer_size = config::lake_compaction_stream_buffer_size_bytes}; - auto fill_meta_cache = true; - ASSIGN_OR_RETURN(auto segments, rowset->segments(lake_io_opts, fill_meta_cache)); + .buffer_size = config::lake_compaction_stream_buffer_size_bytes, + .fill_metadata_cache = true}; + ASSIGN_OR_RETURN(auto segments, rowset->segments(lake_io_opts)); for (auto& segment : segments) { for (auto column_index : column_group) { auto uid = _tablet_schema->column(column_index).unique_id(); diff --git a/be/src/storage/olap_common.h b/be/src/storage/olap_common.h index 877e1b979f49c..cef97d8063657 100644 --- a/be/src/storage/olap_common.h +++ b/be/src/storage/olap_common.h @@ -149,10 +149,11 @@ enum ReaderType { READER_BASE_COMPACTION = 2, READER_CUMULATIVE_COMPACTION = 3, READER_CHECKSUM = 4, + READER_BYPASS_QUERY = 5, }; inline bool is_query(ReaderType reader_type) { - return reader_type == READER_QUERY; + return reader_type == READER_QUERY || reader_type == READER_BYPASS_QUERY; } inline bool is_compaction(ReaderType reader_type) { diff --git a/be/src/storage/options.h b/be/src/storage/options.h index 42f7fb85dfef1..3dc2a5a9680a3 100644 --- a/be/src/storage/options.h +++ b/be/src/storage/options.h @@ -38,6 +38,8 @@ #include #include +#include "fs/fs.h" +#include "storage/lake/location_provider.h" #include "storage/olap_define.h" #include "util/uid_util.h" @@ -77,6 +79,10 @@ struct LakeIOOptions { bool fill_data_cache = false; // Specify different buffer size for different read scenarios int64_t buffer_size = -1; + bool fill_metadata_cache = false; + bool use_page_cache = false; + std::shared_ptr fs; + std::shared_ptr location_provider; }; } // namespace starrocks diff --git a/be/src/storage/protobuf_file.cpp b/be/src/storage/protobuf_file.cpp index fdd883c339c91..9ec8fa5b576f0 100644 --- a/be/src/storage/protobuf_file.cpp +++ b/be/src/storage/protobuf_file.cpp @@ -55,7 +55,12 @@ Status ProtobufFileWithHeader::save(const ::google::protobuf::Message& message, header.version = OLAP_DATA_VERSION_APPLIED; header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER; - ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_path)); + std::shared_ptr fs; + if (_fs) { + fs = _fs; + } else { + ASSIGN_OR_RETURN(fs, FileSystem::CreateSharedFromString(_path)); + } WritableFileOptions opts{.sync_on_close = sync, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE}; ASSIGN_OR_RETURN(auto output_file, fs->new_writable_file(opts, _path)); RETURN_IF_ERROR(output_file->append(Slice((const char*)(&header), sizeof(header)))); @@ -67,7 +72,12 @@ Status ProtobufFileWithHeader::save(const ::google::protobuf::Message& message, Status ProtobufFileWithHeader::load(::google::protobuf::Message* message, bool fill_cache) { SequentialFileOptions opts{.skip_fill_local_cache = !fill_cache}; - ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_path)); + std::shared_ptr fs; + if (_fs) { + fs = _fs; + } else { + ASSIGN_OR_RETURN(fs, FileSystem::CreateSharedFromString(_path)); + } ASSIGN_OR_RETURN(auto input_file, fs->new_sequential_file(opts, _path)); FixedFileHeader header; @@ -140,7 +150,12 @@ Status ProtobufFile::save(const ::google::protobuf::Message& message, bool sync) return Status::InternalError( fmt::format("failed to serialize protobuf to string, maybe the protobuf is too large. path={}", _path)); } - ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_path)); + std::shared_ptr fs; + if (_fs) { + fs = _fs; + } else { + ASSIGN_OR_RETURN(fs, FileSystem::CreateSharedFromString(_path)); + } WritableFileOptions opts{.sync_on_close = sync, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE}; ASSIGN_OR_RETURN(auto output_file, fs->new_writable_file(opts, _path)); RETURN_IF_ERROR(output_file->append(serialized_message)); @@ -150,7 +165,12 @@ Status ProtobufFile::save(const ::google::protobuf::Message& message, bool sync) Status ProtobufFile::load(::google::protobuf::Message* message, bool fill_cache) { RandomAccessFileOptions opts{.skip_fill_local_cache = !fill_cache}; - ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_path)); + std::shared_ptr fs; + if (_fs) { + fs = _fs; + } else { + ASSIGN_OR_RETURN(fs, FileSystem::CreateSharedFromString(_path)); + } ASSIGN_OR_RETURN(auto input_file, fs->new_random_access_file(opts, _path)); ASSIGN_OR_RETURN(auto serialized_string, input_file->read_all()); if (bool parsed = message->ParseFromString(serialized_string); !parsed) { diff --git a/be/src/storage/protobuf_file.h b/be/src/storage/protobuf_file.h index c946b4994c633..248ffa167702c 100644 --- a/be/src/storage/protobuf_file.h +++ b/be/src/storage/protobuf_file.h @@ -31,6 +31,8 @@ class ProtobufFile { public: explicit ProtobufFile(std::string path) : _path(std::move(path)) {} + explicit ProtobufFile(std::string path, std::shared_ptr fs) : _path(std::move(path)), _fs(fs) {} + DISALLOW_COPY_AND_MOVE(ProtobufFile); Status save(const ::google::protobuf::Message& message, bool sync = true); @@ -39,12 +41,16 @@ class ProtobufFile { private: std::string _path; + std::shared_ptr _fs; }; class ProtobufFileWithHeader { public: explicit ProtobufFileWithHeader(std::string path) : _path(std::move(path)) {} + explicit ProtobufFileWithHeader(std::string path, std::shared_ptr fs) + : _path(std::move(path)), _fs(fs) {} + DISALLOW_COPY_AND_MOVE(ProtobufFileWithHeader); Status save(const ::google::protobuf::Message& message, bool sync = true); @@ -55,6 +61,7 @@ class ProtobufFileWithHeader { private: std::string _path; + std::shared_ptr _fs; }; } // namespace starrocks diff --git a/be/src/storage/rowset/indexed_column_reader.cpp b/be/src/storage/rowset/indexed_column_reader.cpp index 0da739aa17f38..fb5641a1fc69c 100644 --- a/be/src/storage/rowset/indexed_column_reader.cpp +++ b/be/src/storage/rowset/indexed_column_reader.cpp @@ -106,9 +106,9 @@ Status IndexedColumnReader::new_iterator(const IndexReadOptions& opts, std::uniq } /////////////////////////////////////////////////////////////////////////////// -IndexedColumnIterator::IndexedColumnIterator(const IndexedColumnReader* reader, const IndexReadOptions& opts) +IndexedColumnIterator::IndexedColumnIterator(const IndexedColumnReader* reader, IndexReadOptions opts) : _reader(reader), - _opts(opts), + _opts(std::move(opts)), _ordinal_iter(&reader->_ordinal_index_reader), _value_iter(&reader->_value_index_reader) {} diff --git a/be/src/storage/rowset/indexed_column_reader.h b/be/src/storage/rowset/indexed_column_reader.h index dc159bcb8a538..a158bf4eedd20 100644 --- a/be/src/storage/rowset/indexed_column_reader.h +++ b/be/src/storage/rowset/indexed_column_reader.h @@ -86,7 +86,7 @@ class IndexedColumnIterator { Status next_batch(size_t* n, Column* column); private: - IndexedColumnIterator(const IndexedColumnReader* reader, const IndexReadOptions& opts); + IndexedColumnIterator(const IndexedColumnReader* reader, IndexReadOptions opts); Status _read_data_page(const PagePointer& pp); diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index 4b3f8108c2ffd..72259565248c7 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -50,6 +50,7 @@ #include "storage/rowset/base_rowset.h" #include "storage/rowset/rowset_meta.h" #include "storage/rowset/segment.h" +#include "storage/rowset/segment_options.h" namespace starrocks { diff --git a/be/src/storage/rowset/scalar_column_iterator.cpp b/be/src/storage/rowset/scalar_column_iterator.cpp index 481e4ca1f20a4..9f0e6e7a83c81 100644 --- a/be/src/storage/rowset/scalar_column_iterator.cpp +++ b/be/src/storage/rowset/scalar_column_iterator.cpp @@ -52,7 +52,7 @@ Status ScalarColumnIterator::init(const ColumnIteratorOptions& opts) { _opts = opts; IndexReadOptions index_opts; - index_opts.use_page_cache = !opts.temporary_data && + index_opts.use_page_cache = !opts.temporary_data && opts.use_page_cache && (config::enable_ordinal_index_memory_page_cache || !config::disable_storage_page_cache); index_opts.kept_in_memory = !opts.temporary_data && config::enable_ordinal_index_memory_page_cache; index_opts.lake_io_opts = opts.lake_io_opts; @@ -382,7 +382,7 @@ Status ScalarColumnIterator::get_row_ranges_by_zone_map(const std::vectornew_random_access_file(file_opts, _segment_file_info)); PageReadOptions opts; - opts.use_page_cache = !config::disable_storage_page_cache; + opts.use_page_cache = lake_io_opts.use_page_cache; opts.read_file = read_file.get(); opts.page_pointer = _short_key_index_page; opts.codec = nullptr; // short key index page uses NO_COMPRESSION for now diff --git a/be/src/storage/rowset/segment_iterator.cpp b/be/src/storage/rowset/segment_iterator.cpp index dcecb09436aef..3a576e17a90e9 100644 --- a/be/src/storage/rowset/segment_iterator.cpp +++ b/be/src/storage/rowset/segment_iterator.cpp @@ -2084,8 +2084,9 @@ Status SegmentIterator::_apply_bitmap_index() { } IndexReadOptions opts; - opts.use_page_cache = !_opts.temporary_data && (config::enable_bitmap_index_memory_page_cache || - !config::disable_storage_page_cache); + opts.use_page_cache = + !_opts.temporary_data && _opts.use_page_cache && + (config::enable_bitmap_index_memory_page_cache || !config::disable_storage_page_cache); opts.kept_in_memory = !_opts.temporary_data && config::enable_bitmap_index_memory_page_cache; opts.lake_io_opts = _opts.lake_io_opts; opts.read_file = _column_files[cid].get(); diff --git a/be/src/storage/tablet_reader_params.h b/be/src/storage/tablet_reader_params.h index ff4b263c524d4..2b99df93e8178 100644 --- a/be/src/storage/tablet_reader_params.h +++ b/be/src/storage/tablet_reader_params.h @@ -62,7 +62,7 @@ struct TabletReaderParams { bool use_page_cache = false; // Options only applies to cloud-native table r/w IO - LakeIOOptions lake_io_opts{.fill_data_cache = true}; + LakeIOOptions lake_io_opts{.fill_data_cache = true, .fill_metadata_cache = true}; RangeStartOperation range = RangeStartOperation::GT; RangeEndOperation end_range = RangeEndOperation::LT; diff --git a/be/src/types/timestamp_value.cpp b/be/src/types/timestamp_value.cpp index e4a638e1cbe61..3d7b40deb3379 100644 --- a/be/src/types/timestamp_value.cpp +++ b/be/src/types/timestamp_value.cpp @@ -827,6 +827,7 @@ void TimestampValue::trunc_to_quarter() { _timestamp = timestamp::from_datetime(year, month_to_quarter[month], 1, 0, 0, 0, 0); } +// return seconds since epoch. int64_t TimestampValue::to_unix_second() const { int64_t result = timestamp::to_julian(_timestamp); result *= SECS_PER_DAY; @@ -835,6 +836,16 @@ int64_t TimestampValue::to_unix_second() const { return result; } +// return microseconds since epoch. +int64_t TimestampValue::to_unixtime() const { + int64_t result = timestamp::to_julian(_timestamp); + result *= SECS_PER_DAY; + result -= timestamp::UNIX_EPOCH_SECONDS; + result *= 1000L; + result += timestamp::to_time(_timestamp) / USECS_PER_MILLIS; + return result; +} + bool TimestampValue::from_unixtime(int64_t second, const std::string& timezone) { cctz::time_zone ctz; if (!TimezoneUtils::find_cctz_time_zone(timezone, ctz)) { diff --git a/be/src/types/timestamp_value.h b/be/src/types/timestamp_value.h index 67509c0725dd0..6c37a6bae6f96 100644 --- a/be/src/types/timestamp_value.h +++ b/be/src/types/timestamp_value.h @@ -120,6 +120,7 @@ class TimestampValue { bool from_string(const char* date_str, size_t len); int64_t to_unix_second() const; + int64_t to_unixtime() const; bool from_unixtime(int64_t second, const std::string& timezone); void from_unixtime(int64_t second, const cctz::time_zone& ctz); diff --git a/be/test/column/timestamp_value_test.cpp b/be/test/column/timestamp_value_test.cpp index 4e3f0444e0d1d..33716329dbb7a 100644 --- a/be/test/column/timestamp_value_test.cpp +++ b/be/test/column/timestamp_value_test.cpp @@ -114,4 +114,10 @@ TEST(TimestampValueTest, cast) { ASSERT_EQ("2004-02-29", ((DateValue)v).to_string()); } +TEST(TimestampValueTest, unixTime) { + auto v = TimestampValue::create(2004, 2, 29, 23, 30, 30); + std::cout << "unix time " << v.to_unixtime() << std::endl; + ASSERT_EQ(1078097430000, v.to_unixtime()); +} + } // namespace starrocks diff --git a/be/test/exec/lake_meta_scanner_test.cpp b/be/test/exec/lake_meta_scanner_test.cpp index bfebf0f2f41ec..e4ca941085737 100644 --- a/be/test/exec/lake_meta_scanner_test.cpp +++ b/be/test/exec/lake_meta_scanner_test.cpp @@ -50,9 +50,9 @@ class LakeMetaScannerTest : public ::testing::Test { public: LakeMetaScannerTest() : _tablet_id(next_id()) { // setup TabletManager - _location_provider = std::make_unique(kRootLocation); + _location_provider = std::make_shared(kRootLocation); _tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager(); - _backup_location_provider = _tablet_mgr->TEST_set_location_provider(_location_provider.get()); + _backup_location_provider = _tablet_mgr->TEST_set_location_provider(_location_provider); FileSystem::Default()->create_dir_recursive(lake::join_path(kRootLocation, lake::kSegmentDirectoryName)); FileSystem::Default()->create_dir_recursive(lake::join_path(kRootLocation, lake::kMetadataDirectoryName)); FileSystem::Default()->create_dir_recursive(lake::join_path(kRootLocation, lake::kTxnLogDirectoryName)); @@ -113,8 +113,8 @@ class LakeMetaScannerTest : public ::testing::Test { public: constexpr static const char* const kRootLocation = "./LakeMetaScannerTest"; lake::TabletManager* _tablet_mgr; - std::unique_ptr _location_provider; - lake::LocationProvider* _backup_location_provider; + std::shared_ptr _location_provider; + std::shared_ptr _backup_location_provider; int64_t _tablet_id; ObjectPool _pool; diff --git a/be/test/fs/fs_s3_test.cpp b/be/test/fs/fs_s3_test.cpp index 4cb3bbc86a60c..9f8c2768b1251 100644 --- a/be/test/fs/fs_s3_test.cpp +++ b/be/test/fs/fs_s3_test.cpp @@ -101,6 +101,40 @@ TEST_F(S3FileSystemTest, test_write_and_read) { EXPECT_ERROR(rf->read_at(0, buf, sizeof(buf))); } +TEST_F(S3FileSystemTest, test_write_and_read_with_options) { + auto uri = S3Path("/dir/test-object.png"); + auto fs_opts = FSOptions( + {{FSOptions::FS_S3_ENDPOINT, config::object_storage_endpoint}, + {FSOptions::FS_S3_ENDPOINT_REGION, config::object_storage_region}, + {FSOptions::FS_S3_PATH_STYLE_ACCESS, std::to_string(config::object_storage_endpoint_path_style_access)}, + {FSOptions::FS_S3_ACCESS_KEY, config::object_storage_access_key_id}, + {FSOptions::FS_S3_SECRET_KEY, config::object_storage_secret_access_key}, + {FSOptions::FS_S3_CONNECTION_SSL_ENABLED, std::to_string(config::object_storage_endpoint_use_https)}, + {FSOptions::FS_S3_READ_AHEAD_RANGE, std::to_string(64 * 1024)}, + {FSOptions::FS_S3_RETRY_LIMIT, std::to_string(config::object_storage_max_retries)}, + {FSOptions::FS_S3_RETRY_INTERVAL, std::to_string(config::object_storage_retry_scale_factor)}}); + ASSERT_TRUE(nullptr == fs_opts.hdfs_properties()); + ASSIGN_OR_ABORT(auto fs, FileSystem::CreateUniqueFromString(uri, fs_opts)); + ASSIGN_OR_ABORT(auto wf, fs->new_writable_file(uri)); + EXPECT_OK(wf->append("hello")); + EXPECT_OK(wf->append(" world!")); + EXPECT_OK(wf->sync()); + EXPECT_OK(wf->close()); + EXPECT_EQ(sizeof("hello world!"), wf->size() + 1); + + char buf[1024]; + ASSIGN_OR_ABORT(auto rf, fs->new_random_access_file(uri)); + ASSIGN_OR_ABORT(auto nr, rf->read_at(0, buf, sizeof(buf))); + EXPECT_EQ("hello world!", std::string_view(buf, nr)); + + ASSIGN_OR_ABORT(nr, rf->read_at(3, buf, sizeof(buf))); + EXPECT_EQ("lo world!", std::string_view(buf, nr)); + + EXPECT_OK(fs->delete_file(uri)); + ASSIGN_OR_ABORT(rf, fs->new_random_access_file(uri)); + EXPECT_ERROR(rf->read_at(0, buf, sizeof(buf))); +} + TEST_F(S3FileSystemTest, test_root_directory) { ASSIGN_OR_ABORT(auto fs, FileSystem::CreateUniqueFromString("s3://")); bool created = false; diff --git a/be/test/fs/fs_test.cpp b/be/test/fs/fs_test.cpp index 7f775dd07dd2f..d5d7fc35db839 100644 --- a/be/test/fs/fs_test.cpp +++ b/be/test/fs/fs_test.cpp @@ -61,6 +61,13 @@ TEST(FileSystemTest, test_good_construction) { ASSIGN_OR_ABORT(auto fs, FileSystem::CreateSharedFromString("unknown2://")); ASSERT_EQ(fs->type(), FileSystem::S3); } + + { + std::unordered_map params = {{"fs.s3a.readahead.range", "100"}}; + std::unique_ptr fs_options = std::make_unique(params); + ASSIGN_OR_ABORT(auto fs, FileSystem::Create("unknown2://", *fs_options)); + ASSERT_EQ(fs->type(), FileSystem::S3); + } } } // namespace starrocks diff --git a/be/test/io/s3_input_stream_test.cpp b/be/test/io/s3_input_stream_test.cpp index 493a8ab4c42e0..2f2f38d936956 100644 --- a/be/test/io/s3_input_stream_test.cpp +++ b/be/test/io/s3_input_stream_test.cpp @@ -55,6 +55,8 @@ class S3InputStreamTest : public testing::Test { std::unique_ptr new_random_access_file(); + std::unique_ptr new_random_access_file_prefetch(int64_t read_ahead_size); + protected: inline static const char* s_bucket_name = nullptr; }; @@ -104,6 +106,10 @@ std::unique_ptr S3InputStreamTest::new_random_access_file() { return std::make_unique(g_s3client, s_bucket_name, kObjectName); } +std::unique_ptr S3InputStreamTest::new_random_access_file_prefetch(int64_t read_ahead_size) { + return std::make_unique(g_s3client, s_bucket_name, kObjectName, read_ahead_size); +} + void S3InputStreamTest::put_object(const std::string& object_content) { std::shared_ptr stream = Aws::MakeShared("", object_content); @@ -204,4 +210,11 @@ TEST_F(S3InputStreamTest, test_read_all) { EXPECT_EQ(kObjectContent, s); } +TEST_F(S3InputStreamTest, test_prefetch) { + auto f = new_random_access_file_prefetch(2); + char buf[6]; + + ASSIGN_OR_ABORT(auto r, f->read(buf, sizeof(buf))); + ASSERT_EQ("012345", std::string_view(buf, r)); +} } // namespace starrocks::io diff --git a/be/test/io/s3_output_stream_test.cpp b/be/test/io/s3_output_stream_test.cpp index 0fc0d16038ece..f619ca5763bdc 100644 --- a/be/test/io/s3_output_stream_test.cpp +++ b/be/test/io/s3_output_stream_test.cpp @@ -137,7 +137,7 @@ TEST_F(S3OutputStreamTest, test_multipart_upload) { const char* kObjectName = "test_multipart_upload"; delete_object(kObjectName); S3OutputStream os(g_s3client, kBucketName, kObjectName, 12, /*5MB=*/5 * 1024 * 1024); - S3InputStream is(g_s3client, kBucketName, kObjectName); + S3InputStream is(g_s3client, kBucketName, kObjectName, /*5MB=*/5 * 1024 * 1024); std::string s1("first line of multipart upload\n"); std::string s2("second line of multipart upload\n"); diff --git a/be/test/runtime/lake_tablets_channel_test.cpp b/be/test/runtime/lake_tablets_channel_test.cpp index 0e3ccf10c8da4..239155e524766 100644 --- a/be/test/runtime/lake_tablets_channel_test.cpp +++ b/be/test/runtime/lake_tablets_channel_test.cpp @@ -59,9 +59,8 @@ class LakeTabletsChannelTestBase : public testing::Test { _mem_tracker = std::make_unique(1024 * 1024); _root_profile = std::make_unique("LoadChannel"); _location_provider = std::make_unique(_test_directory); - _update_manager = std::make_unique(_location_provider.get(), _mem_tracker.get()); - _tablet_manager = - std::make_unique(_location_provider.get(), _update_manager.get(), 1024 * 1024); + _update_manager = std::make_unique(_location_provider, _mem_tracker.get()); + _tablet_manager = std::make_unique(_location_provider, _update_manager.get(), 1024 * 1024); _load_channel_mgr = std::make_unique(); @@ -247,11 +246,10 @@ class LakeTabletsChannelTestBase : public testing::Test { int64_t _schema_id; std::unique_ptr _mem_tracker; std::unique_ptr _root_profile; - std::unique_ptr _location_provider; + std::shared_ptr _location_provider; std::unique_ptr _update_manager; std::unique_ptr _tablet_manager; std::unique_ptr _load_channel_mgr; - lake::LocationProvider* _backup_location_provider; std::shared_ptr _tablet_schema; std::shared_ptr _schema; std::shared_ptr _schema_param; diff --git a/be/test/runtime/load_channel_test.cpp b/be/test/runtime/load_channel_test.cpp index ab6e15a2d2a99..4bb3fa4d72cf8 100644 --- a/be/test/runtime/load_channel_test.cpp +++ b/be/test/runtime/load_channel_test.cpp @@ -54,9 +54,8 @@ class LoadChannelTestForLakeTablet : public testing::Test { _schema_id = next_id(); _mem_tracker = std::make_unique(1024 * 1024); _location_provider = std::make_unique(kTestGroupPath); - _update_manager = std::make_unique(_location_provider.get(), _mem_tracker.get()); - _tablet_manager = - std::make_unique(_location_provider.get(), _update_manager.get(), 1024 * 1024); + _update_manager = std::make_unique(_location_provider, _mem_tracker.get()); + _tablet_manager = std::make_unique(_location_provider, _update_manager.get(), 1024 * 1024); _load_channel_mgr = std::make_unique(); @@ -238,7 +237,7 @@ class LoadChannelTestForLakeTablet : public testing::Test { int64_t _schema_id; std::unique_ptr _mem_tracker; - std::unique_ptr _location_provider; + std::shared_ptr _location_provider; std::unique_ptr _update_manager; std::unique_ptr _tablet_manager; std::unique_ptr _load_channel_mgr; diff --git a/be/test/service/lake_service_test.cpp b/be/test/service/lake_service_test.cpp index ebc3d5bc0ad14..38d87d9d5a6c4 100644 --- a/be/test/service/lake_service_test.cpp +++ b/be/test/service/lake_service_test.cpp @@ -46,7 +46,7 @@ class LakeServiceTest : public testing::Test { LakeServiceTest() : _tablet_id(next_id()), _partition_id(next_id()), - _location_provider(new lake::FixedLocationProvider(kRootLocation)), + _location_provider(std::make_shared(kRootLocation)), _tablet_mgr(ExecEnv::GetInstance()->lake_tablet_manager()), _lake_service(ExecEnv::GetInstance(), ExecEnv::GetInstance()->lake_tablet_manager()) { _backup_location_provider = _tablet_mgr->TEST_set_location_provider(_location_provider); @@ -58,7 +58,6 @@ class LakeServiceTest : public testing::Test { ~LakeServiceTest() override { CHECK_OK(fs::remove_all(kRootLocation)); (void)_tablet_mgr->TEST_set_location_provider(_backup_location_provider); - delete _location_provider; } void create_tablet() { @@ -102,9 +101,9 @@ class LakeServiceTest : public testing::Test { constexpr static const char* const kRootLocation = "./lake_service_test"; int64_t _tablet_id; int64_t _partition_id; - lake::LocationProvider* _location_provider; + std::shared_ptr _location_provider; lake::TabletManager* _tablet_mgr; - lake::LocationProvider* _backup_location_provider; + std::shared_ptr _backup_location_provider; LakeServiceImpl _lake_service; }; diff --git a/be/test/storage/lake/compaction_policy_test.cpp b/be/test/storage/lake/compaction_policy_test.cpp index 6964906a1ef38..0d6ae6732c6c0 100644 --- a/be/test/storage/lake/compaction_policy_test.cpp +++ b/be/test/storage/lake/compaction_policy_test.cpp @@ -17,7 +17,9 @@ #include #include "fs/fs_util.h" +#include "runtime/exec_env.h" #include "storage/lake/join_path.h" +#include "storage/lake/tablet.h" #include "storage/lake/tablet_manager.h" #include "test_util.h" #include "testutil/assert.h" diff --git a/be/test/storage/lake/meta_file_test.cpp b/be/test/storage/lake/meta_file_test.cpp index 1ae12ad31a5de..2dea165654ec0 100644 --- a/be/test/storage/lake/meta_file_test.cpp +++ b/be/test/storage/lake/meta_file_test.cpp @@ -61,16 +61,15 @@ class MetaFileTest : public ::testing::Test { _location_provider = std::make_unique(kTestDir); _mem_tracker = std::make_unique(1024 * 1024); - _update_manager = std::make_unique(_location_provider.get(), _mem_tracker.get()); - _tablet_manager = - std::make_unique(_location_provider.get(), _update_manager.get(), 1638400000); + _update_manager = std::make_unique(_location_provider, _mem_tracker.get()); + _tablet_manager = std::make_unique(_location_provider, _update_manager.get(), 1638400000); } void TearDown() { (void)FileSystem::Default()->delete_dir_recursive(kTestDir); } protected: constexpr static const char* const kTestDir = "./lake_meta_test"; - std::unique_ptr _location_provider; + std::shared_ptr _location_provider; std::unique_ptr _tablet_manager; std::unique_ptr _mem_tracker; std::unique_ptr _update_manager; @@ -124,8 +123,9 @@ TEST_F(MetaFileTest, test_delvec_rw) { // 3. read delvec DelVector after_delvec; + LakeIOOptions lake_io_opts; ASSIGN_OR_ABORT(auto metadata2, _tablet_manager->get_tablet_metadata(tablet_id, version)); - EXPECT_TRUE(get_del_vec(_tablet_manager.get(), *metadata2, segment_id, true, &after_delvec).ok()); + EXPECT_TRUE(get_del_vec(_tablet_manager.get(), *metadata2, segment_id, true, lake_io_opts, &after_delvec).ok()); EXPECT_EQ(before_delvec, after_delvec.save()); // 4. read meta @@ -225,8 +225,9 @@ TEST_F(MetaFileTest, test_delvec_read_loop) { // 3. read delvec DelVector after_delvec; + LakeIOOptions lake_io_opts; ASSIGN_OR_ABORT(auto meta, _tablet_manager->get_tablet_metadata(tablet_id, version)); - EXPECT_TRUE(get_del_vec(_tablet_manager.get(), *meta, segment_id, false, &after_delvec).ok()); + EXPECT_TRUE(get_del_vec(_tablet_manager.get(), *meta, segment_id, false, lake_io_opts, &after_delvec).ok()); EXPECT_EQ(before_delvec, after_delvec.save()); }; for (uint32_t segment_id = 1000; segment_id < 1200; segment_id++) { diff --git a/be/test/storage/lake/replication_txn_manager_test.cpp b/be/test/storage/lake/replication_txn_manager_test.cpp index 3a0f7dedc8118..e46806051a275 100644 --- a/be/test/storage/lake/replication_txn_manager_test.cpp +++ b/be/test/storage/lake/replication_txn_manager_test.cpp @@ -58,13 +58,13 @@ class LakeReplicationTxnManagerTest : public testing::TestWithParam paths; CHECK_OK(starrocks::parse_conf_store_paths(starrocks::config::storage_root_path, &paths)); _test_dir = paths[0].path + "/lake"; - _location_provider = std::make_unique(_test_dir); + _location_provider = std::make_shared(_test_dir); CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->metadata_root_location(1))); CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->txn_log_root_location(1))); CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->segment_root_location(1))); _mem_tracker = std::make_unique(1024 * 1024); - _update_manager = std::make_unique(_location_provider.get(), _mem_tracker.get()); - _tablet_manager = std::make_unique(_location_provider.get(), _update_manager.get(), 16384); + _update_manager = std::make_unique(_location_provider, _mem_tracker.get()); + _tablet_manager = std::make_unique(_location_provider, _update_manager.get(), 16384); _replication_txn_manager = std::make_unique(_tablet_manager.get()); ASSERT_TRUE(_tablet_manager->create_tablet(get_create_tablet_req(_tablet_id, _version, _schema_hash)).ok()); @@ -209,7 +209,7 @@ class LakeReplicationTxnManagerTest : public testing::TestWithParam _tablet_manager; std::string _test_dir; - std::unique_ptr _location_provider; + std::shared_ptr _location_provider; std::unique_ptr _mem_tracker; std::unique_ptr _update_manager; std::unique_ptr _replication_txn_manager; diff --git a/be/test/storage/lake/rowset_test.cpp b/be/test/storage/lake/rowset_test.cpp index 0390249d50bb6..faf5a28bf1d77 100644 --- a/be/test/storage/lake/rowset_test.cpp +++ b/be/test/storage/lake/rowset_test.cpp @@ -132,8 +132,8 @@ TEST_F(LakeRowsetTest, test_load_segments) { } // fill data cache: false, fill metadata cache: true - LakeIOOptions lake_io_opts{.fill_data_cache = false}; - ASSIGN_OR_ABORT(auto segments2, rowset->segments(lake_io_opts, true)); + LakeIOOptions lake_io_opts{.fill_data_cache = false, .fill_metadata_cache = true}; + ASSIGN_OR_ABORT(auto segments2, rowset->segments(lake_io_opts)); ASSERT_EQ(2, segments2.size()); for (const auto& seg : segments2) { auto segment = cache->lookup_segment(seg->file_name()); diff --git a/be/test/storage/lake/schema_change_test.cpp b/be/test/storage/lake/schema_change_test.cpp index 205380064602f..a9a0a27ce88f8 100644 --- a/be/test/storage/lake/schema_change_test.cpp +++ b/be/test/storage/lake/schema_change_test.cpp @@ -64,8 +64,8 @@ class SchemaChangeTest : public testing::Test, public testing::WithParamInterfac SchemaChangeTest(const std::string& test_dir) { _mem_tracker = std::make_unique(1024 * 1024); _location_provider = std::make_unique(test_dir); - _update_manager = std::make_unique(_location_provider.get(), _mem_tracker.get()); - _tablet_manager = std::make_unique(_location_provider.get(), _update_manager.get(), 1024 * 1024); + _update_manager = std::make_unique(_location_provider, _mem_tracker.get()); + _tablet_manager = std::make_unique(_location_provider, _update_manager.get(), 1024 * 1024); } protected: @@ -105,7 +105,7 @@ class SchemaChangeTest : public testing::Test, public testing::WithParamInterfac } std::unique_ptr _mem_tracker; - std::unique_ptr _location_provider; + std::shared_ptr _location_provider; std::unique_ptr _update_manager; std::unique_ptr _tablet_manager; diff --git a/be/test/storage/lake/tablet_manager_test.cpp b/be/test/storage/lake/tablet_manager_test.cpp index b1924a7b0986d..35f5aa5f035e5 100644 --- a/be/test/storage/lake/tablet_manager_test.cpp +++ b/be/test/storage/lake/tablet_manager_test.cpp @@ -54,7 +54,7 @@ class LakeTabletManagerTest : public testing::Test { std::vector paths; CHECK_OK(starrocks::parse_conf_store_paths(starrocks::config::storage_root_path, &paths)); _test_dir = paths[0].path + "/lake"; - _location_provider = new lake::FixedLocationProvider(_test_dir); + _location_provider = std::make_shared(_test_dir); CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->metadata_root_location(1))); CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->txn_log_root_location(1))); CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->segment_root_location(1))); @@ -65,13 +65,12 @@ class LakeTabletManagerTest : public testing::Test { void TearDown() override { delete _tablet_manager; - delete _location_provider; - (void)FileSystem::Default()->delete_dir_recursive(_test_dir); + FileSystem::Default()->delete_dir_recursive(_test_dir); } starrocks::lake::TabletManager* _tablet_manager{nullptr}; std::string _test_dir; - lake::LocationProvider* _location_provider{nullptr}; + std::shared_ptr _location_provider{nullptr}; std::unique_ptr _mem_tracker; std::unique_ptr _update_manager; }; @@ -611,8 +610,8 @@ TCreateTabletReq build_create_tablet_request(int64_t tablet_id, int64_t index_id TEST_F(LakeTabletManagerTest, test_multi_partition_schema_file) { const static int kNumPartition = 4; const static int64_t kIndexId = 123454321; - auto lp = std::make_unique(_test_dir, kNumPartition); - _tablet_manager->TEST_set_location_provider(lp.get()); + auto lp = std::make_shared(_test_dir, kNumPartition); + _tablet_manager->TEST_set_location_provider(lp); for (int i = 0; i < 10; i++) { auto req = build_create_tablet_request(next_id(), kIndexId); ASSERT_OK(_tablet_manager->create_tablet(req)); diff --git a/be/test/storage/lake/tablet_writer_test.cpp b/be/test/storage/lake/tablet_writer_test.cpp index e0232cf3fc6db..d79203f3065ab 100644 --- a/be/test/storage/lake/tablet_writer_test.cpp +++ b/be/test/storage/lake/tablet_writer_test.cpp @@ -24,6 +24,7 @@ #include "common/logging.h" #include "fs/fs_util.h" #include "storage/chunk_helper.h" +#include "storage/lake/starlet_location_provider.h" #include "storage/lake/versioned_tablet.h" #include "storage/rowset/segment.h" #include "storage/rowset/segment_options.h" @@ -334,6 +335,25 @@ TEST_P(LakeTabletWriterTest, test_vertical_write_close_without_finish) { writer->close(); } +#ifdef USE_STAROS + +TEST_P(LakeTabletWriterTest, test_write_sdk) { + auto provider = std::make_shared(); + auto location = provider->root_location(12345); + + Tablet tablet(_tablet_mgr.get(), next_id(), provider, _tablet_metadata); + auto meta_location = tablet.metadata_location(0); + auto column_size = tablet.get_schema()->get()->num_columns(); + auto txn_log_location = tablet.txn_log_location(0); + auto txn_vlog_location = tablet.txn_vlog_location(0); + auto test_segment_location = tablet.segment_location("test_segment"); + auto root_location = tablet.root_location(); + ASSIGN_OR_ABORT(auto writer, tablet.new_writer(kVertical, next_id(), 1)); + writer->close(); +} + +#endif // USE_STAROS + INSTANTIATE_TEST_SUITE_P(LakeTabletWriterTest, LakeTabletWriterTest, ::testing::Values(DUP_KEYS, AGG_KEYS, UNIQUE_KEYS, PRIMARY_KEYS)); diff --git a/be/test/storage/lake/test_util.h b/be/test/storage/lake/test_util.h index bef83ea4bf9df..7d8f0d21dfcbb 100644 --- a/be/test/storage/lake/test_util.h +++ b/be/test/storage/lake/test_util.h @@ -71,9 +71,9 @@ class TestBase : public ::testing::Test { : _test_dir(std::move(test_dir)), _parent_tracker(std::make_unique(-1)), _mem_tracker(std::make_unique(1024 * 1024, "", _parent_tracker.get())), - _lp(std::make_unique(_test_dir)), - _update_mgr(std::make_unique(_lp.get(), _mem_tracker.get())), - _tablet_mgr(std::make_unique(_lp.get(), _update_mgr.get(), cache_limit)) {} + _lp(std::make_shared(_test_dir)), + _update_mgr(std::make_unique(_lp, _mem_tracker.get())), + _tablet_mgr(std::make_unique(_lp, _update_mgr.get(), cache_limit)) {} void remove_test_dir_or_die() { ASSERT_OK(fs::remove_all(_test_dir)); } @@ -103,7 +103,7 @@ class TestBase : public ::testing::Test { std::string _test_dir; std::unique_ptr _parent_tracker; std::unique_ptr _mem_tracker; - std::unique_ptr _lp; + std::shared_ptr _lp; std::unique_ptr _update_mgr; std::unique_ptr _tablet_mgr; }; diff --git a/build.sh b/build.sh index 2b81d81bfe30c..8ef90c67d4cac 100755 --- a/build.sh +++ b/build.sh @@ -80,6 +80,7 @@ usage() { Usage: $0 Optional options: --be build Backend + --format-lib build StarRocks format library, only with shared-data mode cluster --fe build Frontend and Spark Dpp application --spark-dpp build Spark DPP application --hive-udf build Hive UDF @@ -106,6 +107,7 @@ Usage: $0 Eg. $0 build all $0 --be build Backend without clean + $0 --format-lib build StarRocks format library without clean $0 --fe --clean clean and build Frontend and Spark Dpp application $0 --fe --be --clean clean and build Frontend, Spark Dpp application and Backend $0 --spark-dpp build Spark DPP application alone @@ -119,6 +121,7 @@ OPTS=$(getopt \ -n $0 \ -o 'hj:' \ -l 'be' \ + -l 'format-lib' \ -l 'fe' \ -l 'spark-dpp' \ -l 'hive-udf' \ @@ -145,6 +148,7 @@ fi eval set -- "$OPTS" BUILD_BE= +BUILD_FORMAT_LIB= BUILD_FE= BUILD_SPARK_DPP= BUILD_HIVE_UDF= @@ -163,6 +167,7 @@ MSG="" MSG_FE="Frontend" MSG_DPP="Spark Dpp application" MSG_BE="Backend" +MSG_FORMAT_LIB="Format Lib" if [[ -z ${USE_AVX2} ]]; then USE_AVX2=ON fi @@ -218,6 +223,7 @@ if [ $# == 1 ] ; then BUILD_FE=1 BUILD_SPARK_DPP=1 BUILD_HIVE_UDF=1 + BUILD_FORMAT_LIB=0 CLEAN=0 RUN_UT=0 elif [[ $OPTS =~ "-j " ]] && [ $# == 3 ]; then @@ -226,11 +232,13 @@ elif [[ $OPTS =~ "-j " ]] && [ $# == 3 ]; then BUILD_FE=1 BUILD_SPARK_DPP=1 BUILD_HIVE_UDF=1 + BUILD_FORMAT_LIB=0 CLEAN=0 RUN_UT=0 PARALLEL=$2 else BUILD_BE=0 + BUILD_FORMAT_LIB=0 BUILD_FE=0 BUILD_SPARK_DPP=0 BUILD_HIVE_UDF=0 @@ -239,6 +247,7 @@ else while true; do case "$1" in --be) BUILD_BE=1 ; shift ;; + --format-lib) BUILD_FORMAT_LIB=1 ; shift ;; --fe) BUILD_FE=1 ; shift ;; --spark-dpp) BUILD_SPARK_DPP=1 ; shift ;; --hive-udf) BUILD_HIVE_UDF=1 ; shift ;; @@ -268,13 +277,22 @@ if [[ ${HELP} -eq 1 ]]; then exit fi -if [ ${CLEAN} -eq 1 ] && [ ${BUILD_BE} -eq 0 ] && [ ${BUILD_FE} -eq 0 ] && [ ${BUILD_SPARK_DPP} -eq 0 ] && [ ${BUILD_HIVE_UDF} -eq 0 ]; then - echo "--clean can not be specified without --fe or --be or --spark-dpp or --hive-udf" +if [ ${CLEAN} -eq 1 ] && [ ${BUILD_BE} -eq 0 ] && [ ${BUILD_FORMAT_LIB} -eq 0 ] && [ ${BUILD_FE} -eq 0 ] && [ ${BUILD_SPARK_DPP} -eq 0 ] && [ ${BUILD_HIVE_UDF} -eq 0 ]; then + echo "--clean can not be specified without --fe or --be or --format-lib or --spark-dpp or --hive-udf" exit 1 fi +if [ ${BUILD_BE} -eq 1 ] && [ ${BUILD_FORMAT_LIB} -eq 1 ]; then + echo "--format-lib can not be specified with --be" + exit 1 +fi +if [ ${BUILD_FORMAT_LIB} -eq 1 ]; then + echo "do not build java extendsions when build format-lib." + BUILD_JAVA_EXT=OFF +fi echo "Get params: BUILD_BE -- $BUILD_BE + BUILD_FORMAT_LIB -- $BUILD_FORMAT_LIB BE_CMAKE_TYPE -- $BUILD_TYPE BUILD_FE -- $BUILD_FE BUILD_SPARK_DPP -- $BUILD_SPARK_DPP @@ -341,11 +359,15 @@ else fi # Clean and build Backend -if [ ${BUILD_BE} -eq 1 ] ; then +if [ ${BUILD_BE} -eq 1 ] || [ ${BUILD_FORMAT_LIB} -eq 1 ] ; then if ! ${CMAKE_CMD} --version; then echo "Error: cmake is not found" exit 1 fi + # When build starrocks format lib, USE_STAROS must be ON + if [ ${BUILD_FORMAT_LIB} -eq 1 ] ; then + USE_STAROS=ON + fi CMAKE_BUILD_TYPE=$BUILD_TYPE echo "Build Backend: ${CMAKE_BUILD_TYPE}" @@ -353,6 +375,9 @@ if [ ${BUILD_BE} -eq 1 ] ; then if [ "${WITH_GCOV}" = "ON" ]; then CMAKE_BUILD_DIR=${STARROCKS_HOME}/be/build_${CMAKE_BUILD_TYPE}_gcov fi + if [ ${BUILD_FORMAT_LIB} -eq 1 ] ; then + CMAKE_BUILD_DIR=${STARROCKS_HOME}/be/build_${CMAKE_BUILD_TYPE}_format-lib + fi if [ ${CLEAN} -eq 1 ]; then rm -rf $CMAKE_BUILD_DIR @@ -400,9 +425,12 @@ if [ ${BUILD_BE} -eq 1 ] ; then -DWITH_STARCACHE=${WITH_STARCACHE} \ -DUSE_STAROS=${USE_STAROS} \ -DENABLE_FAULT_INJECTION=${ENABLE_FAULT_INJECTION} \ + -DCMAKE_EXPORT_COMPILE_COMMANDS=ON .. \ + -DBUILD_BE=${BUILD_BE} \ -DWITH_TENANN=${WITH_TENANN} \ -DSTARROCKS_JIT_ENABLE=${ENABLE_JIT} \ - -DCMAKE_EXPORT_COMPILE_COMMANDS=ON .. + -DCMAKE_EXPORT_COMPILE_COMMANDS=ON .. \ + -DBUILD_FORMAT_LIB=${BUILD_FORMAT_LIB} time ${BUILD_SYSTEM} -j${PARALLEL} if [ "${WITH_CLANG_TIDY}" == "ON" ];then @@ -492,6 +520,27 @@ if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then fi fi +if [ ${BUILD_FORMAT_LIB} -eq 1 ]; then + rm -rf ${STARROCKS_OUTPUT}/format-lib/* + mkdir -p ${STARROCKS_OUTPUT}/format-lib + cp -r ${STARROCKS_HOME}/be/output/format-lib/* ${STARROCKS_OUTPUT}/format-lib/ + # format $BUILD_TYPE to lower case + ibuildtype=`echo ${BUILD_TYPE} | tr 'A-Z' 'a-z'` + if [ "${ibuildtype}" == "release" ] ; then + pushd ${STARROCKS_OUTPUT}/format-lib/ &>/dev/null + FORMAT_LIB=libstarrocks_format.so + FORMAT_LIB_DEBUGINFO=libstarrocks_format.debuginfo + echo "Split $FORMAT_LIB debug symbol to $FORMAT_LIB_DEBUGINFO ..." + # strip be binary + # if eu-strip is available, can replace following three lines into `eu-strip -g -f starrocks_be.debuginfo starrocks_be` + objcopy --only-keep-debug $FORMAT_LIB $FORMAT_LIB_DEBUGINFO + strip --strip-debug $FORMAT_LIB + objcopy --add-gnu-debuglink=$FORMAT_LIB_DEBUGINFO $FORMAT_LIB + popd &>/dev/null + fi + MSG="${MSG} √ ${MSG_FORMAT_LIB}" +fi + if [ ${BUILD_BE} -eq 1 ]; then rm -rf ${STARROCKS_OUTPUT}/be/lib/* mkdir -p ${STARROCKS_OUTPUT}/be/lib/jni-packages diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 1f18db03dd72d..abf911ff76f23 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -416,7 +416,7 @@ build_simdjson() { #ref: https://github.com/simdjson/simdjson/blob/master/HACKING.md mkdir -p $BUILD_DIR cd $BUILD_DIR - $CMAKE_CMD -G "${CMAKE_GENERATOR}" -DCMAKE_CXX_FLAGS="-O3" -DCMAKE_C_FLAGS="-O3" -DCMAKE_POSITION_INDEPENDENT_CODE=True -DSIMDJSON_AVX512_ALLOWED=OFF .. + $CMAKE_CMD -G "${CMAKE_GENERATOR}" -DCMAKE_CXX_FLAGS="-O3 -fPIC" -DCMAKE_C_FLAGS="-O3 -fPIC" -DCMAKE_POSITION_INDEPENDENT_CODE=True -DSIMDJSON_AVX512_ALLOWED=OFF .. $CMAKE_CMD --build . mkdir -p $TP_INSTALL_DIR/lib @@ -528,7 +528,6 @@ build_lzo2() { build_bzip() { check_if_source_exist $BZIP_SOURCE cd $TP_SOURCE_DIR/$BZIP_SOURCE - make -j$PARALLEL install PREFIX=$TP_INSTALL_DIR }