diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp new file mode 100644 index 00000000000000..35a4d51b7f1959 --- /dev/null +++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/iceberg/arrow_schema_util.h" + +#include +#include + +namespace doris { +namespace iceberg { + +const char* ArrowSchemaUtil::PARQUET_FIELD_ID = "PARQUET:field_id"; +const char* ArrowSchemaUtil::ORIGINAL_TYPE = "originalType"; +const char* ArrowSchemaUtil::MAP_TYPE_VALUE = "mapType"; + +Status ArrowSchemaUtil::convert(const Schema* schema, const std::string& timezone, + std::vector>& fields) { + for (const auto& column : schema->columns()) { + std::shared_ptr arrow_field; + RETURN_IF_ERROR(convert_to(column, &arrow_field, timezone)); + fields.push_back(arrow_field); + } + return Status::OK(); +} + +Status ArrowSchemaUtil::convert_to(const iceberg::NestedField& field, + std::shared_ptr* arrow_field, + const std::string& timezone) { + std::shared_ptr arrow_type; + std::unordered_map metadata; + metadata[PARQUET_FIELD_ID] = std::to_string(field.field_id()); + + switch (field.field_type()->type_id()) { + case iceberg::TypeID::BOOLEAN: + arrow_type = arrow::boolean(); + break; + + case iceberg::TypeID::INTEGER: + arrow_type = arrow::int32(); + break; + + case iceberg::TypeID::LONG: + arrow_type = arrow::int64(); + break; + + case iceberg::TypeID::FLOAT: + arrow_type = arrow::float32(); + break; + + case iceberg::TypeID::DOUBLE: + arrow_type = arrow::float64(); + break; + + case iceberg::TypeID::DATE: + arrow_type = arrow::date32(); + break; + + case iceberg::TypeID::TIMESTAMP: { + arrow_type = std::make_shared(arrow::TimeUnit::MICRO, timezone); + break; + } + + case iceberg::TypeID::BINARY: + case iceberg::TypeID::STRING: + case iceberg::TypeID::UUID: + case iceberg::TypeID::FIXED: + arrow_type = arrow::utf8(); + break; + + case iceberg::TypeID::DECIMAL: { + auto dt = dynamic_cast(field.field_type()); + arrow_type = arrow::decimal(dt->get_precision(), dt->get_scale()); + break; + } + + case iceberg::TypeID::STRUCT: { + std::vector> element_fields; + StructType* st = field.field_type()->as_struct_type(); + for (const auto& column : st->fields()) { + std::shared_ptr element_field; + RETURN_IF_ERROR(convert_to(column, &element_field, timezone)); + element_fields.push_back(element_field); + } + arrow_type = arrow::struct_(element_fields); + break; + } + + case iceberg::TypeID::LIST: { + std::shared_ptr item_field; + ListType* list_type = field.field_type()->as_list_type(); + RETURN_IF_ERROR(convert_to(list_type->element_field(), &item_field, timezone)); + arrow_type = arrow::list(item_field); + break; + } + + case iceberg::TypeID::MAP: { + std::shared_ptr key_field; + std::shared_ptr value_field; + MapType* map_type = field.field_type()->as_map_type(); + RETURN_IF_ERROR(convert_to(map_type->key_field(), &key_field, timezone)); + RETURN_IF_ERROR(convert_to(map_type->value_field(), &value_field, timezone)); + metadata[ORIGINAL_TYPE] = MAP_TYPE_VALUE; + arrow_type = std::make_shared(key_field, value_field); + break; + } + + case iceberg::TypeID::TIME: + default: + return Status::InternalError("Unsupported field type:" + field.field_type()->to_string()); + } + + std::shared_ptr schema_metadata = + std::make_shared(metadata); + *arrow_field = + arrow::field(field.field_name(), arrow_type, field.is_optional(), schema_metadata); + return Status::OK(); +} + +} // namespace iceberg +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h new file mode 100644 index 00000000000000..20b7dbc627cc44 --- /dev/null +++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include + +#include "vec/exec/format/table/iceberg/schema.h" + +namespace doris { +namespace iceberg { + +class ArrowSchemaUtil { +public: + static Status convert(const Schema* schema, const std::string& timezone, + std::vector>& fields); + +private: + static const char* PARQUET_FIELD_ID; + static const char* ORIGINAL_TYPE; + static const char* MAP_TYPE_VALUE; + + static Status convert_to(const iceberg::NestedField& field, + std::shared_ptr* arrow_field, + const std::string& timezone); +}; + +} // namespace iceberg +} // namespace doris diff --git a/be/src/vec/exec/format/table/iceberg/types.cpp b/be/src/vec/exec/format/table/iceberg/types.cpp index b56a231979ace1..bf643655ab8810 100644 --- a/be/src/vec/exec/format/table/iceberg/types.cpp +++ b/be/src/vec/exec/format/table/iceberg/types.cpp @@ -25,8 +25,9 @@ namespace iceberg { std::unique_ptr MapType::of_optional(int key_id, int value_id, std::unique_ptr key_type, std::unique_ptr value_type) { + // key is always required auto key_field = - std::make_unique(true, key_id, "key", std::move(key_type), std::nullopt); + std::make_unique(false, key_id, "key", std::move(key_type), std::nullopt); auto value_field = std::make_unique(true, value_id, "value", std::move(value_type), std::nullopt); return std::unique_ptr(new MapType(std::move(key_field), std::move(value_field))); diff --git a/be/src/vec/exec/format/table/iceberg/types.h b/be/src/vec/exec/format/table/iceberg/types.h index f5262b36f55cd3..91a2f705df0d0b 100644 --- a/be/src/vec/exec/format/table/iceberg/types.h +++ b/be/src/vec/exec/format/table/iceberg/types.h @@ -265,6 +265,10 @@ class DecimalType : public PrimitiveType { ss << "decimal(" << precision << ", " << scale << ")"; return ss.str(); } + + int get_precision() const { return precision; } + + int get_scale() const { return scale; } }; class BinaryType : public PrimitiveType { diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index ab288537313973..5409dc0abf3466 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -65,6 +65,7 @@ #include "vec/core/types.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/table/iceberg/arrow_schema_util.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/functions/function_helpers.h" @@ -202,21 +203,20 @@ void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build } } -VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, - std::vector column_names, - TParquetCompressionType::type compression_type, - bool parquet_disable_dictionary, - TParquetVersion::type parquet_version, - bool output_object_data, - const std::string* iceberg_schema_json) +VParquetTransformer::VParquetTransformer( + RuntimeState* state, doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, std::vector column_names, + TParquetCompressionType::type compression_type, bool parquet_disable_dictionary, + TParquetVersion::type parquet_version, bool output_object_data, + const std::string* iceberg_schema_json, const iceberg::Schema* iceberg_schema) : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), _column_names(std::move(column_names)), _parquet_schemas(nullptr), _compression_type(compression_type), _parquet_disable_dictionary(parquet_disable_dictionary), _parquet_version(parquet_version), - _iceberg_schema_json(iceberg_schema_json) { + _iceberg_schema_json(iceberg_schema_json), + _iceberg_schema(iceberg_schema) { _outstream = std::shared_ptr(new ParquetOutputStream(file_writer)); } @@ -234,6 +234,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWri _parquet_disable_dictionary(parquet_disable_dictionary), _parquet_version(parquet_version), _iceberg_schema_json(iceberg_schema_json) { + _iceberg_schema = nullptr; _outstream = std::shared_ptr(new ParquetOutputStream(file_writer)); } @@ -265,21 +266,27 @@ Status VParquetTransformer::_parse_properties() { Status VParquetTransformer::_parse_schema() { std::vector> fields; - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type, - _state->timezone())); - if (_parquet_schemas != nullptr) { - std::shared_ptr field = - arrow::field(_parquet_schemas->operator[](i).schema_column_name, type, - _output_vexpr_ctxs[i]->root()->is_nullable()); - fields.emplace_back(field); - } else { - std::shared_ptr field = arrow::field( - _column_names[i], type, _output_vexpr_ctxs[i]->root()->is_nullable()); - fields.emplace_back(field); + if (_iceberg_schema != nullptr) { + RETURN_IF_ERROR( + iceberg::ArrowSchemaUtil::convert(_iceberg_schema, _state->timezone(), fields)); + } else { + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + std::shared_ptr type; + RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type, + _state->timezone())); + if (_parquet_schemas != nullptr) { + std::shared_ptr field = + arrow::field(_parquet_schemas->operator[](i).schema_column_name, type, + _output_vexpr_ctxs[i]->root()->is_nullable()); + fields.emplace_back(field); + } else { + std::shared_ptr field = arrow::field( + _column_names[i], type, _output_vexpr_ctxs[i]->root()->is_nullable()); + fields.emplace_back(field); + } } } + if (_iceberg_schema_json != nullptr) { std::shared_ptr schema_metadata = arrow::KeyValueMetadata::Make({"iceberg.schema"}, {*_iceberg_schema_json}); diff --git a/be/src/vec/runtime/vparquet_transformer.h b/be/src/vec/runtime/vparquet_transformer.h index 9fdbb271373212..ecc4a8ddeac4bc 100644 --- a/be/src/vec/runtime/vparquet_transformer.h +++ b/be/src/vec/runtime/vparquet_transformer.h @@ -27,6 +27,7 @@ #include #include +#include "vec/exec/format/table/iceberg/schema.h" #include "vfile_format_transformer.h" namespace doris { @@ -95,7 +96,8 @@ class VParquetTransformer final : public VFileFormatTransformer { std::vector column_names, TParquetCompressionType::type compression_type, bool parquet_disable_dictionary, TParquetVersion::type parquet_version, - bool output_object_data, const std::string* iceberg_schema_json = nullptr); + bool output_object_data, const std::string* iceberg_schema_json = nullptr, + const iceberg::Schema* iceberg_schema = nullptr); VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, const VExprContextSPtrs& output_vexpr_ctxs, @@ -132,6 +134,7 @@ class VParquetTransformer final : public VFileFormatTransformer { const TParquetVersion::type _parquet_version; const std::string* _iceberg_schema_json; uint64_t _write_size = 0; + const iceberg::Schema* _iceberg_schema; }; } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp index 924adf68145a7a..23ee389a8b7d10 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp @@ -84,7 +84,7 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil _file_format_transformer.reset(new VParquetTransformer( state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, - false, _iceberg_schema_json)); + false, _iceberg_schema_json, &_schema)); return _file_format_transformer->open(); } case TFileFormatType::FORMAT_ORC: { diff --git a/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp b/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp new file mode 100644 index 00000000000000..b5f61c9d2e3cdf --- /dev/null +++ b/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/iceberg/arrow_schema_util.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "io/fs/local_file_system.h" +#include "vec/exec/format/table/iceberg/schema.h" +#include "vec/exec/format/table/iceberg/schema_parser.h" + +namespace doris { +namespace iceberg { + +class ArrowSchemaUtilTest : public testing::Test { +public: + ArrowSchemaUtilTest() = default; + virtual ~ArrowSchemaUtilTest() = default; +}; + +const std::string_view pfid = "PARQUET:field_id"; + +TEST(ArrowSchemaUtilTest, test_simple_field) { + std::vector nested_fields; + nested_fields.reserve(2); + NestedField field1(false, 1, "field1", std::make_unique(), std::nullopt); + NestedField field2(false, 2, "field2", std::make_unique(), std::nullopt); + nested_fields.emplace_back(std::move(field1)); + nested_fields.emplace_back(std::move(field2)); + + Schema schema(1, std::move(nested_fields)); + + std::vector> fields; + Status st; + st = ArrowSchemaUtil::convert(&schema, "utc", fields); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(2, fields.size()); + EXPECT_EQ("field1", fields[0]->name()); + EXPECT_EQ("field2", fields[1]->name()); + EXPECT_TRUE(fields[0]->HasMetadata()); + EXPECT_TRUE(fields[1]->HasMetadata()); + EXPECT_EQ("1", fields[0]->metadata()->Get(pfid).ValueUnsafe()); + EXPECT_EQ("2", fields[1]->metadata()->Get(pfid).ValueUnsafe()); +} + +TEST(ArrowSchemaUtilTest, test_stuct_field) { + // struct_json comes from : + // Schema schema = new Schema( + // Types.NestedField.optional( + // 21, "st_col", Types.StructType.of( + // Types.NestedField.optional(32, "st_col_c1", Types.IntegerType.get()), + // Types.NestedField.optional(43, "st_col_c2", Types.StringType.get()) + // ) + // ) + // ); + // StringWriter writer = new StringWriter(); + // JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + // SchemaParser.toJson(schema.asStruct(), generator); + // generator.flush(); + // System.out.println(writer.toString()); + + const std::string struct_json = R"({ + "type": "struct", + "fields": [ + { + "id": 21, + "name": "st_col", + "required": false, + "type": { + "type": "struct", + "fields": [ + { + "id": 32, + "name": "st_col_c1", + "required": false, + "type": "int" + }, + { + "id": 43, + "name": "st_col_c2", + "required": false, + "type": "string" + } + ] + } + } + ] + })"; + std::unique_ptr schema = SchemaParser::from_json(struct_json); + + std::vector> fields; + Status st; + st = ArrowSchemaUtil::convert(schema.get(), "utc", fields); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(1, fields.size()); + EXPECT_EQ("st_col", fields[0]->name()); + EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe()); + + arrow::StructType* arrow_struct = dynamic_cast(fields[0]->type().get()); + auto map_fields = arrow_struct->fields(); + EXPECT_EQ(2, arrow_struct->fields().size()); + EXPECT_EQ("st_col_c1", map_fields.at(0).get()->name()); + EXPECT_EQ("st_col_c2", map_fields.at(1).get()->name()); + EXPECT_EQ("32", map_fields.at(0).get()->metadata()->Get(pfid).ValueUnsafe()); + EXPECT_EQ("43", map_fields.at(1).get()->metadata()->Get(pfid).ValueUnsafe()); +} + +TEST(ArrowSchemaUtilTest, test_map_field) { + // map_json comes from : + // Schema schema = new Schema( + // Types.NestedField.optional( + // 21, "map_col", Types.MapType.ofOptional( + // 32, 43, Types.IntegerType.get(), Types.StringType.get() + // ) + // ) + // ); + // StringWriter writer = new StringWriter(); + // JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + // SchemaParser.toJson(schema.asStruct(), generator); + // generator.flush(); + // System.out.println(writer.toString()); + + const std::string map_json = R"({ + "type": "struct", + "fields": [ + { + "id": 21, + "name": "map_col", + "required": false, + "type": { + "type": "map", + "key-id": 32, + "key": "int", + "value-id": 43, + "value": "string", + "value-required": false + } + } + ] + })"; + std::unique_ptr schema = SchemaParser::from_json(map_json); + + std::vector> fields; + Status st; + st = ArrowSchemaUtil::convert(schema.get(), "utc", fields); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(1, fields.size()); + EXPECT_EQ("map_col", fields[0]->name()); + EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe()); + + arrow::MapType* arrow_map = dynamic_cast(fields[0]->type().get()); + auto map_fields = arrow_map->fields(); + EXPECT_EQ(1, arrow_map->fields().size()); + EXPECT_EQ("key", arrow_map->key_field()->name()); + EXPECT_EQ("value", arrow_map->item_field()->name()); + EXPECT_EQ("32", arrow_map->key_field()->metadata()->Get(pfid).ValueUnsafe()); + EXPECT_EQ("43", arrow_map->item_field()->metadata()->Get(pfid).ValueUnsafe()); +} + +TEST(ArrowSchemaUtilTest, test_list_field) { + // list_json comes from : + // Schema schema = new Schema( + // Types.NestedField.optional( + // 21, "list_col", Types.ListType.ofOptional( + // 32, Types.IntegerType.get()))); + // StringWriter writer = new StringWriter(); + // JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + // SchemaParser.toJson(schema.asStruct(), generator); + // generator.flush(); + // System.out.println(writer.toString()); + + const std::string list_json = R"({ + "type": "struct", + "fields": [ + { + "id": 21, + "name": "list_col", + "required": false, + "type": { + "type": "list", + "element-id": 32, + "element": "int", + "element-required": false + } + } + ] + })"; + std::unique_ptr schema = SchemaParser::from_json(list_json); + + std::vector> fields; + Status st; + st = ArrowSchemaUtil::convert(schema.get(), "utc", fields); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(1, fields.size()); + EXPECT_EQ("list_col", fields[0]->name()); + EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe()); + + arrow::ListType* arrow_list = dynamic_cast(fields[0]->type().get()); + auto map_fields = arrow_list->fields(); + EXPECT_EQ(1, arrow_list->fields().size()); + EXPECT_EQ("element", arrow_list->value_field()->name()); + EXPECT_EQ("32", arrow_list->value_field()->metadata()->Get(pfid).ValueUnsafe()); +} + +TEST(ArrowSchemaUtilTest, test_parquet_filed_id) { + std::string test_dir = "ut_dir/test_parquet_filed_id"; + Status st; + st = io::global_local_filesystem()->delete_directory(test_dir); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(test_dir); + ASSERT_TRUE(st.ok()) << st; + + std::shared_ptr id_array; + std::shared_ptr name_array; + + arrow::Int32Builder id_builder; + ASSERT_TRUE(id_builder.Append(1).ok()); + ASSERT_TRUE(id_builder.Append(2).ok()); + ASSERT_TRUE(id_builder.Append(3).ok()); + auto&& result_id = id_builder.Finish(); + ASSERT_TRUE(result_id.ok()); + id_array = std::move(result_id).ValueUnsafe(); + + arrow::StringBuilder name_builder; + ASSERT_TRUE(name_builder.Append("Alice").ok()); + ASSERT_TRUE(name_builder.Append("Bob").ok()); + ASSERT_TRUE(name_builder.Append("Charlie").ok()); + auto&& result_name = name_builder.Finish(); + ASSERT_TRUE(result_name.ok()); + name_array = std::move(result_name).ValueUnsafe(); + + // 定义表的 Schema + std::vector nested_fields; + nested_fields.reserve(2); + NestedField field1(false, 17, "field_1", std::make_unique(), std::nullopt); + NestedField field2(false, 36, "field_2", std::make_unique(), std::nullopt); + nested_fields.emplace_back(std::move(field1)); + nested_fields.emplace_back(std::move(field2)); + + Schema schema(1, std::move(nested_fields)); + + std::vector> fields; + st = ArrowSchemaUtil::convert(&schema, "utc", fields); + auto arrow_schema = arrow::schema(fields); + + // create arrow table + auto table = arrow::Table::Make(arrow_schema, {id_array, name_array}); + + std::string file_path = test_dir + "/f1.parquet"; + std::shared_ptr outfile; + auto&& result_file = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(result_file.ok()); + outfile = std::move(result_file).ValueUnsafe(); + + // arrow table to parquet file + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1024)); + + // open parquet with parquet's API + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::OpenFile(file_path, false); + + // get MessageType + std::shared_ptr file_metadata = parquet_reader->metadata(); + auto schema_descriptor = file_metadata->schema(); + const parquet::schema::Node& root = *schema_descriptor->group_node(); + const auto& group_node = static_cast(root); + + EXPECT_EQ(2, group_node.field_count()); + auto filed1 = group_node.field(0); + auto filed2 = group_node.field(1); + EXPECT_EQ("field_1", filed1->name()); + EXPECT_EQ(17, filed1->field_id()); + EXPECT_EQ("field_2", filed2->name()); + EXPECT_EQ(36, filed2->field_id()); + + st = io::global_local_filesystem()->delete_directory(test_dir); + EXPECT_TRUE(st.ok()) << st; +} + +} // namespace iceberg +} // namespace doris diff --git a/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp b/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp index 4c3f58cdd10491..f464525a7f99bc 100644 --- a/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp +++ b/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp @@ -78,6 +78,15 @@ const std::string valid_map_json = R"({ "value-required": true })"; +const std::string valid_map_json2 = R"({ + "type": "map", + "key-id": 4, + "key": "string", + "value-id": 5, + "value": "int", + "value-required": false +})"; + const std::string nested_list_json = R"({ "type": "list", "element-id": 6, @@ -209,6 +218,21 @@ TEST(SchemaParserTest, parse_valid_map) { SchemaParser::_type_from_json(rapidjson::Document().Parse(valid_map_json.c_str())); ASSERT_NE(type, nullptr); EXPECT_EQ(type->to_string(), "map"); + EXPECT_TRUE(type->is_map_type()); + MapType* mt = type->as_map_type(); + EXPECT_TRUE(mt->field(4)->is_required()); + EXPECT_TRUE(mt->field(5)->is_required()); +} + +TEST(SchemaParserTest, parse_valid_map2) { + std::unique_ptr type = + SchemaParser::_type_from_json(rapidjson::Document().Parse(valid_map_json2.c_str())); + ASSERT_NE(type, nullptr); + EXPECT_EQ(type->to_string(), "map"); + EXPECT_TRUE(type->is_map_type()); + MapType* mt = type->as_map_type(); + EXPECT_TRUE(mt->field(4)->is_required()); + EXPECT_TRUE(mt->field(5)->is_optional()); } TEST(SchemaParserTest, parse_nested_list) {