From 158c9bf230797dbd39f23b621f607c4f22c297fc Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Thu, 28 Nov 2024 10:32:37 +0100 Subject: [PATCH] Stable behavior --- .../scio/bigquery/TypedBigQueryIT.scala | 3 +- .../spotify/scio/bigquery/BigQueryIO.scala | 66 +++++++++++++------ .../spotify/scio/bigquery/BigQueryTypes.scala | 6 ++ .../bigquery/syntax/SCollectionSyntax.scala | 2 +- .../bigquery/syntax/ScioContextSyntax.scala | 3 +- .../bigquery/types/ConverterProvider.scala | 4 +- .../scio/bigquery/types/TypeProvider.scala | 2 +- 7 files changed, 60 insertions(+), 26 deletions(-) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index aa04fa5744..a7d15091b5 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -184,7 +184,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { waitForTable(avroTable) runWithRealContext(options) { sc => - val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro) + val data = + sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro) data should containInAnyOrder(records) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 0498e9a76d..dc257cd06a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -328,7 +328,11 @@ object BigQueryTypedTable { /** Defines the format in which BigQuery can be read and written to. */ sealed abstract class Format[F] object Format { - case object GenericRecord extends Format[GenericRecord] + sealed abstract private[bigquery] class AvroFormat(val useLogicalTypes: Boolean) + extends Format[GenericRecord] + + case object GenericRecord extends AvroFormat(false) + case object GenericRecordWithLogicalTypes extends AvroFormat(true) case object TableRow extends Format[TableRow] } @@ -389,25 +393,31 @@ object BigQueryTypedTable { )(coders.tableRowCoder) private[this] def genericRecord( - table: Table + table: Table, + useLogicalTypes: Boolean )(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] = BigQueryTypedTable( - _.getRecord(), - identity[GenericRecord], - (genericRecord: GenericRecord, _: TableSchema) => genericRecord, - table + beam.BigQueryIO + .read(_.getRecord) + .pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r), + beam.BigQueryIO + .write[GenericRecord]() + .withAvroFormatFunction(_.getElement) + .pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r), + table, + (genericRecord: GenericRecord, _: TableSchema) => genericRecord ) /** * Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]]. * * NOTE: LogicalType support when using `Format.GenericRecord` has some caveats: Reading: Bigquery - * types DATE, TIME, DATIME will be read as STRING. Writing: Supports LogicalTypes only for DATE - * and TIME. DATETIME is not yet supported. https://issuetracker.google.com/issues/140681683 + * types DATE, TIME, DATEIME will be read as STRING. Use `Format.GenericRecordWithLogicalTypes` + * for avro `date`, `timestamp-micros` and `local-timestamp-micros` (avro 1.10+) */ def apply[F: Coder](table: Table, format: Format[F]): BigQueryTypedTable[F] = format match { - case Format.GenericRecord => genericRecord(table) + case f: Format.AvroFormat => genericRecord(table, f.useLogicalTypes) case Format.TableRow => tableRow(table) } @@ -437,16 +447,11 @@ object BigQueryTypedTable { ): BigQueryTypedTable[T] = { val rFn = ClosureCleaner.clean(readerFn) val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO - .read(rFn(_)) - .useAvroLogicalTypes() + val reader = beam.BigQueryIO.read(rFn(_)) val writer = beam.BigQueryIO .write[T]() .useAvroLogicalTypes() .withAvroFormatFunction(input => wFn(input.getElement())) - .withAvroSchemaFactory { ts => - BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true) - } BigQueryTypedTable(reader, writer, table, fn) } @@ -740,12 +745,31 @@ object BigQueryTyped { override type ReadP = Unit override type WriteP = Table.WriteParam[T] - private val underlying = BigQueryTypedTable[T]( - (i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord), - BigQueryType[T].toTableRow, - BigQueryType[T].fromTableRow, - table - ) + private val underlying = { + val readFn = Functions.serializableFn[SchemaAndRecord, T] { x => + BigQueryType[T].fromAvro(x.getRecord) + } + val writeFn = Functions.serializableFn[AvroWriteRequest[T], GenericRecord] { x => + BigQueryType[T].toAvro(x.getElement) + } + val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ => + BigQueryType[T].avroSchema + } + val parseFn = (r: GenericRecord, _: TableSchema) => BigQueryType[T].fromAvro(r) + + BigQueryTypedTable[T]( + beam.BigQueryIO + .read(readFn) + .useAvroLogicalTypes(), + beam.BigQueryIO + .write[T]() + .withAvroFormatFunction(writeFn) + .withAvroSchemaFactory(schemaFactory) + .useAvroLogicalTypes(), + table, + parseFn + ) + } override def testId: String = s"BigQueryIO(${table.spec})" diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 43f2ef748d..460ff56d47 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -257,6 +257,12 @@ object DateTime { /** Convert BigQuery `DATETIME` string to `LocalDateTime`. */ def parse(datetime: String): LocalDateTime = Parser.parseLocalDateTime(datetime) + + // For BigQueryType macros only, do not use directly + // read + def format(datetime: LocalDateTime): String = apply(datetime) + // write + def micros(datetime: LocalDateTime): Long = datetime.getMillisOfDay * 1000 } /** Scala wrapper for [[com.google.api.services.bigquery.model.TimePartitioning]]. */ diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala index ff2a6ee079..2e7cd64b4e 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala @@ -203,7 +203,7 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC self .covary[GenericRecord] .write( - BigQueryTypedTable(table, Format.GenericRecord)( + BigQueryTypedTable(table, Format.GenericRecordWithLogicalTypes)( self.coder.asInstanceOf[Coder[GenericRecord]] ) )(param) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala index fa82cffe93..80d39668cd 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala @@ -79,7 +79,8 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * Reading records as GenericRecord **should** offer better performance over TableRow records. * * Note: When using `Format.GenericRecord` Bigquery types DATE, TIME and DATETIME are read as - * STRING. + * STRING. Use `Format.GenericRecordWithLogicalTypes` for avro `date`, `timestamp-micros` and + * `local-timestamp-micros` (avro 1.10+) */ def bigQueryTable[F: Coder](table: Table, format: Format[F]): SCollection[F] = self.read(BigQueryTypedTable(table, format)) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index 2a3ce84045..08e656660b 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -193,7 +193,9 @@ private[types] object ConverterProvider { case t if t =:= typeOf[LocalTime] => q"_root_.com.spotify.scio.bigquery.Time.micros($tree)" case t if t =:= typeOf[LocalDateTime] => - q"_root_.com.spotify.scio.bigquery.DateTime($tree)" + // LocalDateTime is read as avro string + // on write we should use `local-timestamp-micros` + q"_root_.com.spotify.scio.bigquery.DateTime.format($tree)" // different than nested record match below, even though thore are case classes case t if t =:= typeOf[Geography] => diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala index 24370aed6b..48626d69ba 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala @@ -334,7 +334,7 @@ private[types] object TypeProvider { q"override def schema: ${p(c, GModel)}.TableSchema = ${p(c, SUtil)}.parseSchema(${schema.toString})" } val defAvroSchema = - q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(${cName.toString}, this.schema.getFields)" + q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(this.schema, true)" val defToPrettyString = q"override def toPrettyString(indent: Int = 0): String = ${p(c, s"$SBQ.types.SchemaUtil")}.toPrettyString(this.schema, ${cName.toString}, indent)"