Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](iceberg)Supports using rest type catalog to read tables in unity catalog for 2.1 (#43525) #45217

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions be/src/io/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,70 @@ namespace io {

Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(create_file_impl(path, writer, opts));
}

Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(open_file_impl(path, reader, opts));
}

Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
}

Status FileSystem::delete_file(const Path& file) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(delete_file_impl(path));
}

Status FileSystem::delete_directory(const Path& dir) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(delete_directory_impl(path));
}

Status FileSystem::batch_delete(const std::vector<Path>& files) {
std::vector<Path> abs_files;
for (auto& file : files) {
abs_files.push_back(absolute_path(file));
Path abs_file;
RETURN_IF_ERROR(absolute_path(file, abs_file));
abs_files.push_back(abs_file);
}
FILESYSTEM_M(batch_delete_impl(abs_files));
}

Status FileSystem::exists(const Path& path, bool* res) const {
auto fs_path = absolute_path(path);
Path fs_path;
RETURN_IF_ERROR(absolute_path(path, fs_path));
FILESYSTEM_M(exists_impl(fs_path, res));
}

Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(file_size_impl(path, file_size));
}

Status FileSystem::list(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(list_impl(path, only_file, files, exists));
}

Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
auto orig_path = absolute_path(orig_name);
auto new_path = absolute_path(new_name);
Path orig_path;
RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
Path new_path;
RETURN_IF_ERROR(absolute_path(new_name, new_path));
FILESYSTEM_M(rename_impl(orig_path, new_path));
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,13 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> {
/// rename file from orig_name to new_name
virtual Status rename_impl(const Path& orig_name, const Path& new_name) = 0;

virtual Path absolute_path(const Path& path) const {
virtual Status absolute_path(const Path& path, Path& abs_path) const {
if (path.is_absolute()) {
return path;
abs_path = path;
} else {
abs_path = _root_path / path;
}
return _root_path / path;
return Status::OK();
}

FileSystem(Path&& root_path, std::string&& id, FileSystemType type)
Expand Down
82 changes: 71 additions & 11 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ Status LocalFileSystem::delete_directory_impl(const Path& dir) {
}

Status LocalFileSystem::delete_directory_or_file(const Path& path) {
auto the_path = absolute_path(path);
Path the_path;
RETURN_IF_ERROR(absolute_path(path, the_path));
FILESYSTEM_M(delete_directory_or_file_impl(the_path));
}

Expand Down Expand Up @@ -248,8 +249,10 @@ Status LocalFileSystem::rename_impl(const Path& orig_name, const Path& new_name)
}

Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
auto src_file = absolute_path(src);
auto dest_file = absolute_path(dest);
Path src_file;
RETURN_IF_ERROR(absolute_path(src, src_file));
Path dest_file;
RETURN_IF_ERROR(absolute_path(dest, dest_file));
FILESYSTEM_M(link_file_impl(src_file, dest_file));
}

Expand All @@ -272,7 +275,8 @@ Status LocalFileSystem::canonicalize(const Path& path, std::string* real_path) {
}

Status LocalFileSystem::is_directory(const Path& path, bool* res) {
auto tmp_path = absolute_path(path);
Path tmp_path;
RETURN_IF_ERROR(absolute_path(path, tmp_path));
std::error_code ec;
*res = std::filesystem::is_directory(tmp_path, ec);
if (ec) {
Expand All @@ -282,7 +286,8 @@ Status LocalFileSystem::is_directory(const Path& path, bool* res) {
}

Status LocalFileSystem::md5sum(const Path& file, std::string* md5sum) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(md5sum_impl(path, md5sum));
}

Expand Down Expand Up @@ -318,8 +323,9 @@ Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) {

Status LocalFileSystem::iterate_directory(const std::string& dir,
const std::function<bool(const FileInfo& file)>& cb) {
auto path = absolute_path(dir);
FILESYSTEM_M(iterate_directory_impl(dir, cb));
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(iterate_directory_impl(path, cb));
}

Status LocalFileSystem::iterate_directory_impl(
Expand All @@ -336,7 +342,8 @@ Status LocalFileSystem::iterate_directory_impl(
}

Status LocalFileSystem::get_space_info(const Path& dir, size_t* capacity, size_t* available) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(get_space_info_impl(path, capacity, available));
}

Expand All @@ -353,8 +360,10 @@ Status LocalFileSystem::get_space_info_impl(const Path& path, size_t* capacity,
}

Status LocalFileSystem::copy_path(const Path& src, const Path& dest) {
auto src_path = absolute_path(src);
auto dest_path = absolute_path(dest);
Path src_path;
RETURN_IF_ERROR(absolute_path(src, src_path));
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest, dest_path));
FILESYSTEM_M(copy_path_impl(src_path, dest_path));
}

Expand Down Expand Up @@ -455,7 +464,8 @@ Status LocalFileSystem::_glob(const std::string& pattern, std::vector<std::strin
}

Status LocalFileSystem::permission(const Path& file, std::filesystem::perms prms) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(permission_impl(path, prms));
}

Expand All @@ -468,5 +478,55 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
return Status::OK();
}

Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& abs_path) {
// valid path include:
// 1. abc/def will return abc/def
// 2. /abc/def will return /abc/def
// 3. file:/abc/def will return /abc/def
// 4. file://<authority>/abc/def will return /abc/def
std::string path_str = input_path_str;
size_t slash = path_str.find('/');
if (slash == 0) {
abs_path = input_path_str;
return Status::OK();
}

// Initialize scheme and authority
std::string scheme;
size_t start = 0;

// Parse URI scheme
size_t colon = path_str.find(':');
if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) {
// Has a scheme
scheme = path_str.substr(0, colon);
if (scheme != "file") {
return Status::InternalError(
"Only supports `file` type scheme, like 'file:///path', 'file:/path'.");
}
start = colon + 1;
}

// Parse URI authority, if any
if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) {
// Has authority
// such as : path_str = "file://authority/abc/def"
// and now : start = 5
size_t next_slash = path_str.find('/', start + 2);
// now : next_slash = 16
if (next_slash == std::string::npos) {
return Status::InternalError(
"This input string only has authority, but has no path information");
}
// We will skit authority
// now : start = 16
start = next_slash;
}

// URI path is the rest of the string
abs_path = path_str.substr(start);
return Status::OK();
}

} // namespace io
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <vector>

#include "common/exception.h"
#include "common/status.h"
#include "io/fs/file_system.h"
#include "io/fs/path.h"
Expand All @@ -33,6 +34,7 @@ namespace doris::io {
class LocalFileSystem final : public FileSystem {
public:
static std::shared_ptr<LocalFileSystem> create(Path path, std::string id = "");
static Status convert_to_abs_path(const Path& path, Path& abs_path);
~LocalFileSystem() override;

/// hard link dest file to src file
Expand Down Expand Up @@ -98,6 +100,10 @@ class LocalFileSystem final : public FileSystem {
Status copy_path_impl(const Path& src, const Path& dest);
Status permission_impl(const Path& file, std::filesystem::perms prms);

Status absolute_path(const Path& path, Path& abs_path) const override {
return convert_to_abs_path(path, abs_path);
}

private:
// a wrapper for glob(), return file list in "res"
Status _glob(const std::string& pattern, std::vector<std::string>* res);
Expand Down
10 changes: 7 additions & 3 deletions be/src/io/fs/remote_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,25 @@ namespace doris {
namespace io {

Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) {
auto dest_path = absolute_path(dest_file);
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
FILESYSTEM_M(upload_impl(local_file, dest_path));
}

Status RemoteFileSystem::batch_upload(const std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
std::vector<Path> remote_paths;
for (auto& path : remote_files) {
remote_paths.push_back(absolute_path(path));
Path abs_path;
RETURN_IF_ERROR(absolute_path(path, abs_path));
remote_paths.push_back(abs_path);
}
FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
}

Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
auto remote_path = absolute_path(remote_file);
Path remote_path;
RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
FILESYSTEM_M(download_impl(remote_path, local));
}

Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,17 @@ class S3FileSystem final : public RemoteFileSystem {
const std::vector<Path>& remote_files) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.string().find("://") != std::string::npos) {
// the path is with schema, which means this is a full path like:
// s3://bucket/path/to/file.txt
// so no need to concat with prefix
return path;
abs_path = path;
} else {
// path with no schema
return _root_path / path;
abs_path = _root_path / path;
}
return Status::OK();
}

private:
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
if (_fields[i].field_id != -1) {
_field_id_name_mapping.emplace(_fields[i].field_id, _fields[i].name);
}
}

if (_next_schema_pos != t_schemas.size()) {
Expand All @@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::OK();
}

const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: return type 'const doris::Slice' is 'const'-qualified at the top level, which may reduce code readability without improving const correctness [readability-const-return-type]

Suggested change
const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {

be/src/vec/exec/format/parquet/schema_desc.h:136:

-     const doris::Slice get_column_name_from_field_id(int32_t id) const;
+     doris::Slice get_column_name_from_field_id(int32_t id) const;

auto const it = _field_id_name_mapping.find(id);
if (it == _field_id_name_mapping.end()) {
return {};
}
return {it->second.data()};
}

Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema* node_field) {
if (curr_pos >= t_schemas.size()) {
Expand All @@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
node_field->type.add_sub_type(child->type);
node_field->is_nullable = false;
_next_schema_pos = curr_pos + 1;
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
} else {
bool is_optional = is_optional_node(t_schema);
if (is_optional) {
Expand All @@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
}

std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
Expand Down Expand Up @@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
group_field->type.type = TYPE_ARRAY;
group_field->type.add_sub_type(struct_field->type);
group_field->is_nullable = false;
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
} else {
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
}
Expand Down Expand Up @@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
list_field->type.type = TYPE_ARRAY;
list_field->type.add_sub_type(list_field->children[0].type);
list_field->is_nullable = is_optional;
list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;

return Status::OK();
}
Expand Down Expand Up @@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
map_field->type.add_sub_type(map_kv_field->type.children[0]);
map_field->type.add_sub_type(map_kv_field->type.children[1]);
map_field->is_nullable = is_optional;
map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;

return Status::OK();
}
Expand All @@ -619,6 +635,8 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
struct_field->name = to_lower(struct_schema.name);
struct_field->is_nullable = is_optional;
struct_field->type.type = TYPE_STRUCT;
struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;

for (int i = 0; i < num_children; ++i) {
struct_field->type.add_sub_type(struct_field->children[i].type,
struct_field->children[i].name);
Expand Down
Loading
Loading