Skip to content

Commit

Permalink
BigQuery JSON column: encode as Jackson JsonNode (#5523)
Browse files Browse the repository at this point in the history
Co-authored-by: Michel Davit <[email protected]>
  • Loading branch information
turb and RustedBones authored Nov 18, 2024
1 parent cd95610 commit 49f43b0
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 122 deletions.
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[DirectAbstractMethodProblem](
"org.apache.beam.sdk.coders.Coder.getCoderArguments"
),
// added BQ Json object
ProblemFilters.exclude[MissingTypesProblem](
"com.spotify.scio.bigquery.types.package$Json$"
)
)

Expand Down Expand Up @@ -962,6 +966,7 @@ lazy val `scio-google-cloud-platform` = project
libraryDependencies ++= Seq(
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.google.api" % "gax" % gcpBom.key.value,
"com.google.api" % "gax-grpc" % gcpBom.key.value,
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package com.spotify.scio.bigquery

import com.google.protobuf.ByteString
import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.bigquery.BigQueryTypedTable.Format
import com.spotify.scio.bigquery.client.BigQuery
import com.spotify.scio.bigquery.types.{BigNumeric, Geography, Json}
import com.spotify.scio.testing._
import magnolify.scalacheck.auto._
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import org.joda.time.format.DateTimeFormat
Expand All @@ -42,24 +42,53 @@ object TypedBigQueryIT {
long: Long,
float: Float,
double: Double,
numeric: BigDecimal,
string: String,
byteString: ByteString,
timestamp: Instant,
date: LocalDate,
time: LocalTime,
datetime: LocalDateTime
datetime: LocalDateTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric
)

// Workaround for millis rounding error
val epochGen: Gen[Long] = Gen.chooseNum[Long](0L, 1000000000000L).map(x => x / 1000 * 1000)
def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary {
val max = BigInt(10).pow(precision) - 1
Gen.choose(-max, max).map(BigDecimal(_, scale))
}

implicit val arbNumeric: Arbitrary[BigDecimal] =
arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale)
implicit val arbString: Arbitrary[String] = Arbitrary(Gen.alphaStr)
implicit val arbByteString: Arbitrary[ByteString] = Arbitrary(
Gen.alphaStr.map(ByteString.copyFromUtf8)
)
// Workaround for millis rounding error
val epochGen: Gen[Long] = Gen.chooseNum[Long](0L, 1000000000000L).map(x => x / 1000 * 1000)
implicit val arbInstant: Arbitrary[Instant] = Arbitrary(epochGen.map(new Instant(_)))
implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(epochGen.map(new LocalDate(_)))
implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(epochGen.map(new LocalTime(_)))
implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(epochGen.map(new LocalDateTime(_)))
implicit val arbGeography: Arbitrary[Geography] = Arbitrary(
for {
x <- Gen.numChar
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary
.map(BigNumeric.apply)
}

private val recordGen =
implicitly[Arbitrary[Record]].arbitrary
Expand All @@ -71,9 +100,9 @@ object TypedBigQueryIT {
s"data-integration-test:bigquery_avro_it.$name${now}_${Random.nextInt(Int.MaxValue)}"
Table.Spec(spec)
}
private val typedTable = table("records")
private val tableRowTable = table("records_tablerow")
private val avroTable = table("records_avro")
private val avroLogicalTypeTable = table("records_avro_logical_type")

private val records = Gen.listOfN(100, recordGen).sample.get
private val options = PipelineOptionsFactory
Expand All @@ -87,122 +116,57 @@ object TypedBigQueryIT {
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
import TypedBigQueryIT._

override protected def beforeAll(): Unit = {
val sc = ScioContext(options)
sc.parallelize(records).saveAsTypedBigQueryTable(tableRowTable)

sc.run()
()
}

override protected def afterAll(): Unit = {
BigQuery.defaultInstance().tables.delete(tableRowTable.ref)
BigQuery.defaultInstance().tables.delete(avroTable.ref)
BigQuery.defaultInstance().tables.delete(avroLogicalTypeTable.ref)
}

"TypedBigQuery" should "read records" in {
val sc = ScioContext(options)
sc.typedBigQuery[Record](tableRowTable) should containInAnyOrder(records)
sc.run()
val bq = BigQuery.defaultInstance()
bq.tables.delete(typedTable.ref)
bq.tables.delete(tableRowTable.ref)
bq.tables.delete(avroTable.ref)
}

it should "convert to avro format" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
sc.typedBigQuery[Record](tableRowTable)
.map(Record.toAvro)
.map(Record.fromAvro) should containInAnyOrder(
records
)
sc.run()
}
"TypedBigQuery" should "handle records as TableRow" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED)
}.waitUntilFinish()

"BigQueryTypedTable" should "read TableRow records" in {
val sc = ScioContext(options)
sc
.bigQueryTable(tableRowTable)
.map(Record.fromTableRow) should containInAnyOrder(records)
sc.run()
runWithRealContext(options) { sc =>
val data = sc.typedBigQuery[Record](typedTable)
data should containInAnyOrder(records)
}
}

it should "read GenericRecord recors" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
sc
.bigQueryTable(tableRowTable, Format.GenericRecord)
.map(Record.fromAvro) should containInAnyOrder(records)
sc.run()
"BigQueryTypedTable" should "handle records as TableRow format" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toTableRow)
.saveAsBigQueryTable(
tableRowTable,
schema = Record.schema,
createDisposition = CREATE_IF_NEEDED
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow)
data should containInAnyOrder(records)
}
}

it should "write GenericRecord records" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
val schema =
BigQueryUtil.parseSchema("""
|{
| "fields": [
| {"mode": "NULLABLE", "name": "bool", "type": "BOOLEAN"},
| {"mode": "NULLABLE", "name": "int", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "long", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "float", "type": "FLOAT"},
| {"mode": "NULLABLE", "name": "double", "type": "FLOAT"},
| {"mode": "NULLABLE", "name": "string", "type": "STRING"},
| {"mode": "NULLABLE", "name": "byteString", "type": "BYTES"},
| {"mode": "NULLABLE", "name": "timestamp", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "date", "type": "STRING"},
| {"mode": "NULLABLE", "name": "time", "type": "STRING"},
| {"mode": "NULLABLE", "name": "datetime", "type": "STRING"}
| ]
|}
""".stripMargin)
val tap = sc
.bigQueryTable(tableRowTable, Format.GenericRecord)
.saveAsBigQueryTable(avroTable, schema = schema, createDisposition = CREATE_IF_NEEDED)

val result = sc.run().waitUntilDone()
result.tap(tap).map(Record.fromAvro).value.toSet shouldBe records.toSet
it should "handle records as avro format" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toAvro)
.saveAsBigQueryTable(
avroTable,
schema = Record.schema, // This is a bad API. an avro schema should be expected
createDisposition = CREATE_IF_NEEDED
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
data should containInAnyOrder(records)
}
}

it should "write GenericRecord records with logical types" in {
val sc = ScioContext(options)
// format: off
val schema: Schema = SchemaBuilder
.record("Record")
.namespace("com.spotify.scio.bigquery")
.fields()
.name("date").`type`(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).withDefault(0)
.name("time").`type`(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG))).withDefault(0L)
.name("datetime").`type`().stringType().stringDefault("")
.endRecord()
// format: on

implicit val coder = avroGenericRecordCoder(schema)
val ltRecords: Seq[GenericRecord] =
Seq(
new GenericRecordBuilder(schema)
.set("date", 10)
.set("time", 1000L)
.set("datetime", "2020-08-03 11:11:11")
.build()
)

val tableSchema =
BigQueryUtil.parseSchema("""
|{
| "fields": [
| {"mode": "REQUIRED", "name": "date", "type": "DATE"},
| {"mode": "REQUIRED", "name": "time", "type": "TIME"},
| {"mode": "REQUIRED", "name": "datetime", "type": "STRING"}
| ]
|}
""".stripMargin)
val tap = sc
.parallelize(ltRecords)
.saveAsBigQueryTable(avroLogicalTypeTable, tableSchema, createDisposition = CREATE_IF_NEEDED)

val result = sc.run().waitUntilDone()
result.tap(tap).value.toList.size shouldBe 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,11 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
// for TableRow/json, use JSON to prevent escaping
q"_root_.com.spotify.scio.bigquery.types.Json.parse($tree)"
case t if t =:= typeOf[BigNumeric] =>
q"$tree.wkt"
// for TableRow/json, use string to avoid precision loss (like numeric)
q"$tree.wkt.toString"

case t if isCaseClass(c)(t) => // nested records
val fn = TermName("r" + t.typeSymbol.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.spotify.scio.coders.Coder
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes
Expand Down Expand Up @@ -53,14 +54,20 @@ package object types {
case class Geography(wkt: String)

/**
* Case class to serve as raw type for Json instances to distinguish them from Strings.
* Case class to serve as raw type for Json instances.
*
* See also https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
*
* @param wkt
* Well Known Text formatted string that BigQuery displays for Json
*/
case class Json(wkt: String)
object Json {
private lazy val mapper = new ObjectMapper()

def apply(node: JsonNode): Json = Json(mapper.writeValueAsString(node))
def parse(json: Json): JsonNode = mapper.readTree(json.wkt)
}

/**
* Case class to serve as BigNumeric type to distinguish them from Numeric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ final class ConverterProviderSpec
.retryUntil(_.precision <= Numeric.MaxNumericPrecision)
.map(Numeric.apply)
}
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)
implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
implicit val eqInstant: Eq[Instant] = Eq.instance[Instant](_ == _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery.types

import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode}
import com.spotify.scio.bigquery._
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec
Expand Down Expand Up @@ -47,9 +48,26 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {
}

it should "handle required json type" in {
val wkt = "{\"name\": \"Alice\", \"age\": 30}"
RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt)
val wkt = """{"name":"Alice","age":30}"""
val jsNodeFactory = new JsonNodeFactory(false)
val jackson = jsNodeFactory
.objectNode()
.set[ObjectNode]("name", jsNodeFactory.textNode("Alice"))
.set[ObjectNode]("age", jsNodeFactory.numberNode(30))

RequiredJson.fromTableRow(TableRow("a" -> jackson)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> jackson)
}

it should "handle required big numeric type" in {
val bigNumeric = "12.34567890123456789012345678901234567890"
val wkt = BigDecimal(bigNumeric)
RequiredBigNumeric.fromTableRow(TableRow("a" -> bigNumeric)) shouldBe RequiredBigNumeric(
BigNumeric(wkt)
)
BigQueryType.toTableRow(RequiredBigNumeric(BigNumeric(wkt))) shouldBe TableRow(
"a" -> bigNumeric
)
}

it should "handle case classes with methods" in {
Expand All @@ -66,6 +84,9 @@ object ConverterProviderTest {
@BigQueryType.toTable
case class RequiredJson(a: Json)

@BigQueryType.toTable
case class RequiredBigNumeric(a: BigNumeric)

@BigQueryType.toTable
case class Required(a: String)

Expand Down

0 comments on commit 49f43b0

Please sign in to comment.