Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] [Enhancement] Fix nullptr and support Iceberg null padding (backport #49212) #49525

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/formats/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,6 @@ void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field,
if (parquet_field_it == field_id_2_pos.end()) {
// Means newly added struct subfield not existed in original parquet file, we put nullptr
// column reader in children_reader, we will append default value for this subfield later.
LOG(INFO) << "Struct subfield name: " + format_subfield_name + " not found in ParquetField.";
pos[i] = -1;
iceberg_schema_subfield[i] = nullptr;
continue;
Expand Down
4 changes: 4 additions & 0 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ Status GroupReader::_create_column_reader(const GroupReaderParam::Column& column
RETURN_IF_ERROR(ColumnReader::create(_column_reader_opts, schema_node, column.slot_type(),
column.t_iceberg_schema_field, &column_reader));
}
if (column_reader == nullptr) {
// this shouldn't happen but guard
return Status::InternalError("No valid column reader.");
}

if (column.slot_type().is_complex_type()) {
// For complex type columns, we need parse def & rep levels.
Expand Down
61 changes: 44 additions & 17 deletions be/src/formats/parquet/meta_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,32 +118,59 @@ void IcebergMetaHelper::_init_field_mapping() {
}
}

bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field,
const TIcebergSchemaField* field_schema) const {
bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema,
const TypeDescriptor* type_descriptor) const {
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
return true;
}

bool has_valid_child = false;

std::unordered_map<int32_t, const TIcebergSchemaField*> field_id_2_iceberg_schema{};
for (const auto& field : field_schema->children) {
field_id_2_iceberg_schema.emplace(field.field_id, &field);
if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
return false;
}

// start to check struct type
for (const auto& child_parquet_field : parquet_field->children) {
auto it = field_id_2_iceberg_schema.find(child_parquet_field.field_id);
if (it == field_id_2_iceberg_schema.end()) {
continue;
bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &field_schema->children[idx],
&type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
} else if (parquet_field->type.is_struct_type()) {
std::unordered_map<int32_t, const TIcebergSchemaField*> field_id_2_iceberg_schema{};
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type{};
for (const auto& field : field_schema->children) {
field_id_2_iceberg_schema.emplace(field.field_id, &field);
for (size_t i = 0; i < type_descriptor->field_names.size(); i++) {
if (type_descriptor->field_names[i] == field.name) {
field_id_2_type.emplace(field.field_id, &type_descriptor->children[i]);
break;
}
}
}

// start to check struct type
for (const auto& child_parquet_field : parquet_field->children) {
auto it = field_id_2_iceberg_schema.find(child_parquet_field.field_id);
if (it == field_id_2_iceberg_schema.end()) {
continue;
}

// is compelx type, recursive check it's children
if (_is_valid_type(&child_parquet_field, it->second)) {
has_valid_child = true;
break;
auto it_td = field_id_2_type.find(child_parquet_field.field_id);
if (it_td == field_id_2_type.end()) {
continue;
}

// is compelx type, recursive check it's children
if (_is_valid_type(&child_parquet_field, it->second, it_td->second)) {
has_valid_child = true;
break;
}
}
}

Expand Down Expand Up @@ -186,7 +213,7 @@ void IcebergMetaHelper::prepare_read_columns(const std::vector<HdfsScannerContex

const ParquetField* parquet_field = _file_metadata->schema().get_stored_column_by_field_id(field_id);
// check is type is invalid
if (!_is_valid_type(parquet_field, iceberg_it->second)) {
if (!_is_valid_type(parquet_field, iceberg_it->second, &materialized_column.slot_desc->type())) {
continue;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/formats/parquet/meta_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class IcebergMetaHelper : public MetaHelper {

private:
void _init_field_mapping();
bool _is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema) const;
bool _is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema,
const TypeDescriptor* type_descriptor) const;
const TIcebergSchema* _t_iceberg_schema = nullptr;
// field name has already been formatted
std::unordered_map<std::string, const TIcebergSchemaField*> _field_name_2_iceberg_field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,76 @@ TEST_F(IcebergSchemaEvolutionTest, TestStructAddSubfield) {
EXPECT_EQ("[1, {a:2,b:3,c:4,d:NULL}]", chunk->debug_row(0));
}

TEST_F(IcebergSchemaEvolutionTest, TestStructEvolutionPadNull) {
auto file = _create_file(add_struct_subfield_file_path);
auto file_reader = std::make_shared<FileReader>(config::vector_chunk_size, file.get(),
std::filesystem::file_size(add_struct_subfield_file_path), 0);

// --------------init context---------------
auto ctx = _create_scan_context();
TIcebergSchema schema = TIcebergSchema{};

TIcebergSchemaField field_id{};
field_id.__set_field_id(1);
field_id.__set_name("id");

TIcebergSchemaField field_col{};
field_col.__set_field_id(2);
field_col.__set_name("col");

TIcebergSchemaField field_col_a{};
field_col_a.__set_field_id(3);
field_col_a.__set_name("a");

TIcebergSchemaField field_col_b{};
field_col_b.__set_field_id(4);
field_col_b.__set_name("b");

TIcebergSchemaField field_col_c{};
field_col_c.__set_field_id(5);
field_col_c.__set_name("c");

TIcebergSchemaField field_col_d{};
field_col_d.__set_field_id(6);
field_col_d.__set_name("d");

std::vector<TIcebergSchemaField> subfields{field_col_a, field_col_d};
field_col.__set_children(subfields);

std::vector<TIcebergSchemaField> fields{field_id, field_col};
schema.__set_fields(fields);
ctx->iceberg_schema = &schema;

TypeDescriptor col = TypeDescriptor::from_logical_type(LogicalType::TYPE_STRUCT);

col.children.emplace_back(TypeDescriptor::from_logical_type(LogicalType::TYPE_INT));
col.field_names.emplace_back("d");

Utils::SlotDesc slot_descs[] = {{"col", col}, {""}};

ctx->tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs);
Utils::make_column_info_vector(ctx->tuple_desc, &ctx->materialized_columns);
ctx->scan_range = (_create_scan_range(add_struct_subfield_file_path));
// --------------finish init context---------------

Status status = file_reader->init(ctx);
if (!status.ok()) {
std::cout << status.message() << std::endl;
}
ASSERT_TRUE(status.ok());

EXPECT_EQ(file_reader->_row_group_readers.size(), 1);

auto chunk = std::make_shared<Chunk>();
chunk->append_column(ColumnHelper::create_column(col, true), chunk->num_columns());

status = file_reader->get_next(&chunk);
ASSERT_TRUE(status.ok());
ASSERT_EQ(1, chunk->num_rows());

EXPECT_EQ("[NULL]", chunk->debug_row(0));
}

TEST_F(IcebergSchemaEvolutionTest, TestStructDropSubfield) {
auto file = _create_file(add_struct_subfield_file_path);
auto file_reader = std::make_shared<FileReader>(config::vector_chunk_size, file.get(),
Expand Down
Loading