diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp index 3579a5323d9217..e6b5ef7df1a8f5 100644 --- a/be/src/io/fs/file_system.cpp +++ b/be/src/io/fs/file_system.cpp @@ -25,58 +25,70 @@ namespace io { Status FileSystem::create_file(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { - auto path = absolute_path(file); + Path path; + RETURN_IF_ERROR(absolute_path(file, path)); FILESYSTEM_M(create_file_impl(path, writer, opts)); } Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader, const FileReaderOptions* opts) { - auto path = absolute_path(file); + Path path; + RETURN_IF_ERROR(absolute_path(file, path)); FILESYSTEM_M(open_file_impl(path, reader, opts)); } Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) { - auto path = absolute_path(dir); + Path path; + RETURN_IF_ERROR(absolute_path(dir, path)); FILESYSTEM_M(create_directory_impl(path, failed_if_exists)); } Status FileSystem::delete_file(const Path& file) { - auto path = absolute_path(file); + Path path; + RETURN_IF_ERROR(absolute_path(file, path)); FILESYSTEM_M(delete_file_impl(path)); } Status FileSystem::delete_directory(const Path& dir) { - auto path = absolute_path(dir); + Path path; + RETURN_IF_ERROR(absolute_path(dir, path)); FILESYSTEM_M(delete_directory_impl(path)); } Status FileSystem::batch_delete(const std::vector& files) { std::vector abs_files; for (auto& file : files) { - abs_files.push_back(absolute_path(file)); + Path abs_file; + RETURN_IF_ERROR(absolute_path(file, abs_file)); + abs_files.push_back(abs_file); } FILESYSTEM_M(batch_delete_impl(abs_files)); } Status FileSystem::exists(const Path& path, bool* res) const { - auto fs_path = absolute_path(path); + Path fs_path; + RETURN_IF_ERROR(absolute_path(path, fs_path)); FILESYSTEM_M(exists_impl(fs_path, res)); } Status FileSystem::file_size(const Path& file, int64_t* file_size) const { - auto path = absolute_path(file); + Path path; + RETURN_IF_ERROR(absolute_path(file, path)); FILESYSTEM_M(file_size_impl(path, file_size)); } Status FileSystem::list(const Path& dir, bool only_file, std::vector* files, bool* exists) { - auto path = absolute_path(dir); + Path path; + RETURN_IF_ERROR(absolute_path(dir, path)); FILESYSTEM_M(list_impl(path, only_file, files, exists)); } Status FileSystem::rename(const Path& orig_name, const Path& new_name) { - auto orig_path = absolute_path(orig_name); - auto new_path = absolute_path(new_name); + Path orig_path; + RETURN_IF_ERROR(absolute_path(orig_name, orig_path)); + Path new_path; + RETURN_IF_ERROR(absolute_path(new_name, new_path)); FILESYSTEM_M(rename_impl(orig_path, new_path)); } diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index a8cdf5f4eb6cb4..dd6a63222a5c9b 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -144,11 +144,13 @@ class FileSystem : public std::enable_shared_from_this { /// rename file from orig_name to new_name virtual Status rename_impl(const Path& orig_name, const Path& new_name) = 0; - virtual Path absolute_path(const Path& path) const { + virtual Status absolute_path(const Path& path, Path& abs_path) const { if (path.is_absolute()) { - return path; + abs_path = path; + } else { + abs_path = _root_path / path; } - return _root_path / path; + return Status::OK(); } FileSystem(Path&& root_path, std::string&& id, FileSystemType type) diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 2c4b6bb5f8259f..9370c4cae5b536 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -146,7 +146,8 @@ Status LocalFileSystem::delete_directory_impl(const Path& dir) { } Status LocalFileSystem::delete_directory_or_file(const Path& path) { - auto the_path = absolute_path(path); + Path the_path; + RETURN_IF_ERROR(absolute_path(path, the_path)); FILESYSTEM_M(delete_directory_or_file_impl(the_path)); } @@ -248,8 +249,10 @@ Status LocalFileSystem::rename_impl(const Path& orig_name, const Path& new_name) } Status LocalFileSystem::link_file(const Path& src, const Path& dest) { - auto src_file = absolute_path(src); - auto dest_file = absolute_path(dest); + Path src_file; + RETURN_IF_ERROR(absolute_path(src, src_file)); + Path dest_file; + RETURN_IF_ERROR(absolute_path(dest, dest_file)); FILESYSTEM_M(link_file_impl(src_file, dest_file)); } @@ -272,7 +275,8 @@ Status LocalFileSystem::canonicalize(const Path& path, std::string* real_path) { } Status LocalFileSystem::is_directory(const Path& path, bool* res) { - auto tmp_path = absolute_path(path); + Path tmp_path; + RETURN_IF_ERROR(absolute_path(path, tmp_path)); std::error_code ec; *res = std::filesystem::is_directory(tmp_path, ec); if (ec) { @@ -282,7 +286,8 @@ Status LocalFileSystem::is_directory(const Path& path, bool* res) { } Status LocalFileSystem::md5sum(const Path& file, std::string* md5sum) { - auto path = absolute_path(file); + Path path; + RETURN_IF_ERROR(absolute_path(file, path)); FILESYSTEM_M(md5sum_impl(path, md5sum)); } @@ -318,8 +323,9 @@ Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) { Status LocalFileSystem::iterate_directory(const std::string& dir, const std::function& cb) { - auto path = absolute_path(dir); - FILESYSTEM_M(iterate_directory_impl(dir, cb)); + Path path; + RETURN_IF_ERROR(absolute_path(dir, path)); + FILESYSTEM_M(iterate_directory_impl(path, cb)); } Status LocalFileSystem::iterate_directory_impl( @@ -336,7 +342,8 @@ Status LocalFileSystem::iterate_directory_impl( } Status LocalFileSystem::get_space_info(const Path& dir, size_t* capacity, size_t* available) { - auto path = absolute_path(dir); + Path path; + RETURN_IF_ERROR(absolute_path(dir, path)); FILESYSTEM_M(get_space_info_impl(path, capacity, available)); } @@ -353,8 +360,10 @@ Status LocalFileSystem::get_space_info_impl(const Path& path, size_t* capacity, } Status LocalFileSystem::copy_path(const Path& src, const Path& dest) { - auto src_path = absolute_path(src); - auto dest_path = absolute_path(dest); + Path src_path; + RETURN_IF_ERROR(absolute_path(src, src_path)); + Path dest_path; + RETURN_IF_ERROR(absolute_path(dest, dest_path)); FILESYSTEM_M(copy_path_impl(src_path, dest_path)); } @@ -455,7 +464,8 @@ Status LocalFileSystem::_glob(const std::string& pattern, std::vector/abc/def will return /abc/def + std::string path_str = input_path_str; + size_t slash = path_str.find('/'); + if (slash == 0) { + abs_path = input_path_str; + return Status::OK(); + } + + // Initialize scheme and authority + std::string scheme; + size_t start = 0; + + // Parse URI scheme + size_t colon = path_str.find(':'); + if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) { + // Has a scheme + scheme = path_str.substr(0, colon); + if (scheme != "file") { + return Status::InternalError( + "Only supports `file` type scheme, like 'file:///path', 'file:/path'."); + } + start = colon + 1; + } + + // Parse URI authority, if any + if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) { + // Has authority + // such as : path_str = "file://authority/abc/def" + // and now : start = 5 + size_t next_slash = path_str.find('/', start + 2); + // now : next_slash = 16 + if (next_slash == std::string::npos) { + return Status::InternalError( + "This input string only has authority, but has no path information"); + } + // We will skit authority + // now : start = 16 + start = next_slash; + } + + // URI path is the rest of the string + abs_path = path_str.substr(start); + return Status::OK(); +} + } // namespace io } // namespace doris diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 8578b9f5ac2512..d765f2daeeca50 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -24,6 +24,7 @@ #include #include +#include "common/exception.h" #include "common/status.h" #include "io/fs/file_system.h" #include "io/fs/path.h" @@ -33,6 +34,7 @@ namespace doris::io { class LocalFileSystem final : public FileSystem { public: static std::shared_ptr create(Path path, std::string id = ""); + static Status convert_to_abs_path(const Path& path, Path& abs_path); ~LocalFileSystem() override; /// hard link dest file to src file @@ -98,6 +100,10 @@ class LocalFileSystem final : public FileSystem { Status copy_path_impl(const Path& src, const Path& dest); Status permission_impl(const Path& file, std::filesystem::perms prms); + Status absolute_path(const Path& path, Path& abs_path) const override { + return convert_to_abs_path(path, abs_path); + } + private: // a wrapper for glob(), return file list in "res" Status _glob(const std::string& pattern, std::vector* res); diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp index dc4830be41e6ea..dcde80b9d216c4 100644 --- a/be/src/io/fs/remote_file_system.cpp +++ b/be/src/io/fs/remote_file_system.cpp @@ -30,7 +30,8 @@ namespace doris { namespace io { Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) { - auto dest_path = absolute_path(dest_file); + Path dest_path; + RETURN_IF_ERROR(absolute_path(dest_file, dest_path)); FILESYSTEM_M(upload_impl(local_file, dest_path)); } @@ -38,13 +39,16 @@ Status RemoteFileSystem::batch_upload(const std::vector& local_files, const std::vector& remote_files) { std::vector remote_paths; for (auto& path : remote_files) { - remote_paths.push_back(absolute_path(path)); + Path abs_path; + RETURN_IF_ERROR(absolute_path(path, abs_path)); + remote_paths.push_back(abs_path); } FILESYSTEM_M(batch_upload_impl(local_files, remote_paths)); } Status RemoteFileSystem::download(const Path& remote_file, const Path& local) { - auto remote_path = absolute_path(remote_file); + Path remote_path; + RETURN_IF_ERROR(absolute_path(remote_file, remote_path)); FILESYSTEM_M(download_impl(remote_path, local)); } diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 26bf8186a93e35..2346d1b2f43480 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -91,16 +91,17 @@ class S3FileSystem final : public RemoteFileSystem { const std::vector& remote_files) override; Status download_impl(const Path& remote_file, const Path& local_file) override; - Path absolute_path(const Path& path) const override { + Status absolute_path(const Path& path, Path& abs_path) const override { if (path.string().find("://") != std::string::npos) { // the path is with schema, which means this is a full path like: // s3://bucket/path/to/file.txt // so no need to concat with prefix - return path; + abs_path = path; } else { // path with no schema - return _root_path / path; + abs_path = _root_path / path; } + return Status::OK(); } private: diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index 9097b65718f53d..410e5eb7a1f1c5 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const std::vectorsecond.data()}; +} + Status FieldDescriptor::parse_node_field(const std::vector& t_schemas, size_t curr_pos, FieldSchema* node_field) { if (curr_pos >= t_schemas.size()) { @@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const std::vectortype.add_sub_type(child->type); node_field->is_nullable = false; _next_schema_pos = curr_pos + 1; + node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1; } else { bool is_optional = is_optional_node(t_schema); if (is_optional) { @@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic auto type = get_doris_type(physical_schema); physical_field->type = type.first; physical_field->is_type_compatibility = type.second; + physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1; } std::pair FieldDescriptor::get_doris_type( @@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const std::vectortype.type = TYPE_ARRAY; group_field->type.add_sub_type(struct_field->type); group_field->is_nullable = false; + group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1; } else { RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field)); } @@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const std::vectortype.type = TYPE_ARRAY; list_field->type.add_sub_type(list_field->children[0].type); list_field->is_nullable = is_optional; + list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1; return Status::OK(); } @@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const std::vectortype.add_sub_type(map_kv_field->type.children[0]); map_field->type.add_sub_type(map_kv_field->type.children[1]); map_field->is_nullable = is_optional; + map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1; return Status::OK(); } @@ -619,6 +635,8 @@ Status FieldDescriptor::parse_struct_field(const std::vectorname = to_lower(struct_schema.name); struct_field->is_nullable = is_optional; struct_field->type.type = TYPE_STRUCT; + struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1; + for (int i = 0; i < num_children; ++i) { struct_field->type.add_sub_type(struct_field->children[i].type, struct_field->children[i].name); diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index ca726ef1b57590..3a139e3c456a2f 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -28,6 +28,7 @@ #include "common/status.h" #include "runtime/types.h" +#include "util/slice.h" namespace doris::vectorized { @@ -56,6 +57,7 @@ struct FieldSchema { ~FieldSchema() = default; FieldSchema(const FieldSchema& fieldSchema) = default; std::string debug_string() const; + int32_t field_id; }; class FieldDescriptor { @@ -68,6 +70,7 @@ class FieldDescriptor { std::unordered_map _name_to_field; // Used in from_thrift, marking the next schema position that should be parsed size_t _next_schema_pos; + std::unordered_map _field_id_name_mapping; void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable, FieldSchema* physical_field); @@ -128,6 +131,10 @@ class FieldDescriptor { std::string debug_string() const; int32_t size() const { return _fields.size(); } + + bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; } + + const doris::Slice get_column_name_from_field_id(int32_t id) const; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 32391d1f0dad5d..0f204172fc6d8c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -253,10 +253,8 @@ Status ParquetReader::_open_file() { return Status::OK(); } -// Get iceberg col id to col name map stored in parquet metadata key values. -// This is for iceberg schema evolution. -std::vector ParquetReader::get_metadata_key_values() { - return _t_metadata->key_value_metadata; +const FieldDescriptor ParquetReader::get_file_metadata_schema() { + return _file_metadata->schema(); } Status ParquetReader::open() { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 1d70f9ab5d5bda..1928ebe6aa3651 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -148,7 +148,7 @@ class ParquetReader : public GenericReader { partition_columns, const std::unordered_map& missing_columns) override; - std::vector get_metadata_key_values(); + const FieldDescriptor get_file_metadata_schema(); void set_table_to_file_col_map(std::unordered_map& map) { _table_col_to_file_col = map; } diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 295a3a405441b6..8f130ca6002d5d 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -53,6 +53,7 @@ #include "vec/exec/format/format_common.h" #include "vec/exec/format/generic_reader.h" #include "vec/exec/format/orc/vorc_reader.h" +#include "vec/exec/format/parquet/schema_desc.h" #include "vec/exec/format/table/table_format_reader.h" namespace cctz { @@ -546,8 +547,8 @@ Status IcebergParquetReader::init_reader( _col_id_name_map = col_id_name_map; _file_col_names = file_col_names; _colname_to_value_range = colname_to_value_range; - auto parquet_meta_kv = parquet_reader->get_metadata_key_values(); - RETURN_IF_ERROR(_gen_col_name_maps(parquet_meta_kv)); + FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); + RETURN_IF_ERROR(_gen_col_name_maps(field_desc)); _gen_file_col_names(); _gen_new_colname_to_value_range(); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); @@ -672,39 +673,20 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete * 1. col1_new -> col1 * 2. col1 -> col1_new */ -Status IcebergParquetReader::_gen_col_name_maps(std::vector parquet_meta_kv) { - for (int i = 0; i < parquet_meta_kv.size(); ++i) { - tparquet::KeyValue kv = parquet_meta_kv[i]; - if (kv.key == "iceberg.schema") { - _has_iceberg_schema = true; - std::string schema = kv.value; - rapidjson::Document json; - json.Parse(schema.c_str()); - - if (json.HasMember("fields")) { - rapidjson::Value& fields = json["fields"]; - if (fields.IsArray()) { - for (int j = 0; j < fields.Size(); j++) { - rapidjson::Value& e = fields[j]; - rapidjson::Value& id = e["id"]; - rapidjson::Value& name = e["name"]; - std::string name_string = name.GetString(); - transform(name_string.begin(), name_string.end(), name_string.begin(), - ::tolower); - auto iter = _col_id_name_map.find(id.GetInt()); - if (iter != _col_id_name_map.end()) { - _table_col_to_file_col.emplace(iter->second, name_string); - _file_col_to_table_col.emplace(name_string, iter->second); - if (name_string != iter->second) { - _has_schema_change = true; - } - } else { - _has_schema_change = true; - } - } +Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& field_desc) { + if (field_desc.has_parquet_field_id()) { + for (const auto& pair : _col_id_name_map) { + auto name_slice = field_desc.get_column_name_from_field_id(pair.first); + if (name_slice.get_size() == 0) { + _has_schema_change = true; + } else { + auto name_string = name_slice.to_string(); + _table_col_to_file_col.emplace(pair.second, name_string); + _file_col_to_table_col.emplace(name_string, pair.second); + if (name_string != pair.second) { + _has_schema_change = true; } } - break; } } return Status::OK(); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 04f64aad518a9f..2e240f465b6a2c 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -218,7 +218,7 @@ class IcebergParquetReader final : public IcebergTableReader { parquet_reader->set_delete_rows(&_iceberg_delete_rows); } - Status _gen_col_name_maps(std::vector parquet_meta_kv); + Status _gen_col_name_maps(const FieldDescriptor& field_desc); protected: std::unique_ptr _create_equality_reader( diff --git a/be/test/io/fs/local_file_system_test.cpp b/be/test/io/fs/local_file_system_test.cpp index 5cfdb9ac73b9db..e15ba34a2545e4 100644 --- a/be/test/io/fs/local_file_system_test.cpp +++ b/be/test/io/fs/local_file_system_test.cpp @@ -407,4 +407,55 @@ TEST_F(LocalFileSystemTest, TestGlob) { EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok()); } +TEST_F(LocalFileSystemTest, TestConvertToAbsPath) { + io::Path abs_path; + Status st; + + // suppurt path: + st = doris::io::LocalFileSystem::convert_to_abs_path("/abc/def", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("/abc/def", abs_path); + + st = doris::io::LocalFileSystem::convert_to_abs_path("file:/def/hij", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("/def/hij", abs_path); + + st = doris::io::LocalFileSystem::convert_to_abs_path("file://host:80/hij/abc", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("/hij/abc", abs_path); + + st = doris::io::LocalFileSystem::convert_to_abs_path("file://host/abc/def", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("/abc/def", abs_path); + + st = doris::io::LocalFileSystem::convert_to_abs_path("file:///def", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("/def", abs_path); + + st = doris::io::LocalFileSystem::convert_to_abs_path("file:///", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("/", abs_path); + + st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth/", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("/", abs_path); + + st = doris::io::LocalFileSystem::convert_to_abs_path("abc", abs_path); + ASSERT_TRUE(st.ok()); + ASSERT_EQ("abc", abs_path); + + // not support path: + st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth", abs_path); + ASSERT_TRUE(!st.ok()); + + st = doris::io::LocalFileSystem::convert_to_abs_path("fileee:/abc", abs_path); + ASSERT_TRUE(!st.ok()); + + st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:///abc", abs_path); + ASSERT_TRUE(!st.ok()); + + st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:/abc", abs_path); + ASSERT_TRUE(!st.ok()); +} + } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 5a05baf33364df..ddbf7e5e4e6229 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -38,6 +38,7 @@ import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; @@ -143,6 +144,7 @@ public abstract class ExternalCatalog protected Optional useMetaCache = Optional.empty(); protected MetaCache> metaCache; + protected PreExecutionAuthenticator preExecutionAuthenticator; public ExternalCatalog() { } @@ -913,4 +915,8 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException { public String getQualifiedName(String dbName) { return String.join(".", name, dbName); } + + public PreExecutionAuthenticator getPreExecutionAuthenticator() { + return preExecutionAuthenticator; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 20b9482041df02..85b999f1111047 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -27,6 +27,7 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; @@ -34,6 +35,7 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.iceberg.IcebergMetadataOps; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.operations.ExternalMetadataOperations; @@ -88,7 +90,7 @@ public class HMSExternalCatalog extends ExternalCatalog { private boolean enableHmsEventsIncrementalSync = false; //for "type" = "hms" , but is iceberg table. - private HiveCatalog icebergHiveCatalog; + private IcebergMetadataOps icebergMetadataOps; @VisibleForTesting public HMSExternalCatalog() { @@ -168,6 +170,7 @@ public void checkProperties() throws DdlException { @Override protected void initLocalObjectsImpl() { + preExecutionAuthenticator = new PreExecutionAuthenticator(); if (authenticator == null) { AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); @@ -199,8 +202,6 @@ protected void initLocalObjectsImpl() { transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps, fileSystemProvider, fileSystemExecutor); metadataOps = hiveOps; - - icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this, getName()); } @Override @@ -337,10 +338,6 @@ public boolean isEnableHmsEventsIncrementalSync() { return enableHmsEventsIncrementalSync; } - public HiveCatalog getIcebergHiveCatalog() { - return icebergHiveCatalog; - } - /** * Enum for meta tables in hive catalog. * eg: tbl$partitions @@ -393,5 +390,14 @@ public TableValuedFunctionRef createFunctionRef(String ctlName, String dbName, S } } } + + public IcebergMetadataOps getIcebergMetadataOps() { + makeSureInitialized(); + if (icebergMetadataOps == null) { + HiveCatalog icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this, getName()); + icebergMetadataOps = ExternalMetadataOperations.newIcebergMetadataOps(this, icebergHiveCatalog); + } + return icebergMetadataOps; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index d8dfd1c128f162..0fa69825a01ef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -40,11 +40,10 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_HADOOP = "hadoop"; public static final String ICEBERG_GLUE = "glue"; public static final String ICEBERG_DLF = "dlf"; + public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; protected String icebergCatalogType; protected Catalog catalog; - protected PreExecutionAuthenticator preExecutionAuthenticator; - public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index c1ac2a79754b79..ad347ca78f2a4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -36,8 +36,6 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; import org.jetbrains.annotations.NotNull; import java.util.HashMap; @@ -104,18 +102,16 @@ private List loadSnapshots(IcebergMetadataCacheKey key) { @NotNull private Table loadTable(IcebergMetadataCacheKey key) { - Catalog icebergCatalog; + IcebergMetadataOps ops; if (key.catalog instanceof HMSExternalCatalog) { - icebergCatalog = ((HMSExternalCatalog) key.catalog).getIcebergHiveCatalog(); + ops = ((HMSExternalCatalog) key.catalog).getIcebergMetadataOps(); } else if (key.catalog instanceof IcebergExternalCatalog) { - icebergCatalog = ((IcebergExternalCatalog) key.catalog).getCatalog(); + ops = (IcebergMetadataOps) (((IcebergExternalCatalog) key.catalog).getMetadataOps()); } else { throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table"); } - Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog) key.catalog).getConfiguration(), - () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName))); - initIcebergTableFileIO(icebergTable, key.catalog.getProperties()); - return icebergTable; + return HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog) key.catalog).getConfiguration(), + () -> ops.loadTable(key.dbName, key.tableName)); } public void invalidateCatalogCache(long catalogId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 500f9728961958..c2070f4bad498a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -36,6 +36,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -46,29 +47,39 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; public class IcebergMetadataOps implements ExternalMetadataOps { private static final Logger LOG = LogManager.getLogger(IcebergMetadataOps.class); protected Catalog catalog; - protected IcebergExternalCatalog dorisCatalog; + protected ExternalCatalog dorisCatalog; protected SupportsNamespaces nsCatalog; private PreExecutionAuthenticator preExecutionAuthenticator; + // Generally, there should be only two levels under the catalog, namely ., + // but the REST type catalog is obtained from an external server, + // and the level provided by the external server may be three levels, ..
. + // Therefore, if the external server provides a catalog, + // the catalog needs to be recorded here to ensure semantic consistency. + private Optional externalCatalogName = Optional.empty(); - public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) { + public IcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) { this.dorisCatalog = dorisCatalog; this.catalog = catalog; nsCatalog = (SupportsNamespaces) catalog; - this.preExecutionAuthenticator = dorisCatalog.preExecutionAuthenticator; - + this.preExecutionAuthenticator = dorisCatalog.getPreExecutionAuthenticator(); + if (dorisCatalog.getProperties().containsKey(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME)) { + externalCatalogName = + Optional.of(dorisCatalog.getProperties().get(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME)); + } } public Catalog getCatalog() { return catalog; } - public IcebergExternalCatalog getExternalCatalog() { + public ExternalCatalog getExternalCatalog() { return dorisCatalog; } @@ -78,17 +89,18 @@ public void close() { @Override public boolean tableExist(String dbName, String tblName) { - return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + return catalog.tableExists(getTableIdentifier(dbName, tblName)); } public boolean databaseExist(String dbName) { - return nsCatalog.namespaceExists(Namespace.of(dbName)); + return nsCatalog.namespaceExists(getNamespace(dbName)); } public List listDatabaseNames() { try { - return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces().stream() - .map(Namespace::toString) + return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces(getNamespace()) + .stream() + .map(n -> n.level(n.length() - 1)) .collect(Collectors.toList())); } catch (Exception e) { throw new RuntimeException("Failed to list database names, error message is: " + e.getMessage()); @@ -97,7 +109,7 @@ public List listDatabaseNames() { @Override public List listTableNames(String dbName) { - List tableIdentifiers = catalog.listTables(Namespace.of(dbName)); + List tableIdentifiers = catalog.listTables(getNamespace(dbName)); return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList()); } @@ -127,12 +139,14 @@ private void performCreateDb(CreateDbStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName); } } - String icebergCatalogType = dorisCatalog.getIcebergCatalogType(); - if (!properties.isEmpty() && !IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) { - throw new DdlException( + if (!properties.isEmpty() && dorisCatalog instanceof IcebergExternalCatalog) { + String icebergCatalogType = ((IcebergExternalCatalog) dorisCatalog).getIcebergCatalogType(); + if (!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) { + throw new DdlException( "Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType); + } } - nsCatalog.createNamespace(Namespace.of(dbName), properties); + nsCatalog.createNamespace(getNamespace(dbName), properties); dorisCatalog.onRefreshCache(true); } @@ -159,7 +173,7 @@ private void preformDropDb(DropDbStmt stmt) throws DdlException { } } SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; - nsCatalog.dropNamespace(Namespace.of(dbName)); + nsCatalog.dropNamespace(getNamespace(dbName)); dorisCatalog.onRefreshCache(true); } @@ -199,7 +213,7 @@ public boolean performCreateTable(CreateTableStmt stmt) throws UserException { Map properties = stmt.getProperties(); properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE); PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema); - catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); + catalog.createTable(getTableIdentifier(dbName, tableName), schema, partitionSpec, properties); db.setUnInitialized(true); return false; } @@ -237,7 +251,7 @@ private void performDropTable(DropTableStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName); } } - catalog.dropTable(TableIdentifier.of(dbName, tableName), true); + catalog.dropTable(getTableIdentifier(dbName, tableName), true); db.setUnInitialized(true); } @@ -249,4 +263,25 @@ public void truncateTable(String dbName, String tblName, List partitions public PreExecutionAuthenticator getPreExecutionAuthenticator() { return preExecutionAuthenticator; } + + @Override + public Table loadTable(String dbName, String tblName) { + return catalog.loadTable(getTableIdentifier(dbName, tblName)); + } + + private TableIdentifier getTableIdentifier(String dbName, String tblName) { + return externalCatalogName + .map(s -> TableIdentifier.of(s, dbName, tblName)) + .orElseGet(() -> TableIdentifier.of(dbName, tblName)); + } + + private Namespace getNamespace(String dbName) { + return externalCatalogName + .map(s -> Namespace.of(s, dbName)) + .orElseGet(() -> Namespace.of(dbName)); + } + + private Namespace getNamespace() { + return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java index 908a4fa9e3f271..b92d2c91f9630e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java @@ -26,7 +26,6 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.aws.AwsClientProperties; -import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.aws.s3.S3FileIOProperties; import java.util.HashMap; @@ -71,7 +70,6 @@ private Map convertToRestCatalogProperties() { Map props = catalogProperty.getProperties(); Map restProperties = new HashMap<>(props); - restProperties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); restProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_REST); String restUri = props.getOrDefault(CatalogProperties.URI, ""); restProperties.put(CatalogProperties.URI, restUri); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 685915025d665e..d0cca11b0af2bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -22,6 +22,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper; import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext; import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; @@ -140,7 +141,7 @@ public long getUpdateCnt() { private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { Objects.requireNonNull(tableInfo); - IcebergExternalCatalog externalCatalog = ops.getExternalCatalog(); + ExternalCatalog externalCatalog = ops.getExternalCatalog(); return IcebergUtils.getRemoteTable(externalCatalog, tableInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java index 4a2757f918f294..50166fe8305113 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java @@ -17,9 +17,9 @@ package org.apache.doris.datasource.operations; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetadataOps; -import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; @@ -34,7 +34,7 @@ public static HiveMetadataOps newHiveMetadataOps(HiveConf hiveConf, JdbcClientCo return new HiveMetadataOps(hiveConf, jdbcClientConfig, catalog); } - public static IcebergMetadataOps newIcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) { + public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) { return new IcebergMetadataOps(dorisCatalog, catalog); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java index 0333124b35294c..e5ed129c679ffe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java @@ -91,6 +91,10 @@ public interface ExternalMetadataOps { boolean databaseExist(String dbName); + default Object loadTable(String dbName, String tblName) { + throw new UnsupportedOperationException("Load table is not supported."); + } + /** * close the connection, eg, to hms */ diff --git a/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out b/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out new file mode 100644 index 00000000000000..42414c3654930c --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out @@ -0,0 +1,40 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +1 nWYHawtqUw 930 +2 uvOzzthsLV 166 +3 WIAehuXWkv 170 +4 wYCSvnJKTo 709 +5 VsslXsUIDZ 993 +6 ZLsACYYTFy 813 +7 BtDDvLeBpK 52 +8 YISVtrPfGr 8 +9 PBPJHDFjjC 45 +10 qbDuUJzJMO 756 +11 EjqqWoaLJn 712 +12 jpZLMdKXpn 847 +13 acpjQXpJCp 649 +14 nOKqHhRwao 133 +15 kxUUZEUoKv 398 + +-- !q2 -- +7 +8 +9 +10 +11 +12 +13 +14 +15 + +-- !q3 -- +nWYHawtqUw 930 +wYCSvnJKTo 709 +VsslXsUIDZ 993 +ZLsACYYTFy 813 +qbDuUJzJMO 756 +EjqqWoaLJn 712 +jpZLMdKXpn 847 +acpjQXpJCp 649 +kxUUZEUoKv 398 + diff --git a/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy b/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy new file mode 100644 index 00000000000000..48b8b6559ca82e --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_read_unitycatalog_table", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String catalog_name = "iceberg_read_unitycatalog_table" + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """ use ${catalog_name}.test_db """ + String tb = "unitycatalog_marksheet_uniform" + + qt_q1 """ select * from ${tb} order by c1 """ + qt_q2 """ select c1 from ${tb} where c1 > 6 order by c1 """ + qt_q3 """ select c2, c3 from ${tb} where c3 > 200 order by c1 """ + +} + +/* + +spark-sql: + 1. create table marksheet_uniform (c1 int, c2 string, c3 int); + 2. get parquet file from marksheet_uniform; (ref: https://docs.unitycatalog.io/usage/tables/uniform/) + 3. put parquet file to hdfs: hdfs dfs -put hdfs://xxxxx + 4. CALL .system.add_files( + table => '.unitycatalog_db.marksheet_uniform', + source_table => '`parquet`.`hdfs://172.20.32.136:8020/user/doris/preinstalled_data/iceberg_hadoop_warehouse/unitycatalog_db/marksheet_uniform_data/part-00000-5af50cc4-3218-465b-a3a4-eb4fc709421d-c000.snappy.parquet`' + ); +*/ \ No newline at end of file