diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala index 4ea605a82a..5c000a3683 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ParquetExample.scala @@ -41,7 +41,13 @@ object ParquetExample { * These case classes represent both full and projected field mappings from the [[Account]] Avro * record. */ - case class AccountFull(id: Int, `type`: String, name: Option[String], amount: Double) + case class AccountFull( + id: Int, + `type`: String, + name: Option[String], + amount: Double, + accountStatus: Option[AccountStatus] + ) case class AccountProjection(id: Int, name: Option[String]) /** @@ -108,13 +114,11 @@ object ParquetExample { private def avroSpecificIn(sc: ScioContext, args: Args): ClosedTap[String] = { // Macros for generating column projections and row predicates - val projection = Projection[Account](_.getId, _.getName, _.getAmount) + // account_status is the only field with default value that can be left out the projection + val projection = Projection[Account](_.getId, _.getType, _.getName, _.getAmount) val predicate = Predicate[Account](x => x.getAmount > 0) sc.parquetAvroFile[Account](args("input"), projection, predicate) - // The result Account records are not complete Avro objects. Only the projected columns are present while the rest are null. - // These objects may fail serialization and it’s recommended that you map them out to tuples or case classes right after reading. - .map(x => AccountProjection(x.getId, Some(x.getName.toString))) .saveAsTextFile(args("output")) } @@ -122,7 +126,7 @@ object ParquetExample { val schema = Account.getClassSchema implicit val genericRecordCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema) - val parquetIn = sc.parquetAvroFile[GenericRecord](args("input"), schema) + val parquetIn = sc.parquetAvroGenericRecordFile(args("input"), schema) // Catches a specific bug with encoding GenericRecords read by parquet-avro parquetIn @@ -146,12 +150,19 @@ object ParquetExample { // but close to `parquet.block.size`, i.e. 1 GiB. This guarantees that each file contains 1 row group only and reduces seeks. .saveAsParquetAvroFile(args("output"), numShards = 1, conf = fineTunedParquetWriterConfig) + private[extra] def toScalaFull(account: Account): AccountFull = + AccountFull( + account.getId, + account.getType.toString, + Some(account.getName.toString), + account.getAmount, + Some(account.getAccountStatus) + ) + private def typedOut(sc: ScioContext, args: Args): ClosedTap[AccountFull] = sc.parallelize(fakeData) - .map(x => AccountFull(x.getId, x.getType.toString, Some(x.getName.toString), x.getAmount)) - .saveAsTypedParquetFile( - args("output") - ) + .map(toScalaFull) + .saveAsTypedParquetFile(args("output")) private[extra] def toExample(account: Account): Example = { val amount = Feature diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala index 554a3a0095..89cc90c21d 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/ParquetExampleTest.scala @@ -46,7 +46,8 @@ class ParquetExampleTest extends PipelineSpec { "ParquetExample" should "work for SpecificRecord input" in { val expected = ParquetExample.fakeData - .map(x => AccountProjection(x.getId, Some(x.getName.toString))) + // set default value on field outside projection + .map(x => Account.newBuilder(x).setAccountStatus(null).build()) .map(_.toString) JobTest[com.spotify.scio.examples.extra.ParquetExample.type] @@ -79,8 +80,7 @@ class ParquetExampleTest extends PipelineSpec { } it should "work for typed output" in { - val expected = ParquetExample.fakeData - .map(a => AccountFull(a.getId, a.getType.toString, Some(a.getName.toString), a.getAmount)) + val expected = ParquetExample.fakeData.map(ParquetExample.toScalaFull) JobTest[com.spotify.scio.examples.extra.ParquetExample.type] .args("--output=out.parquet", "--method=typedOut")