Skip to content

Commit

Permalink
[BugFix] [Enhancement] Fix nullptr and support Iceberg null padding
Browse files Browse the repository at this point in the history
PR #48151 introduced a regression, where what would previously return an
Error now caused a nullptr dereference and crashed the entire CN.
This change removes the crashing and, for Iceberg, also adds support for
padding evolved fields with null values, as per Iceberg spec.

Signed-off-by: Samrose Ahmed <[email protected]>
  • Loading branch information
Samrose-Ahmed committed Jul 31, 2024
1 parent 9efc6f0 commit 76ca9a0
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 2 deletions.
1 change: 1 addition & 0 deletions be/src/formats/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ add_library(Formats STATIC
parquet/column_reader.cpp
parquet/scalar_column_reader.cpp
parquet/complex_column_reader.cpp
parquet/default_column_reader.h
parquet/encoding.cpp
parquet/level_codec.cpp
parquet/page_reader.cpp
Expand Down
2 changes: 0 additions & 2 deletions be/src/formats/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field,
if (iceberg_it == subfield_name_2_field_schema.end()) {
// This suitition should not be happened, means table's struct subfield not existed in iceberg schema
// Below code is defensive
DCHECK(false) << "Struct subfield name: " + format_subfield_name + " not found in iceberg schema.";
pos[i] = -1;
iceberg_schema_subfield[i] = nullptr;
continue;
Expand All @@ -200,7 +199,6 @@ void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field,
if (parquet_field_it == field_id_2_pos.end()) {
// Means newly added struct subfield not existed in original parquet file, we put nullptr
// column reader in children_reader, we will append default value for this subfield later.
LOG(INFO) << "Struct subfield name: " + format_subfield_name + " not found in ParquetField.";
pos[i] = -1;
iceberg_schema_subfield[i] = nullptr;
continue;
Expand Down
40 changes: 40 additions & 0 deletions be/src/formats/parquet/default_column_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "formats/parquet/column_reader.h"

namespace starrocks::parquet {

// Used to pad with nulls for schema evolved struct columns, as per Iceberg spec.
class DefaultColumnReader final : public ColumnReader {
public:
explicit DefaultColumnReader() {}
~DefaultColumnReader() override = default;

Status read_range(const Range<uint64_t>& range, const Filter* filter, ColumnPtr& dst) override {
dst->append_default(range.span_size());
return Status::OK();
}

void get_levels(level_t** def_levels, level_t** rep_levels, size_t* num_levels) override {}
void set_need_parse_levels(bool need_parse_levels) override { }
void select_offset_index(const SparseRange<uint64_t>& range, const uint64_t rg_first_row) override {}
void collect_column_io_range(std::vector<io::SharedBufferedInputStream::IORange>* ranges, int64_t* end_offset,
ColumnIOType type, bool active) override {}

};

} // namespace starrocks::parquet
9 changes: 9 additions & 0 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "formats/parquet/page_index_reader.h"
#include "formats/parquet/schema.h"
#include "gutil/strings/substitute.h"
#include "default_column_reader.h"
#include "runtime/types.h"
#include "simd/simd.h"
#include "storage/chunk_helper.h"
Expand Down Expand Up @@ -274,6 +275,14 @@ Status GroupReader::_create_column_reader(const GroupReaderParam::Column& column
RETURN_IF_ERROR(ColumnReader::create(_column_reader_opts, schema_node, column.slot_type(),
column.t_iceberg_schema_field, &column_reader));
}
if (column_reader == nullptr) {
if (column.t_iceberg_schema_field == nullptr) {
return Status::InternalError("No valid column reader.");
}
// pad with nulls for Iceberg to support schema evolution
std::unique_ptr<DefaultColumnReader> reader(new DefaultColumnReader());
column_reader = std::move(reader);
}

if (column.slot_type().is_complex_type()) {
// For complex type columns, we need parse def & rep levels.
Expand Down

0 comments on commit 76ca9a0

Please sign in to comment.