Skip to content

Commit

Permalink
doc: Update doc regarding avro version and logical-type support (#5435)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Jul 24, 2024
1 parent d97641c commit ab6eef6
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 122 deletions.
87 changes: 7 additions & 80 deletions site/src/main/paradox/extras/Sort-Merge-Bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,99 +97,26 @@ be manually read by a downstream user.

## Avro String keys

If you're using `AvroSortedBucketIO`, be aware of how Avro String fields are decoded. Configuration
errors can result in the following runtime exception:
As of **Scio 0.14.0**, Avro `CharSequence` are backed by `String` instead of default `Utf8`.
With previous versions you may encounter the following when using Avro `CharSequence` keys:

```bash
Cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String
[info] at org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:37)
[info] at org.apache.beam.sdk.extensions.smb.BucketMetadata.encodeKeyBytes(BucketMetadata.java:222)
```

### SpecificRecords

Scio 0.10.4 specifically has a bug in the default String decoding behavior for `SpecificRecords`: by default,
they're decoded at runtime into `org.apache.avro.util.Utf8` objects, rather than `java.lang.String`s
(the generated getter/setter signatures use `CharSequence` as an umbrella type). This bug has been fixed in
Scio 0.11+. If you cannot upgrade, you can mitigate this by ensuring your `SpecificRecord` schema has the property
`java-class: java.lang.String` set in the key field. This can be done either in the avsc/avdl schema or in
Java/Scala code:

```scala
val mySchema: org.apache.avro.Schema = ???
mySchema
.getField("keyField")
.schema()
.addProp(
org.apache.avro.specific.SpecificData.CLASS_PROP,
"java.lang.String".asInstanceOf[Object]
)
```

Note: If you're using [sbt-avro](https://github.com/sbt/sbt-avro#examples) for schema generation, you can
just set the SBT property `avroStringType := "String"` instead.

### GenericRecords

For GenericRecords, `org.apache.avro.util.Utf8` decoding has always been the default. If you're reading
Avro GenericRecords in your SMB join, set the `avro.java.string: String` property in the Schema of the key field.

```scala
val mySchema: org.apache.avro.Schema = ???
mySchema
.getField("keyField")
.schema()
.addProp(
org.apache.avro.generic.GenericData.STRING_PROP,
"String".asInstanceOf[Object]
)
```
You'll have to either recompile your avro schema using `String` type,
or add the `GenericData.StringType.String` property to your Avro schema with [setStringType](https://avro.apache.org/docs/1.11.1/api/java/org/apache/avro/generic/GenericData.html#setStringType-org.apache.avro.Schema-org.apache.avro.generic.GenericData.StringType-)

## Parquet

SMB supports Parquet reads and writes in both Avro and case class formats.

As of **Scio 0.14.0** and above, Scio supports logical types in parquet-avro out of the box.

Earlier versions of Scio require you to manually supply a _logical type supplier_ in your Parquet `Configuration` parameter:

```scala mdoc:fail:silent
import org.apache.avro.specific.SpecificRecordBase

import org.apache.beam.sdk.extensions.smb.{AvroLogicalTypeSupplier, ParquetAvroSortedBucketIO}
import org.apache.beam.sdk.values.TupleTag
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}
import com.spotify.scio.avro.TestRecord

// Reads
val readConf = new Configuration()
readConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])

ParquetAvroSortedBucketIO
.read[TestRecord](new TupleTag[TestRecord], classOf[TestRecord])
.withConfiguration(readConf)

// Writes
val writeConf = new Configuration()
writeConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])

ParquetAvroSortedBucketIO
.write(classOf[String], "myKeyField", classOf[TestRecord])
.withConfiguration(writeConf)

// Transforms
val transformConf = new Configuration()
transformConf.setClass(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])
transformConf.setClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, classOf[AvroLogicalTypeSupplier], classOf[AvroDataSupplier])

ParquetAvroSortedBucketIO
.transformOutput(classOf[String], "myKeyField", classOf[TestRecord])
.withConfiguration(transformConf)
```
As of **Scio 0.14.0** and above, Scio supports specific record logical types in parquet-avro out of the box.

Note that if you're using a non-default Avro version (i.e. Avro 1.11), you'll have to supply a custom logical type supplier
using Avro 1.11 classes. See @ref:[Logical Types in Parquet](../io/Parquet.md#logical-types) for more information.
When using generic record, you have to manually supply a _data supplier_ in your Parquet `Configuration` parameter.
See @ref:[Logical Types in Parquet](../io/Parquet.md#logical-types) for more information.

## Tuning parameters for SMB transforms

Expand Down
70 changes: 28 additions & 42 deletions site/src/main/paradox/io/Parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ object ParquetJob {

### Write Avro to Parquet files

Both Avro [generic](https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/generic/GenericData.Record.html) and [specific](https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/specific/package-summary.html) records are supported when writing.
Both Avro [generic](https://avro.apache.org/docs/current/api/java/org/apache/avro/generic/GenericData.Record.html) and [specific](https://avro.apache.org/docs/current/api/java/org/apache/avro/specific/package-summary.html) records are supported when writing.

Type of Avro specific records will hold information about schema,
therefore Scio will figure out the schema by itself:
Expand Down Expand Up @@ -117,66 +117,52 @@ def result = input.saveAsParquetAvroFile("gs://path-to-data/lake/output", schema

### Logical Types

As of **Scio 0.14.0** and above, Scio supports logical types in parquet-avro out of the box.
As of **Scio 0.14.0** and above, Scio supports specific record logical types in parquet-avro out of the box.

If you're on an earlier version of Scio and your Avro schema contains a logical type, you'll need to supply an additional Configuration parameter for your reads and writes.
When using generic record you'll need to supply the additional Configuration parameter
`AvroReadSupport.AVRO_DATA_SUPPLIER` for reads or `AvroWriteSupport.AVRO_DATA_SUPPLIER` for writes to use logical types.

If you're using the default version of Avro (1.8), you can use Scio's pre-built logical type conversions:

```scala mdoc:fail:silent
```scala mdoc:compile-only
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.parquet.avro._
import com.spotify.scio.avro.TestRecord
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.values.SCollection
import org.apache.avro.Conversions
import org.apache.avro.generic.GenericRecord
import org.apache.avro.data.TimeConversions
import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}

val sc: ScioContext = ???
val data: SCollection[TestRecord] = sc.parallelize(List[TestRecord]())
implicit val coder: Coder[GenericRecord] = ???
val data: SCollection[GenericRecord] = ???

// Reads
import com.spotify.scio.parquet.ParquetConfiguration
class AvroLogicalTypeSupplier extends AvroDataSupplier {
override def get(): GenericData = {
val data = GenericData.get()

import org.apache.parquet.avro.AvroReadSupport
// Add conversions as needed
data.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion())

data
}
}

// Reads
sc.parquetAvroFile(
"somePath",
conf = ParquetConfiguration.of(AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier])
conf = ParquetConfiguration.of(AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[AvroLogicalTypeSupplier])
)

// Writes
import org.apache.parquet.avro.AvroWriteSupport

data.saveAsParquetAvroFile(
"somePath",
conf = ParquetConfiguration.of(AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier])
conf = ParquetConfiguration.of(AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[AvroLogicalTypeSupplier])
)
```

(If you're using `scio-smb`, you can use the provided class `org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier` instead.)

If you're using Avro 1.11, you'll have to create your own logical type supplier class, as Scio's `LogicalTypeSupplier` uses
classes present in Avro 1.8 but not 1.11. A sample Avro 1.11 logical-type supplier might look like:

```scala
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.parquet.avro.AvroDataSupplier;

case class AvroLogicalTypeSupplier() extends AvroDataSupplier {
override def get(): GenericData = {
val specificData = SpecificData.get()

// Add conversions as needed
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion())

specificData
}
}
```

Then, you'll have to specify your logical type supplier class in your `Configuration` as outlined above.

## Case classes

Scio uses [magnolify-parquet](https://github.com/spotify/magnolify/blob/master/docs/parquet.md) to derive Parquet reader and writer for case classes at compile time, similar to how @ref:[coders](../internals/Coders.md) work. See this [mapping table](https://github.com/spotify/magnolify/blob/master/docs/mapping.md) for how Scala and Parquet types map; enum type mapping is also specifically [documented](https://github.com/spotify/magnolify/blob/main/docs/enums.md).
Expand Down

0 comments on commit ab6eef6

Please sign in to comment.