Skip to content

Commit

Permalink
Error handling improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Feb 9, 2024
1 parent 783c1af commit d58199f
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class HttpSinkTaskIT
none,
none,
).some,
none,
).toJson

val path = "/awesome/endpoint"
Expand Down Expand Up @@ -110,6 +111,7 @@ class HttpSinkTaskIT
none,
none,
).some,
none,
).toJson

val path = "/awesome/endpoint/.*"
Expand Down Expand Up @@ -162,6 +164,7 @@ class HttpSinkTaskIT
none,
none,
).some,
none,
).toJson

val path = "/awesome/endpoint/.*"
Expand Down Expand Up @@ -214,6 +217,7 @@ class HttpSinkTaskIT
none,
none,
).some,
none,
).toJson

val path = "/awesome/endpoint/.*"
Expand Down Expand Up @@ -274,6 +278,7 @@ class HttpSinkTaskIT
none,
none,
).some,
none,
).toJson

val path = "/awesome/endpoint"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ class HttpSinkTask extends SinkTask with LazyLogging {
}
}
.recoverWith {
case error: IllegalArgumentException =>
IO.raiseError[Unit](error)
case e =>
IO.raiseError[Unit](new RuntimeException("Unexpected error occurred", e))
// errors at this point simply need to be thrown
IO.raiseError[Unit](new RuntimeException("Unexpected error occurred during sink start", e))
}.unsafeRunSync()
}

Expand Down Expand Up @@ -127,6 +126,8 @@ class HttpSinkTask extends SinkTask with LazyLogging {
eitherRendered match {
case Left(ex) =>
logger.error(s"[$sinkName] Template Rendering Failure", ex)
IO.raiseError(ex)
// rendering errors can not be recovered from as configuration should be amended

case Right(renderedRecs) =>
logger.trace(s"[$sinkName] Rendered successful: $renderedRecs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package io.lenses.streamreactor.connect.http.sink
import cats.effect.IO
import cats.effect.Ref
import cats.effect.unsafe.implicits.global
import cats.implicits.catsSyntaxOptionId
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
import io.lenses.streamreactor.connect.http.sink.OffsetMergeUtils.createCommitContextForEvaluation
import io.lenses.streamreactor.connect.http.sink.OffsetMergeUtils.updateCommitContextPostCommit
import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType
import org.apache.kafka.clients.consumer.OffsetAndMetadata
Expand All @@ -39,7 +38,8 @@ class HttpWriter(
sender: HttpRequestSender,
template: TemplateType,
recordsQueueRef: Ref[IO, Queue[RenderedRecord]],
commitContextRef: Ref[IO, Option[HttpCommitContext]],
commitContextRef: Ref[IO, HttpCommitContext],
errorThreshold: Int,
) extends LazyLogging {
private val maybeBatchSize: Option[Int] = commitPolicy.conditions.collectFirst {
case Count(maxCount) => maxCount.toInt
Expand Down Expand Up @@ -74,45 +74,65 @@ class HttpWriter(
refSet <- recordsQueueRef.set(dequeueN(recordsQueue, takeHowMany))
} yield refSet
case _ =>
IO(logger.info("No match "))
IO(logger.trace(s"[$sinkName] Empty record queue"))
}
_ <- resetErrorsInCommitContext()
} yield res
}.onError {
e =>
logger.error("Error", e)
IO.raiseError(e)
for {
uniqueError: Option[Throwable] <- addErrorToCommitContext(e)
res <- if (uniqueError.nonEmpty) {
IO(logger.error("Error in HttpWriter", e)) *> IO.raiseError(e)
} else {
IO(logger.error("Error in HttpWriter but not reached threshold so ignoring", e)) *> IO.unit
}
} yield res
}

/*private def takeNRecords(recordsQueue: Queue[RenderedRecord], takeHowMany: Int) = {
val records = mutable.Seq[RenderedRecord]()
var finished = false
while(records.size < takeHowMany && !finished) {
val record = recordsQueue.take(1).toList.head
def preCommit(
initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata],
): IO[Map[TopicPartition, OffsetAndMetadata]] =
commitContextRef.get.map {
case HttpCommitContext(_, committedOffsets, _, _, _, _, _) =>
committedOffsets.flatMap {
case (tp, offset) =>
for {
initialOffsetAndMeta <- initialOffsetAndMetaMap.get(tp)

val tpo = record.topicPartitionOffset
} yield tp -> new OffsetAndMetadata(offset.value,
initialOffsetAndMeta.leaderEpoch(),
initialOffsetAndMeta.metadata(),
)
}
case _ => initialOffsetAndMetaMap
}.orElse(IO(Map.empty[TopicPartition, OffsetAndMetadata]))

if (offsetAlreadySeen(tpo.toTopicPartition, tpo.offset, commitContextRef.get).unsafeRunSync()) {
logger.info("skipping")
} else {
logger.info("not skipping")
qCount = qCount + 1
}
private def addErrorToCommitContext(e: Throwable): IO[Option[Throwable]] = {
val updatedCC = commitContextRef.getAndUpdate {
commitContext => commitContext.addError(e)
}
val batch = recordsQueue.takeWhile { record =>
qCount < takeHowMany
val maxError = updatedCC.map(cc =>
cc
.errors
.maxByOption { case (_, errSeq) => errSeq.size }
.filter { case (_, errSeq) => errSeq.size > errorThreshold }
.flatMap(_._2.headOption),
)
maxError
}

}
batch
}*/
private def resetErrorsInCommitContext(): IO[Unit] =
commitContextRef.getAndUpdate {
commitContext => commitContext.resetErrors
} *> IO.unit

private def updateCommitContextIfFlush(
maybeCC: Option[HttpCommitContext],
batch: Queue[RenderedRecord],
): IO[(Option[HttpCommitContext], Unit)] =
cc: HttpCommitContext,
batch: Queue[RenderedRecord],
): IO[(HttpCommitContext, Unit)] =
for {
flushEvalCommitContext: HttpCommitContext <- IO.pure(createCommitContextForEvaluation(sinkName, batch, maybeCC))
flushEvalCommitContext: HttpCommitContext <- IO.pure(createCommitContextForEvaluation(batch, cc))
_ <- IO.delay(logger.trace(s"[$sinkName] Updating sink context to: $flushEvalCommitContext"))
shouldFlush: Boolean <- IO.pure(commitPolicy.shouldFlush(flushEvalCommitContext))
_ <- IO.delay(logger.trace(s"[$sinkName] Should flush? $shouldFlush"))
Expand All @@ -125,9 +145,9 @@ class HttpWriter(
} yield {
(
if (shouldFlush) {
updateCommitContextPostCommit(currentCommitContext = flushEvalCommitContext).some
updateCommitContextPostCommit(currentCommitContext = flushEvalCommitContext)
} else {
maybeCC
cc
},
(),
)
Expand All @@ -137,8 +157,8 @@ class HttpWriter(
logger.trace(s"[$sinkName] modifyCommitContext for batch of ${batch.size}")

commitContextRef.modify {
maybeCC: Option[HttpCommitContext] =>
updateCommitContextIfFlush(maybeCC, batch).unsafeRunSync()
cc: HttpCommitContext =>
updateCommitContextIfFlush(cc, batch).unsafeRunSync()
}
}

Expand All @@ -153,22 +173,4 @@ class HttpWriter(
sent <- sender.sendHttpRequest(processed)
} yield sent

def preCommit(
initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata],
): IO[Map[TopicPartition, OffsetAndMetadata]] =
commitContextRef.get.map {
case Some(HttpCommitContext(_, committedOffsets, _, _, _, _)) =>
committedOffsets.flatMap {
case (tp, offset) =>
for {
initialOffsetAndMeta <- initialOffsetAndMetaMap.get(tp)

} yield tp -> new OffsetAndMetadata(offset.value,
initialOffsetAndMeta.leaderEpoch(),
initialOffsetAndMeta.metadata(),
)
}
case _ => initialOffsetAndMetaMap
}.orElse(IO(Map.empty[TopicPartition, OffsetAndMetadata]))

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitPolicy
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType
import org.apache.kafka.clients.consumer.OffsetAndMetadata
Expand All @@ -43,11 +43,14 @@ import java.util.concurrent.TimeUnit
import scala.collection.immutable.Queue

object HttpWriterManager {

val DefaultErrorThreshold = 5

def apply(sinkName: String, config: HttpSinkConfig, template: TemplateType): HttpWriterManager = {

// in certain circumstances we want a customised http client.
val clientCreate = config match {
case HttpSinkConfig(_, _, _, _, _, Some(ssl), _) =>
case HttpSinkConfig(_, _, _, _, _, Some(ssl), _, _) =>
val sslContext = SSLConfigContext(ssl) // TODO: wrap for error handling
val httpClient = HttpClient.newBuilder().sslContext(sslContext).build()
JdkHttpClient[IO](httpClient)
Expand All @@ -69,6 +72,7 @@ object HttpWriterManager {
config.batch.map(_.toCommitPolicy).getOrElse(HttpCommitPolicy.Default),
cResRel,
Ref.unsafe(Map[Topic, HttpWriter]()),
config.errorThreshold.getOrElse(DefaultErrorThreshold),
)
}

Expand All @@ -81,6 +85,7 @@ class HttpWriterManager(
commitPolicy: CommitPolicy,
val close: IO[Unit],
writersRef: Ref[IO, Map[Topic, HttpWriter]],
errorThreshold: Int,
) extends LazyLogging {
private val scheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()

Expand All @@ -91,7 +96,8 @@ class HttpWriterManager(
sender = httpRequestSender,
template = template,
Ref.unsafe[IO, Queue[RenderedRecord]](Queue()),
Ref.unsafe[IO, Option[HttpCommitContext]](Option.empty[HttpCommitContext]),
Ref.unsafe[IO, HttpCommitContext](HttpCommitContext.default(sinkName)),
errorThreshold,
)

def getWriter(topic: Topic): IO[HttpWriter] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,25 @@ package io.lenses.streamreactor.connect.http.sink
import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.http.sink.model.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord

object OffsetMergeUtils {

def createCommitContextForEvaluation(
sinkName: String,
batch: Seq[RenderedRecord],
currentCommitContext: Option[HttpCommitContext],
currentCommitContext: HttpCommitContext,
): HttpCommitContext = {
val count = batch.size.toLong
val fileSize = batch.map(_.recordRendered.length).sum.toLong
val highestOffsets = maxOffsets(batch)
currentCommitContext match {
case Some(httpCommitContext @ HttpCommitContext(_, httpCommittedOffsets, _, _, _, _)) =>
case httpCommitContext @ HttpCommitContext(_, httpCommittedOffsets, _, _, _, _, _) =>
httpCommitContext.copy(
committedOffsets = mergeOffsets(httpCommittedOffsets, highestOffsets),
count = count,
fileSize = fileSize,
)
case None =>
HttpCommitContext(
sinkName = sinkName,
committedOffsets = highestOffsets,
count = count,
fileSize = fileSize,
createdTimestamp = System.currentTimeMillis(),
lastFlushedTimestamp = Option.empty,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.model
package io.lenses.streamreactor.connect.http.sink.commit

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitContext
import io.lenses.streamreactor.connect.cloud.common.sink.commit.ConditionCommitResult

object HttpCommitContext {
def default(sinkName: String): HttpCommitContext =
HttpCommitContext(
sinkName,
Map.empty,
0L,
0L,
System.currentTimeMillis(),
Option.empty,
Map.empty,
)
}

/**
* @param tpo the [[TopicPartitionOffset]] of the last record written
* @param count the number of records written thus far to the file
Expand All @@ -33,6 +46,7 @@ case class HttpCommitContext(
fileSize: Long,
createdTimestamp: Long,
lastFlushedTimestamp: Option[Long],
errors: Map[String, Seq[Throwable]],
) extends CommitContext
with LazyLogging {
override def logFlush(flushing: Boolean, result: Seq[ConditionCommitResult]): Unit = {
Expand All @@ -46,4 +60,15 @@ case class HttpCommitContext(
logLine,
)
}

def resetErrors: HttpCommitContext = copy(errors = Map.empty)

def addError(ex: Throwable): HttpCommitContext =
copy(
errors = {
val mapKey = ex.getClass.getSimpleName
val mapValue = errors.get(mapKey).fold(Seq(ex))(_ :+ ex)
errors + (mapKey -> mapValue)
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ case class HttpSinkConfig(
headers: Seq[(String, String)], // tokenised
sslConfig: Option[SSLConfig],
batch: Option[BatchConfiguration],
errorThreshold: Option[Int],
) {
def toJson: String = {
val decoded: HttpSinkConfig = this
decoded.asJson(HttpSinkConfig.encoder).noSpaces
decoded
.asJson(HttpSinkConfig.encoder)
.noSpaces
}

}
Loading

0 comments on commit d58199f

Please sign in to comment.