Skip to content

Commit

Permalink
Flink 1.20: Update Flink to use planned Avro reads (#11386)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre authored Oct 29, 2024
1 parent 1e3ee1e commit 602c2b2
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,28 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

/**
* @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
*/
@Deprecated
public class FlinkAvroReader implements DatumReader<RowData>, SupportsRowPosition {

private final Schema readSchema;
private final ValueReader<RowData> reader;
private Schema fileSchema = null;

/**
* @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
*/
@Deprecated
public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) {
this(expectedSchema, readSchema, ImmutableMap.of());
}

/**
* @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
*/
@Deprecated
@SuppressWarnings("unchecked")
public FlinkAvroReader(
org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map<Integer, ?> constants) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.data;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.avro.AvroWithPartnerVisitor;
import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;

public class FlinkPlannedAvroReader implements DatumReader<RowData>, SupportsRowPosition {

private final Types.StructType expectedType;
private final Map<Integer, ?> idToConstant;
private ValueReader<RowData> reader;

public static FlinkPlannedAvroReader create(org.apache.iceberg.Schema schema) {
return create(schema, ImmutableMap.of());
}

public static FlinkPlannedAvroReader create(
org.apache.iceberg.Schema schema, Map<Integer, ?> constants) {
return new FlinkPlannedAvroReader(schema, constants);
}

private FlinkPlannedAvroReader(
org.apache.iceberg.Schema expectedSchema, Map<Integer, ?> constants) {
this.expectedType = expectedSchema.asStruct();
this.idToConstant = constants;
}

@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema fileSchema) {
this.reader =
(ValueReader<RowData>)
AvroWithPartnerVisitor.visit(
expectedType,
fileSchema,
new ReadBuilder(idToConstant),
AvroWithPartnerVisitor.FieldIDAccessors.get());
}

@Override
public RowData read(RowData reuse, Decoder decoder) throws IOException {
return reader.read(decoder, reuse);
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}

private static class ReadBuilder extends AvroWithPartnerVisitor<Type, ValueReader<?>> {
private final Map<Integer, ?> idToConstant;

private ReadBuilder(Map<Integer, ?> idToConstant) {
this.idToConstant = idToConstant;
}

@Override
public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> fieldReaders) {
if (partner == null) {
return ValueReaders.skipStruct(fieldReaders);
}

Types.StructType expected = partner.asStructType();
List<Pair<Integer, ValueReader<?>>> readPlan =
ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant);

// TODO: should this pass expected so that struct.get can reuse containers?
return FlinkValueReaders.struct(readPlan, expected.fields().size());
}

@Override
public ValueReader<?> union(Type partner, Schema union, List<ValueReader<?>> options) {
return ValueReaders.union(options);
}

@Override
public ValueReader<?> array(Type partner, Schema array, ValueReader<?> elementReader) {
return FlinkValueReaders.array(elementReader);
}

@Override
public ValueReader<?> arrayMap(
Type partner, Schema map, ValueReader<?> keyReader, ValueReader<?> valueReader) {
return FlinkValueReaders.arrayMap(keyReader, valueReader);
}

@Override
public ValueReader<?> map(Type partner, Schema map, ValueReader<?> valueReader) {
return FlinkValueReaders.map(FlinkValueReaders.strings(), valueReader);
}

@Override
public ValueReader<?> primitive(Type partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
case "date":
// Flink uses the same representation
return ValueReaders.ints();

case "time-micros":
return FlinkValueReaders.timeMicros();

case "timestamp-millis":
return FlinkValueReaders.timestampMills();

case "timestamp-micros":
return FlinkValueReaders.timestampMicros();

case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return FlinkValueReaders.decimal(
ValueReaders.decimalBytesReader(primitive),
decimal.getPrecision(),
decimal.getScale());

case "uuid":
return FlinkValueReaders.uuids();

default:
throw new IllegalArgumentException("Unknown logical type: " + logicalType.getName());
}
}

switch (primitive.getType()) {
case NULL:
return ValueReaders.nulls();
case BOOLEAN:
return ValueReaders.booleans();
case INT:
if (partner != null && partner.typeId() == Type.TypeID.LONG) {
return ValueReaders.intsAsLongs();
}
return ValueReaders.ints();
case LONG:
return ValueReaders.longs();
case FLOAT:
if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
return ValueReaders.floatsAsDoubles();
}
return ValueReaders.floats();
case DOUBLE:
return ValueReaders.doubles();
case STRING:
return FlinkValueReaders.strings();
case FIXED:
return ValueReaders.fixed(primitive.getFixedSize());
case BYTES:
return ValueReaders.bytes();
case ENUM:
return FlinkValueReaders.enums(primitive.getEnumSymbols());
default:
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;

public class FlinkValueReaders {

Expand Down Expand Up @@ -86,6 +87,10 @@ static ValueReader<MapData> map(ValueReader<?> keyReader, ValueReader<?> valueRe
return new MapReader(keyReader, valueReader);
}

static ValueReader<RowData> struct(List<Pair<Integer, ValueReader<?>>> readPlan, int numFields) {
return new PlannedStructReader(readPlan, numFields);
}

static ValueReader<RowData> struct(
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
return new StructReader(readers, struct, idToConstant);
Expand Down Expand Up @@ -282,6 +287,33 @@ public MapData read(Decoder decoder, Object reuse) throws IOException {
}
}

private static class PlannedStructReader extends ValueReaders.PlannedStructReader<RowData> {
private final int numFields;

private PlannedStructReader(List<Pair<Integer, ValueReader<?>>> readPlan, int numFields) {
super(readPlan);
this.numFields = numFields;
}

@Override
protected RowData reuseOrCreate(Object reuse) {
if (reuse instanceof GenericRowData && ((GenericRowData) reuse).getArity() == numFields) {
return (RowData) reuse;
}
return new GenericRowData(numFields);
}

@Override
protected Object get(RowData struct, int pos) {
return null;
}

@Override
protected void set(RowData struct, int pos, Object value) {
((GenericRowData) struct).setField(pos, value);
}
}

private static class StructReader extends ValueReaders.StructReader<RowData> {
private final int numFields;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkSourceFilter;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.FlinkAvroReader;
import org.apache.iceberg.flink.data.FlinkOrcReader;
import org.apache.iceberg.flink.data.FlinkParquetReaders;
import org.apache.iceberg.flink.data.FlinkPlannedAvroReader;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.flink.data.RowDataUtil;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -154,7 +154,7 @@ private CloseableIterable<RowData> newAvroIterable(
.reuseContainers()
.project(schema)
.split(task.start(), task.length())
.createReaderFunc(readSchema -> new FlinkAvroReader(schema, readSchema, idToConstant));
.createReaderFunc(readSchema -> FlinkPlannedAvroReader.create(schema, idToConstant));

if (nameMapping != null) {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
try (CloseableIterable<RowData> reader =
Avro.read(Files.localInput(recordsFile))
.project(schema)
.createReaderFunc(FlinkAvroReader::new)
.createResolvingReader(FlinkPlannedAvroReader::create)
.build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private RowData writeAndRead(String desc, Schema writeSchema, Schema readSchema,
Iterable<RowData> records =
Avro.read(Files.localInput(file))
.project(readSchema)
.createReaderFunc(FlinkAvroReader::new)
.createResolvingReader(FlinkPlannedAvroReader::create)
.build();

return Iterables.getOnlyElement(records);
Expand Down

0 comments on commit 602c2b2

Please sign in to comment.