diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala index 67bafcadfa..c9981c5a41 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala @@ -278,4 +278,11 @@ class AwsS3StorageInterface(connectorTaskId: ConnectorTaskId, s3Client: S3Client lastModified .map(lmValue => S3FileMetadata(fileName, lmValue)) .orElse(getMetadata(bucket, fileName).map(oMeta => S3FileMetadata(fileName, oMeta.lastModified)).toOption) + + /** + * Gets the system name for use in log messages. + * + * @return + */ + override def system(): String = "S3" } diff --git a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala index 04dc6d67a5..bbfa007bb0 100644 --- a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala +++ b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala @@ -220,4 +220,11 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak }.toEither.leftMap(FileDeleteError(_, file)) }.sequence } yield () + + /** + * Gets the system name for use in log messages. + * + * @return + */ + override def system(): String = "Azure Datalake" } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala index f80000103c..2697a53119 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala @@ -44,7 +44,7 @@ object FormatWriter { outputStream <- Try(new BuildLocalOutputStream(toBufferedOutputStream(path), topicPartition)) writer <- Try(FormatWriter(formatSelection, outputStream)) } yield writer - }.toEither.leftMap(ex => NonFatalCloudSinkError(ex.getMessage, ex)) + }.toEither.leftMap(ex => new NonFatalCloudSinkError(ex.getMessage, ex.some)) def apply( formatInfo: FormatSelection, diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala index f0d13f1d2c..68dbcf8a62 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala @@ -38,6 +38,7 @@ import io.lenses.streamreactor.connect.cloud.common.utils.MapUtils import io.lenses.streamreactor.connect.cloud.common.utils.TimestampUtils import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition } +import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord import org.apache.kafka.connect.sink.SinkTask @@ -88,7 +89,7 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig, CT]( if (error.rollBack()) { rollback(error.topicPartitions()) } - throw new IllegalStateException(error.message(), error.exception()) + throw new ConnectException(error.message(), error.exception().orNull) case Right(_) => } @@ -252,7 +253,7 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig, CT]( def convertPropsToConfig(connectorTaskId: ConnectorTaskId, props: Map[String, String]): Either[Throwable, C] - def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] = + private def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] = for { config <- convertPropsToConfig(connectorTaskId, props) s3Client <- createClient(config) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/Exceptions.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/Exceptions.scala index 05e0e045f2..a94d5ad0b5 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/Exceptions.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/Exceptions.scala @@ -15,10 +15,11 @@ */ package io.lenses.streamreactor.connect.cloud.common.sink +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition trait SinkError { - def exception(): Throwable + def exception(): Option[Throwable] def message(): String @@ -28,7 +29,7 @@ trait SinkError { } // Cannot be retried, must be cleaned up -case class FatalCloudSinkError(message: String, exception: Throwable, topicPartition: TopicPartition) +case class FatalCloudSinkError(message: String, exception: Option[Throwable], topicPartition: TopicPartition) extends SinkError { override def rollBack(): Boolean = true @@ -39,12 +40,12 @@ case class FatalCloudSinkError(message: String, exception: Throwable, topicParti case object FatalCloudSinkError { def apply(message: String, topicPartition: TopicPartition): FatalCloudSinkError = - FatalCloudSinkError(message, new IllegalStateException(message), topicPartition) + FatalCloudSinkError(message, Option.empty, topicPartition) } // Can be retried -case class NonFatalCloudSinkError(message: String, exception: Throwable) extends SinkError { +case class NonFatalCloudSinkError(message: String, exception: Option[Throwable]) extends SinkError { override def rollBack(): Boolean = false @@ -53,10 +54,10 @@ case class NonFatalCloudSinkError(message: String, exception: Throwable) extends case object NonFatalCloudSinkError { def apply(message: String): NonFatalCloudSinkError = - NonFatalCloudSinkError(message, new IllegalStateException(message)) + NonFatalCloudSinkError(message, Option.empty) def apply(exception: Throwable): NonFatalCloudSinkError = - NonFatalCloudSinkError(exception.getMessage, exception) + NonFatalCloudSinkError(exception.getMessage, exception.some) } case object BatchCloudSinkError { @@ -76,11 +77,10 @@ case class BatchCloudSinkError( nonFatal: Set[NonFatalCloudSinkError], ) extends SinkError { - override def exception(): Throwable = + override def exception(): Option[Throwable] = fatal.++(nonFatal) .headOption - .map(_.exception) - .getOrElse(new IllegalStateException("No exception found in BatchCloudSinkError")) + .flatMap { ex: SinkError => ex.exception() } override def message(): String = "fatal:\n" + fatal.map(_.message).mkString("\n") + "\n\nnonFatal:\n" + nonFatal.map(_.message).mkString( diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala index 401fdc8c0f..50b6467446 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/CloudKeyNamer.scala @@ -16,6 +16,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.naming import cats.implicits.catsSyntaxEitherId +import cats.implicits.catsSyntaxOptionId import cats.implicits.toTraverseOps import io.lenses.streamreactor.connect.cloud.common.config.FormatSelection import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail @@ -94,7 +95,7 @@ class CloudKeyNamer( .toFile createFileAndParents(file) file - }.toEither.left.map(ex => FatalCloudSinkError(ex.getMessage, ex, topicPartition)) + }.toEither.left.map(ex => new FatalCloudSinkError(ex.getMessage, ex.some, topicPartition)) private def buildPartitionPrefix(partitionValues: Map[PartitionField, String]): String = partitionSelection.partitions.map { diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/IndexFilenames.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/IndexFilenames.scala index c71fd84256..2f9c26460d 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/IndexFilenames.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/naming/IndexFilenames.scala @@ -15,8 +15,11 @@ */ package io.lenses.streamreactor.connect.cloud.common.sink.naming +import cats.implicits.toTraverseOps import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.model.Offset +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset import scala.util.Try @@ -34,10 +37,26 @@ object IndexFilenames { def indexForTopicPartition(topic: String, partition: Int)(implicit connectorTaskId: ConnectorTaskId): String = f".indexes/${connectorTaskId.name}/$topic/$partition%05d/" + /** + * Parses the filename of the index file, converting it to a TopicPartitionOffset + * + * @param maybeIndex option of the index filename + * @return either an error, or a TopicPartitionOffset + */ + def indexToOffset( + topicPartition: TopicPartition, + maybeIndex: Option[String], + ): Either[Throwable, Option[TopicPartitionOffset]] = + maybeIndex.map(index => + for { + offset <- IndexFilenames.offsetFromIndex(index) + } yield topicPartition.withOffset(offset), + ).sequence + /** * Parses an index filename and returns an offset */ - def offsetFromIndex(indexFilename: String): Either[Throwable, Offset] = { + private def offsetFromIndex(indexFilename: String): Either[Throwable, Offset] = { val lastIndex = indexFilename.lastIndexOf("/") val (_, last) = indexFilename.splitAt(lastIndex + 1) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala index 6b3dcd2ffb..83ff6bedf3 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManager.scala @@ -25,10 +25,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.SinkError import io.lenses.streamreactor.connect.cloud.common.sink.naming.IndexFilenames -import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError -import io.lenses.streamreactor.connect.cloud.common.storage.FileLoadError -import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata -import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface +import io.lenses.streamreactor.connect.cloud.common.sink.naming.IndexFilenames.indexToOffset +import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors.corruptStorageState +import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors.fileDeleteError +import io.lenses.streamreactor.connect.cloud.common.storage._ class IndexManager[SM <: FileMetadata]( maxIndexes: Int, @@ -55,9 +55,9 @@ class IndexManager[SM <: FileMetadata]( indexFileLocation.some, ) .leftMap { e => - val logLine = s"Couldn't retrieve listing for (${mostRecentIndexFile}})" + val logLine = s"Couldn't retrieve listing for ($mostRecentIndexFile})" logger.error("[{}] {}", connectorTaskId.show, logLine, e.exception) - new NonFatalCloudSinkError(logLine, e.exception) + new NonFatalCloudSinkError(logLine, e.exception.some) } .flatMap { case None => 0.asRight @@ -127,13 +127,13 @@ class IndexManager[SM <: FileMetadata]( } /** - * Seeks the filesystem to find the latyest offsets for a topic/partition. + * Seeks the filesystem to find the latest offsets for a topic/partition. * * @param topicPartition the TopicPartition for which to retrieve the offsets * @param bucket the configured bucket * @return either a SinkError or an option to a TopicPartitionOffset with the seek result. */ - def seek( + def initialSeek( topicPartition: TopicPartition, bucket: String, ): Either[SinkError, Option[TopicPartitionOffset]] = { @@ -144,7 +144,7 @@ class IndexManager[SM <: FileMetadata]( ) .leftMap { e => logger.error("Error retrieving listing", e.exception) - new NonFatalCloudSinkError("Couldn't retrieve listing", e.exception) + new NonFatalCloudSinkError("Couldn't retrieve listing", Option(e.exception)) } .flatMap { case None => Option.empty[TopicPartitionOffset].asRight[SinkError] @@ -164,48 +164,33 @@ class IndexManager[SM <: FileMetadata]( topicPartition: TopicPartition, bucket: String, indexes: Seq[String], - ) = { - - /** - * Parses the filename of the index file, converting it to a TopicPartitionOffset - * - * @param maybeIndex option of the index filename - * @return either an error, or a TopicPartitionOffset - */ - def indexToOffset(maybeIndex: Option[String]): Either[Throwable, Option[TopicPartitionOffset]] = - maybeIndex match { - case Some(index) => - for { - offset <- IndexFilenames.offsetFromIndex(index) - } yield Some(topicPartition.withOffset(offset)) - case None => Option.empty[TopicPartitionOffset].asRight - } + ): Either[NonFatalCloudSinkError, Option[TopicPartitionOffset]] = { + for { + validIndex <- scanIndexes(bucket, indexes) + indexesToDelete = indexes.filterNot(validIndex.contains) + _ <- storageInterface.deleteFiles(bucket, indexesToDelete) + offset <- indexToOffset(topicPartition, validIndex).leftMap(FileNameParseError(_, s"$validIndex")) + } yield { + logger.info("[{}] Seeked offset {} for TP {}", connectorTaskId.show, offset, topicPartition) + offset + } + }.leftMap(e => handleSeekAndCleanErrors(e)) - { - for { - validIndex <- scanIndexes(bucket, indexes) - indexesToDelete = indexes.filterNot(validIndex.contains) - _ <- storageInterface.deleteFiles(bucket, indexesToDelete) - offset <- indexToOffset(validIndex) - } yield { - logger.info("[{}] Seeked offset {} for TP {}", connectorTaskId.show, offset, topicPartition) - offset - } - }.leftMap { + def handleSeekAndCleanErrors(uploadError: UploadError): NonFatalCloudSinkError = + uploadError match { case err: FileLoadError => val logLine = s"File load error while seeking: ${err.message()}" logger.error(s"[{}] {}", connectorTaskId.show, logLine, err.exception) - new NonFatalCloudSinkError(logLine, err.exception) + NonFatalCloudSinkError(corruptStorageState(storageInterface.system())) case err: FileDeleteError => val logLine = s"File delete error while seeking: ${err.message()}" logger.error(s"[{}] {}", connectorTaskId.show, logLine, err.exception) - new NonFatalCloudSinkError(logLine, err.exception) - case err: Throwable => - val logLine = s"Error while seeking: ${err.getMessage}" + NonFatalCloudSinkError(fileDeleteError(storageInterface.system())) + case err: FileNameParseError => + val logLine = s"Error while seeking: ${err.message()}" logger.error(s"[{}] {}", connectorTaskId.show, logLine, err) - new NonFatalCloudSinkError(logLine, err) + new NonFatalCloudSinkError(logLine, err.exception.some) } - } /** * Given a bucket and a list of files, attempts to load them to establish the most recent valid index diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerErrors.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerErrors.scala new file mode 100644 index 0000000000..335aae4b84 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerErrors.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.sink.seek + +object IndexManagerErrors { + + private def errorWrapper(error: String): String = + s""" + |=============================================================================================== + |$error + |=============================================================================================== + |""".stripMargin.linesIterator.filter(_.nonEmpty).mkString(System.lineSeparator()) + + def corruptStorageState(system: String): String = + errorWrapper( + s""" + |The $system storage state is corrupted. The connector state is out of sync + |with the data. This could happen if the connector has been recreated and the data was deleted. + |Delete the connector's .index subfolder as well and restart the connector. + |""".stripMargin, + ) + + def fileDeleteError(system: String): String = + errorWrapper( + s""" + |There was an issue deleting old index files from the indexes directory. This could happen if + |you have not granted the connector role appropriate delete permissions via the $system + |permissions model. + |""".stripMargin, + ) + +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala index 83ae3663c6..124bb4a052 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala @@ -59,7 +59,7 @@ class Writer[SM <: FileMetadata]( writingState.formatWriter.write(messageDetail) match { case Left(err: Throwable) => logger.error(err.getMessage) - NonFatalCloudSinkError(err.getMessage, err).asLeft + NonFatalCloudSinkError(err.getMessage, err.some).asLeft case Right(_) => writeState = writingState.updateOffset(messageDetail.offset, messageDetail.value.schema()) ().asRight @@ -118,7 +118,7 @@ class Writer[SM <: FileMetadata]( case _: ZeroByteFileError => () } .leftMap { - case UploadFailedError(exception, _) => NonFatalCloudSinkError(exception.getMessage, exception) + case UploadFailedError(exception, _) => NonFatalCloudSinkError(exception.getMessage, exception.some) } _ <- indexManager.clean(finalFileName.bucket, indexFileName, topicPartition) stateReset <- Try { diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala index c7dde9512b..9889b34a61 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala @@ -148,7 +148,7 @@ class WriterManager[SM <: FileMetadata]( logger.debug(s"[{}] seekOffsetsForTopicPartition {}", connectorTaskId.show, topicPartition) for { bucketAndPrefix <- bucketAndPrefixFn(topicPartition) - offset <- indexManager.seek(topicPartition, bucketAndPrefix.bucket) + offset <- indexManager.initialSeek(topicPartition, bucketAndPrefix.bucket) } yield offset } @@ -168,7 +168,7 @@ class WriterManager[SM <: FileMetadata]( shouldSkip = writer.shouldSkip(topicPartitionOffset.offset) resultIfNotSkipped <- if (!shouldSkip) { transformerF(messageDetail).leftMap(ex => - FatalCloudSinkError(ex.getMessage, ex, topicPartitionOffset.toTopicPartition), + new FatalCloudSinkError(ex.getMessage, ex.some, topicPartitionOffset.toTopicPartition), ).flatMap { transformed => writeAndCommit(topicPartitionOffset, transformed, writer) } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala index a15a1718b8..34c973a03e 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala @@ -24,6 +24,12 @@ import java.time.Instant trait StorageInterface[SM <: FileMetadata] extends ResultProcessors { + /** + * Gets the system name for use in log messages. + * @return + */ + def system(): String + def uploadFile(source: UploadableFile, bucket: String, path: String): Either[UploadError, Unit] def close(): Unit diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala index caf9c6c7a7..750ccc08e2 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala @@ -42,7 +42,7 @@ case class FileCreateError(exception: Throwable, data: String) extends UploadErr } case class FileDeleteError(exception: Throwable, fileName: String) extends UploadError { - override def message() = s"error deleting file (${fileName}) ${exception.getMessage}" + override def message() = s"error deleting file ($fileName) ${exception.getMessage}" } case class FileLoadError(exception: Throwable, fileName: String) extends UploadError { @@ -51,6 +51,11 @@ case class FileLoadError(exception: Throwable, fileName: String) extends UploadE def toException = new RuntimeException(message(), exception) } +case class FileNameParseError(exception: Throwable, fileName: String) extends UploadError { + override def message() = s"error parsing file name ($fileName) ${exception.getMessage}" + + def toException = new RuntimeException(message(), exception) +} case class FileListError(exception: Throwable, bucket: String, path: Option[String]) extends UploadError { override def message() = s"error listing files (${path}) ${exception.getMessage}" } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerErrorsTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerErrorsTest.scala new file mode 100644 index 0000000000..2e0f8ebc4e --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerErrorsTest.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.sink.seek + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class IndexManagerErrorsTest extends AnyFlatSpec with Matchers { + + "corruptStorageState" should "return the correct error message" in { + val system = "TestSystem" + val expectedError = + """=============================================================================================== + |The TestSystem storage state is corrupted. The connector state is out of sync + |with the data. This could happen if the connector has been recreated and the data was deleted. + |Delete the connector's .index subfolder as well and restart the connector. + |===============================================================================================""".stripMargin + + val result = IndexManagerErrors.corruptStorageState(system) + result shouldBe expectedError + } + + "fileDeleteError" should "return the correct error message" in { + val system = "TestSystem" + val expectedError = + """=============================================================================================== + |There was an issue deleting old index files from the indexes directory. This could happen if + |you have not granted the connector role appropriate delete permissions via the TestSystem + |permissions model. + |===============================================================================================""".stripMargin + + val result = IndexManagerErrors.fileDeleteError(system) + result shouldBe expectedError + } +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala index a9dee66358..f8459d4b80 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/seek/IndexManagerTest.scala @@ -34,6 +34,8 @@ import org.scalatest.BeforeAndAfter import org.scalatest.EitherValues import org.scalatest.OptionValues +import java.io.FileNotFoundException +import java.io.IOException import java.time.Instant class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues with OptionValues with BeforeAndAfter { @@ -52,6 +54,10 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w private val indexManager = new IndexManager(maxIndexes) + before { + when(storageInterface.system()).thenReturn("TestaCloud") + } + after { reset(storageInterface) } @@ -124,7 +130,7 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w } "clean" should "return error when too many indexes have accumulated" in { - setUpTooManyIndexes + setUpTooManyIndexes() val capturedEx = indexManager.clean(bucketName, indexPath, topicPartition).left.value capturedEx shouldBe a[FatalCloudSinkError] @@ -138,7 +144,7 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w cleanInOrder.verifyNoMoreInteractions() } - private def setUpTooManyIndexes = { + private def setUpTooManyIndexes() = { val tenIndexes = Range(0, 9).map(x => f".indexes/sinkName/myTopic/00005/000000000000000000$x%020d").toList when( storageInterface.listKeysRecursive( @@ -223,9 +229,9 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w cleanInOrder.verifyNoMoreInteractions() } - "seek" should "correctly seek files" in { + "initial seek" should "correctly seek files" in { - val existingIndexes = setUpExistingIndexes + val existingIndexes = setUpExistingIndexes() when( storageInterface.listKeysRecursive( any[String], @@ -240,7 +246,7 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w ).some.asRight, ) when(storageInterface.deleteFiles(eqTo(bucketName), any[List[String]])).thenReturn(().asRight) - val seekRes = indexManager.seek(topicPartition, bucketName) + val seekRes = indexManager.initialSeek(topicPartition, bucketName) seekRes.value should be(Some(topicPartition.withOffset(Offset(70)))) val seekInOrder = inOrder(storageInterface) @@ -270,15 +276,15 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w seekInOrder.verifyNoMoreInteractions() } - "seek" should "sulk when too many index files have accumulated" in { + "initial seek" should "sulk when too many index files have accumulated" in { - setUpTooManyIndexes + setUpTooManyIndexes() val target = "testString" when(storageInterface.getBlobAsString(any[String], any[String])).thenReturn(target.asRight) when(storageInterface.pathExists(any[String], any[String])).thenReturn(true.asRight) when(storageInterface.deleteFiles(eqTo(bucketName), any[List[String]])).thenReturn(().asRight) - val seekRes = indexManager.seek(topicPartition, bucketName) + val seekRes = indexManager.initialSeek(topicPartition, bucketName) val capturedEx = seekRes.left.value capturedEx shouldBe a[FatalCloudSinkError] capturedEx.message() should startWith("Too many index files have accumulated") @@ -293,7 +299,7 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w "scanIndexes" should "identify most recent index" in { - val existingIndexes = setUpExistingIndexes + val existingIndexes = setUpExistingIndexes() indexManager.scanIndexes(bucketName, existingIndexes.map(_._1).toList) should be( Right(Some(".indexes/sinkName/myTopic/00005/00000000000000000070")), @@ -313,7 +319,7 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w scannedInOrder.verifyNoMoreInteractions() } - private def setUpExistingIndexes = { + private def setUpExistingIndexes() = { // of the 3 indexes: // * 50, the file exists but has been superceded by a new one. DELETE // * 70, the file exists and is the latest index. File exists. KEEP @@ -349,4 +355,23 @@ class IndexManagerTest extends AnyFlatSpec with MockitoSugar with EitherValues w ) scannedInOrder.verifyNoMoreInteractions() } + + "handleSeekAndCleanErrors" should "handle FileLoadError correctly" in { + val fileLoadError = FileLoadError(new FileNotFoundException("what"), "noFile.txt") + val result = indexManager.handleSeekAndCleanErrors(fileLoadError) + result shouldBe a[NonFatalCloudSinkError] + result.message should include("The TestaCloud storage state is corrupted.") + } + + "handleSeekAndCleanErrors" should "handle FileDeleteError correctly" in { + val fileDeleteError = FileDeleteError(new IOException("need input"), "noinput.txt") + val result = indexManager.handleSeekAndCleanErrors(fileDeleteError) + result shouldBe a[NonFatalCloudSinkError] + } + + "handleSeekAndCleanErrors" should "handle FileNameParseError correctly" in { + val fileNameParseError = FileNameParseError(new FileNotFoundException("Invalid file name"), "nofile.txt") + val result = indexManager.handleSeekAndCleanErrors(fileNameParseError) + result shouldBe a[NonFatalCloudSinkError] + } } diff --git a/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterfaceTest.scala b/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterfaceTest.scala new file mode 100644 index 0000000000..732ece33b7 --- /dev/null +++ b/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterfaceTest.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.gcp.storage.storage + +import io.lenses.streamreactor.connect.cloud.common.model.UploadableString +import io.lenses.streamreactor.connect.gcp.storage.utils.GCPProxyContainerTest +import org.scalatest.EitherValues +import org.scalatest.matchers.should.Matchers + +class GCPStorageStorageInterfaceTest extends GCPProxyContainerTest with Matchers with EitherValues { + + "pathExists" should "return false if path not found" in { + val result = storageInterface.pathExists(BucketName, path = "/my/made/up/path") + result.value should be(false) + } + + "pathExists" should "return true if path exists" in { + storageInterface.writeStringToFile(BucketName, + "/my/real/path", + UploadableString("Wouldn't you like to be a pepper too?"), + ) + + val result = storageInterface.pathExists(BucketName, path = "/my/real/path") + result.value should be(true) + } + +} diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala index f1eae757cf..341cdc9ccc 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala @@ -85,37 +85,44 @@ class GCPStorageStorageInterface(connectorTaskId: ConnectorTaskId, storage: Stor override def close(): Unit = Try(storage.close()).getOrElse(()) - private def usingBlob[X](bucket: String, path: String)(f: Blob => X): Either[FileLoadError, X] = + private def usingBlob[X](bucket: String, path: String)(f: Option[Blob] => X): Either[FileLoadError, X] = Try { - val blob = storage.get(BlobId.of(bucket, path)) - f(blob) + val blob = storage.get(BlobId.of(bucket, path)) + val optiBlob = Option(blob) + f(optiBlob) }.toEither.leftMap { FileLoadError(_, path) } override def pathExists(bucket: String, path: String): Either[FileLoadError, Boolean] = usingBlob[Boolean](bucket, path) { - blob => - blob.exists().booleanValue() + maybeBlob => + maybeBlob.nonEmpty && maybeBlob.exists(blob => blob.exists()) } override def getBlob(bucket: String, path: String): Either[FileLoadError, InputStream] = usingBlob[InputStream](bucket, path) { - blob => + case Some(blob) => val reader = blob.reader() Channels.newInputStream(reader) + case None => + throw new IllegalStateException("No/null blob found (file doesn't exist?)") } override def getBlobAsString(bucket: String, path: String): Either[FileLoadError, String] = usingBlob[String](bucket, path) { - blob => + case Some(blob) => new String(blob.getContent()) + case None => + throw new IllegalStateException("No/null blob found (file doesn't exist?)") } override def getMetadata(bucket: String, path: String): Either[FileLoadError, ObjectMetadata] = usingBlob[ObjectMetadata](bucket, path) { - blob => + case Some(blob) => ObjectMetadata(blob.getSize, blob.getCreateTimeOffsetDateTime.toInstant) + case None => + throw new IllegalStateException("No/null blob found (file doesn't exist?)") } override def writeStringToFile(bucket: String, path: String, data: UploadableString): Either[UploadError, Unit] = { @@ -270,4 +277,10 @@ class GCPStorageStorageInterface(connectorTaskId: ConnectorTaskId, storage: Stor .map(lmValue => GCPStorageFileMetadata(fileName, lmValue)) .orElse(getMetadata(bucket, fileName).map(oMeta => GCPStorageFileMetadata(fileName, oMeta.lastModified)).toOption) + /** + * Gets the system name for use in log messages. + * + * @return + */ + override def system(): String = "GCP Storage" }