From 4e7f91837e3db731ce20e53fbcf5c6cf43baf3b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=2E=20Veyri=C3=A9?= Date: Thu, 4 Jul 2024 13:22:06 +0200 Subject: [PATCH] Support BigQuery JSON column type (#5416) Co-authored-by: Michel Davit --- .../scio/bigquery/PopulateTestData.scala | 25 ++++++++++++----- .../bigquery/BigQueryAvroUtilsWrapper.java | 1 + .../spotify/scio/bigquery/StorageUtil.scala | 1 + .../bigquery/types/ConverterProvider.scala | 14 +++++++--- .../scio/bigquery/types/SchemaProvider.scala | 1 + .../scio/bigquery/types/TypeProvider.scala | 27 +++++++------------ .../spotify/scio/bigquery/types/package.scala | 10 +++++++ .../types/ConverterProviderSpec.scala | 1 + .../types/ConverterProviderTest.scala | 9 +++++++ .../bigquery/types/SchemaProviderTest.scala | 3 ++- .../spotify/scio/bigquery/types/Schemas.scala | 9 ++++--- .../bigquery/types/TypeProviderTest.scala | 15 +++++++++++ 12 files changed, 86 insertions(+), 30 deletions(-) diff --git a/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala b/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala index 1141404a3e..49879371b2 100644 --- a/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala +++ b/integration/src/main/scala/com/spotify/scio/bigquery/PopulateTestData.scala @@ -21,6 +21,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException import com.google.api.services.bigquery.model.{Dataset, DatasetReference} import com.google.protobuf.ByteString import com.spotify.scio.bigquery.client.BigQuery +import com.spotify.scio.bigquery.types.{Geography, Json} import org.joda.time._ import org.joda.time.format.DateTimeFormat import org.slf4j.LoggerFactory @@ -50,7 +51,9 @@ object PopulateTestData { timestamp: Instant, date: LocalDate, time: LocalTime, - datetime: LocalDateTime + datetime: LocalDateTime, + geography: Geography, + json: Json ) @BigQueryType.toTable @@ -64,7 +67,9 @@ object PopulateTestData { timestamp: Option[Instant], date: Option[LocalDate], time: Option[LocalTime], - datetime: Option[LocalDateTime] + datetime: Option[LocalDateTime], + geography: Option[Geography], + json: Option[Json] ) @BigQueryType.toTable @@ -78,7 +83,9 @@ object PopulateTestData { timestamp: List[Instant], date: List[LocalDate], time: List[LocalTime], - datetime: List[LocalDateTime] + datetime: List[LocalDateTime], + geography: List[Geography], + json: List[Json] ) case class Record(int: Long, string: String) @@ -200,7 +207,9 @@ object PopulateTestData { t.plus(Duration.millis(i.toLong)), dt.toLocalDate.plusDays(i), dt.toLocalTime.plusMillis(i), - dt.toLocalDateTime.plusMillis(i) + dt.toLocalDateTime.plusMillis(i), + Geography(s"POINT($i $i)"), + Json(s"""{"value": $i}""") ) } @@ -217,7 +226,9 @@ object PopulateTestData { Some(t.plus(Duration.millis(i.toLong))), Some(dt.toLocalDate.plusDays(i)), Some(dt.toLocalTime.plusMillis(i)), - Some(dt.toLocalDateTime.plusMillis(i)) + Some(dt.toLocalDateTime.plusMillis(i)), + Some(Geography(s"POINT($i $i)")), + Some(Json(s"""{"value": $i}""")) ) } @@ -234,7 +245,9 @@ object PopulateTestData { List(t.plus(Duration.millis(i.toLong))), List(dt.toLocalDate.plusDays(i)), List(dt.toLocalTime.plusMillis(i)), - List(dt.toLocalDateTime.plusMillis(i)) + List(dt.toLocalDateTime.plusMillis(i)), + List(Geography(s"POINT($i $i)")), + List(Json(s"""{"value": $i}""")) ) } } diff --git a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/gcp/bigquery/BigQueryAvroUtilsWrapper.java b/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/gcp/bigquery/BigQueryAvroUtilsWrapper.java index 40bda08815..a1af5d9c08 100644 --- a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/gcp/bigquery/BigQueryAvroUtilsWrapper.java +++ b/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/gcp/bigquery/BigQueryAvroUtilsWrapper.java @@ -47,6 +47,7 @@ public class BigQueryAvroUtilsWrapper { ImmutableMap.builder() .put("STRING", Type.STRING) .put("GEOGRAPHY", Type.STRING) + .put("JSON", Type.STRING) .put("BYTES", Type.BYTES) .put("INTEGER", Type.LONG) .put("FLOAT", Type.DOUBLE) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala index 068db3804b..d21a737ea4 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala @@ -106,6 +106,7 @@ object StorageUtil { case null => "STRING" case t if t.getName == "datetime" => "DATETIME" case t if t.getName == "geography" => "GEOGRAPHY" + case t if t.getName == "json" => "JSON" case t => throw new IllegalStateException(s"Unsupported logical type: $t") } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index 57c22697af..243c0a1e57 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -100,6 +100,8 @@ private[types] object ConverterProvider { q"_root_.com.spotify.scio.bigquery.DateTime.parse($tree.toString)" case t if t =:= typeOf[Geography] => q"_root_.com.spotify.scio.bigquery.types.Geography($tree.toString)" + case t if t =:= typeOf[Json] => + q"_root_.com.spotify.scio.bigquery.types.Json($tree.toString)" case t if isCaseClass(c)(t) => val fn = TermName("r" + t.typeSymbol.name) @@ -187,8 +189,10 @@ private[types] object ConverterProvider { case t if t =:= typeOf[LocalDateTime] => q"_root_.com.spotify.scio.bigquery.DateTime($tree)" + // different than nested record match below, even though thore are case classes case t if t =:= typeOf[Geography] => - // different than nested record match below, even though this is a case class + q"$tree.wkt" + case t if t =:= typeOf[Json] => q"$tree.wkt" case t if isCaseClass(c)(t) => // nested records @@ -289,9 +293,11 @@ private[types] object ConverterProvider { case t if t =:= typeOf[LocalDateTime] => q"_root_.com.spotify.scio.bigquery.DateTime.parse($s)" + // different than nested record match below, even though those are case classes case t if t =:= typeOf[Geography] => - // different than nested record match below, even though this is a case class q"_root_.com.spotify.scio.bigquery.types.Geography($s)" + case t if t =:= typeOf[Json] => + q"_root_.com.spotify.scio.bigquery.types.Json($s)" case t if isCaseClass(c)(t) => // nested records val fn = TermName("r" + t.typeSymbol.name) @@ -392,8 +398,10 @@ private[types] object ConverterProvider { case t if t =:= typeOf[LocalDateTime] => q"_root_.com.spotify.scio.bigquery.DateTime($tree)" + // different than nested record match below, even though those are case classes case t if t =:= typeOf[Geography] => - // different than nested record match below, even though this is a case class + q"$tree.wkt" + case t if t =:= typeOf[Json] => q"$tree.wkt" case t if isCaseClass(c)(t) => // nested records diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index d774acc67b..8370d11ca0 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -88,6 +88,7 @@ private[types] object SchemaProvider { case t if t =:= typeOf[LocalTime] => ("TIME", Iterable.empty) case t if t =:= typeOf[LocalDateTime] => ("DATETIME", Iterable.empty) case t if t =:= typeOf[Geography] => ("GEOGRAPHY", Iterable.empty) + case t if t =:= typeOf[Json] => ("JSON", Iterable.empty) case t if isCaseClass(t) => ("RECORD", toFields(t)) case _ => throw new RuntimeException(s"Unsupported type: $tpe") diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala index 8f66c316df..f1d865ede8 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala @@ -204,23 +204,14 @@ private[types] object TypeProvider { tq"${TypeName(s"Function${fields.size}")}[..${fields.map(_.children.head)}, $cName]" val traits = (if (fields.size <= 22) Seq(fnTrait) else Seq()) ++ defTblDesc .map(_ => tq"${p(c, SType)}.HasTableDescription") - val taggedFields = fields.map { - case q"$m val $n: _root_.com.spotify.scio.bigquery.types.Geography = $rhs" => - provider.initializeToTable(c)(m, n, tq"_root_.java.lang.String") - c.universe.ValDef( - c.universe.Modifiers(m.flags, m.privateWithin, m.annotations), - n, - tq"_root_.java.lang.String @${typeOf[BigQueryTag]}", - q"{$rhs}.wkt" - ) - case ValDef(m, n, tpt, rhs) => - provider.initializeToTable(c)(m, n, tpt) - c.universe.ValDef( - c.universe.Modifiers(m.flags, m.privateWithin, m.annotations), - n, - tq"$tpt @${typeOf[BigQueryTag]}", - rhs - ) + val taggedFields = fields.map { case ValDef(m, n, tpt, rhs) => + provider.initializeToTable(c)(m, n, tpt) + c.universe.ValDef( + c.universe.Modifiers(m.flags, m.privateWithin, m.annotations), + n, + tq"$tpt @${typeOf[BigQueryTag]}", + rhs + ) } val caseClassTree = q"""${caseClass(c)(mods, cName, taggedFields, body)}""" @@ -288,6 +279,8 @@ private[types] object TypeProvider { case "DATETIME" => (tq"_root_.org.joda.time.LocalDateTime", Nil) case "GEOGRAPHY" => (tq"_root_.com.spotify.scio.bigquery.types.Geography", Nil) + case "JSON" => + (tq"_root_.com.spotify.scio.bigquery.types.Json", Nil) case "RECORD" | "STRUCT" => val name = NameProvider.getUniqueName(tfs.getName) val (fields, records) = toFields(tfs.getFields) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala index 3dd1dd26b1..a286227051 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala @@ -45,4 +45,14 @@ package object types { * Well Known Text formatted string that BigQuery displays for Geography */ case class Geography(wkt: String) + + /** + * Case class to serve as raw type for Json instances to distinguish them from Strings. + * + * See also https://cloud.google.com/bigquery/docs/json-data + * + * @param wkt + * Well Known Text formatted string that BigQuery displays for Json + */ + case class Json(wkt: String) } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala index 67b309dca2..5b1d38e921 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala @@ -104,6 +104,7 @@ final class ConverterProviderSpec o.datetimeF.isDefined shouldBe r.containsKey("datetimeF") o.bigDecimalF.isDefined shouldBe r.containsKey("bigDecimalF") o.geographyF.isDefined shouldBe r.containsKey("geographyF") + o.jsonF.isDefined shouldBe r.containsKey("jsonF") } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala index 4bcf1d9d8c..591c89641b 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala @@ -46,6 +46,12 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers { BigQueryType.toTableRow[RequiredGeo](RequiredGeo(Geography(wkt))) shouldBe TableRow("a" -> wkt) } + it should "handle required json type" in { + val wkt = "{\"name\": \"Alice\", \"age\": 30}" + RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt)) + BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt) + } + it should "handle case classes with methods" in { RequiredWithMethod.fromTableRow(TableRow("a" -> "")) shouldBe RequiredWithMethod("") BigQueryType.toTableRow[RequiredWithMethod](RequiredWithMethod("")) shouldBe TableRow("a" -> "") @@ -57,6 +63,9 @@ object ConverterProviderTest { @BigQueryType.toTable case class RequiredGeo(a: Geography) + @BigQueryType.toTable + case class RequiredJson(a: Json) + @BigQueryType.toTable case class Required(a: String) diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala index fa17494516..907a2d5610 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala @@ -41,7 +41,8 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers { | {"mode": "$mode", "name": "timeF", "type": "TIME"}, | {"mode": "$mode", "name": "datetimeF", "type": "DATETIME"}, | {"mode": "$mode", "name": "bigDecimalF", "type": "NUMERIC"}, - | {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"} + | {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"}, + | {"mode": "$mode", "name": "jsonF", "type": "JSON"} |] |""".stripMargin diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala index 77e4605b12..754e80ca3f 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala @@ -36,7 +36,8 @@ object Schemas { timeF: LocalTime, datetimeF: LocalDateTime, bigDecimalF: BigDecimal, - geographyF: Geography + geographyF: Geography, + jsonF: Json ) case class Optional( boolF: Option[Boolean], @@ -52,7 +53,8 @@ object Schemas { timeF: Option[LocalTime], datetimeF: Option[LocalDateTime], bigDecimalF: Option[BigDecimal], - geographyF: Option[Geography] + geographyF: Option[Geography], + jsonF: Option[Json] ) case class Repeated( boolF: List[Boolean], @@ -68,7 +70,8 @@ object Schemas { timeF: List[LocalTime], datetimeF: List[LocalDateTime], bigDecimalF: List[BigDecimal], - geographyF: List[Geography] + geographyF: List[Geography], + jsonF: List[Json] ) // records diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala index b13e5b3a6b..861580f7d5 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala @@ -619,4 +619,19 @@ class TypeProviderTest extends AnyFlatSpec with Matchers { cc.f1 shouldBe Geography(wkt) GeoRecordTo(Geography(wkt)) } + + @BigQueryType.fromSchema(""" + |{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "JSON"}]} + """.stripMargin) + class JsonRecordFrom + + @BigQueryType.toTable + case class JsonRecordTo(f1: Json) + + it should "support JSON type" in { + val wkt = "{\"name\": \"Alice\", \"age\": 30}" + val cc = JsonRecordFrom(Json(wkt)) + cc.f1 shouldBe Json(wkt) + JsonRecordTo(Json(wkt)) + } }