From 90d6c39d64aeeea72f649d114789b7375a53a61e Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 9 Feb 2024 09:54:33 +0000 Subject: [PATCH] Error handling improvements --- .../connect/http/sink/HttpSinkTaskIT.scala | 5 + .../connect/http/sink/HttpSinkTask.scala | 7 +- .../connect/http/sink/HttpWriter.scala | 104 +++++++++--------- .../connect/http/sink/HttpWriterManager.scala | 12 +- .../connect/http/sink/OffsetMergeUtils.scala | 16 +-- .../{model => commit}/HttpCommitContext.scala | 27 ++++- .../http/sink/config/HttpSinkConfig.scala | 5 +- .../http/sink/HttpSinkConfigTest.scala | 8 +- .../connect/http/sink/HttpWriterTest.scala | 44 ++++++-- .../http/sink/OffsetMergeUtilsTest.scala | 10 +- 10 files changed, 147 insertions(+), 91 deletions(-) rename kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/{model => commit}/HttpCommitContext.scala (75%) diff --git a/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala index dcb4efafa0..dfdad20f47 100644 --- a/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala +++ b/kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala @@ -71,6 +71,7 @@ class HttpSinkTaskIT none, none, ).some, + none, ).toJson val path = "/awesome/endpoint" @@ -110,6 +111,7 @@ class HttpSinkTaskIT none, none, ).some, + none, ).toJson val path = "/awesome/endpoint/.*" @@ -162,6 +164,7 @@ class HttpSinkTaskIT none, none, ).some, + none, ).toJson val path = "/awesome/endpoint/.*" @@ -214,6 +217,7 @@ class HttpSinkTaskIT none, none, ).some, + none, ).toJson val path = "/awesome/endpoint/.*" @@ -274,6 +278,7 @@ class HttpSinkTaskIT none, none, ).some, + none, ).toJson val path = "/awesome/endpoint" diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala index e05881053a..b807ef7b6a 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala @@ -83,10 +83,9 @@ class HttpSinkTask extends SinkTask with LazyLogging { } } .recoverWith { - case error: IllegalArgumentException => - IO.raiseError[Unit](error) case e => - IO.raiseError[Unit](new RuntimeException("Unexpected error occurred", e)) + // errors at this point simply need to be thrown + IO.raiseError[Unit](new RuntimeException("Unexpected error occurred during sink start", e)) }.unsafeRunSync() } @@ -127,6 +126,8 @@ class HttpSinkTask extends SinkTask with LazyLogging { eitherRendered match { case Left(ex) => logger.error(s"[$sinkName] Template Rendering Failure", ex) + IO.raiseError(ex) + // rendering errors can not be recovered from as configuration should be amended case Right(renderedRecs) => logger.trace(s"[$sinkName] Rendered successful: $renderedRecs") diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala index d3c9a5a3b5..a34c25ff6d 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.http.sink import cats.effect.IO import cats.effect.Ref import cats.effect.unsafe.implicits.global -import cats.implicits.catsSyntaxOptionId import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy @@ -26,7 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count import io.lenses.streamreactor.connect.http.sink.OffsetMergeUtils.createCommitContextForEvaluation import io.lenses.streamreactor.connect.http.sink.OffsetMergeUtils.updateCommitContextPostCommit import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender -import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext +import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType import org.apache.kafka.clients.consumer.OffsetAndMetadata @@ -39,7 +38,8 @@ class HttpWriter( sender: HttpRequestSender, template: TemplateType, recordsQueueRef: Ref[IO, Queue[RenderedRecord]], - commitContextRef: Ref[IO, Option[HttpCommitContext]], + commitContextRef: Ref[IO, HttpCommitContext], + errorThreshold: Int, ) extends LazyLogging { private val maybeBatchSize: Option[Int] = commitPolicy.conditions.collectFirst { case Count(maxCount) => maxCount.toInt @@ -74,45 +74,65 @@ class HttpWriter( refSet <- recordsQueueRef.set(dequeueN(recordsQueue, takeHowMany)) } yield refSet case _ => - IO(logger.info("No match ")) + IO(logger.trace(s"[$sinkName] Empty record queue")) } + _ <- resetErrorsInCommitContext() } yield res }.onError { e => - logger.error("Error", e) - IO.raiseError(e) + for { + uniqueError: Option[Throwable] <- addErrorToCommitContext(e) + res <- if (uniqueError.nonEmpty) { + IO(logger.error("Error in HttpWriter", e)) *> IO.raiseError(e) + } else { + IO(logger.error("Error in HttpWriter but not reached threshold so ignoring", e)) *> IO.unit + } + } yield res } - /*private def takeNRecords(recordsQueue: Queue[RenderedRecord], takeHowMany: Int) = { - val records = mutable.Seq[RenderedRecord]() - - var finished = false - while(records.size < takeHowMany && !finished) { - val record = recordsQueue.take(1).toList.head + def preCommit( + initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata], + ): IO[Map[TopicPartition, OffsetAndMetadata]] = + commitContextRef.get.map { + case HttpCommitContext(_, committedOffsets, _, _, _, _, _) => + committedOffsets.flatMap { + case (tp, offset) => + for { + initialOffsetAndMeta <- initialOffsetAndMetaMap.get(tp) - val tpo = record.topicPartitionOffset + } yield tp -> new OffsetAndMetadata(offset.value, + initialOffsetAndMeta.leaderEpoch(), + initialOffsetAndMeta.metadata(), + ) + } + case _ => initialOffsetAndMetaMap + }.orElse(IO(Map.empty[TopicPartition, OffsetAndMetadata])) - if (offsetAlreadySeen(tpo.toTopicPartition, tpo.offset, commitContextRef.get).unsafeRunSync()) { - logger.info("skipping") - } else { - logger.info("not skipping") - qCount = qCount + 1 - } + private def addErrorToCommitContext(e: Throwable): IO[Option[Throwable]] = { + val updatedCC = commitContextRef.getAndUpdate { + commitContext => commitContext.addError(e) } - val batch = recordsQueue.takeWhile { record => - - qCount < takeHowMany + val maxError = updatedCC.map(cc => + cc + .errors + .maxByOption { case (_, errSeq) => errSeq.size } + .filter { case (_, errSeq) => errSeq.size > errorThreshold } + .flatMap(_._2.headOption), + ) + maxError + } - } - batch - }*/ + private def resetErrorsInCommitContext(): IO[Unit] = + commitContextRef.getAndUpdate { + commitContext => commitContext.resetErrors + } *> IO.unit private def updateCommitContextIfFlush( - maybeCC: Option[HttpCommitContext], - batch: Queue[RenderedRecord], - ): IO[(Option[HttpCommitContext], Unit)] = + cc: HttpCommitContext, + batch: Queue[RenderedRecord], + ): IO[(HttpCommitContext, Unit)] = for { - flushEvalCommitContext: HttpCommitContext <- IO.pure(createCommitContextForEvaluation(sinkName, batch, maybeCC)) + flushEvalCommitContext: HttpCommitContext <- IO.pure(createCommitContextForEvaluation(batch, cc)) _ <- IO.delay(logger.trace(s"[$sinkName] Updating sink context to: $flushEvalCommitContext")) shouldFlush: Boolean <- IO.pure(commitPolicy.shouldFlush(flushEvalCommitContext)) _ <- IO.delay(logger.trace(s"[$sinkName] Should flush? $shouldFlush")) @@ -125,9 +145,9 @@ class HttpWriter( } yield { ( if (shouldFlush) { - updateCommitContextPostCommit(currentCommitContext = flushEvalCommitContext).some + updateCommitContextPostCommit(currentCommitContext = flushEvalCommitContext) } else { - maybeCC + cc }, (), ) @@ -137,8 +157,8 @@ class HttpWriter( logger.trace(s"[$sinkName] modifyCommitContext for batch of ${batch.size}") commitContextRef.modify { - maybeCC: Option[HttpCommitContext] => - updateCommitContextIfFlush(maybeCC, batch).unsafeRunSync() + cc: HttpCommitContext => + updateCommitContextIfFlush(cc, batch).unsafeRunSync() } } @@ -153,22 +173,4 @@ class HttpWriter( sent <- sender.sendHttpRequest(processed) } yield sent - def preCommit( - initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata], - ): IO[Map[TopicPartition, OffsetAndMetadata]] = - commitContextRef.get.map { - case Some(HttpCommitContext(_, committedOffsets, _, _, _, _)) => - committedOffsets.flatMap { - case (tp, offset) => - for { - initialOffsetAndMeta <- initialOffsetAndMetaMap.get(tp) - - } yield tp -> new OffsetAndMetadata(offset.value, - initialOffsetAndMeta.leaderEpoch(), - initialOffsetAndMeta.metadata(), - ) - } - case _ => initialOffsetAndMetaMap - }.orElse(IO(Map.empty[TopicPartition, OffsetAndMetadata])) - } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala index ea20573e9a..b07455b982 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala @@ -28,9 +28,9 @@ import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender +import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitPolicy import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig -import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType import org.apache.kafka.clients.consumer.OffsetAndMetadata @@ -43,11 +43,14 @@ import java.util.concurrent.TimeUnit import scala.collection.immutable.Queue object HttpWriterManager { + + val DefaultErrorThreshold = 5 + def apply(sinkName: String, config: HttpSinkConfig, template: TemplateType): HttpWriterManager = { // in certain circumstances we want a customised http client. val clientCreate = config match { - case HttpSinkConfig(_, _, _, _, _, Some(ssl), _) => + case HttpSinkConfig(_, _, _, _, _, Some(ssl), _, _) => val sslContext = SSLConfigContext(ssl) // TODO: wrap for error handling val httpClient = HttpClient.newBuilder().sslContext(sslContext).build() JdkHttpClient[IO](httpClient) @@ -69,6 +72,7 @@ object HttpWriterManager { config.batch.map(_.toCommitPolicy).getOrElse(HttpCommitPolicy.Default), cResRel, Ref.unsafe(Map[Topic, HttpWriter]()), + config.errorThreshold.getOrElse(DefaultErrorThreshold), ) } @@ -81,6 +85,7 @@ class HttpWriterManager( commitPolicy: CommitPolicy, val close: IO[Unit], writersRef: Ref[IO, Map[Topic, HttpWriter]], + errorThreshold: Int, ) extends LazyLogging { private val scheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor() @@ -91,7 +96,8 @@ class HttpWriterManager( sender = httpRequestSender, template = template, Ref.unsafe[IO, Queue[RenderedRecord]](Queue()), - Ref.unsafe[IO, Option[HttpCommitContext]](Option.empty[HttpCommitContext]), + Ref.unsafe[IO, HttpCommitContext](HttpCommitContext.default(sinkName)), + errorThreshold, ) def getWriter(topic: Topic): IO[HttpWriter] = { diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtils.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtils.scala index 4beb7fa411..2246c4bea2 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtils.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtils.scala @@ -18,35 +18,25 @@ package io.lenses.streamreactor.connect.http.sink import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition -import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext +import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord object OffsetMergeUtils { def createCommitContextForEvaluation( - sinkName: String, batch: Seq[RenderedRecord], - currentCommitContext: Option[HttpCommitContext], + currentCommitContext: HttpCommitContext, ): HttpCommitContext = { val count = batch.size.toLong val fileSize = batch.map(_.recordRendered.length).sum.toLong val highestOffsets = maxOffsets(batch) currentCommitContext match { - case Some(httpCommitContext @ HttpCommitContext(_, httpCommittedOffsets, _, _, _, _)) => + case httpCommitContext @ HttpCommitContext(_, httpCommittedOffsets, _, _, _, _, _) => httpCommitContext.copy( committedOffsets = mergeOffsets(httpCommittedOffsets, highestOffsets), count = count, fileSize = fileSize, ) - case None => - HttpCommitContext( - sinkName = sinkName, - committedOffsets = highestOffsets, - count = count, - fileSize = fileSize, - createdTimestamp = System.currentTimeMillis(), - lastFlushedTimestamp = Option.empty, - ) } } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/model/HttpCommitContext.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitContext.scala similarity index 75% rename from kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/model/HttpCommitContext.scala rename to kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitContext.scala index 495511bfe0..47ef3d5f9e 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/model/HttpCommitContext.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/commit/HttpCommitContext.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.lenses.streamreactor.connect.http.sink.model +package io.lenses.streamreactor.connect.http.sink.commit import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.cloud.common.model.Offset @@ -21,6 +21,19 @@ import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitContext import io.lenses.streamreactor.connect.cloud.common.sink.commit.ConditionCommitResult +object HttpCommitContext { + def default(sinkName: String): HttpCommitContext = + HttpCommitContext( + sinkName, + Map.empty, + 0L, + 0L, + System.currentTimeMillis(), + Option.empty, + Map.empty, + ) +} + /** * @param tpo the [[TopicPartitionOffset]] of the last record written * @param count the number of records written thus far to the file @@ -33,6 +46,7 @@ case class HttpCommitContext( fileSize: Long, createdTimestamp: Long, lastFlushedTimestamp: Option[Long], + errors: Map[String, Seq[Throwable]], ) extends CommitContext with LazyLogging { override def logFlush(flushing: Boolean, result: Seq[ConditionCommitResult]): Unit = { @@ -46,4 +60,15 @@ case class HttpCommitContext( logLine, ) } + + def resetErrors: HttpCommitContext = copy(errors = Map.empty) + + def addError(ex: Throwable): HttpCommitContext = + copy( + errors = { + val mapKey = ex.getClass.getSimpleName + val mapValue = errors.get(mapKey).fold(Seq(ex))(_ :+ ex) + errors + (mapKey -> mapValue) + }, + ) } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala index 676ade2339..84fee0bedb 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala @@ -74,10 +74,13 @@ case class HttpSinkConfig( headers: Seq[(String, String)], // tokenised sslConfig: Option[SSLConfig], batch: Option[BatchConfiguration], + errorThreshold: Option[Int], ) { def toJson: String = { val decoded: HttpSinkConfig = this - decoded.asJson(HttpSinkConfig.encoder).noSpaces + decoded + .asJson(HttpSinkConfig.encoder) + .noSpaces } } diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala index d2a6003247..0ef16fa0e7 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala @@ -15,6 +15,7 @@ */ package io.lenses.streamreactor.connect.http.sink +import cats.implicits.none import io.lenses.streamreactor.connect.http.sink.client.BasicAuthentication import io.lenses.streamreactor.connect.http.sink.client.HttpMethod.Put import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig @@ -30,10 +31,11 @@ class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers { "http://myaddress.example.com", "\nDave\nJason\nHooray for Kafka Connect!\n", Seq("something" -> "somethingelse"), - Option.empty, - Option.empty, + none, + none, + none, ).toJson should be( - """{"authentication":{"username":"user","password":"pass","type":"BasicAuthentication"},"method":"Put","endpoint":"http://myaddress.example.com","content":"\nDave\nJason\nHooray for Kafka Connect!\n","headers":[["something","somethingelse"]],"sslConfig":null,"batch":null}""", + """{"authentication":{"username":"user","password":"pass","type":"BasicAuthentication"},"method":"Put","endpoint":"http://myaddress.example.com","content":"\nDave\nJason\nHooray for Kafka Connect!\n","headers":[["something","somethingelse"]],"sslConfig":null,"batch":null},"errorThreshold":null}""", ) } diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala index 8382f8a404..373887919c 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala @@ -23,7 +23,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender -import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext +import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.ProcessedTemplate import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType @@ -38,7 +38,8 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi private val sinkName = "MySinkName" - private val topicPartition: TopicPartition = Topic("myTopic").withPartition(1) + private val topicPartition: TopicPartition = Topic("myTopic").withPartition(1) + private val defaultContext: HttpCommitContext = HttpCommitContext.default("My Sink") test("add method should add records to the queue") { val commitPolicy = CommitPolicy(Count(2L)) @@ -47,8 +48,15 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi for { recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty) - commitContextRef <- Ref.of[IO, Option[HttpCommitContext]](None) - httpWriter = new HttpWriter(sinkName, commitPolicy, senderMock, templateMock, recordsQueueRef, commitContextRef) + commitContextRef <- Ref.of[IO, HttpCommitContext](HttpCommitContext.default("My Sink")) + httpWriter = new HttpWriter(sinkName, + commitPolicy, + senderMock, + templateMock, + recordsQueueRef, + commitContextRef, + 5, + ) recordsToAdd = Seq( RenderedRecord(topicPartition.atOffset(100), "record1", Seq.empty, None), RenderedRecord(topicPartition.atOffset(101), "record2", Seq.empty, None), @@ -77,16 +85,23 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi { for { recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty) - commitContextRef <- Ref.of[IO, Option[HttpCommitContext]](None) + commitContextRef <- Ref.of[IO, HttpCommitContext](defaultContext) - httpWriter = new HttpWriter(sinkName, commitPolicy, senderMock, templateMock, recordsQueueRef, commitContextRef) + httpWriter = new HttpWriter(sinkName, + commitPolicy, + senderMock, + templateMock, + recordsQueueRef, + commitContextRef, + 5, + ) _ <- httpWriter.add(recordsToAdd) _ <- httpWriter.process() updatedContext <- commitContextRef.get updatedQueue <- recordsQueueRef.get } yield { - updatedContext shouldBe a[Some[_]] + updatedContext should not be defaultContext updatedQueue shouldBe empty } } @@ -101,15 +116,22 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi for { recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty) - commitContextRef <- Ref.of[IO, Option[HttpCommitContext]](None) - - httpWriter = new HttpWriter(sinkName, commitPolicy, senderMock, templateMock, recordsQueueRef, commitContextRef) + commitContextRef <- Ref.of[IO, HttpCommitContext](defaultContext) + + httpWriter = new HttpWriter(sinkName, + commitPolicy, + senderMock, + templateMock, + recordsQueueRef, + commitContextRef, + 5, + ) _ <- httpWriter.process() updatedContext <- commitContextRef.get updatedQueue <- recordsQueueRef.get } yield { - updatedContext shouldBe None + updatedContext shouldBe defaultContext updatedQueue shouldBe empty } } diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtilsTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtilsTest.scala index 2105cf2bc3..044e19bc52 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtilsTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/OffsetMergeUtilsTest.scala @@ -15,11 +15,10 @@ */ package io.lenses.streamreactor.connect.http.sink -import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition -import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext +import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers._ @@ -42,9 +41,10 @@ class OffsetMergeUtilsTest extends AnyFunSuite { 10L, System.currentTimeMillis(), None, + Map.empty, ) - val result = OffsetMergeUtils.createCommitContextForEvaluation(sinkName, batch, currentCommitContext.some) + val result = OffsetMergeUtils.createCommitContextForEvaluation(batch, currentCommitContext) result.committedOffsets shouldBe Map( Topic("topic1").withPartition(0) -> Offset(100), @@ -59,9 +59,9 @@ class OffsetMergeUtilsTest extends AnyFunSuite { RenderedRecord(Topic("topic1").withPartition(0).withOffset(Offset(100)), "record1", Seq.empty, None), RenderedRecord(Topic("topic2").withPartition(0).withOffset(Offset(50)), "record2", Seq.empty, None), ) - val currentCommitContext = None + val currentCommitContext = HttpCommitContext.default("My Sink") - val result = OffsetMergeUtils.createCommitContextForEvaluation(sinkName, batch, currentCommitContext) + val result = OffsetMergeUtils.createCommitContextForEvaluation(batch, currentCommitContext) result.committedOffsets shouldBe Map( Topic("topic1").withPartition(0) -> Offset(100),