Skip to content

Commit

Permalink
LC-171 - Ensure Date support in cloud connectors (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan authored Jul 3, 2024
1 parent 6db51c8 commit bc0ba7f
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
*/
package io.lenses.streamreactor.connect.cloud.common.formats.writer

import cats.implicits.catsSyntaxOptionId
import org.apache.kafka.connect.data.Date
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data.Time
import org.apache.kafka.connect.data.Timestamp

sealed trait SinkData {
def schema(): Option[Schema]
Expand Down Expand Up @@ -88,3 +92,15 @@ case class ByteArraySinkData(value: Array[Byte], schema: Option[Schema] = None)
case class NullSinkData(schema: Option[Schema] = None) extends SinkData {
override def value: Any = null
}

case class DateSinkData(value: java.util.Date) extends SinkData {
override def schema(): Option[Schema] = Date.SCHEMA.some
}

case class TimestampSinkData(value: java.util.Date) extends SinkData {
override def schema(): Option[Schema] = Timestamp.SCHEMA.some;
}

case class TimeSinkData(value: java.util.Date) extends SinkData {
override def schema(): Option[Schema] = Time.SCHEMA.some;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@ package io.lenses.streamreactor.connect.cloud.common.sink.conversion
import io.confluent.connect.avro.AvroData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.DateSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimeSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimestampSinkData
import org.apache.avro.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data.{ Schema => ConnectSchema }

import java.nio.ByteBuffer
import java.time.LocalDate
import java.time.ZoneId
import java.time.temporal.ChronoUnit
import java.util
import java.util.Date
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.CollectionConverters.IterableHasAsJava
import scala.jdk.CollectionConverters.MapHasAsJava
Expand All @@ -51,10 +58,16 @@ object ToAvroDataConverter {
case ArraySinkData(array, _) => convert(array)
case ByteArraySinkData(array, _) => ByteBuffer.wrap(array)
case primitive: PrimitiveSinkData => primitive.value
case _: NullSinkData => null
case DateSinkData(value) => convertDateToDaysFromEpoch(value)
case TimeSinkData(value) => value.getTime
case TimestampSinkData(value) => value.toInstant.toEpochMilli
case _: NullSinkData => null
case other => throw new IllegalArgumentException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
}

private def convertDateToDaysFromEpoch[A <: Any](value: Date) =
ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), LocalDate.ofInstant(value.toInstant, ZoneId.systemDefault()))

private def convertArray(list: java.util.List[_]) = list.asScala.map(convert).asJava
private def convert(value: Any): Any =
value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.DateSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimeSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimestampSinkData
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.json.JsonConverter
Expand All @@ -44,9 +47,12 @@ object ToJsonDataConverter {
json.getBytes()
case ArraySinkData(array, schema) =>
converter.fromConnectData(topic.value, schema.orNull, array)
case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null)
case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
case dsd @ DateSinkData(value) => converter.fromConnectData(topic.value, dsd.schema().orNull, value)
case tsd @ TimeSinkData(value) => converter.fromConnectData(topic.value, tsd.schema().orNull, value)
case tssd @ TimestampSinkData(value) => converter.fromConnectData(topic.value, tssd.schema().orNull, value)
case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null)
case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
}

def convert(data: SinkData): Any = data match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.BooleanSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.DateSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.DecimalSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.DoubleSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.FloatSinkData
Expand All @@ -30,8 +31,13 @@ import io.lenses.streamreactor.connect.cloud.common.formats.writer.ShortSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StringSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimeSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimestampSinkData
import org.apache.kafka.connect.data.Date
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data.Time
import org.apache.kafka.connect.data.Timestamp
import org.apache.kafka.connect.errors.ConnectException

import java.nio.ByteBuffer
Expand Down Expand Up @@ -61,6 +67,15 @@ object ValueToSinkDataConverter {
DecimalSinkData.from(decimal.bigDecimal, schema.orElse(Some(DecimalSinkData.schemaFor(decimal.bigDecimal))))
case decimal: java.math.BigDecimal =>
DecimalSinkData.from(decimal, schema.orElse(Some(DecimalSinkData.schemaFor(decimal))))
case date: java.util.Date =>
schema match {
case Some(Date.SCHEMA) => DateSinkData(date)
case Some(Time.SCHEMA) => TimeSinkData(date)
case Some(Timestamp.SCHEMA) => TimestampSinkData(date)
case other => throw new ConnectException(
s"java.util.Date found but without a schema to identify type, or with unexpected schema: ${other.orNull}",
)
}
case null => NullSinkData(schema)
case otherVal => throw new ConnectException(s"Unsupported record $otherVal:${otherVal.getClass.getCanonicalName}")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.conversion

import java.time.temporal.ChronoUnit
import java.time.LocalDate
import java.time.ZoneId
import java.util.Calendar
import java.util.Date

object TimeUtils {

def dateWithTimeFieldsOnly(hour: Int, minute: Int, second: Int, millis: Int): Date = {
val calendar = Calendar.getInstance()

// Set date fields to epoch start (January 1, 1970)
calendar.set(Calendar.YEAR, 1970)
calendar.set(Calendar.MONTH, Calendar.JANUARY)
calendar.set(Calendar.DAY_OF_MONTH, 1)

// Set time fields
calendar.set(Calendar.HOUR_OF_DAY, hour)
calendar.set(Calendar.MINUTE, minute)
calendar.set(Calendar.SECOND, second)
calendar.set(Calendar.MILLISECOND, millis)

calendar.getTime
}

def toLocalDate(date: Date): LocalDate =
date.toInstant.atZone(ZoneId.systemDefault).toLocalDate

def daysSinceEpoch(date: Date): Long = {
val localDate = toLocalDate(date)
val epoch = LocalDate.ofEpochDay(0)
ChronoUnit.DAYS.between(epoch, localDate)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.conversion

import io.lenses.streamreactor.connect.cloud.common.formats.writer.DateSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimeSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimestampSinkData
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.TimeUtils.dateWithTimeFieldsOnly
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.TimeUtils.daysSinceEpoch
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers

import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Date

class ToAvroDataConverterTest extends AnyFunSuiteLike with Matchers {

test("should convert date") {
val date = Date.from(Instant.now().truncatedTo(ChronoUnit.DAYS))
val daysSince: Long = daysSinceEpoch(date)
val converted = ToAvroDataConverter.convertToGenericRecord(DateSinkData(date))
checkValueAndSchema(converted, daysSince)
}

test("should convert time") {
val asDate: Date = dateWithTimeFieldsOnly(12, 30, 45, 450)
val converted = ToAvroDataConverter.convertToGenericRecord(TimeSinkData(asDate))
checkValueAndSchema(converted, asDate.getTime)
}

test("should convert timestamp") {
val date = Date.from(Instant.now())
val converted = ToAvroDataConverter.convertToGenericRecord(TimestampSinkData(date))
checkValueAndSchema(converted, date.getTime)
}

private def checkValueAndSchema(converted: Any, expectedValue: Long): Any =
converted match {
case nonRecordContainer: Long =>
nonRecordContainer should be(expectedValue)
case _ => fail("not a non-record container")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.conversion

import io.lenses.streamreactor.connect.cloud.common.formats.writer.DateSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimeSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.TimestampSinkData
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.TimeUtils.dateWithTimeFieldsOnly
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.TimeUtils.daysSinceEpoch
import org.apache.kafka.connect.json.DecimalFormat
import org.apache.kafka.connect.json.JsonConverter
import org.apache.kafka.connect.json.JsonConverterConfig
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers

import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Date
import scala.jdk.CollectionConverters.MapHasAsJava

class ToJsonDataConverterTest extends AnyFunSuiteLike with Matchers {

private val ExampleTopic: Topic = Topic("myTopic")

private val Converter = new JsonConverter()
Converter.configure(
Map("schemas.enable" -> "false", JsonConverterConfig.DECIMAL_FORMAT_CONFIG -> DecimalFormat.NUMERIC.name()).asJava,
false,
)

test("should convert date") {
val date = Date.from(Instant.now().truncatedTo(ChronoUnit.DAYS))
val daysSince: Long = daysSinceEpoch(date)
val converted = ToJsonDataConverter.convertMessageValueToByteArray(Converter, ExampleTopic, DateSinkData(date))
checkValueAndSchema(converted, daysSince.toString)
}

test("should convert time") {
val asDate: Date = dateWithTimeFieldsOnly(12, 30, 45, 450)
val converted = ToJsonDataConverter.convertMessageValueToByteArray(Converter, ExampleTopic, TimeSinkData(asDate))
checkValueAndSchema(converted, asDate.getTime.toString)
}

test("should convert timestamp") {
val date = Date.from(Instant.now())
val converted = ToJsonDataConverter.convertMessageValueToByteArray(Converter, ExampleTopic, TimestampSinkData(date))
checkValueAndSchema(converted, date.getTime.toString)
}

private def checkValueAndSchema(converted: Any, expectedValue: String): Any =
converted match {
case nonRecordContainer: Array[Byte] =>
new String(nonRecordContainer) should be(expectedValue)
case x => fail(s"not a byte array ($converted) but a ${x.getClass.getName}")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.conversion

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.apache.kafka.connect.data.Decimal

import scala.jdk.CollectionConverters.MapHasAsJava

Expand Down Expand Up @@ -96,4 +94,40 @@ class ValueToSinkDataConverterTest extends AnyFlatSpec with Matchers {
case _ => fail("Expected DecimalSinkData")
}
}

"convert" should "convert a Date" in {
val date = new java.util.Date()
val schema = Date.SCHEMA
val converted = ValueToSinkDataConverter(date, schema.some)
converted match {
case DateSinkData(dateValue) =>
dateValue should be(date)
converted.schema() should contain(schema)
case _ => fail("Didn't contain date")
}
}

"convert" should "convert a Time" in {
val date = new java.util.Date()
val schema = Time.SCHEMA
val converted = ValueToSinkDataConverter(date, schema.some)
converted match {
case TimeSinkData(dateValue) =>
dateValue should be(date)
converted.schema() should contain(schema)
case _ => fail("Didn't contain time")
}
}

"convert" should "convert a TimeStamp" in {
val date = new java.util.Date()
val schema = Timestamp.SCHEMA
val converted = ValueToSinkDataConverter(date, schema.some)
converted match {
case TimestampSinkData(dateValue) =>
dateValue should be(date)
converted.schema() should contain(schema)
case _ => fail("Didn't contain timestamp")
}
}
}
Loading

0 comments on commit bc0ba7f

Please sign in to comment.