Skip to content

Commit

Permalink
[fix](iceberg)Bring field_id with parquet files And fix map type's ke…
Browse files Browse the repository at this point in the history
…y optional (#44470)

### What problem does this PR solve?

1. Column IDs are required to be stored as [field
IDs](http://github.com/apache/parquet-format/blob/40699d05bd24181de6b1457babbee2c16dce3803/src/main/thrift/parquet.thrift#L459)
on the parquet schema.
ref: https://iceberg.apache.org/spec/?h=field+id#parquet
So, we should add field ids.
2. For `MapType`, its key is always required.
  • Loading branch information
wuwenchi authored Dec 1, 2024
1 parent 97adac1 commit 9ca8048
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 25 deletions.
134 changes: 134 additions & 0 deletions be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/exec/format/table/iceberg/arrow_schema_util.h"

#include <arrow/type.h>
#include <arrow/util/key_value_metadata.h>

namespace doris {
namespace iceberg {

const char* ArrowSchemaUtil::PARQUET_FIELD_ID = "PARQUET:field_id";
const char* ArrowSchemaUtil::ORIGINAL_TYPE = "originalType";
const char* ArrowSchemaUtil::MAP_TYPE_VALUE = "mapType";

Status ArrowSchemaUtil::convert(const Schema* schema, const std::string& timezone,
std::vector<std::shared_ptr<arrow::Field>>& fields) {
for (const auto& column : schema->columns()) {
std::shared_ptr<arrow::Field> arrow_field;
RETURN_IF_ERROR(convert_to(column, &arrow_field, timezone));
fields.push_back(arrow_field);
}
return Status::OK();
}

Status ArrowSchemaUtil::convert_to(const iceberg::NestedField& field,
std::shared_ptr<arrow::Field>* arrow_field,
const std::string& timezone) {
std::shared_ptr<arrow::DataType> arrow_type;
std::unordered_map<std::string, std::string> metadata;
metadata[PARQUET_FIELD_ID] = std::to_string(field.field_id());

switch (field.field_type()->type_id()) {
case iceberg::TypeID::BOOLEAN:
arrow_type = arrow::boolean();
break;

case iceberg::TypeID::INTEGER:
arrow_type = arrow::int32();
break;

case iceberg::TypeID::LONG:
arrow_type = arrow::int64();
break;

case iceberg::TypeID::FLOAT:
arrow_type = arrow::float32();
break;

case iceberg::TypeID::DOUBLE:
arrow_type = arrow::float64();
break;

case iceberg::TypeID::DATE:
arrow_type = arrow::date32();
break;

case iceberg::TypeID::TIMESTAMP: {
arrow_type = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
break;
}

case iceberg::TypeID::BINARY:
case iceberg::TypeID::STRING:
case iceberg::TypeID::UUID:
case iceberg::TypeID::FIXED:
arrow_type = arrow::utf8();
break;

case iceberg::TypeID::DECIMAL: {
auto dt = dynamic_cast<DecimalType*>(field.field_type());
arrow_type = arrow::decimal(dt->get_precision(), dt->get_scale());
break;
}

case iceberg::TypeID::STRUCT: {
std::vector<std::shared_ptr<arrow::Field>> element_fields;
StructType* st = field.field_type()->as_struct_type();
for (const auto& column : st->fields()) {
std::shared_ptr<arrow::Field> element_field;
RETURN_IF_ERROR(convert_to(column, &element_field, timezone));
element_fields.push_back(element_field);
}
arrow_type = arrow::struct_(element_fields);
break;
}

case iceberg::TypeID::LIST: {
std::shared_ptr<arrow::Field> item_field;
ListType* list_type = field.field_type()->as_list_type();
RETURN_IF_ERROR(convert_to(list_type->element_field(), &item_field, timezone));
arrow_type = arrow::list(item_field);
break;
}

case iceberg::TypeID::MAP: {
std::shared_ptr<arrow::Field> key_field;
std::shared_ptr<arrow::Field> value_field;
MapType* map_type = field.field_type()->as_map_type();
RETURN_IF_ERROR(convert_to(map_type->key_field(), &key_field, timezone));
RETURN_IF_ERROR(convert_to(map_type->value_field(), &value_field, timezone));
metadata[ORIGINAL_TYPE] = MAP_TYPE_VALUE;
arrow_type = std::make_shared<arrow::MapType>(key_field, value_field);
break;
}

case iceberg::TypeID::TIME:
default:
return Status::InternalError("Unsupported field type:" + field.field_type()->to_string());
}

std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
std::make_shared<arrow::KeyValueMetadata>(metadata);
*arrow_field =
arrow::field(field.field_name(), arrow_type, field.is_optional(), schema_metadata);
return Status::OK();
}

} // namespace iceberg
} // namespace doris
45 changes: 45 additions & 0 deletions be/src/vec/exec/format/table/iceberg/arrow_schema_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <arrow/type.h>

#include <shared_mutex>

#include "vec/exec/format/table/iceberg/schema.h"

namespace doris {
namespace iceberg {

class ArrowSchemaUtil {
public:
static Status convert(const Schema* schema, const std::string& timezone,
std::vector<std::shared_ptr<arrow::Field>>& fields);

private:
static const char* PARQUET_FIELD_ID;
static const char* ORIGINAL_TYPE;
static const char* MAP_TYPE_VALUE;

static Status convert_to(const iceberg::NestedField& field,
std::shared_ptr<arrow::Field>* arrow_field,
const std::string& timezone);
};

} // namespace iceberg
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/table/iceberg/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ namespace iceberg {
std::unique_ptr<MapType> MapType::of_optional(int key_id, int value_id,
std::unique_ptr<Type> key_type,
std::unique_ptr<Type> value_type) {
// key is always required
auto key_field =
std::make_unique<NestedField>(true, key_id, "key", std::move(key_type), std::nullopt);
std::make_unique<NestedField>(false, key_id, "key", std::move(key_type), std::nullopt);
auto value_field = std::make_unique<NestedField>(true, value_id, "value", std::move(value_type),
std::nullopt);
return std::unique_ptr<MapType>(new MapType(std::move(key_field), std::move(value_field)));
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/table/iceberg/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ class DecimalType : public PrimitiveType {
ss << "decimal(" << precision << ", " << scale << ")";
return ss.str();
}

int get_precision() const { return precision; }

int get_scale() const { return scale; }
};

class BinaryType : public PrimitiveType {
Expand Down
51 changes: 29 additions & 22 deletions be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/function_helpers.h"
Expand Down Expand Up @@ -202,21 +203,20 @@ void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
}
}

VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
std::vector<std::string> column_names,
TParquetCompressionType::type compression_type,
bool parquet_disable_dictionary,
TParquetVersion::type parquet_version,
bool output_object_data,
const std::string* iceberg_schema_json)
VParquetTransformer::VParquetTransformer(
RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs, std::vector<std::string> column_names,
TParquetCompressionType::type compression_type, bool parquet_disable_dictionary,
TParquetVersion::type parquet_version, bool output_object_data,
const std::string* iceberg_schema_json, const iceberg::Schema* iceberg_schema)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_column_names(std::move(column_names)),
_parquet_schemas(nullptr),
_compression_type(compression_type),
_parquet_disable_dictionary(parquet_disable_dictionary),
_parquet_version(parquet_version),
_iceberg_schema_json(iceberg_schema_json) {
_iceberg_schema_json(iceberg_schema_json),
_iceberg_schema(iceberg_schema) {
_outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer));
}

Expand All @@ -234,6 +234,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWri
_parquet_disable_dictionary(parquet_disable_dictionary),
_parquet_version(parquet_version),
_iceberg_schema_json(iceberg_schema_json) {
_iceberg_schema = nullptr;
_outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer));
}

Expand Down Expand Up @@ -265,21 +266,27 @@ Status VParquetTransformer::_parse_properties() {

Status VParquetTransformer::_parse_schema() {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> type;
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type,
_state->timezone()));
if (_parquet_schemas != nullptr) {
std::shared_ptr<arrow::Field> field =
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
_output_vexpr_ctxs[i]->root()->is_nullable());
fields.emplace_back(field);
} else {
std::shared_ptr<arrow::Field> field = arrow::field(
_column_names[i], type, _output_vexpr_ctxs[i]->root()->is_nullable());
fields.emplace_back(field);
if (_iceberg_schema != nullptr) {
RETURN_IF_ERROR(
iceberg::ArrowSchemaUtil::convert(_iceberg_schema, _state->timezone(), fields));
} else {
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> type;
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type,
_state->timezone()));
if (_parquet_schemas != nullptr) {
std::shared_ptr<arrow::Field> field =
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
_output_vexpr_ctxs[i]->root()->is_nullable());
fields.emplace_back(field);
} else {
std::shared_ptr<arrow::Field> field = arrow::field(
_column_names[i], type, _output_vexpr_ctxs[i]->root()->is_nullable());
fields.emplace_back(field);
}
}
}

if (_iceberg_schema_json != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
arrow::KeyValueMetadata::Make({"iceberg.schema"}, {*_iceberg_schema_json});
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/runtime/vparquet_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <parquet/types.h>
#include <stdint.h>

#include "vec/exec/format/table/iceberg/schema.h"
#include "vfile_format_transformer.h"

namespace doris {
Expand Down Expand Up @@ -95,7 +96,8 @@ class VParquetTransformer final : public VFileFormatTransformer {
std::vector<std::string> column_names,
TParquetCompressionType::type compression_type,
bool parquet_disable_dictionary, TParquetVersion::type parquet_version,
bool output_object_data, const std::string* iceberg_schema_json = nullptr);
bool output_object_data, const std::string* iceberg_schema_json = nullptr,
const iceberg::Schema* iceberg_schema = nullptr);

VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
Expand Down Expand Up @@ -132,6 +134,7 @@ class VParquetTransformer final : public VFileFormatTransformer {
const TParquetVersion::type _parquet_version;
const std::string* _iceberg_schema_json;
uint64_t _write_size = 0;
const iceberg::Schema* _iceberg_schema;
};

} // namespace doris::vectorized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil
_file_format_transformer.reset(new VParquetTransformer(
state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names,
parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0,
false, _iceberg_schema_json));
false, _iceberg_schema_json, &_schema));
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
Expand Down
Loading

0 comments on commit 9ca8048

Please sign in to comment.