Skip to content

Commit

Permalink
S3/GCS/Azure Source: Enhanced Data Reload Strategy for Specific Timef…
Browse files Browse the repository at this point in the history
…rames (#1187)

* First commit

* Adapted the S3 integration tests to account for the new object key name including the earliest record timestamp.

Refactored a few parameter/field names around the object key name.

* Object Key format version only applies when the envelope storage is used. It is the only storage which guarantees the record timestamp to be preserved.

Sinks tests have been updated to reflect the object keys values.

* Fixes endless loop in test in case of test failure.

Avoids Avro invalid sync as a result of concurrent tests writing the same file

* Expand the object key value to contain the min and max records timestamp within the file.

This change would reduce the complexity of the initial seek when a request to load from a specific point in time is chosen

* Removes obsolete comment.

For temp files/folders call the deleteOnExit

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Apr 30, 2024
1 parent ae57b61 commit 9b1722f
Show file tree
Hide file tree
Showing 39 changed files with 1,056 additions and 526 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.FileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamerV0
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamerV1
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.UsersSchemaDecimal
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.connect.data.Decimal
Expand All @@ -60,6 +62,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.ByteBuffer
import java.time.Instant

class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {

Expand All @@ -73,7 +76,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont

private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator
private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def avroConfig = S3SinkConfig(
private def avroConfig(fileNamer: FileNamer) = S3SinkConfig(
S3ConnectionConfig(
None,
Some(s3Container.identity.identity),
Expand All @@ -89,10 +92,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
keyNamer = new CloudKeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetFileNamer(
identity[String],
AvroFormatSelection.extension,
),
fileNamer,
new PaddingService(Map[String, PaddingStrategy](
"partition" -> NoOpPaddingStrategy,
"offset" -> LeftPadPaddingStrategy(12, 0),
Expand All @@ -108,7 +108,81 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
)

"avro sink" should "write 2 records to avro format in s3" in {
val sink = writerManagerCreator.from(avroConfig)
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamerV1(
identity[String],
AvroFormatSelection.extension,
)))
firstUsers.zipWithIndex.foreach {
case (struct: Struct, index: Int) =>
val writeRes = sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset((index + 1).toLong)),
MessageDetail(
NullSinkData(None),
StructSinkData(struct),
Map.empty[String, SinkData],
Some(Instant.ofEpochMilli(index.toLong + 101)),
Topic(TopicName),
1,
Offset((index + 1).toLong),
),
)
writeRes.isRight should be(true)
}

sink.close()

val keys = listBucketPath(BucketName, "streamReactorBackups/myTopic/1/")
keys.size should be(1)

val byteArray = remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/2_101_102.avro")
val genericRecords: List[GenericRecord] = avroFormatReader.read(byteArray)
genericRecords.size should be(2)

genericRecords(0).get("name").toString should be("sam")
genericRecords(1).get("name").toString should be("laura")

}

"avro sink" should "write multiple files and keeping the earliest timestamp" in {
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamerV1(
identity[String],
AvroFormatSelection.extension,
)))
firstUsers.zip(List(0 -> 100, 1 -> 99, 2 -> 101, 3 -> 102)).foreach {
case (struct: Struct, (index: Int, timestamp: Int)) =>
val writeRes = sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset((index + 1).toLong)),
MessageDetail(
NullSinkData(None),
StructSinkData(struct),
Map.empty[String, SinkData],
Some(Instant.ofEpochMilli(timestamp.toLong)),
Topic(TopicName),
1,
Offset((index + 1).toLong),
),
)
writeRes.isRight should be(true)
}

sink.close()

listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(1)

val byteArray = remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/2_99_100.avro")
val genericRecords: List[GenericRecord] = avroFormatReader.read(byteArray)
genericRecords.size should be(2)

genericRecords(0).get("name").toString should be("sam")
genericRecords(1).get("name").toString should be("laura")

}

"avro sink" should "write 2 records to avro format in s3 using v0 namer" in {
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamerV0(
identity[String],
AvroFormatSelection.extension,
)))
firstUsers.zipWithIndex.foreach {
case (struct: Struct, index: Int) =>
val writeRes = sink.write(
Expand Down Expand Up @@ -139,7 +213,10 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
}

"avro sink" should "write BigDecimal" in {
val sink = writerManagerCreator.from(avroConfig)
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamerV1(
identity[String],
AvroFormatSelection.extension,
)))
val usersWithDecimal1 =
new Struct(UsersSchemaDecimal)
.put("name", "sam")
Expand All @@ -154,7 +231,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
NullSinkData(None),
StructSinkData(usersWithDecimal1),
Map.empty,
None,
Some(Instant.ofEpochMilli(10L)),
Topic(TopicName),
1,
Offset(1L),
Expand All @@ -178,7 +255,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
NullSinkData(None),
StructSinkData(usersWithDecimal2),
Map.empty,
None,
Some(Instant.ofEpochMilli(10L)),
Topic(TopicName),
1,
Offset(2L),
Expand All @@ -189,7 +266,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont

listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(1)

val byteArray = remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/2.avro")
val byteArray = remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/2_10_10.avro")
val genericRecords: List[GenericRecord] = avroFormatReader.read(byteArray)
genericRecords.size should be(2)

Expand All @@ -215,43 +292,48 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
new Struct(secondSchema).put("name", "coco").put("designation", null).put("salary", 395.44),
)

val sink = writerManagerCreator.from(avroConfig)
val sink = writerManagerCreator.from(avroConfig(new OffsetFileNamerV1(
identity[String],
AvroFormatSelection.extension,
)))
firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach {
case (user, index) =>
sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset((index + 1).toLong)),
MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
MessageDetail(
NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
Some(Instant.ofEpochMilli(index.toLong)),
Topic(TopicName),
1,
Offset((index + 1).toLong),
),
)
}
sink.close()

listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(3)
val keys = listBucketPath(BucketName, "streamReactorBackups/myTopic/1/")
keys.size should be(3)

// records 1 and 2
val genericRecords1: List[GenericRecord] = avroFormatReader.read(
remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/2.avro"),
remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/2_0_1.avro"),
)
genericRecords1.size should be(2)
genericRecords1(0).get("name").toString should be("sam")
genericRecords1(1).get("name").toString should be("laura")

// record 3 only - next schema is different so ending the file
val genericRecords2: List[GenericRecord] = avroFormatReader.read(
remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/3.avro"),
remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/3_2_2.avro"),
)
genericRecords2.size should be(1)
genericRecords2(0).get("name").toString should be("tom")

// record 3 only - next schema is different so ending the file
val genericRecords3: List[GenericRecord] = avroFormatReader.read(
remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/5.avro"),
remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/5_3_4.avro"),
)
genericRecords3.size should be(2)
genericRecords3(0).get("name").toString should be("bobo")
Expand Down
Loading

0 comments on commit 9b1722f

Please sign in to comment.