Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](iceberg)Supports using rest type catalog to read tables in unity catalog for 2.1 (#43525) #45217

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,5 +468,55 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
return Status::OK();
}

Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& abs_path) {
// valid path include:
// 1. abc/def will return abc/def
// 2. /abc/def will return /abc/def
// 3. file:/abc/def will return /abc/def
// 4. file://<authority>/abc/def will return /abc/def
std::string path_str = input_path_str;
size_t slash = path_str.find('/');
if (slash == 0) {
abs_path = input_path_str;
return Status::OK();
}

// Initialize scheme and authority
std::string scheme;
size_t start = 0;

// Parse URI scheme
size_t colon = path_str.find(':');
if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) {
// Has a scheme
scheme = path_str.substr(0, colon);
if (scheme != "file") {
return Status::InternalError(
"Only supports `file` type scheme, like 'file:///path', 'file:/path'.");
}
start = colon + 1;
}

// Parse URI authority, if any
if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) {
// Has authority
// such as : path_str = "file://authority/abc/def"
// and now : start = 5
size_t next_slash = path_str.find('/', start + 2);
// now : next_slash = 16
if (next_slash == std::string::npos) {
return Status::InternalError(
"This input string only has authority, but has no path information");
}
// We will skit authority
// now : start = 16
start = next_slash;
}

// URI path is the rest of the string
abs_path = path_str.substr(start);
return Status::OK();
}

} // namespace io
} // namespace doris
8 changes: 8 additions & 0 deletions be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <vector>

#include "common/exception.h"
#include "common/status.h"
#include "io/fs/file_system.h"
#include "io/fs/path.h"
Expand All @@ -33,6 +34,7 @@ namespace doris::io {
class LocalFileSystem final : public FileSystem {
public:
static std::shared_ptr<LocalFileSystem> create(Path path, std::string id = "");
static Status convert_to_abs_path(const Path& path, Path& abs_path);
~LocalFileSystem() override;

/// hard link dest file to src file
Expand Down Expand Up @@ -98,6 +100,12 @@ class LocalFileSystem final : public FileSystem {
Status copy_path_impl(const Path& src, const Path& dest);
Status permission_impl(const Path& file, std::filesystem::perms prms);

Path absolute_path(const Path& path) const override {
Path abs_path;
THROW_IF_ERROR(convert_to_abs_path(path, abs_path));
return abs_path;
}

private:
// a wrapper for glob(), return file list in "res"
Status _glob(const std::string& pattern, std::vector<std::string>* res);
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
if (_fields[i].field_id != -1) {
_field_id_name_mapping.emplace(_fields[i].field_id, _fields[i].name);
}
}

if (_next_schema_pos != t_schemas.size()) {
Expand All @@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::OK();
}

const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: return type 'const doris::Slice' is 'const'-qualified at the top level, which may reduce code readability without improving const correctness [readability-const-return-type]

Suggested change
const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {

be/src/vec/exec/format/parquet/schema_desc.h:136:

-     const doris::Slice get_column_name_from_field_id(int32_t id) const;
+     doris::Slice get_column_name_from_field_id(int32_t id) const;

auto const it = _field_id_name_mapping.find(id);
if (it == _field_id_name_mapping.end()) {
return {};
}
return {it->second.data()};
}

Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema* node_field) {
if (curr_pos >= t_schemas.size()) {
Expand All @@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
node_field->type.add_sub_type(child->type);
node_field->is_nullable = false;
_next_schema_pos = curr_pos + 1;
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
} else {
bool is_optional = is_optional_node(t_schema);
if (is_optional) {
Expand All @@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
}

std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
Expand Down Expand Up @@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
group_field->type.type = TYPE_ARRAY;
group_field->type.add_sub_type(struct_field->type);
group_field->is_nullable = false;
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
} else {
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
}
Expand Down Expand Up @@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
list_field->type.type = TYPE_ARRAY;
list_field->type.add_sub_type(list_field->children[0].type);
list_field->is_nullable = is_optional;
list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;

return Status::OK();
}
Expand Down Expand Up @@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
map_field->type.add_sub_type(map_kv_field->type.children[0]);
map_field->type.add_sub_type(map_kv_field->type.children[1]);
map_field->is_nullable = is_optional;
map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;

return Status::OK();
}
Expand All @@ -619,6 +635,8 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
struct_field->name = to_lower(struct_schema.name);
struct_field->is_nullable = is_optional;
struct_field->type.type = TYPE_STRUCT;
struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;

for (int i = 0; i < num_children; ++i) {
struct_field->type.add_sub_type(struct_field->children[i].type,
struct_field->children[i].name);
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/status.h"
#include "runtime/types.h"
#include "util/slice.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -56,6 +57,7 @@ struct FieldSchema {
~FieldSchema() = default;
FieldSchema(const FieldSchema& fieldSchema) = default;
std::string debug_string() const;
int32_t field_id;
};

class FieldDescriptor {
Expand All @@ -68,6 +70,7 @@ class FieldDescriptor {
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be parsed
size_t _next_schema_pos;
std::unordered_map<int, std::string> _field_id_name_mapping;

void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
FieldSchema* physical_field);
Expand Down Expand Up @@ -128,6 +131,10 @@ class FieldDescriptor {
std::string debug_string() const;

int32_t size() const { return _fields.size(); }

bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; }

const doris::Slice get_column_name_from_field_id(int32_t id) const;
};

} // namespace doris::vectorized
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,8 @@ Status ParquetReader::_open_file() {
return Status::OK();
}

// Get iceberg col id to col name map stored in parquet metadata key values.
// This is for iceberg schema evolution.
std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
return _t_metadata->key_value_metadata;
const FieldDescriptor ParquetReader::get_file_metadata_schema() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: return type 'const std::doris::vectorized::FieldDescriptor' is 'const'-qualified at the top level, which may reduce code readability without improving const correctness [readability-const-return-type]

Suggested change
const FieldDescriptor ParquetReader::get_file_metadata_schema() {
FieldDescriptor ParquetReader::get_file_metadata_schema() {

be/src/vec/exec/format/parquet/vparquet_reader.h:150:

-     const FieldDescriptor get_file_metadata_schema();
+     FieldDescriptor get_file_metadata_schema();

return _file_metadata->schema();
}

Status ParquetReader::open() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ParquetReader : public GenericReader {
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;

std::vector<tparquet::KeyValue> get_metadata_key_values();
const FieldDescriptor get_file_metadata_schema();
void set_table_to_file_col_map(std::unordered_map<std::string, std::string>& map) {
_table_col_to_file_col = map;
}
Expand Down
48 changes: 15 additions & 33 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "vec/exec/format/format_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/schema_desc.h"
#include "vec/exec/format/table/table_format_reader.h"

namespace cctz {
Expand Down Expand Up @@ -546,8 +547,8 @@ Status IcebergParquetReader::init_reader(
_col_id_name_map = col_id_name_map;
_file_col_names = file_col_names;
_colname_to_value_range = colname_to_value_range;
auto parquet_meta_kv = parquet_reader->get_metadata_key_values();
RETURN_IF_ERROR(_gen_col_name_maps(parquet_meta_kv));
FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
RETURN_IF_ERROR(_gen_col_name_maps(field_desc));
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
Expand Down Expand Up @@ -672,39 +673,20 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete
* 1. col1_new -> col1
* 2. col1 -> col1_new
*/
Status IcebergParquetReader::_gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv) {
for (int i = 0; i < parquet_meta_kv.size(); ++i) {
tparquet::KeyValue kv = parquet_meta_kv[i];
if (kv.key == "iceberg.schema") {
_has_iceberg_schema = true;
std::string schema = kv.value;
rapidjson::Document json;
json.Parse(schema.c_str());

if (json.HasMember("fields")) {
rapidjson::Value& fields = json["fields"];
if (fields.IsArray()) {
for (int j = 0; j < fields.Size(); j++) {
rapidjson::Value& e = fields[j];
rapidjson::Value& id = e["id"];
rapidjson::Value& name = e["name"];
std::string name_string = name.GetString();
transform(name_string.begin(), name_string.end(), name_string.begin(),
::tolower);
auto iter = _col_id_name_map.find(id.GetInt());
if (iter != _col_id_name_map.end()) {
_table_col_to_file_col.emplace(iter->second, name_string);
_file_col_to_table_col.emplace(name_string, iter->second);
if (name_string != iter->second) {
_has_schema_change = true;
}
} else {
_has_schema_change = true;
}
}
Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& field_desc) {
if (field_desc.has_parquet_field_id()) {
for (const auto& pair : _col_id_name_map) {
auto name_slice = field_desc.get_column_name_from_field_id(pair.first);
if (name_slice.get_size() == 0) {
_has_schema_change = true;
} else {
auto name_string = name_slice.to_string();
_table_col_to_file_col.emplace(pair.second, name_string);
_file_col_to_table_col.emplace(name_string, pair.second);
if (name_string != pair.second) {
_has_schema_change = true;
}
}
break;
}
}
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class IcebergParquetReader final : public IcebergTableReader {
parquet_reader->set_delete_rows(&_iceberg_delete_rows);
}

Status _gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv);
Status _gen_col_name_maps(const FieldDescriptor& field_desc);

protected:
std::unique_ptr<GenericReader> _create_equality_reader(
Expand Down
51 changes: 51 additions & 0 deletions be/test/io/fs/local_file_system_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,55 @@ TEST_F(LocalFileSystemTest, TestGlob) {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
}

TEST_F(LocalFileSystemTest, TestConvertToAbsPath) {
io::Path abs_path;
Status st;

// suppurt path:
st = doris::io::LocalFileSystem::convert_to_abs_path("/abc/def", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("/abc/def", abs_path);

st = doris::io::LocalFileSystem::convert_to_abs_path("file:/def/hij", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("/def/hij", abs_path);

st = doris::io::LocalFileSystem::convert_to_abs_path("file://host:80/hij/abc", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("/hij/abc", abs_path);

st = doris::io::LocalFileSystem::convert_to_abs_path("file://host/abc/def", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("/abc/def", abs_path);

st = doris::io::LocalFileSystem::convert_to_abs_path("file:///def", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("/def", abs_path);

st = doris::io::LocalFileSystem::convert_to_abs_path("file:///", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("/", abs_path);

st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth/", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("/", abs_path);

st = doris::io::LocalFileSystem::convert_to_abs_path("abc", abs_path);
ASSERT_TRUE(st.ok());
ASSERT_EQ("abc", abs_path);

// not support path:
st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth", abs_path);
ASSERT_TRUE(!st.ok());

st = doris::io::LocalFileSystem::convert_to_abs_path("fileee:/abc", abs_path);
ASSERT_TRUE(!st.ok());

st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:///abc", abs_path);
ASSERT_TRUE(!st.ok());

st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:/abc", abs_path);
ASSERT_TRUE(!st.ok());
}

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.es.EsExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
Expand Down Expand Up @@ -143,6 +144,7 @@ public abstract class ExternalCatalog

protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
protected PreExecutionAuthenticator preExecutionAuthenticator;

public ExternalCatalog() {
}
Expand Down Expand Up @@ -913,4 +915,8 @@ public void truncateTable(TruncateTableStmt stmt) throws DdlException {
public String getQualifiedName(String dbName) {
return String.join(".", name, dbName);
}

public PreExecutionAuthenticator getPreExecutionAuthenticator() {
return preExecutionAuthenticator;
}
}
Loading
Loading