Skip to content

Commit

Permalink
[BugFix] [Enhancement] Fix nullptr and support Iceberg null padding (#…
Browse files Browse the repository at this point in the history
…49212)

Signed-off-by: Samrose Ahmed <[email protected]>
(cherry picked from commit e224d5d)
  • Loading branch information
Samrose-Ahmed authored and mergify[bot] committed Aug 7, 2024
1 parent 1fe1c8f commit 0f1775b
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 19 deletions.
1 change: 0 additions & 1 deletion be/src/formats/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,6 @@ void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field,
if (parquet_field_it == field_id_2_pos.end()) {
// Means newly added struct subfield not existed in original parquet file, we put nullptr
// column reader in children_reader, we will append default value for this subfield later.
LOG(INFO) << "Struct subfield name: " + format_subfield_name + " not found in ParquetField.";
pos[i] = -1;
iceberg_schema_subfield[i] = nullptr;
continue;
Expand Down
4 changes: 4 additions & 0 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ Status GroupReader::_create_column_reader(const GroupReaderParam::Column& column
RETURN_IF_ERROR(ColumnReader::create(_column_reader_opts, schema_node, column.slot_type(),
column.t_iceberg_schema_field, &column_reader));
}
if (column_reader == nullptr) {
// this shouldn't happen but guard
return Status::InternalError("No valid column reader.");
}

if (column.slot_type().is_complex_type()) {
// For complex type columns, we need parse def & rep levels.
Expand Down
61 changes: 44 additions & 17 deletions be/src/formats/parquet/meta_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,32 +118,59 @@ void IcebergMetaHelper::_init_field_mapping() {
}
}

bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field,
const TIcebergSchemaField* field_schema) const {
bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema,
const TypeDescriptor* type_descriptor) const {
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
return true;
}

bool has_valid_child = false;

std::unordered_map<int32_t, const TIcebergSchemaField*> field_id_2_iceberg_schema{};
for (const auto& field : field_schema->children) {
field_id_2_iceberg_schema.emplace(field.field_id, &field);
if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
return false;
}

// start to check struct type
for (const auto& child_parquet_field : parquet_field->children) {
auto it = field_id_2_iceberg_schema.find(child_parquet_field.field_id);
if (it == field_id_2_iceberg_schema.end()) {
continue;
bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &field_schema->children[idx],
&type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
} else if (parquet_field->type.is_struct_type()) {
std::unordered_map<int32_t, const TIcebergSchemaField*> field_id_2_iceberg_schema{};
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type{};
for (const auto& field : field_schema->children) {
field_id_2_iceberg_schema.emplace(field.field_id, &field);
for (size_t i = 0; i < type_descriptor->field_names.size(); i++) {
if (type_descriptor->field_names[i] == field.name) {
field_id_2_type.emplace(field.field_id, &type_descriptor->children[i]);
break;
}
}
}

// start to check struct type
for (const auto& child_parquet_field : parquet_field->children) {
auto it = field_id_2_iceberg_schema.find(child_parquet_field.field_id);
if (it == field_id_2_iceberg_schema.end()) {
continue;
}

// is compelx type, recursive check it's children
if (_is_valid_type(&child_parquet_field, it->second)) {
has_valid_child = true;
break;
auto it_td = field_id_2_type.find(child_parquet_field.field_id);
if (it_td == field_id_2_type.end()) {
continue;
}

// is compelx type, recursive check it's children
if (_is_valid_type(&child_parquet_field, it->second, it_td->second)) {
has_valid_child = true;
break;
}
}
}

Expand Down Expand Up @@ -186,7 +213,7 @@ void IcebergMetaHelper::prepare_read_columns(const std::vector<HdfsScannerContex

const ParquetField* parquet_field = _file_metadata->schema().get_stored_column_by_field_id(field_id);
// check is type is invalid
if (!_is_valid_type(parquet_field, iceberg_it->second)) {
if (!_is_valid_type(parquet_field, iceberg_it->second, &materialized_column.slot_desc->type())) {
continue;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/formats/parquet/meta_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class IcebergMetaHelper : public MetaHelper {

private:
void _init_field_mapping();
bool _is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema) const;
bool _is_valid_type(const ParquetField* parquet_field, const TIcebergSchemaField* field_schema,
const TypeDescriptor* type_descriptor) const;
const TIcebergSchema* _t_iceberg_schema = nullptr;
// field name has already been formatted
std::unordered_map<std::string, const TIcebergSchemaField*> _field_name_2_iceberg_field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,76 @@ TEST_F(IcebergSchemaEvolutionTest, TestStructAddSubfield) {
EXPECT_EQ("[1, {a:2,b:3,c:4,d:NULL}]", chunk->debug_row(0));
}

TEST_F(IcebergSchemaEvolutionTest, TestStructEvolutionPadNull) {
auto file = _create_file(add_struct_subfield_file_path);
auto file_reader = std::make_shared<FileReader>(config::vector_chunk_size, file.get(),
std::filesystem::file_size(add_struct_subfield_file_path), 0);

// --------------init context---------------
auto ctx = _create_scan_context();
TIcebergSchema schema = TIcebergSchema{};

TIcebergSchemaField field_id{};
field_id.__set_field_id(1);
field_id.__set_name("id");

TIcebergSchemaField field_col{};
field_col.__set_field_id(2);
field_col.__set_name("col");

TIcebergSchemaField field_col_a{};
field_col_a.__set_field_id(3);
field_col_a.__set_name("a");

TIcebergSchemaField field_col_b{};
field_col_b.__set_field_id(4);
field_col_b.__set_name("b");

TIcebergSchemaField field_col_c{};
field_col_c.__set_field_id(5);
field_col_c.__set_name("c");

TIcebergSchemaField field_col_d{};
field_col_d.__set_field_id(6);
field_col_d.__set_name("d");

std::vector<TIcebergSchemaField> subfields{field_col_a, field_col_d};
field_col.__set_children(subfields);

std::vector<TIcebergSchemaField> fields{field_id, field_col};
schema.__set_fields(fields);
ctx->iceberg_schema = &schema;

TypeDescriptor col = TypeDescriptor::from_logical_type(LogicalType::TYPE_STRUCT);

col.children.emplace_back(TypeDescriptor::from_logical_type(LogicalType::TYPE_INT));
col.field_names.emplace_back("d");

Utils::SlotDesc slot_descs[] = {{"col", col}, {""}};

ctx->tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs);
Utils::make_column_info_vector(ctx->tuple_desc, &ctx->materialized_columns);
ctx->scan_range = (_create_scan_range(add_struct_subfield_file_path));
// --------------finish init context---------------

Status status = file_reader->init(ctx);
if (!status.ok()) {
std::cout << status.message() << std::endl;
}
ASSERT_TRUE(status.ok());

EXPECT_EQ(file_reader->_row_group_readers.size(), 1);

auto chunk = std::make_shared<Chunk>();
chunk->append_column(ColumnHelper::create_column(col, true), chunk->num_columns());

status = file_reader->get_next(&chunk);
ASSERT_TRUE(status.ok());
ASSERT_EQ(1, chunk->num_rows());

EXPECT_EQ("[NULL]", chunk->debug_row(0));
}

TEST_F(IcebergSchemaEvolutionTest, TestStructDropSubfield) {
auto file = _create_file(add_struct_subfield_file_path);
auto file_reader = std::make_shared<FileReader>(config::vector_chunk_size, file.get(),
Expand Down

0 comments on commit 0f1775b

Please sign in to comment.