Skip to content

Commit

Permalink
Splitting AWS to 2 modules, cloud-common and AWS (#993)
Browse files Browse the repository at this point in the history
* Splitting AWS to 2 modules, cloud-common and AWS.  This is a WIP
  • Loading branch information
davidsloan authored Oct 2, 2023
1 parent 1d6a561 commit dd85a36
Show file tree
Hide file tree
Showing 269 changed files with 2,432 additions and 2,258 deletions.
22 changes: 22 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ThisBuild / scalaVersion := Dependencies.scalaVersion
lazy val subProjects: Seq[Project] = Seq(
`query-language`,
common,
`cloud-common`,
`aws-s3`,
`azure-documentdb`,
cassandra,
Expand Down Expand Up @@ -72,8 +73,29 @@ lazy val common = (project in file("kafka-connect-common"))
.configureAssembly()
.configureTests(baseTestDeps)

lazy val `cloud-common` = (project in file("kafka-connect-cloud-common"))
.dependsOn(common)
//.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-cloud-common",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectCloudCommonDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly()
.configureTests(baseTestDeps)
.enablePlugins(PackPlugin)

lazy val `aws-s3` = (project in file("kafka-connect-aws-s3"))
.dependsOn(common)
.dependsOn(`cloud-common` % "compile->compile;test->test")
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@

package io.lenses.streamreactor.connect.aws.s3.formats

import io.lenses.streamreactor.connect.aws.s3.config.AvroFormatSelection
import io.lenses.streamreactor.connect.aws.s3.formats.writer.AvroFormatWriter
import io.lenses.streamreactor.connect.aws.s3.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.aws.s3.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.aws.s3.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodec
import io.lenses.streamreactor.connect.aws.s3.model.Offset
import io.lenses.streamreactor.connect.aws.s3.model.Topic
import io.lenses.streamreactor.connect.aws.s3.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.aws.s3.stream.BuildLocalOutputStream
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData._
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.AvroFormatWriter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand All @@ -44,13 +48,13 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))

val avroFormatWriter = new AvroFormatWriter(blobStream)
avroFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
avroFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
avroFormatWriter.complete() should be(Right(()))
val bytes = localFileAsBytes(localFile)
Expand Down Expand Up @@ -106,7 +110,7 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val avroFormatWriter = new AvroFormatWriter(blobStream)
firstUsers.foreach(u =>
avroFormatWriter.write(
MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
writer.MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@

package io.lenses.streamreactor.connect.aws.s3.formats

import io.lenses.streamreactor.connect.aws.s3.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.aws.s3.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.aws.s3.formats.writer._
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodecName.BROTLI
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodecName.LZ4
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodecName.LZO
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.aws.s3.model._
import io.lenses.streamreactor.connect.aws.s3.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.aws.s3.stream.BuildLocalOutputStream
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData._
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.BROTLI
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.LZ4
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.LZO
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.scalatest.EitherValues
Expand All @@ -50,31 +56,31 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
parquetFormatWriter.getPointer should be(21)
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
))
parquetFormatWriter.getPointer should be(44)
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
))
parquetFormatWriter.getPointer should be(59)
parquetFormatWriter.complete() should be(Right(()))
Expand All @@ -101,7 +107,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
MessageDetail(
writer.MessageDetail(
NullSinkData(None),
ArraySinkData(
Seq(
Expand All @@ -126,7 +132,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
MessageDetail(
writer.MessageDetail(
NullSinkData(None),
MapSinkData(
Map(
Expand Down Expand Up @@ -179,13 +185,13 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val parquetFormatWriter = new ParquetFormatWriter(blobStream)
firstUsers.foreach(u =>
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
)) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,37 @@ package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.aws.s3.config._
import io.lenses.streamreactor.connect.aws.s3.formats.AvroFormatReader
import io.lenses.streamreactor.connect.aws.s3.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.aws.s3.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.aws.s3.formats.writer.SinkData
import io.lenses.streamreactor.connect.aws.s3.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.aws.s3.model._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3Location
import io.lenses.streamreactor.connect.aws.s3.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.aws.s3.sink.commit.Count
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.aws.s3.sink.config.LocalStagingArea
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.aws.s3.sink.config.OffsetSeekerOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.sink.config.SinkBucketOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.aws.s3.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData._
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingArea
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.S3KeyNamer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
Expand All @@ -56,7 +66,9 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
private val avroFormatReader = new AvroFormatReader

private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1)
private val bucketAndPrefix = S3Location(BucketName, PathPrefix.some)

private implicit val cloudLocationValidator = S3LocationValidator
private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def avroConfig = S3SinkConfig(
S3Config(
None,
Expand Down Expand Up @@ -93,7 +105,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
)

"avro sink" should "write 2 records to avro format in s3" in {
val sink = S3WriterManager.from(avroConfig)
val sink = S3WriterManagerCreator.from(avroConfig)
firstUsers.zipWithIndex.foreach {
case (struct: Struct, index: Int) =>
val writeRes = sink.write(
Expand Down Expand Up @@ -137,18 +149,18 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
new Struct(secondSchema).put("name", "coco").put("designation", null).put("salary", 395.44),
)

val sink = S3WriterManager.from(avroConfig)
val sink = S3WriterManagerCreator.from(avroConfig)
firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach {
case (user, index) =>
sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset((index + 1).toLong)),
MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
writer.MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
),
)
}
Expand Down
Loading

0 comments on commit dd85a36

Please sign in to comment.