Skip to content

Commit

Permalink
Error handling changes for clarity and debugging ease. (#1161)
Browse files Browse the repository at this point in the history
* Error handling changes for clarity and debugging ease.
Fix GCP storage interface NPE issue.

* Changes from review
  • Loading branch information
davidsloan committed Apr 23, 2024
1 parent 75b2aed commit 36f67ea
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,11 @@ class AwsS3StorageInterface(connectorTaskId: ConnectorTaskId, s3Client: S3Client
lastModified
.map(lmValue => S3FileMetadata(fileName, lmValue))
.orElse(getMetadata(bucket, fileName).map(oMeta => S3FileMetadata(fileName, oMeta.lastModified)).toOption)

/**
* Gets the system name for use in log messages.
*
* @return
*/
override def system(): String = "S3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,11 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
}.toEither.leftMap(FileDeleteError(_, file))
}.sequence
} yield ()

/**
* Gets the system name for use in log messages.
*
* @return
*/
override def system(): String = "Azure Datalake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object FormatWriter {
outputStream <- Try(new BuildLocalOutputStream(toBufferedOutputStream(path), topicPartition))
writer <- Try(FormatWriter(formatSelection, outputStream))
} yield writer
}.toEither.leftMap(ex => NonFatalCloudSinkError(ex.getMessage, ex))
}.toEither.leftMap(ex => new NonFatalCloudSinkError(ex.getMessage, ex.some))

def apply(
formatInfo: FormatSelection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import io.lenses.streamreactor.connect.cloud.common.utils.MapUtils
import io.lenses.streamreactor.connect.cloud.common.utils.TimestampUtils
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition }
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask

Expand Down Expand Up @@ -88,7 +89,7 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig, CT](
if (error.rollBack()) {
rollback(error.topicPartitions())
}
throw new IllegalStateException(error.message(), error.exception())
throw new ConnectException(error.message(), error.exception().orNull)
case Right(_) =>
}

Expand Down Expand Up @@ -252,7 +253,7 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig, CT](

def convertPropsToConfig(connectorTaskId: ConnectorTaskId, props: Map[String, String]): Either[Throwable, C]

def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] =
private def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] =
for {
config <- convertPropsToConfig(connectorTaskId, props)
s3Client <- createClient(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition

trait SinkError {
def exception(): Throwable
def exception(): Option[Throwable]

def message(): String

Expand All @@ -28,7 +29,7 @@ trait SinkError {
}

// Cannot be retried, must be cleaned up
case class FatalCloudSinkError(message: String, exception: Throwable, topicPartition: TopicPartition)
case class FatalCloudSinkError(message: String, exception: Option[Throwable], topicPartition: TopicPartition)
extends SinkError {

override def rollBack(): Boolean = true
Expand All @@ -39,12 +40,12 @@ case class FatalCloudSinkError(message: String, exception: Throwable, topicParti
case object FatalCloudSinkError {

def apply(message: String, topicPartition: TopicPartition): FatalCloudSinkError =
FatalCloudSinkError(message, new IllegalStateException(message), topicPartition)
FatalCloudSinkError(message, Option.empty, topicPartition)

}

// Can be retried
case class NonFatalCloudSinkError(message: String, exception: Throwable) extends SinkError {
case class NonFatalCloudSinkError(message: String, exception: Option[Throwable]) extends SinkError {

override def rollBack(): Boolean = false

Expand All @@ -53,10 +54,10 @@ case class NonFatalCloudSinkError(message: String, exception: Throwable) extends

case object NonFatalCloudSinkError {
def apply(message: String): NonFatalCloudSinkError =
NonFatalCloudSinkError(message, new IllegalStateException(message))
NonFatalCloudSinkError(message, Option.empty)

def apply(exception: Throwable): NonFatalCloudSinkError =
NonFatalCloudSinkError(exception.getMessage, exception)
NonFatalCloudSinkError(exception.getMessage, exception.some)
}

case object BatchCloudSinkError {
Expand All @@ -76,11 +77,10 @@ case class BatchCloudSinkError(
nonFatal: Set[NonFatalCloudSinkError],
) extends SinkError {

override def exception(): Throwable =
override def exception(): Option[Throwable] =
fatal.++(nonFatal)
.headOption
.map(_.exception)
.getOrElse(new IllegalStateException("No exception found in BatchCloudSinkError"))
.flatMap { ex: SinkError => ex.exception() }

override def message(): String =
"fatal:\n" + fatal.map(_.message).mkString("\n") + "\n\nnonFatal:\n" + nonFatal.map(_.message).mkString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.lenses.streamreactor.connect.cloud.common.sink.naming

import cats.implicits.catsSyntaxEitherId
import cats.implicits.catsSyntaxOptionId
import cats.implicits.toTraverseOps
import io.lenses.streamreactor.connect.cloud.common.config.FormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
Expand Down Expand Up @@ -94,7 +95,7 @@ class CloudKeyNamer(
.toFile
createFileAndParents(file)
file
}.toEither.left.map(ex => FatalCloudSinkError(ex.getMessage, ex, topicPartition))
}.toEither.left.map(ex => new FatalCloudSinkError(ex.getMessage, ex.some, topicPartition))

private def buildPartitionPrefix(partitionValues: Map[PartitionField, String]): String =
partitionSelection.partitions.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.naming

import cats.implicits.toTraverseOps
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset

import scala.util.Try

Expand All @@ -34,10 +37,26 @@ object IndexFilenames {
def indexForTopicPartition(topic: String, partition: Int)(implicit connectorTaskId: ConnectorTaskId): String =
f".indexes/${connectorTaskId.name}/$topic/$partition%05d/"

/**
* Parses the filename of the index file, converting it to a TopicPartitionOffset
*
* @param maybeIndex option of the index filename
* @return either an error, or a TopicPartitionOffset
*/
def indexToOffset(
topicPartition: TopicPartition,
maybeIndex: Option[String],
): Either[Throwable, Option[TopicPartitionOffset]] =
maybeIndex.map(index =>
for {
offset <- IndexFilenames.offsetFromIndex(index)
} yield topicPartition.withOffset(offset),
).sequence

/**
* Parses an index filename and returns an offset
*/
def offsetFromIndex(indexFilename: String): Either[Throwable, Offset] = {
private def offsetFromIndex(indexFilename: String): Either[Throwable, Offset] = {
val lastIndex = indexFilename.lastIndexOf("/")
val (_, last) = indexFilename.splitAt(lastIndex + 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.naming.IndexFilenames
import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError
import io.lenses.streamreactor.connect.cloud.common.storage.FileLoadError
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.cloud.common.sink.naming.IndexFilenames.indexToOffset
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors.corruptStorageState
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors.fileDeleteError
import io.lenses.streamreactor.connect.cloud.common.storage._

class IndexManager[SM <: FileMetadata](
maxIndexes: Int,
Expand All @@ -55,9 +55,9 @@ class IndexManager[SM <: FileMetadata](
indexFileLocation.some,
)
.leftMap { e =>
val logLine = s"Couldn't retrieve listing for (${mostRecentIndexFile}})"
val logLine = s"Couldn't retrieve listing for ($mostRecentIndexFile})"
logger.error("[{}] {}", connectorTaskId.show, logLine, e.exception)
new NonFatalCloudSinkError(logLine, e.exception)
new NonFatalCloudSinkError(logLine, e.exception.some)
}
.flatMap {
case None => 0.asRight
Expand Down Expand Up @@ -127,13 +127,13 @@ class IndexManager[SM <: FileMetadata](
}

/**
* Seeks the filesystem to find the latyest offsets for a topic/partition.
* Seeks the filesystem to find the latest offsets for a topic/partition.
*
* @param topicPartition the TopicPartition for which to retrieve the offsets
* @param bucket the configured bucket
* @return either a SinkError or an option to a TopicPartitionOffset with the seek result.
*/
def seek(
def initialSeek(
topicPartition: TopicPartition,
bucket: String,
): Either[SinkError, Option[TopicPartitionOffset]] = {
Expand All @@ -144,7 +144,7 @@ class IndexManager[SM <: FileMetadata](
)
.leftMap { e =>
logger.error("Error retrieving listing", e.exception)
new NonFatalCloudSinkError("Couldn't retrieve listing", e.exception)
new NonFatalCloudSinkError("Couldn't retrieve listing", Option(e.exception))
}
.flatMap {
case None => Option.empty[TopicPartitionOffset].asRight[SinkError]
Expand All @@ -164,48 +164,33 @@ class IndexManager[SM <: FileMetadata](
topicPartition: TopicPartition,
bucket: String,
indexes: Seq[String],
) = {

/**
* Parses the filename of the index file, converting it to a TopicPartitionOffset
*
* @param maybeIndex option of the index filename
* @return either an error, or a TopicPartitionOffset
*/
def indexToOffset(maybeIndex: Option[String]): Either[Throwable, Option[TopicPartitionOffset]] =
maybeIndex match {
case Some(index) =>
for {
offset <- IndexFilenames.offsetFromIndex(index)
} yield Some(topicPartition.withOffset(offset))
case None => Option.empty[TopicPartitionOffset].asRight
}
): Either[NonFatalCloudSinkError, Option[TopicPartitionOffset]] = {
for {
validIndex <- scanIndexes(bucket, indexes)
indexesToDelete = indexes.filterNot(validIndex.contains)
_ <- storageInterface.deleteFiles(bucket, indexesToDelete)
offset <- indexToOffset(topicPartition, validIndex).leftMap(FileNameParseError(_, s"$validIndex"))
} yield {
logger.info("[{}] Seeked offset {} for TP {}", connectorTaskId.show, offset, topicPartition)
offset
}
}.leftMap(e => handleSeekAndCleanErrors(e))

{
for {
validIndex <- scanIndexes(bucket, indexes)
indexesToDelete = indexes.filterNot(validIndex.contains)
_ <- storageInterface.deleteFiles(bucket, indexesToDelete)
offset <- indexToOffset(validIndex)
} yield {
logger.info("[{}] Seeked offset {} for TP {}", connectorTaskId.show, offset, topicPartition)
offset
}
}.leftMap {
def handleSeekAndCleanErrors(uploadError: UploadError): NonFatalCloudSinkError =
uploadError match {
case err: FileLoadError =>
val logLine = s"File load error while seeking: ${err.message()}"
logger.error(s"[{}] {}", connectorTaskId.show, logLine, err.exception)
new NonFatalCloudSinkError(logLine, err.exception)
NonFatalCloudSinkError(corruptStorageState(storageInterface.system()))
case err: FileDeleteError =>
val logLine = s"File delete error while seeking: ${err.message()}"
logger.error(s"[{}] {}", connectorTaskId.show, logLine, err.exception)
new NonFatalCloudSinkError(logLine, err.exception)
case err: Throwable =>
val logLine = s"Error while seeking: ${err.getMessage}"
NonFatalCloudSinkError(fileDeleteError(storageInterface.system()))
case err: FileNameParseError =>
val logLine = s"Error while seeking: ${err.message()}"
logger.error(s"[{}] {}", connectorTaskId.show, logLine, err)
new NonFatalCloudSinkError(logLine, err)
new NonFatalCloudSinkError(logLine, err.exception.some)
}
}

/**
* Given a bucket and a list of files, attempts to load them to establish the most recent valid index
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.seek

object IndexManagerErrors {

def corruptStorageState(system: String): String =
s"""
|The $system storage state is corrupted. The connector state is out of sync
|with the data. This could happen if the connector has been recreated and the data was deleted.
|Delete the connector's .index subfolder as well and restart the connector.""".stripMargin

def fileDeleteError(system: String): String =
s"""
|There was an issue deleting old index files from the indexes directory. This could happen if
|you have not granted the connector role appropriate delete permissions via the $system
|permissions model.""".stripMargin

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Writer[SM <: FileMetadata](
writingState.formatWriter.write(messageDetail) match {
case Left(err: Throwable) =>
logger.error(err.getMessage)
NonFatalCloudSinkError(err.getMessage, err).asLeft
NonFatalCloudSinkError(err.getMessage, err.some).asLeft
case Right(_) =>
writeState = writingState.updateOffset(messageDetail.offset, messageDetail.value.schema())
().asRight
Expand Down Expand Up @@ -118,7 +118,7 @@ class Writer[SM <: FileMetadata](
case _: ZeroByteFileError => ()
}
.leftMap {
case UploadFailedError(exception, _) => NonFatalCloudSinkError(exception.getMessage, exception)
case UploadFailedError(exception, _) => NonFatalCloudSinkError(exception.getMessage, exception.some)
}
_ <- indexManager.clean(finalFileName.bucket, indexFileName, topicPartition)
stateReset <- Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class WriterManager[SM <: FileMetadata](
logger.debug(s"[{}] seekOffsetsForTopicPartition {}", connectorTaskId.show, topicPartition)
for {
bucketAndPrefix <- bucketAndPrefixFn(topicPartition)
offset <- indexManager.seek(topicPartition, bucketAndPrefix.bucket)
offset <- indexManager.initialSeek(topicPartition, bucketAndPrefix.bucket)
} yield offset
}

Expand All @@ -168,7 +168,7 @@ class WriterManager[SM <: FileMetadata](
shouldSkip = writer.shouldSkip(topicPartitionOffset.offset)
resultIfNotSkipped <- if (!shouldSkip) {
transformerF(messageDetail).leftMap(ex =>
FatalCloudSinkError(ex.getMessage, ex, topicPartitionOffset.toTopicPartition),
new FatalCloudSinkError(ex.getMessage, ex.some, topicPartitionOffset.toTopicPartition),
).flatMap { transformed =>
writeAndCommit(topicPartitionOffset, transformed, writer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import java.time.Instant

trait StorageInterface[SM <: FileMetadata] extends ResultProcessors {

/**
* Gets the system name for use in log messages.
* @return
*/
def system(): String

def uploadFile(source: UploadableFile, bucket: String, path: String): Either[UploadError, Unit]

def close(): Unit
Expand Down
Loading

0 comments on commit 36f67ea

Please sign in to comment.