diff --git a/be/src/formats/parquet/column_reader.cpp b/be/src/formats/parquet/column_reader.cpp index 47af4687fc654..47435a3260872 100644 --- a/be/src/formats/parquet/column_reader.cpp +++ b/be/src/formats/parquet/column_reader.cpp @@ -200,7 +200,6 @@ void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field, if (parquet_field_it == field_id_2_pos.end()) { // Means newly added struct subfield not existed in original parquet file, we put nullptr // column reader in children_reader, we will append default value for this subfield later. - LOG(INFO) << "Struct subfield name: " + format_subfield_name + " not found in ParquetField."; pos[i] = -1; iceberg_schema_subfield[i] = nullptr; continue; diff --git a/be/src/formats/parquet/group_reader.cpp b/be/src/formats/parquet/group_reader.cpp index 3d9f18f40dd3a..3df63b0753900 100644 --- a/be/src/formats/parquet/group_reader.cpp +++ b/be/src/formats/parquet/group_reader.cpp @@ -274,6 +274,10 @@ Status GroupReader::_create_column_reader(const GroupReaderParam::Column& column RETURN_IF_ERROR(ColumnReader::create(_column_reader_opts, schema_node, column.slot_type(), column.t_iceberg_schema_field, &column_reader)); } + if (column_reader == nullptr) { + // this shouldn't happen but guard + return Status::InternalError("No valid column reader."); + } if (column.slot_type().is_complex_type()) { // For complex type columns, we need parse def & rep levels. diff --git a/be/src/formats/parquet/meta_helper.cpp b/be/src/formats/parquet/meta_helper.cpp index 19105be7eaded..1562becebb85b 100644 --- a/be/src/formats/parquet/meta_helper.cpp +++ b/be/src/formats/parquet/meta_helper.cpp @@ -120,32 +120,59 @@ void IcebergMetaHelper::_init_field_mapping() { } } -bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field, - const TIcebergSchemaField* field_schema) const { +bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema, + const TypeDescriptor* type_descriptor) const { // only check for complex type now // if complex type has none valid subfield, we will treat this struct type as invalid type. if (!parquet_field->type.is_complex_type()) { return true; } - bool has_valid_child = false; - - std::unordered_map field_id_2_iceberg_schema{}; - for (const auto& field : field_schema->children) { - field_id_2_iceberg_schema.emplace(field.field_id, &field); + if (parquet_field->type.type != type_descriptor->type) { + // complex type mismatched + return false; } - // start to check struct type - for (const auto& child_parquet_field : parquet_field->children) { - auto it = field_id_2_iceberg_schema.find(child_parquet_field.field_id); - if (it == field_id_2_iceberg_schema.end()) { - continue; + bool has_valid_child = false; + + if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) { + for (size_t idx = 0; idx < parquet_field->children.size(); idx++) { + if (_is_valid_type(&parquet_field->children[idx], &field_schema->children[idx], + &type_descriptor->children[idx])) { + has_valid_child = true; + break; + } } + } else if (parquet_field->type.is_struct_type()) { + std::unordered_map field_id_2_iceberg_schema{}; + std::unordered_map field_id_2_type{}; + for (const auto& field : field_schema->children) { + field_id_2_iceberg_schema.emplace(field.field_id, &field); + for (size_t i = 0; i < type_descriptor->field_names.size(); i++) { + if (type_descriptor->field_names[i] == field.name) { + field_id_2_type.emplace(field.field_id, &type_descriptor->children[i]); + break; + } + } + } + + // start to check struct type + for (const auto& child_parquet_field : parquet_field->children) { + auto it = field_id_2_iceberg_schema.find(child_parquet_field.field_id); + if (it == field_id_2_iceberg_schema.end()) { + continue; + } - // is compelx type, recursive check it's children - if (_is_valid_type(&child_parquet_field, it->second)) { - has_valid_child = true; - break; + auto it_td = field_id_2_type.find(child_parquet_field.field_id); + if (it_td == field_id_2_type.end()) { + continue; + } + + // is compelx type, recursive check it's children + if (_is_valid_type(&child_parquet_field, it->second, it_td->second)) { + has_valid_child = true; + break; + } } } @@ -188,7 +215,7 @@ void IcebergMetaHelper::prepare_read_columns(const std::vectorschema().get_stored_column_by_field_id(field_id); // check is type is invalid - if (!_is_valid_type(parquet_field, iceberg_it->second)) { + if (!_is_valid_type(parquet_field, iceberg_it->second, &materialized_column.slot_desc->type())) { continue; } diff --git a/be/src/formats/parquet/meta_helper.h b/be/src/formats/parquet/meta_helper.h index 49eea74b68522..267d7123026e1 100644 --- a/be/src/formats/parquet/meta_helper.h +++ b/be/src/formats/parquet/meta_helper.h @@ -134,7 +134,8 @@ class IcebergMetaHelper : public MetaHelper { private: void _init_field_mapping(); - bool _is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema) const; + bool _is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema, + const TypeDescriptor* type_descriptor) const; const TIcebergSchema* _t_iceberg_schema = nullptr; // field name has already been formatted std::unordered_map _field_name_2_iceberg_field; diff --git a/be/test/formats/parquet/iceberg_schema_evolution_file_reader_test.cpp b/be/test/formats/parquet/iceberg_schema_evolution_file_reader_test.cpp index ecea53a198930..950ff5da3b29c 100644 --- a/be/test/formats/parquet/iceberg_schema_evolution_file_reader_test.cpp +++ b/be/test/formats/parquet/iceberg_schema_evolution_file_reader_test.cpp @@ -197,6 +197,76 @@ TEST_F(IcebergSchemaEvolutionTest, TestStructAddSubfield) { EXPECT_EQ("[1, {a:2,b:3,c:4,d:NULL}]", chunk->debug_row(0)); } +TEST_F(IcebergSchemaEvolutionTest, TestStructEvolutionPadNull) { + auto file = _create_file(add_struct_subfield_file_path); + auto file_reader = std::make_shared(config::vector_chunk_size, file.get(), + std::filesystem::file_size(add_struct_subfield_file_path), 0); + + // --------------init context--------------- + auto ctx = _create_scan_context(); + TIcebergSchema schema = TIcebergSchema{}; + + TIcebergSchemaField field_id{}; + field_id.__set_field_id(1); + field_id.__set_name("id"); + + TIcebergSchemaField field_col{}; + field_col.__set_field_id(2); + field_col.__set_name("col"); + + TIcebergSchemaField field_col_a{}; + field_col_a.__set_field_id(3); + field_col_a.__set_name("a"); + + TIcebergSchemaField field_col_b{}; + field_col_b.__set_field_id(4); + field_col_b.__set_name("b"); + + TIcebergSchemaField field_col_c{}; + field_col_c.__set_field_id(5); + field_col_c.__set_name("c"); + + TIcebergSchemaField field_col_d{}; + field_col_d.__set_field_id(6); + field_col_d.__set_name("d"); + + std::vector subfields{field_col_a, field_col_d}; + field_col.__set_children(subfields); + + std::vector fields{field_id, field_col}; + schema.__set_fields(fields); + ctx->iceberg_schema = &schema; + + TypeDescriptor col = TypeDescriptor::from_logical_type(LogicalType::TYPE_STRUCT); + + col.children.emplace_back(TypeDescriptor::from_logical_type(LogicalType::TYPE_INT)); + col.field_names.emplace_back("d"); + + Utils::SlotDesc slot_descs[] = {{"col", col}, {""}}; + + ctx->tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs); + Utils::make_column_info_vector(ctx->tuple_desc, &ctx->materialized_columns); + ctx->scan_range = (_create_scan_range(add_struct_subfield_file_path)); + // --------------finish init context--------------- + + Status status = file_reader->init(ctx); + if (!status.ok()) { + std::cout << status.message() << std::endl; + } + ASSERT_TRUE(status.ok()); + + EXPECT_EQ(file_reader->_row_group_readers.size(), 1); + + auto chunk = std::make_shared(); + chunk->append_column(ColumnHelper::create_column(col, true), chunk->num_columns()); + + status = file_reader->get_next(&chunk); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(1, chunk->num_rows()); + + EXPECT_EQ("[NULL]", chunk->debug_row(0)); +} + TEST_F(IcebergSchemaEvolutionTest, TestStructDropSubfield) { auto file = _create_file(add_struct_subfield_file_path); auto file_reader = std::make_shared(config::vector_chunk_size, file.get(),