From c272cd51de2d52df5fb73c6d8eae1e0aed0aa725 Mon Sep 17 00:00:00 2001 From: David Sloan <33483659+davidsloan@users.noreply.github.com> Date: Fri, 29 Nov 2024 14:32:22 +0000 Subject: [PATCH] Post-Processing for Datalake Cloud Source Connectors (#161) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After processing files from the S3/GCP Storage source, this enables the feature of deleting or moving the files after they've been committed. # New KCQL Configuration Options for Datalake Cloud Connectors The following configuration options introduce post-processing capabilities for the AWS S3, GCP Storage, and (coming soon) Azure Datalake Gen 2 **source connectors**. These options allow the connector to manage source files after they are successfully processed, either by deleting the file or moving it to a new location in cloud storage. In Kafka Connect, post-processing is triggered when the framework calls the `commitRecord` method after a source record is successfully processed. The configured action then determines how the source file is handled. If no `post.process.action` is configured, **no post-processing will occur**, and the file will remain in its original location. --- ## KCQL Configuration Options ### 1. `post.process.action` - **Description**: Defines the action to perform on a file after it has been processed. - **Options**: - `DELETE` – Removes the file after processing. - `MOVE` – Relocates the file to a new location after processing. ### 2. `post.process.action.bucket` - **Description**: Specifies the target bucket for files when using the `MOVE` action. - **Applicability**: Only applies to the `MOVE` action. - **Notes**: This field is **mandatory** when `post.process.action` is set to `MOVE`. ### 3. `post.process.action.prefix` - **Description**: Specifies a new prefix to replace the existing one for the file’s location when using the `MOVE` action. The file's path will remain unchanged except for the prefix. - **Applicability**: Only applies to the `MOVE` action. - **Notes**: This field is **mandatory** when `post.process.action` is set to `MOVE`. --- ## Key Use Cases - **DELETE**: Automatically removes source files to free up storage space and prevent redundant data from remaining in the bucket. - **MOVE**: Organizes processed source files by relocating them to a different bucket or prefix, which is useful for archiving, categorizing, or preparing files for other workflows. --- ## Examples ### Example 1: Deleting Files After Processing To configure the source connector to delete files after processing, use the following KCQL: ```kcql INSERT INTO `my-bucket` SELECT * FROM `my-topic` PROPERTIES ( 'post.process.action'=`DELETE` ) ``` ### Example 2: Moving Files After Processing To configure the source connector to move files to a different bucket and prefix, use the following KCQL: ```kcql INSERT INTO `my-bucket:archive/` SELECT * FROM `my-topic` PROPERTIES ( 'post.process.action'=`MOVE`, 'post.process.action.bucket'=`archive-bucket`, 'post.process.action.prefix'=`archive/` ) ``` In this example: * The file is moved to `archive-bucket`. * The prefix `archive/` is applied to the file’s path while keeping the rest of the path unchanged. ## Important Considerations * Both `post.process.action.bucket` and `post.process.action.prefix` are mandatory when using the `MOVE` action. * For the `DELETE` action, no additional configuration is required. * If no `post.process.action` is configured, no post-processing will be applied, and the file will remain in its original location. * * Configuration for Burn-After-Reading * Implementing actions and storage interfaces. Needs add tests. The file move logic needs testing where it resolves the path - is this even the best configuration? * Storage interface tests * Address comment from review referencing this page on moving items in GCP: https://cloud.google.com/storage/docs/samples/storage-move-file * * Adding temporary logging, fixing a bug with the Map equality not enabling prefixes to map to each other * Fix Move action * Fix prefix replace behaviour * Changes to ensure error handling approach is correct * Review fixes - remove S3 references * Avoid variable shadowing * Avoid variable shadowing * add documentation * CopyObjectResponse --- .../connect/aws/s3/source/S3SourceTask.scala | 10 +- .../s3/storage/AwsS3StorageInterface.scala | 30 +++ .../s3/source/reader/ReaderManagerTest.scala | 12 + .../reader/S3PartitionDiscoveryTest.scala | 76 ++++-- .../state/ReaderManagerBuilderTest.scala | 8 +- .../storage/AwsS3StorageInterfaceTest.scala | 105 +++++++++ .../storage/DatalakeStorageInterface.scala | 10 + .../DatalakeStorageInterfaceTest.scala | 53 +++++ .../config/kcqlprops/PropsKeyEnum.scala | 7 + .../formats/reader/CloudStreamReader.scala | 4 +- .../converters/BytesOutputRowConverter.scala | 4 +- .../converters/SchemaAndValueConverter.scala | 4 +- .../SchemaAndValueEnvelopeConverter.scala | 4 +- .../SchemalessEnvelopeConverter.scala | 4 +- .../reader/converters/TextConverter.scala | 4 +- .../common/model/location/CloudLocation.scala | 6 + .../cloud/common/source/CloudSourceTask.scala | 64 +++-- .../cloud/common/source/CommitWatermark.scala | 20 ++ .../cloud/common/source/ContextBoolean.scala | 33 +++ .../common/source/ContextConstants.scala | 2 + .../common/source/SourceContextReader.scala | 24 +- .../cloud/common/source/SourceWatermark.scala | 90 ++++++- .../config/CloudSourceBucketOptions.scala | 4 + .../source/config/PostProcessAction.scala | 92 ++++++++ .../kcqlprops/CloudSourcePropsSchema.scala | 23 +- .../kcqlprops/PostProcessActionEnum.scala | 29 +++ .../source/reader/PartitionDiscovery.scala | 7 +- .../common/source/reader/ReaderManager.scala | 27 ++- .../source/state/CloudSourceTaskState.scala | 29 ++- .../source/state/ReaderManagerBuilder.scala | 15 +- .../common/storage/StorageInterface.scala | 2 + .../cloud/common/storage/UploadError.scala | 4 + ...elegateIteratorCloudStreamReaderTest.scala | 4 +- .../BytesOutputRowConverterTest.scala | 12 +- .../SchemaAndValueConverterTest.scala | 4 +- .../SchemaAndValueEnvelopeConverterTest.scala | 14 +- .../SchemalessEnvelopeConverterTest.scala | 17 +- .../reader/converters/TextConverterTest.scala | 7 +- .../common/source/SourceWatermarkTest.scala | 2 + .../source/config/PostProcessActionTest.scala | 112 +++++++++ .../reader/PartitionDiscoveryTest.scala | 24 +- .../storage/source/GCPStorageSourceTask.scala | 10 +- .../storage/GCPStorageStorageInterface.scala | 55 ++++- .../GCPStorageStorageInterfaceTest.scala | 221 ++++++++++++++++++ 44 files changed, 1139 insertions(+), 149 deletions(-) create mode 100644 kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterfaceTest.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CommitWatermark.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextBoolean.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessAction.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/PostProcessActionEnum.scala create mode 100644 kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessActionTest.scala diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala index 3d8111cbb3..8611c4fe21 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala @@ -33,7 +33,7 @@ class S3SourceTask S3FileMetadata, S3SourceConfig, S3Client, - ] + ]("/aws-s3-source-ascii.txt") with LazyLogging { val validator: CloudLocationValidator = S3LocationValidator @@ -41,10 +41,10 @@ class S3SourceTask override def createStorageInterface( connectorTaskId: ConnectorTaskId, config: S3SourceConfig, - s3Client: S3Client, + client: S3Client, ): AwsS3StorageInterface = new AwsS3StorageInterface(connectorTaskId = connectorTaskId, - s3Client = s3Client, + s3Client = client, batchDelete = config.batchDelete, extensionFilter = config.extensionFilter, ) @@ -59,6 +59,6 @@ class S3SourceTask override def connectorPrefix: String = CONNECTOR_PREFIX - override def createDirectoryLister(connectorTaskId: ConnectorTaskId, s3Client: S3Client): DirectoryLister = - new AwsS3DirectoryLister(connectorTaskId, s3Client) + override def createDirectoryLister(connectorTaskId: ConnectorTaskId, client: S3Client): DirectoryLister = + new AwsS3DirectoryLister(connectorTaskId, client) } diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala index 881023d754..5a2f0b7b10 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError import io.lenses.streamreactor.connect.cloud.common.storage.FileListError import io.lenses.streamreactor.connect.cloud.common.storage.FileLoadError +import io.lenses.streamreactor.connect.cloud.common.storage.FileMoveError import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse import io.lenses.streamreactor.connect.cloud.common.storage.ListOfMetadataResponse import io.lenses.streamreactor.connect.cloud.common.storage.ListResponse @@ -295,4 +296,33 @@ class AwsS3StorageInterface( * @return */ override def system(): String = "S3" + + override def mvFile( + oldBucket: String, + oldPath: String, + newBucket: String, + newPath: String, + ): Either[FileMoveError, Unit] = { + val headObjectRequest = HeadObjectRequest.builder().bucket(oldBucket).key(oldPath).build() + Try(s3Client.headObject(headObjectRequest)) match { + case Failure(ex: NoSuchKeyException) => + logger.warn("Object ({}/{}) doesn't exist to move", oldBucket, oldPath, ex) + ().asRight + case Failure(ex) => + logger.error("Object ({}/{}) could not be retrieved", ex) + FileMoveError(ex, oldPath, newPath).asLeft + case Success(_) => + Try { + s3Client.copyObject( + CopyObjectRequest.builder().sourceKey(oldPath).destinationKey(newPath).sourceBucket( + oldBucket, + ).destinationBucket( + newBucket, + ).build(), + ) + s3Client.deleteObject(DeleteObjectRequest.builder().bucket(oldBucket).key(oldPath).build()) + }.toEither.leftMap(FileMoveError(_, oldPath, newPath)).void + } + } + } diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/ReaderManagerTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/ReaderManagerTest.scala index f546175a8e..7bd3f5b01e 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/ReaderManagerTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/ReaderManagerTest.scala @@ -25,9 +25,11 @@ import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator +import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager import io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader +import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.source.SourceRecord import org.mockito.MockitoSugar @@ -46,12 +48,16 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with private val bucketAndPrefix = CloudLocation("test", "ing".some) private val firstFileBucketAndPath = bucketAndPrefix.withPath("test:ing/topic/9/0.json") private val firstFileBucketAndPathAndLine = firstFileBucketAndPath.atLine(0).withTimestamp(Instant.now) + private val noPostProcessAction = Option.empty[PostProcessAction] + private val storageInterface = mock[StorageInterface[_]] "poll" should "be empty when no results found" in { val fileQueueProcessor: SourceFileQueue = mock[SourceFileQueue] var locationFnCalls = 0 val target = new ReaderManager( + bucketAndPrefix, + bucketAndPrefix, recordsLimit, fileQueueProcessor, _ => @@ -61,6 +67,8 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with }, connectorTaskId, Ref[IO].of(Option.empty[ResultReader]).unsafeRunSync(), + storageInterface, + noPostProcessAction, ) when(fileQueueProcessor.next()).thenReturn(None.asRight) @@ -100,6 +108,8 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with ).thenReturn(firstFileBucketAndPath) val target = new ReaderManager( + bucketAndPrefix, + bucketAndPrefix, recordsLimit, fileQueueProcessor, location => { @@ -108,6 +118,8 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with }, connectorTaskId, Ref[IO].of(Option.empty[ResultReader]).unsafeRunSync(), + storageInterface, + noPostProcessAction, ) target.poll().unsafeRunSync() should be(pollResults) diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/S3PartitionDiscoveryTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/S3PartitionDiscoveryTest.scala index b9d18e4bdf..ef2842c7a4 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/S3PartitionDiscoveryTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/reader/S3PartitionDiscoveryTest.scala @@ -29,6 +29,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions.ExcludeIndexes +import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction import io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher import io.lenses.streamreactor.connect.cloud.common.source.distribution.PartitionSearcherResponse import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue @@ -36,6 +37,7 @@ import io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDisco import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState import io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader +import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import org.mockito.MockitoSugar import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers @@ -44,9 +46,12 @@ import scala.concurrent.duration.DurationInt class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with MockitoSugar { private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator - private val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1) + private val root = CloudLocation("bucket", "path".some) + private val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1) private val fFilesLimit: CloudLocation => Either[Throwable, Int] = _ => 1000.asRight + private val storageInterface = mock[StorageInterface[_]] + private val noPostProcessAction = Option.empty[PostProcessAction] "PartitionDiscovery" should "discover all partitions" in { val fileQueueProcessor: SourceFileQueue = mock[SourceFileQueue] @@ -78,12 +83,19 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit connectorTaskId, ).find, (_, _) => - IO(new ReaderManager(limit, - fileQueueProcessor, - _ => Left(new RuntimeException()), - connectorTaskId, - readerRef, - )), + IO( + new ReaderManager( + root, + root, + limit, + fileQueueProcessor, + _ => Left(new RuntimeException()), + connectorTaskId, + readerRef, + storageInterface, + noPostProcessAction, + ), + ), state, cancelledRef, ).start @@ -146,12 +158,18 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit connectorTaskId, ).find, (_, _) => - IO(new ReaderManager(limit, - fileQueueProcessor, - _ => Left(new RuntimeException()), - connectorTaskId, - readerRef, - )), + IO( + new ReaderManager(root, + root, + limit, + fileQueueProcessor, + _ => Left(new RuntimeException()), + connectorTaskId, + readerRef, + storageInterface, + noPostProcessAction, + ), + ), state, cancelledRef, ).start @@ -206,12 +224,18 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit connectorTaskId, ).find, (_, _) => - IO(new ReaderManager(limit, - fileQueueProcessor, - _ => Left(new RuntimeException()), - connectorTaskId, - readerRef, - )), + IO( + new ReaderManager(root, + root, + limit, + fileQueueProcessor, + _ => Left(new RuntimeException()), + connectorTaskId, + readerRef, + storageInterface, + noPostProcessAction, + ), + ), state, cancelledRef, ).start @@ -271,7 +295,19 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit ( _, _, - ) => IO(new ReaderManager(limit, fileQueueProcessor, _ => Left(new RuntimeException()), taskId, readerRef)), + ) => + IO( + new ReaderManager(root, + root, + limit, + fileQueueProcessor, + _ => Left(new RuntimeException()), + taskId, + readerRef, + storageInterface, + noPostProcessAction, + ), + ), state, cancelledRef, ).start diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/state/ReaderManagerBuilderTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/state/ReaderManagerBuilderTest.scala index 4ecb5e8aba..d60a2d8605 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/state/ReaderManagerBuilderTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/state/ReaderManagerBuilderTest.scala @@ -55,10 +55,12 @@ class ReaderManagerBuilderTest extends AsyncFlatSpec with AsyncIOSpec with Match None, OrderingType.LastModified, false, + Option.empty, ) - val taskId = ConnectorTaskId("test", 3, 1) - ReaderManagerBuilder(root, path, si, taskId, contextF, _ => Some(sbo)) - .asserting(_ => rootValue shouldBe Some(root.copy(prefix = Some(path)))) + val taskId = ConnectorTaskId("test", 3, 1) + val pathLocation = root.withPath(path) + ReaderManagerBuilder(root, pathLocation, si, taskId, contextF, _ => Some(sbo)) + .asserting(_ => rootValue shouldBe Some(pathLocation)) } } diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterfaceTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterfaceTest.scala new file mode 100644 index 0000000000..00374f2c83 --- /dev/null +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterfaceTest.scala @@ -0,0 +1,105 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.storage + +import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId +import io.lenses.streamreactor.connect.cloud.common.storage.FileMoveError +import org.mockito.ArgumentMatchersSugar +import org.mockito.MockitoSugar +import org.scalatest.EitherValues +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.CopyObjectRequest +import software.amazon.awssdk.services.s3.model.CopyObjectResponse +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse +import software.amazon.awssdk.services.s3.model.HeadObjectRequest +import software.amazon.awssdk.services.s3.model.HeadObjectResponse +import software.amazon.awssdk.services.s3.model.NoSuchKeyException + +class AwsS3StorageInterfaceTest + extends AnyFlatSpecLike + with Matchers + with MockitoSugar + with ArgumentMatchersSugar + with EitherValues { + + "mvFile" should "move a file from one bucket to another successfully" in { + val s3Client = mock[S3Client] + val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None) + + val copyObjectResponse: CopyObjectResponse = CopyObjectResponse.builder().build() + when(s3Client.copyObject(any[CopyObjectRequest])).thenReturn(copyObjectResponse) + val deleteObjectResponse: DeleteObjectResponse = DeleteObjectResponse.builder().build() + when(s3Client.deleteObject(any[DeleteObjectRequest])).thenReturn(deleteObjectResponse) + + val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath") + + result shouldBe Right(()) + verify(s3Client).copyObject(any[CopyObjectRequest]) + verify(s3Client).deleteObject(any[DeleteObjectRequest]) + } + + it should "return a FileMoveError if copyObject fails" in { + val s3Client = mock[S3Client] + val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None) + + when(s3Client.copyObject(any[CopyObjectRequest])).thenThrow(new RuntimeException("Copy failed")) + + val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath") + + result.isLeft shouldBe true + result.left.value shouldBe a[FileMoveError] + verify(s3Client).copyObject(any[CopyObjectRequest]) + verify(s3Client, never).deleteObject(any[DeleteObjectRequest]) + } + + it should "return a FileMoveError if deleteObject fails" in { + val s3Client = mock[S3Client] + val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None) + + val headObjectResponse: HeadObjectResponse = HeadObjectResponse.builder().build() + when(s3Client.headObject(any[HeadObjectRequest])).thenReturn(headObjectResponse) + val copyObjectResponse: CopyObjectResponse = CopyObjectResponse.builder().build() + when(s3Client.copyObject(any[CopyObjectRequest])).thenReturn(copyObjectResponse) + when(s3Client.deleteObject(any[DeleteObjectRequest])).thenThrow(new RuntimeException("Delete failed")) + + val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath") + + result.isLeft shouldBe true + result.left.value shouldBe a[FileMoveError] + verify(s3Client).copyObject(any[CopyObjectRequest]) + verify(s3Client).deleteObject(any[DeleteObjectRequest]) + } + + it should "pass if no source object exists" in { + val s3Client = mock[S3Client] + val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None) + + when(s3Client.headObject(any[HeadObjectRequest])).thenThrow(NoSuchKeyException.builder().build()) + val copyObjectResponse: CopyObjectResponse = CopyObjectResponse.builder().build() + when(s3Client.copyObject(any[CopyObjectRequest])).thenReturn(copyObjectResponse) + when(s3Client.deleteObject(any[DeleteObjectRequest])).thenThrow(new RuntimeException("Delete failed")) + + val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath") + + result.isRight shouldBe true + verify(s3Client).headObject(any[HeadObjectRequest]) + verify(s3Client, never).copyObject(any[CopyObjectRequest]) + verify(s3Client, never).deleteObject(any[DeleteObjectRequest]) + } +} diff --git a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala index bbfa007bb0..f5094400c2 100644 --- a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala +++ b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterface.scala @@ -227,4 +227,14 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak * @return */ override def system(): String = "Azure Datalake" + + override def mvFile( + oldBucket: String, + oldPath: String, + newBucket: String, + newPath: String, + ): Either[FileMoveError, Unit] = + Try(client.getFileSystemClient(oldBucket).getFileClient(oldPath).rename(newBucket, newPath)).toEither.leftMap( + FileMoveError(_, oldPath, newPath), + ).void } diff --git a/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterfaceTest.scala b/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterfaceTest.scala index 81f3904e3b..589565337d 100644 --- a/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterfaceTest.scala +++ b/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/storage/DatalakeStorageInterfaceTest.scala @@ -20,6 +20,7 @@ import cats.implicits.none import com.azure.storage.file.datalake.DataLakeFileClient import com.azure.storage.file.datalake.DataLakeFileSystemClient import com.azure.storage.file.datalake.DataLakeServiceClient +import com.azure.storage.file.datalake.models.DataLakeStorageException import com.azure.storage.file.datalake.models.ListPathsOptions import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile @@ -29,6 +30,7 @@ import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError import io.lenses.streamreactor.connect.cloud.common.storage.FileListError import io.lenses.streamreactor.connect.cloud.common.storage.FileLoadError +import io.lenses.streamreactor.connect.cloud.common.storage.FileMoveError import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse import io.lenses.streamreactor.connect.cloud.common.storage.NonExistingFileError import io.lenses.streamreactor.connect.cloud.common.storage.UploadFailedError @@ -436,4 +438,55 @@ class DatalakeStorageInterfaceTest result.left.value should be(a[FileDeleteError]) } + "mvFile" should "move a file from one bucket to another successfully" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val fileClient = mock[DataLakeFileClient] + when(client.getFileSystemClient(oldBucket).getFileClient(oldPath)).thenReturn(fileClient) + when(fileClient.rename(newBucket, newPath)).thenReturn(fileClient) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + result should be(Right(())) + verify(fileClient).rename(newBucket, newPath) + } + + "mvFile" should "return a FileMoveError if rename fails" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val fileClient = mock[DataLakeFileClient] + when(client.getFileSystemClient(oldBucket).getFileClient(oldPath)).thenReturn(fileClient) + when(fileClient.rename(newBucket, newPath)).thenThrow(new DataLakeStorageException("Rename failed", null, null)) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + result.isLeft should be(true) + result.left.value should be(a[FileMoveError]) + verify(fileClient).rename(newBucket, newPath) + } + + "mvFile" should "return a FileMoveError if the old file does not exist" in { + val oldBucket = "oldBucket" + val oldPath = "nonExistingPath" + val newBucket = "newBucket" + val newPath = "newPath" + + when(client.getFileSystemClient(oldBucket).getFileClient(oldPath)).thenThrow(new DataLakeStorageException( + "File not found", + null, + null, + )) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + result.isLeft should be(true) + result.left.value should be(a[FileMoveError]) + } + } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala index 9ae5918772..07e536711a 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala @@ -61,4 +61,11 @@ object PropsKeyEnum extends Enum[PropsKeyEntry] { case object FlushInterval extends PropsKeyEntry("flush.interval") + // enum - copy, move, delete, tag, execute lambda trigger + case object PostProcessAction extends PropsKeyEntry("post.process.action") + + case object PostProcessActionBucket extends PropsKeyEntry("post.process.action.bucket") + + case object PostProcessActionPrefix extends PropsKeyEntry("post.process.action.prefix") + } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/CloudStreamReader.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/CloudStreamReader.scala index 97090086e8..b50073177e 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/CloudStreamReader.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/CloudStreamReader.scala @@ -28,7 +28,7 @@ trait CloudStreamReader extends AutoCloseable with Iterator[SourceRecord] { trait CloudDataIterator[T] extends Iterator[T] with AutoCloseable trait Converter[T] { - def convert(t: T, index: Long): SourceRecord + def convert(t: T, index: Long, lastLine: Boolean): SourceRecord } class DelegateIteratorCloudStreamReader[T]( @@ -52,7 +52,7 @@ class DelegateIteratorCloudStreamReader[T]( override def next(): SourceRecord = { val data = iterator.next() recordIndex = recordIndex + 1 - converter.convert(data, recordIndex) + converter.convert(data, recordIndex, !hasNext) } override def close(): Unit = iterator.close() diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverter.scala index 8c6763ec0f..f1d40009b2 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverter.scala @@ -32,10 +32,10 @@ class BytesOutputRowConverter( location: CloudLocation, lastModified: Instant, ) extends Converter[BytesOutputRow] { - override def convert(row: BytesOutputRow, index: Long): SourceRecord = + override def convert(row: BytesOutputRow, index: Long, lastLine: Boolean): SourceRecord = new SourceRecord( watermarkPartition, - SourceWatermark.offset(location, index, lastModified), + SourceWatermark.offset(location, index, lastModified, lastLine), topic.value, partition, Schema.BYTES_SCHEMA, diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverter.scala index 44fe1caf15..351d487529 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverter.scala @@ -31,10 +31,10 @@ class SchemaAndValueConverter( location: CloudLocation, lastModified: Instant, ) extends Converter[SchemaAndValue] { - override def convert(schemaAndValue: SchemaAndValue, index: Long): SourceRecord = + override def convert(schemaAndValue: SchemaAndValue, index: Long, lastLine: Boolean): SourceRecord = new SourceRecord( watermarkPartition, - SourceWatermark.offset(location, index, lastModified), + SourceWatermark.offset(location, index, lastModified, lastLine), topic.value, partition, null, diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverter.scala index 99da68bfd9..9585ac905c 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverter.scala @@ -68,7 +68,7 @@ class SchemaAndValueEnvelopeConverter( lastModified: Instant, instantF: () => Instant = () => Instant.now(), ) extends Converter[SchemaAndValue] { - override def convert(schemaAndValue: SchemaAndValue, index: Long): SourceRecord = { + override def convert(schemaAndValue: SchemaAndValue, index: Long, lastLine: Boolean): SourceRecord = { if (schemaAndValue.schema().`type`() != Schema.Type.STRUCT) { throw new RuntimeException( s"Invalid schema type [${schemaAndValue.schema().`type`()}]. Expected [${Schema.Type.STRUCT}]", @@ -101,7 +101,7 @@ class SchemaAndValueEnvelopeConverter( new SourceRecord( watermarkPartition, - SourceWatermark.offset(location, index, lastModified), + SourceWatermark.offset(location, index, lastModified, lastLine), topic.value, partition, keySchema.orNull, diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverter.scala index c0812a0155..08343c205f 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverter.scala @@ -67,7 +67,7 @@ class SchemalessEnvelopeConverter( lastModified: Instant, instantF: () => Instant = () => Instant.now(), ) extends Converter[String] { - override def convert(envelope: String, index: Long): SourceRecord = + override def convert(envelope: String, index: Long, lastLine: Boolean): SourceRecord = //parse the json and then extract key,value, headers and metadata parse(envelope) match { case Left(value) => throw new RuntimeException(s"Failed to parse envelope [$envelope].", value) @@ -86,7 +86,7 @@ class SchemalessEnvelopeConverter( val sourceRecord = new SourceRecord( watermarkPartition, - SourceWatermark.offset(location, index, lastModified), + SourceWatermark.offset(location, index, lastModified, lastLine), topic.value, metadata.flatMap(j => j("partition").get.asNumber.flatMap(_.toInt).map(Integer.valueOf)).getOrElse(partition), key.map { _ => diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverter.scala index 6a08dc5820..cee577fb89 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverter.scala @@ -32,10 +32,10 @@ class TextConverter( location: CloudLocation, lastModified: Instant, ) extends Converter[String] { - override def convert(value: String, index: Long): SourceRecord = + override def convert(value: String, index: Long, lastLine: Boolean): SourceRecord = new SourceRecord( watermarkPartition, - SourceWatermark.offset(location, index, lastModified), + SourceWatermark.offset(location, index, lastModified, lastLine), topic.value, partition, null, diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala index 6471d3ea7c..7741c15442 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala @@ -20,6 +20,7 @@ import cats.data.Validated import cats.implicits.catsSyntaxEitherId import cats.implicits.catsSyntaxOptionId import cats.implicits.none +import io.lenses.streamreactor.connect.cloud.common.source.state.CloudLocationKey import java.time.Instant @@ -34,6 +35,8 @@ case class CloudLocation( val cloudLocationValidator: CloudLocationValidator, ) { + def toKey = CloudLocationKey(bucket, prefix) + def fromRoot(root: String): CloudLocation = copy(prefix = root.some) @@ -49,6 +52,9 @@ case class CloudLocation( def withPath(path: String): CloudLocation = copy(path = path.some) + def withPrefix(prefix: String): CloudLocation = + copy(prefix = prefix.some) + def pathOrUnknown: String = path.getOrElse("(Unavailable)") def prefixOrDefault(): String = prefix.getOrElse("") diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala index a8112c1d32..832ca69f20 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CloudSourceTask.scala @@ -29,6 +29,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskIdCreator import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator +import io.lenses.streamreactor.connect.cloud.common.source.SourceWatermark.readOffsetWatermark import io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher import io.lenses.streamreactor.connect.cloud.common.source.distribution.PartitionSearcher import io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDiscovery @@ -40,14 +41,16 @@ import io.lenses.streamreactor.connect.cloud.common.storage.DirectoryLister import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import io.lenses.streamreactor.connect.cloud.common.utils.MapUtils +import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.connect.source.SourceRecord import org.apache.kafka.connect.source.SourceTask import java.util import java.util.Collections import scala.jdk.CollectionConverters._ -abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], CT] - extends SourceTask +abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], CT]( + sinkAsciiArtResource: String, +) extends SourceTask with LazyLogging with WithConnectorPrefix with JarManifestProvided { @@ -58,7 +61,7 @@ abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], C SourceContextReader.getCurrentOffset(() => context) @volatile - private var s3SourceTaskState: Option[CloudSourceTaskState] = None + private var cloudSourceTaskState: Option[CloudSourceTaskState] = None @volatile private var cancelledRef: Option[Ref[IO, Boolean]] = None @@ -72,9 +75,9 @@ abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], C */ override def start(props: util.Map[String, String]): Unit = { - printAsciiHeader(manifest, "/aws-s3-source-ascii.txt") + printAsciiHeader(manifest, sinkAsciiArtResource) - logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties") + logger.debug(s"Received call to CloudSourceTask.start with ${props.size()} properties") val contextProperties: Map[String, String] = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty) @@ -83,23 +86,23 @@ abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], C result <- make(validator, connectorPrefix, mergedProperties, contextOffsetFn) fiber <- result.partitionDiscoveryLoop.start } yield { - s3SourceTaskState = result.some + cloudSourceTaskState = result.some cancelledRef = result.cancelledRef.some partitionDiscoveryLoop = fiber.some }).unsafeRunSync() } override def stop(): Unit = { - logger.info(s"Stopping S3 source task") - (s3SourceTaskState, cancelledRef, partitionDiscoveryLoop) match { + logger.info(s"Stopping cloud source task") + (cloudSourceTaskState, cancelledRef, partitionDiscoveryLoop) match { case (Some(state), Some(signal), Some(fiber)) => stopInternal(state, signal, fiber) case _ => logger.info("There is no state to stop.") } - logger.info(s"Stopped S3 source task") + logger.info(s"Stopped cloud source task") } override def poll(): util.List[SourceRecord] = - s3SourceTaskState.fold(Collections.emptyList[SourceRecord]()) { state => + cloudSourceTaskState.fold(Collections.emptyList[SourceRecord]()) { state => state.poll().unsafeRunSync().asJava } @@ -113,7 +116,7 @@ abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], C } yield ()).unsafeRunSync() cancelledRef = None partitionDiscoveryLoop = None - s3SourceTaskState = None + cloudSourceTaskState = None } def createClient(config: C): Either[Throwable, CT] @@ -127,15 +130,15 @@ abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], C for { connectorTaskId <- IO.fromEither(new ConnectorTaskIdCreator(connectorPrefix).fromProps(props)) config <- IO.fromEither(convertPropsToConfig(connectorTaskId, props)) - s3Client <- IO.fromEither(createClient(config)) - storageInterface: StorageInterface[MD] <- IO.delay(createStorageInterface(connectorTaskId, config, s3Client)) + client <- IO.fromEither(createClient(config)) + storageInterface: StorageInterface[MD] <- IO.delay(createStorageInterface(connectorTaskId, config, client)) - directoryLister <- IO.delay(createDirectoryLister(connectorTaskId, s3Client)) + directoryLister <- IO.delay(createDirectoryLister(connectorTaskId, client)) partitionSearcher <- IO.delay(createPartitionSearcher(directoryLister, connectorTaskId, config)) readerManagerState <- Ref[IO].of(ReaderManagerState(Seq.empty, Seq.empty)) cancelledRef <- Ref[IO].of(false) } yield { - val readerManagerCreateFn: (CloudLocation, String) => IO[ReaderManager] = (root, path) => { + val readerManagerCreateFn: (CloudLocation, CloudLocation) => IO[ReaderManager] = (root, path) => { ReaderManagerBuilder( root, path, @@ -152,14 +155,18 @@ abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], C readerManagerState, cancelledRef, ) - CloudSourceTaskState(readerManagerState.get.map(_.readerManagers), cancelledRef, partitionDiscoveryLoop) + CloudSourceTaskState( + readerManagerState.get.map(_.readerManagers.map(rm => rm.path.toKey -> rm).toMap), + cancelledRef, + partitionDiscoveryLoop, + ) } - def createStorageInterface(connectorTaskId: ConnectorTaskId, config: C, s3Client: CT): StorageInterface[MD] + def createStorageInterface(connectorTaskId: ConnectorTaskId, config: C, client: CT): StorageInterface[MD] def convertPropsToConfig(connectorTaskId: ConnectorTaskId, props: Map[String, String]): Either[Throwable, C] - def createDirectoryLister(connectorTaskId: ConnectorTaskId, s3Client: CT): DirectoryLister + def createDirectoryLister(connectorTaskId: ConnectorTaskId, client: CT): DirectoryLister def getFilesLimit(config: C): CloudLocation => Either[Throwable, Int] = { cloudLocation => @@ -180,4 +187,25 @@ abstract class CloudSourceTask[MD <: FileMetadata, C <: CloudSourceConfig[MD], C config.partitionSearcher, connectorTaskId, ) + + override def commitRecord(record: SourceRecord, metadata: RecordMetadata): Unit = { + val _ = for { + sourcePartition <- SourceWatermark.partitionMapToSourceRoot( + record.sourcePartition().asScala.toMap, + )(validator) + + offsetWatermark <- readOffsetWatermark(sourcePartition, record.sourceOffset().asScala.toMap) + } yield { + if (offsetWatermark.isLastLine) { + logger.info( + "CommitRecord - sourcePartition: {}, offsetWatermark: {}, isLastLine: {}", + sourcePartition, + offsetWatermark, + offsetWatermark.isLastLine, + cloudSourceTaskState, + ) + cloudSourceTaskState.foreach(_.commitRecord(sourcePartition, offsetWatermark).unsafeRunSync()) + } + } + } } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CommitWatermark.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CommitWatermark.scala new file mode 100644 index 0000000000..6c2204b835 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/CommitWatermark.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.source + +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation + +case class CommitWatermark(cloudLocation: CloudLocation, isLastLine: Boolean) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextBoolean.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextBoolean.scala new file mode 100644 index 0000000000..f2c00e4400 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextBoolean.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.source + +import cats.implicits.catsSyntaxEitherId + +object ContextBoolean { + + private val trueString = "t" + private val falseString = "f" + + def stringToBoolean(original: String): Either[Throwable, Boolean] = + original match { + case `trueString` => true.asRight + case `falseString` => false.asRight + case _ => new IllegalStateException(s"Invalid Boolean value: $original").asLeft + } + + def booleanToString(original: Boolean): String = if (original) trueString else falseString +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextConstants.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextConstants.scala index d6b0ae6a62..1e1625c3e9 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextConstants.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/ContextConstants.scala @@ -26,4 +26,6 @@ object ContextConstants { val LineKey = "line" val TimeStampKey = "ts" + + val LastLine = "last" } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceContextReader.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceContextReader.scala index c4e9e82ef3..da781107fb 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceContextReader.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceContextReader.scala @@ -15,15 +15,10 @@ */ package io.lenses.streamreactor.connect.cloud.common.source -import cats.implicits.catsSyntaxOptionId -import ContextConstants.LineKey -import ContextConstants.PathKey -import ContextConstants.TimeStampKey -import SourceWatermark.partition import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.source.SourceWatermark.partition import org.apache.kafka.connect.source.SourceTaskContext -import java.time.Instant import scala.jdk.CollectionConverters.MapHasAsScala import scala.util.Try @@ -35,19 +30,10 @@ object SourceContextReader { ): Option[CloudLocation] = { val key = partition(sourceRoot) for { - offsetMap <- Try(context().offsetStorageReader.offset(key).asScala).toOption.filterNot(_ == null) - path <- offsetMap.get(PathKey).collect { case value: String => value } - line <- offsetMap.get(LineKey).collect { case value: String if value forall Character.isDigit => value.toInt } - ts = offsetMap.get(TimeStampKey).collect { - case value: String if value forall Character.isDigit => Instant.ofEpochMilli(value.toLong) - } - } yield { - sourceRoot.copy( - path = path.some, - line = line.some, - timestamp = ts, - )(sourceRoot.cloudLocationValidator) - } + offsetMap <- Try(context().offsetStorageReader.offset(key).asScala).toOption.filterNot(_ == null) + locationWithOffset <- SourceWatermark.mapToOffset(sourceRoot, offsetMap.toMap) + } yield locationWithOffset + } } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermark.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermark.scala index a3174a7ede..dd7fe88c4c 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermark.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermark.scala @@ -15,12 +15,15 @@ */ package io.lenses.streamreactor.connect.cloud.common.source -import ContextConstants.ContainerKey -import ContextConstants.LineKey -import ContextConstants.PathKey -import ContextConstants.PrefixKey -import ContextConstants.TimeStampKey +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator +import io.lenses.streamreactor.connect.cloud.common.source.ContextConstants.ContainerKey +import io.lenses.streamreactor.connect.cloud.common.source.ContextConstants.LastLine +import io.lenses.streamreactor.connect.cloud.common.source.ContextConstants.LineKey +import io.lenses.streamreactor.connect.cloud.common.source.ContextConstants.PathKey +import io.lenses.streamreactor.connect.cloud.common.source.ContextConstants.PrefixKey +import io.lenses.streamreactor.connect.cloud.common.source.ContextConstants.TimeStampKey import java.time.Instant import scala.jdk.CollectionConverters.MapHasAsJava @@ -45,11 +48,86 @@ object SourceWatermark { * @param lastModified The last modified time of the file processed * @return A map of offset information */ - def offset(bucketAndPath: CloudLocation, offset: Long, lastModified: Instant): java.util.Map[String, String] = + def offset( + bucketAndPath: CloudLocation, + offset: Long, + lastModified: Instant, + lastLine: Boolean, + ): java.util.Map[String, String] = Map( PathKey -> bucketAndPath.pathOrUnknown, LineKey -> offset.toString, TimeStampKey -> lastModified.toEpochMilli.toString, + LastLine -> ContextBoolean.booleanToString(lastLine), ).asJava + /** + * Converts a partition map to a CloudLocation object. + * + * @param partitionMap A map containing partition information. + * @param cloudLocationValidator An implicit CloudLocationValidator. + * @return An Option containing the CloudLocation object if the conversion is successful, None otherwise. + */ + def partitionMapToSourceRoot( + partitionMap: Map[String, _], + )( + implicit + cloudLocationValidator: CloudLocationValidator, + ): Option[CloudLocation] = + for { + bucket <- partitionMap.get(ContainerKey).collect { + case value: String => value + } + prefix = partitionMap.get(PrefixKey).collect { + case value: String => value + } + } yield CloudLocation(bucket, prefix) + + /** + * Reads the offset watermark from the given source root and offset map. + * + * @param sourceRoot The root CloudLocation object. + * @param offsetMap A map containing offset information. + * @return An Option containing the CommitWatermark if the conversion is successful, None otherwise. + */ + def readOffsetWatermark( + sourceRoot: CloudLocation, + offsetMap: Map[String, _], + ): Option[CommitWatermark] = + for { + cloudLocation <- mapToOffset(sourceRoot, offsetMap) + lastLineBool <- offsetMap.get(LastLine).collect { + case value: String => ContextBoolean.stringToBoolean(value).toOption + }.flatten + } yield { + CommitWatermark( + cloudLocation, + lastLineBool, + ) + } + + /** + * Converts a map of offset information to a CloudLocation object. + * + * @param sourceRoot The root CloudLocation object. + * @param offsetMap A map containing offset information. + * @return An Option containing the updated CloudLocation object if the conversion is successful, None otherwise. + */ + def mapToOffset( + sourceRoot: CloudLocation, + offsetMap: Map[String, _], + ): Option[CloudLocation] = + for { + path <- offsetMap.get(PathKey).collect { case value: String => value } + line <- offsetMap.get(LineKey).collect { case value: String if value forall Character.isDigit => value.toInt } + ts = offsetMap.get(TimeStampKey).collect { + case value: String if value forall Character.isDigit => Instant.ofEpochMilli(value.toLong) + } + } yield { + sourceRoot.copy( + path = path.some, + line = line.some, + timestamp = ts, + )(sourceRoot.cloudLocationValidator) + } } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceBucketOptions.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceBucketOptions.scala index f547a31b6b..edf94bcd1d 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceBucketOptions.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceBucketOptions.scala @@ -48,6 +48,8 @@ object CloudSourceBucketOptions { //extract the envelope. of not present default to false hasEnvelope <- config.extractEnvelope(sourceProps) + postProcessAction <- PostProcessAction(source.prefix, sourceProps) + } yield CloudSourceBucketOptions[M]( source, kcql.getTarget, @@ -57,6 +59,7 @@ object CloudSourceBucketOptions { partitionExtractor = partitionExtractor, orderingType = config.extractOrderingType, hasEnvelope = hasEnvelope.getOrElse(false), + postProcessAction = postProcessAction, ) }.toSeq.traverse(identity) @@ -71,6 +74,7 @@ case class CloudSourceBucketOptions[M <: FileMetadata]( partitionExtractor: Option[PartitionExtractor], orderingType: OrderingType, hasEnvelope: Boolean, + postProcessAction: Option[PostProcessAction], ) { def createBatchListerFn( storageInterface: StorageInterface[M], diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessAction.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessAction.scala new file mode 100644 index 0000000000..043484b6e5 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessAction.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.source.config +import cats.effect.IO +import cats.implicits.catsSyntaxEitherId +import cats.implicits.toBifunctorOps +import cats.implicits.toTraverseOps +import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEntry +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionBucket +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionPrefix +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEntry +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum.Delete +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum.Move +import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface +import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties + +trait PostProcessAction { + def run( + cloudLocation: CloudLocation, + storageInterface: StorageInterface[_], + ): IO[Unit] +} + +object PostProcessAction { + def apply( + prefix: Option[String], + kcqlProperties: KcqlProperties[PropsKeyEntry, PropsKeyEnum.type], + ): Either[Throwable, Option[PostProcessAction]] = + kcqlProperties.getEnumValue[PostProcessActionEntry, PostProcessActionEnum.type](PostProcessActionEnum, + PropsKeyEnum.PostProcessAction, + ) + .map { + case Delete => + new DeletePostProcessAction().asRight + case Move => { + for { + destBucket <- kcqlProperties.getString(PostProcessActionBucket) + destPrefix <- kcqlProperties.getString(PostProcessActionPrefix) + } yield MovePostProcessAction(prefix, destBucket, destPrefix) + } + .toRight(new IllegalArgumentException("A bucket and a path must be specified for moving files to.")) + } + .sequence +} + +class DeletePostProcessAction extends PostProcessAction with LazyLogging { + def run( + cloudLocation: CloudLocation, + storageInterface: StorageInterface[_], + ): IO[Unit] = + for { + _ <- IO.delay(logger.debug("Running delete for {}", cloudLocation)) + path <- IO.fromOption(cloudLocation.path)( + new IllegalArgumentException("Cannot delete without a path, this is probably a logic error"), + ) + del <- IO.fromEither(storageInterface.deleteFiles(cloudLocation.bucket, Seq(path)).leftMap(_.exception)) + } yield del +} + +case class MovePostProcessAction(originalPrefix: Option[String], newBucket: String, newPrefix: String) + extends PostProcessAction { + override def run( + cloudLocation: CloudLocation, + storageInterface: StorageInterface[_], + ): IO[Unit] = + for { + path <- IO.fromOption(cloudLocation.path)( + new IllegalArgumentException("Cannot move without a path, this is probably a logic error"), + ) + newPath = originalPrefix.map(o => path.replace(o, newPrefix)).getOrElse(path) + mov <- IO.fromEither( + storageInterface.mvFile(cloudLocation.bucket, path, newBucket, newPath).leftMap(_.exception), + ) + } yield mov +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala index 7db5dc7721..e8a3cf0a5b 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala @@ -26,16 +26,19 @@ import scala.jdk.CollectionConverters.MapHasAsScala object CloudSourcePropsSchema { private[source] val keys = Map[PropsKeyEntry, PropsSchema]( - ReadTextMode -> EnumPropsSchema(ReadTextModeEnum), - ReadRegex -> StringPropsSchema, - ReadStartTag -> StringPropsSchema, - ReadEndTag -> StringPropsSchema, - ReadStartLine -> StringPropsSchema, - ReadEndLine -> StringPropsSchema, - ReadLastEndLineMissing -> BooleanPropsSchema, - BufferSize -> IntPropsSchema, - ReadTrimLine -> BooleanPropsSchema, - StoreEnvelope -> BooleanPropsSchema, + ReadTextMode -> EnumPropsSchema(ReadTextModeEnum), + ReadRegex -> StringPropsSchema, + ReadStartTag -> StringPropsSchema, + ReadEndTag -> StringPropsSchema, + ReadStartLine -> StringPropsSchema, + ReadEndLine -> StringPropsSchema, + ReadLastEndLineMissing -> BooleanPropsSchema, + BufferSize -> IntPropsSchema, + ReadTrimLine -> BooleanPropsSchema, + StoreEnvelope -> BooleanPropsSchema, + PostProcessAction -> EnumPropsSchema(PostProcessActionEnum), + PostProcessActionBucket -> StringPropsSchema, + PostProcessActionPrefix -> StringPropsSchema, ) val schema = KcqlPropsSchema(PropsKeyEnum, keys) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/PostProcessActionEnum.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/PostProcessActionEnum.scala new file mode 100644 index 0000000000..51c2d715cf --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/PostProcessActionEnum.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops + +import enumeratum.Enum +import enumeratum.EnumEntry + +sealed trait PostProcessActionEntry extends EnumEntry {} + +object PostProcessActionEnum extends Enum[PostProcessActionEntry] { + + override val values = findValues + + case object Delete extends PostProcessActionEntry + case object Move extends PostProcessActionEntry +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscovery.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscovery.scala index b4338e7d2e..45f7bbd8f8 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscovery.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscovery.scala @@ -34,7 +34,7 @@ object PartitionDiscovery extends LazyLogging { connectorTaskId: ConnectorTaskId, settings: PartitionSearcherOptions, partitionSearcher: PartitionSearcherF, - readerManagerCreateFn: (CloudLocation, String) => IO[ReaderManager], + readerManagerCreateFn: (CloudLocation, CloudLocation) => IO[ReaderManager], readerManagerState: Ref[IO, ReaderManagerState], cancelledRef: Ref[IO, Boolean], ): IO[Unit] = { @@ -42,7 +42,10 @@ object PartitionDiscovery extends LazyLogging { _ <- IO(logger.info(s"[${connectorTaskId.show}] Starting the partition discovery task.")) oldState <- readerManagerState.get newParts <- partitionSearcher(oldState.partitionResponses) - tuples = newParts.flatMap(part => part.results.map(part.root -> _)) + // FIXME: + // strictly the prefix should always stay as the prefix configured in the config - however it seems that in parts + // of the code `prefix` actually means `directory path`. This should be investigated and fixed + tuples = newParts.flatMap(part => part.results.map(part.root -> part.root.withPrefix(_))) newReaderManagers <- tuples .map { case (location, path) => diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/ReaderManager.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/ReaderManager.scala index c2c4d106b3..62e6698a02 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/ReaderManager.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/ReaderManager.scala @@ -22,7 +22,10 @@ import cats.implicits.toShow import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.source.CommitWatermark +import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue +import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import org.apache.kafka.connect.source.SourceRecord import scala.util.Try @@ -31,11 +34,15 @@ import scala.util.Try * Given a sourceBucketOptions, manages readers for all of the files */ class ReaderManager( - recordsLimit: Int, - fileSource: SourceFileQueue, - readerBuilderF: CloudLocation => Either[Throwable, ResultReader], - connectorTaskId: ConnectorTaskId, - readerRef: Ref[IO, Option[ResultReader]], + val root: CloudLocation, + val path: CloudLocation, + recordsLimit: Int, + fileSource: SourceFileQueue, + readerBuilderF: CloudLocation => Either[Throwable, ResultReader], + connectorTaskId: ConnectorTaskId, + readerRef: Ref[IO, Option[ResultReader]], + storageInterface: StorageInterface[_], + maybePostProcessAction: Option[PostProcessAction], ) extends LazyLogging { def poll(): IO[Vector[SourceRecord]] = { @@ -124,4 +131,14 @@ class ReaderManager( _ <- closeAndLog(currentState) } yield () + def postProcess(commitWatermark: CommitWatermark): IO[Unit] = + maybePostProcessAction match { + case Some(action) => + logger.info("PostProcess for {}", commitWatermark) + action.run(commitWatermark.cloudLocation, storageInterface) + case None => + logger.info("No PostProcess for {}", commitWatermark) + IO.unit + } + } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/CloudSourceTaskState.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/CloudSourceTaskState.scala index 9d1f1ffc58..ef6ece0a21 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/CloudSourceTaskState.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/CloudSourceTaskState.scala @@ -18,22 +18,41 @@ package io.lenses.streamreactor.connect.cloud.common.source.state import cats.effect.IO import cats.effect.kernel.Ref import cats.implicits.toTraverseOps +import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation +import io.lenses.streamreactor.connect.cloud.common.source.CommitWatermark import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager import org.apache.kafka.connect.source.SourceRecord +case class CloudLocationKey(bucket: String, prefix: Option[String]) case class CloudSourceTaskState( - latestReaderManagers: IO[Seq[ReaderManager]], + latestReaderManagers: IO[Map[CloudLocationKey, ReaderManager]], cancelledRef: Ref[IO, Boolean], partitionDiscoveryLoop: IO[Unit], -) { - +) extends LazyLogging { def close(): IO[Unit] = - latestReaderManagers.flatMap(_.traverse(_.close())).attempt.void + latestReaderManagers + .flatMap(_.values.toList.traverse(_.close())) + .attempt + .void def poll(): IO[Seq[SourceRecord]] = for { readers <- latestReaderManagers - pollResults <- readers.map(_.poll()).traverse(identity) + pollResults <- readers.view.values.map(_.poll()).toList.traverse(identity) sourceRecords = pollResults.flatten } yield sourceRecords + + def commitRecord(cloudLocation: CloudLocation, commitWatermark: CommitWatermark): IO[Unit] = + for { + rms <- latestReaderManagers + fin <- rms.get(cloudLocation.toKey).traverse(_.postProcess(commitWatermark)) + _ <- IO.delay( + logger.info(s"CloudSourceTaskState.commitRecord with cloudLocation: {}, readerManagers: {}, fin: {}", + cloudLocation, + rms, + fin, + ), + ) + } yield () } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/ReaderManagerBuilder.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/ReaderManagerBuilder.scala index b31a4c3ee5..3f5d04f39d 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/ReaderManagerBuilder.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/ReaderManagerBuilder.scala @@ -34,7 +34,7 @@ import org.apache.kafka.connect.errors.ConnectException object ReaderManagerBuilder { def apply[M <: FileMetadata]( root: CloudLocation, - path: String, + path: CloudLocation, storageInterface: StorageInterface[M], connectorTaskId: ConnectorTaskId, contextOffsetFn: CloudLocation => Option[CloudLocation], @@ -49,11 +49,10 @@ object ReaderManagerBuilder { new ConnectException(s"No root found for path:$path"), ), ) - ref <- Ref[IO].of(Option.empty[ResultReader]) - adaptedRoot = root.copy(prefix = Some(path)) - adaptedSbo = sbo.copy[M](sourceBucketAndPrefix = adaptedRoot) - listingFn = adaptedSbo.createBatchListerFn(storageInterface) - source = contextOffsetFn(adaptedRoot).fold { + ref <- Ref[IO].of(Option.empty[ResultReader]) + adaptedSbo = sbo.copy[M](sourceBucketAndPrefix = path) + listingFn = adaptedSbo.createBatchListerFn(storageInterface) + source = contextOffsetFn(path).fold { new CloudSourceFileQueue[M](connectorTaskId, listingFn) } { location => CloudSourceFileQueue.from[M]( @@ -64,6 +63,8 @@ object ReaderManagerBuilder { ) } } yield new ReaderManager( + root, + path, sbo.recordsLimit, source, ResultReader.create(sbo.format, @@ -75,6 +76,8 @@ object ReaderManagerBuilder { ), connectorTaskId, ref, + storageInterface, + sbo.postProcessAction, ) } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala index 34c973a03e..d54c38cee6 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/StorageInterface.scala @@ -68,4 +68,6 @@ trait StorageInterface[SM <: FileMetadata] extends ResultProcessors { def writeStringToFile(bucket: String, path: String, data: UploadableString): Either[UploadError, Unit] def deleteFiles(bucket: String, files: Seq[String]): Either[FileDeleteError, Unit] + + def mvFile(oldBucket: String, oldPath: String, newBucket: String, newPath: String): Either[FileMoveError, Unit] } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala index 750ccc08e2..bef058d252 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/storage/UploadError.scala @@ -37,6 +37,10 @@ case class EmptyContentsStringError(data: String) extends UploadError { override def message() = s"attempt to upload empty string (${data})" } +case class FileMoveError(exception: Throwable, oldName: String, newName: String) extends UploadError { + override def message() = s"error moving file from ($oldName) to ($newName) ${exception.getMessage}" +} + case class FileCreateError(exception: Throwable, data: String) extends UploadError { override def message() = s"error writing file (${data}) ${exception.getMessage}" } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/DelegateIteratorCloudStreamReaderTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/DelegateIteratorCloudStreamReaderTest.scala index f312f40906..e1effc4953 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/DelegateIteratorCloudStreamReaderTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/DelegateIteratorCloudStreamReaderTest.scala @@ -45,7 +45,7 @@ class DelegateIteratorCloudStreamReaderTest extends AnyFunSuite with Matchers { override def close(): Unit = {} } val converter = new Converter[Int] { - override def convert(t: Int, index: Long): SourceRecord = + override def convert(t: Int, index: Long, lastLine: Boolean): SourceRecord = new SourceRecord( Map("a" -> "1").asJava, Map("line" -> index.toString, "path" -> "a/b/c.txt", "ts" -> "1000").asJava, @@ -73,7 +73,7 @@ class DelegateIteratorCloudStreamReaderTest extends AnyFunSuite with Matchers { override def close(): Unit = {} } val converter = new Converter[Int] { - override def convert(t: Int, index: Long): SourceRecord = + override def convert(t: Int, index: Long, lastLine: Boolean): SourceRecord = new SourceRecord( Map("a" -> "1").asJava, Map("line" -> index.toString, "path" -> "a/b/c.txt", "ts" -> "1000").asJava, diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverterTest.scala index cd6b2d5cc6..c3e53068b2 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverterTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/BytesOutputRowConverterTest.scala @@ -44,18 +44,18 @@ class BytesOutputRowConverterTest extends AnyFunSuite with Matchers { 1, location, lastModified, - ).convert( - BytesOutputRow( - valueBytes, - ), - 2, + ).convert(BytesOutputRow( + valueBytes, + ), + 2, + lastLine = true, ) actual.key() shouldBe null actual.value() shouldBe "value".getBytes actual.topic() shouldBe "topic1" actual.kafkaPartition() shouldBe 1 actual.sourcePartition() shouldBe Map("a" -> "1").asJava - actual.sourceOffset() shouldBe Map("line" -> "2", "path" -> "a/b/c.txt", "ts" -> "10001").asJava + actual.sourceOffset() shouldBe Map("line" -> "2", "path" -> "a/b/c.txt", "ts" -> "10001", "last" -> "t").asJava actual.keySchema().`type`() shouldBe Schema.BYTES_SCHEMA.`type`() actual.valueSchema().`type`() shouldBe Schema.BYTES_SCHEMA.`type`() } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverterTest.scala index daecb1690e..9e6837ac39 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverterTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueConverterTest.scala @@ -44,12 +44,12 @@ class SchemaAndValueConverterTest extends AnyFunSuite with Matchers { 1, location, lastModified, - ).convert(value, 1) + ).convert(value, 1, lastLine = false) actual.valueSchema().`type`() shouldBe Schema.OPTIONAL_STRING_SCHEMA.`type`() actual.value() shouldBe "lore ipsum" actual.kafkaPartition() shouldBe 1 actual.sourcePartition() shouldBe Map("a" -> "1").asJava - actual.sourceOffset() shouldBe Map("line" -> "1", "path" -> "a/b/c.txt", "ts" -> "1000").asJava + actual.sourceOffset() shouldBe Map("line" -> "1", "path" -> "a/b/c.txt", "ts" -> "1000", "last" -> "f").asJava actual.key() shouldBe null actual.keySchema() shouldBe null actual.headers().size() shouldBe 0 diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverterTest.scala index 46f3f04623..c67866ce7c 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverterTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemaAndValueEnvelopeConverterTest.scala @@ -88,7 +88,7 @@ class SchemaAndValueEnvelopeConverterTest extends AnyFunSuite with Matchers { val schemaAndValueEnveloper = new SchemaAndValue(allEnvelopeFieldsSchema, envelope) - val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L) + val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L, lastLine = true) assertOffsets(sourceRecord) sourceRecord.topic() shouldBe TargetTopic sourceRecord.kafkaPartition() shouldBe Partition @@ -122,7 +122,7 @@ class SchemaAndValueEnvelopeConverterTest extends AnyFunSuite with Matchers { val schemaAndValueEnveloper = new SchemaAndValue(envelopeSchema, envelope) - val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L) + val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L, lastLine = true) assertOffsets(sourceRecord) sourceRecord.topic() shouldBe TargetTopic sourceRecord.kafkaPartition() shouldBe Partition @@ -151,7 +151,7 @@ class SchemaAndValueEnvelopeConverterTest extends AnyFunSuite with Matchers { envelope.put("metadata", createMetadata()) val schemaAndValueEnveloper = new SchemaAndValue(envelopeSchema, envelope) - val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L) + val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L, lastLine = true) assertOffsets(sourceRecord) sourceRecord.topic() shouldBe TargetTopic sourceRecord.kafkaPartition() shouldBe Partition @@ -182,7 +182,8 @@ class SchemaAndValueEnvelopeConverterTest extends AnyFunSuite with Matchers { val schemaAndValueEnveloper = new SchemaAndValue(envelopeSchema, envelope) - val sourceRecord = createConverter(() => LastModifiedTimestamp).convert(schemaAndValueEnveloper, 0L) + val sourceRecord = + createConverter(() => LastModifiedTimestamp).convert(schemaAndValueEnveloper, 0L, lastLine = true) assertOffsets(sourceRecord) sourceRecord.topic() shouldBe TargetTopic sourceRecord.kafkaPartition() shouldBe TargetPartition @@ -210,7 +211,7 @@ class SchemaAndValueEnvelopeConverterTest extends AnyFunSuite with Matchers { val schemaAndValueEnveloper = new SchemaAndValue(envelopeSchema, envelope) - val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L) + val sourceRecord = createConverter().convert(schemaAndValueEnveloper, 0L, lastLine = true) assertOffsets(sourceRecord) sourceRecord.topic() shouldBe TargetTopic sourceRecord.kafkaPartition() shouldBe Partition @@ -224,7 +225,7 @@ class SchemaAndValueEnvelopeConverterTest extends AnyFunSuite with Matchers { test("non-Struct input returns an exception") { val schemaAndValueEnveloper = new SchemaAndValue(Schema.STRING_SCHEMA, "lorem ipsum") assertThrows[RuntimeException] { - createConverter().convert(schemaAndValueEnveloper, 0L) + createConverter().convert(schemaAndValueEnveloper, 0L, lastLine = true) } } private def createConverter(instantF: () => Instant = () => Instant.now()): SchemaAndValueEnvelopeConverter = @@ -240,6 +241,7 @@ class SchemaAndValueEnvelopeConverterTest extends AnyFunSuite with Matchers { sourceRecord.sourceOffset().asScala shouldBe Map("path" -> "/a/b/c.avro", "line" -> "0", "ts" -> LastModifiedTimestamp.toEpochMilli.toString, + "last" -> "t", ) } private def assertHeaders(record: SourceRecord): Assertion = diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverterTest.scala index 5730ac7d5a..cc618d1337 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverterTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/SchemalessEnvelopeConverterTest.scala @@ -61,7 +61,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { "metadata" -> createMetadata(), ) - val actual = createConverter().convert(json.noSpaces, 0) + val actual = createConverter().convert(json.noSpaces, 0, lastLine = false) assertOffsets(actual) actual.topic() shouldBe TargetTopic actual.kafkaPartition() shouldBe Partition @@ -83,7 +83,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { "metadata" -> createMetadata(), ) - val actual = createConverter().convert(json.noSpaces, 0) + val actual = createConverter().convert(json.noSpaces, 0, lastLine = false) assertOffsets(actual) actual.topic() shouldBe TargetTopic actual.kafkaPartition() shouldBe Partition @@ -105,7 +105,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { ) val now = Instant.now() - val actual = createConverter(() => now).convert(json.noSpaces, 0) + val actual = createConverter(() => now).convert(json.noSpaces, 0, lastLine = false) assertOffsets(actual) actual.topic() shouldBe TargetTopic actual.kafkaPartition() shouldBe TargetPartition @@ -126,7 +126,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { ) val now = Instant.now() - val actual = createConverter(() => now).convert(json.noSpaces, 0) + val actual = createConverter(() => now).convert(json.noSpaces, 0, lastLine = false) assertOffsets(actual) actual.topic() shouldBe TargetTopic actual.kafkaPartition() shouldBe TargetPartition @@ -147,7 +147,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { ) val now = Instant.now() - val actual = createConverter(() => now).convert(json.noSpaces, 0) + val actual = createConverter(() => now).convert(json.noSpaces, 0, lastLine = false) assertOffsets(actual) actual.topic() shouldBe TargetTopic actual.kafkaPartition() shouldBe TargetPartition @@ -167,7 +167,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { ) val now = Instant.now() - val actual = createConverter(() => now).convert(json.noSpaces, 0) + val actual = createConverter(() => now).convert(json.noSpaces, 0, lastLine = false) assertOffsets(actual) actual.topic() shouldBe TargetTopic actual.kafkaPartition() shouldBe TargetPartition @@ -180,7 +180,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { } test("input not object raises an error") { val json = Json.fromString("not an object") - assertThrows[RuntimeException](createConverter().convert(json.noSpaces, 0)) + assertThrows[RuntimeException](createConverter().convert(json.noSpaces, 0, lastLine = true)) } test("base64 encoded key and value are decoded as arrays") { val json = Json.obj( @@ -191,7 +191,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { "headers" -> createHeaders(), "metadata" -> createMetadata(), ) - val actual = createConverter().convert(json.noSpaces, 0) + val actual = createConverter().convert(json.noSpaces, 0, lastLine = false) assertOffsets(actual) actual.topic() shouldBe TargetTopic actual.kafkaPartition() shouldBe Partition @@ -216,6 +216,7 @@ class SchemalessEnvelopeConverterTest extends AnyFunSuite with Matchers { sourceRecord.sourceOffset().asScala shouldBe Map("path" -> "/a/b/c.avro", "line" -> "0", "ts" -> LastModifiedTimestamp.toEpochMilli.toString, + "last" -> "f", ) } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverterTest.scala index df901eb143..a2cec13258 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverterTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/reader/converters/TextConverterTest.scala @@ -35,14 +35,17 @@ class TextConverterTest extends AnyFunSuite with Matchers { val lastModified = Instant.ofEpochMilli(1000) val actual = - new TextConverter(Map("a" -> "1").asJava, Topic("topic1"), 1, location, lastModified).convert("value", 1) + new TextConverter(Map("a" -> "1").asJava, Topic("topic1"), 1, location, lastModified).convert("value", + 1, + lastLine = true, + ) actual.valueSchema().`type`() shouldBe Schema.STRING_SCHEMA.`type`() actual.value() shouldBe "value" actual.topic() shouldBe "topic1" actual.kafkaPartition() shouldBe 1 actual.sourcePartition() shouldBe Map("a" -> "1").asJava - actual.sourceOffset() shouldBe Map("line" -> "1", "path" -> "a/b/c.txt", "ts" -> "1000").asJava + actual.sourceOffset() shouldBe Map("line" -> "1", "path" -> "a/b/c.txt", "ts" -> "1000", "last" -> "t").asJava actual.key() shouldBe null actual.keySchema() shouldBe null actual.headers().size() shouldBe 0 diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermarkTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermarkTest.scala index 7105fc4931..24d6d189fc 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermarkTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/SourceWatermarkTest.scala @@ -48,10 +48,12 @@ class SourceWatermarkTest extends AnyFlatSpec with Matchers { CloudLocation("test-bucket", "test-prefix".some).withPath("test-path"), 100L, nowInst, + lastLine = true, ).asScala.toMap shouldBe Map( "path" -> "test-path", "line" -> "100", "ts" -> nowInst.toEpochMilli.toString, + "last" -> "t", ) } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessActionTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessActionTest.scala new file mode 100644 index 0000000000..0d2796035a --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/config/PostProcessActionTest.scala @@ -0,0 +1,112 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.source.config + +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEntry +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionBucket +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionPrefix +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEntry +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum.Delete +import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum.Move +import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties +import org.mockito.MockitoSugar +import org.scalatest.EitherValues +import org.scalatest.OptionValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PostProcessActionTest extends AnyFlatSpec with Matchers with EitherValues with OptionValues with MockitoSugar { + + "PostProcessAction.apply" should "return DeletePostProcessAction when Delete is specified" in { + val kcqlProperties = mock[KcqlProperties[PropsKeyEntry, PropsKeyEnum.type]] + when( + kcqlProperties.getEnumValue[PostProcessActionEntry, PostProcessActionEnum.type](PostProcessActionEnum, + PropsKeyEnum.PostProcessAction, + ), + ) + .thenReturn(Some(Delete)) + + val result = PostProcessAction(Option.empty, kcqlProperties) + + result.value.value shouldBe a[DeletePostProcessAction] + } + + it should "return MovePostProcessAction when Move is specified and bucket and prefix are provided" in { + val kcqlProperties = mock[KcqlProperties[PropsKeyEntry, PropsKeyEnum.type]] + when( + kcqlProperties.getEnumValue[PostProcessActionEntry, PostProcessActionEnum.type](PostProcessActionEnum, + PropsKeyEnum.PostProcessAction, + ), + ) + .thenReturn(Some(Move)) + when(kcqlProperties.getString(PostProcessActionPrefix)).thenReturn(Some("some/prefix")) + when(kcqlProperties.getString(PostProcessActionBucket)).thenReturn(Some("myNewBucket")) + + val result = PostProcessAction(Option.empty, kcqlProperties) + + result.value.value shouldBe a[MovePostProcessAction] + result.value.value.asInstanceOf[MovePostProcessAction].newPrefix shouldBe "some/prefix" + result.value.value.asInstanceOf[MovePostProcessAction].newBucket shouldBe "myNewBucket" + } + + it should "return an error when Move is specified but no prefix is provided" in { + val kcqlProperties = mock[KcqlProperties[PropsKeyEntry, PropsKeyEnum.type]] + when( + kcqlProperties.getEnumValue[PostProcessActionEntry, PostProcessActionEnum.type](PostProcessActionEnum, + PropsKeyEnum.PostProcessAction, + ), + ) + .thenReturn(Some(Move)) + when(kcqlProperties.getString(PostProcessActionPrefix)).thenReturn(None) + when(kcqlProperties.getString(PostProcessActionBucket)).thenReturn(Some("myNewBucket")) + + val result = PostProcessAction(Option.empty, kcqlProperties) + + result.left.value shouldBe an[IllegalArgumentException] + } + + it should "return an error when Move is specified but no bucket is provided" in { + val kcqlProperties = mock[KcqlProperties[PropsKeyEntry, PropsKeyEnum.type]] + when( + kcqlProperties.getEnumValue[PostProcessActionEntry, PostProcessActionEnum.type](PostProcessActionEnum, + PropsKeyEnum.PostProcessAction, + ), + ) + .thenReturn(Some(Move)) + when(kcqlProperties.getString(PostProcessActionPrefix)).thenReturn(Some("my/prefix")) + when(kcqlProperties.getString(PostProcessActionBucket)).thenReturn(None) + + val result = PostProcessAction(Option.empty, kcqlProperties) + + result.left.value shouldBe an[IllegalArgumentException] + } + + it should "return None when no PostProcessAction is specified" in { + val kcqlProperties = mock[KcqlProperties[PropsKeyEntry, PropsKeyEnum.type]] + when( + kcqlProperties.getEnumValue[PostProcessActionEntry, PostProcessActionEnum.type](PostProcessActionEnum, + PropsKeyEnum.PostProcessAction, + ), + ) + .thenReturn(None) + + val result = PostProcessAction(Option.empty, kcqlProperties) + + result.value shouldBe None + } +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscoveryTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscoveryTest.scala index c1a5d70c62..08a3b1e90a 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscoveryTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/reader/PartitionDiscoveryTest.scala @@ -18,13 +18,16 @@ package io.lenses.streamreactor.connect.cloud.common.source.reader import cats.effect.IO import cats.effect.kernel.Ref import cats.effect.unsafe.implicits.global +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions.ExcludeIndexes +import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction import io.lenses.streamreactor.connect.cloud.common.source.distribution.PartitionSearcherResponse import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue +import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import io.lenses.streamreactor.connect.cloud.common.utils.SampleData import org.mockito.MockitoSugar import org.scalatest.flatspec.AnyFlatSpecLike @@ -35,6 +38,9 @@ import scala.concurrent.duration.DurationInt class PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with MockitoSugar { private implicit val cloudLocationValidator: CloudLocationValidator = SampleData.cloudLocationValidator private val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1) + private val root: CloudLocation = CloudLocation("myBucket", "myPrefix".some) + private val storageInterface: StorageInterface[_] = mock[StorageInterface[_]] + private val noPostProcessAction = Option.empty[PostProcessAction] "PartitionDiscovery" should "handle failure on PartitionSearcher and resume" in { val fileQueueProcessor: SourceFileQueue = mock[SourceFileQueue] val limit = 10 @@ -77,12 +83,18 @@ class PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with MockitoS options, searcherMock.find, (_, _) => - IO(new ReaderManager(limit, - fileQueueProcessor, - _ => Left(new RuntimeException()), - connectorTaskId, - readerRef, - )), + IO( + new ReaderManager(root, + root, + limit, + fileQueueProcessor, + _ => Left(new RuntimeException()), + connectorTaskId, + readerRef, + storageInterface, + noPostProcessAction, + ), + ), state, cancelledRef, ).start diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala index 698c878b35..a9a956cae7 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/source/GCPStorageSourceTask.scala @@ -34,7 +34,7 @@ class GCPStorageSourceTask GCPStorageFileMetadata, GCPStorageSourceConfig, Storage, - ] + ]("/gcpstorage-sink-ascii.txt") with LazyLogging { val validator: CloudLocationValidator = GCPStorageLocationValidator @@ -42,10 +42,10 @@ class GCPStorageSourceTask override def createStorageInterface( connectorTaskId: ConnectorTaskId, config: GCPStorageSourceConfig, - storage: Storage, + client: Storage, ): GCPStorageStorageInterface = new GCPStorageStorageInterface(connectorTaskId, - storage = storage, + storage = client, avoidReumableUpload = false, extensionFilter = config.extensionFilter, ) @@ -60,6 +60,6 @@ class GCPStorageSourceTask override def connectorPrefix: String = CONNECTOR_PREFIX - override def createDirectoryLister(connectorTaskId: ConnectorTaskId, s3Client: Storage): DirectoryLister = - new GCPStorageDirectoryLister(connectorTaskId, s3Client) + override def createDirectoryLister(connectorTaskId: ConnectorTaskId, client: Storage): DirectoryLister = + new GCPStorageDirectoryLister(connectorTaskId, client) } diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala index 707773d7cc..54e7fedb08 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/storage/GCPStorageStorageInterface.scala @@ -34,6 +34,7 @@ import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError import io.lenses.streamreactor.connect.cloud.common.storage.FileListError import io.lenses.streamreactor.connect.cloud.common.storage.FileLoadError +import io.lenses.streamreactor.connect.cloud.common.storage.FileMoveError import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse import io.lenses.streamreactor.connect.cloud.common.storage.ListOfMetadataResponse import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface @@ -45,8 +46,9 @@ import java.nio.channels.Channels import java.nio.file.Files import java.time.Instant import scala.annotation.tailrec -import scala.collection.immutable.Seq import scala.jdk.CollectionConverters.IterableHasAsScala +import scala.util.Failure +import scala.util.Success import scala.util.Try class GCPStorageStorageInterface( @@ -312,4 +314,55 @@ class GCPStorageStorageInterface( * @return */ override def system(): String = "GCP Storage" + + override def mvFile( + oldBucket: String, + oldPath: String, + newBucket: String, + newPath: String, + ): Either[FileMoveError, Unit] = { + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + val sourceBlob = Try(storage.get(sourceBlobId)).map(Option(_)).map(_.filter(_.exists())) + sourceBlob match { + case Success(None) => + logger.warn("Object ({}/{}) doesn't exist to move", oldBucket, oldPath) + ().asRight + case Failure(ex) => + logger.error("Object ({}/{}) could not be retrieved", ex) + FileMoveError(ex, oldPath, newPath).asLeft + case Success(Some(_: Blob)) => + Try { + val destinationBlobId = BlobId.of(newBucket, newPath) + val destinationBlob = Option(storage.get(newBucket, newPath)) + val precondition: Storage.BlobTargetOption = decideMovePrecondition(destinationBlob) + + storage.copy( + Storage.CopyRequest.newBuilder() + .setSource(sourceBlobId) + .setTarget(destinationBlobId, precondition) + .build(), + ) + + // Delete the original blob to complete the move operation + storage.delete(sourceBlobId) + }.toEither.leftMap(FileMoveError(_, oldPath, newPath)).void + } + } + + /** + * Decides the precondition for moving a blob based on the existence of the destination blob. + * + * @param destinationBlob An optional Blob object representing the destination blob. + * @return A Storage.BlobTargetOption indicating the precondition for the move operation. + * If the destination blob does not exist, returns Storage.BlobTargetOption.doesNotExist(). + * If the destination blob exists, returns Storage.BlobTargetOption.generationMatch() with the blob's generation. + */ + private def decideMovePrecondition(destinationBlob: Option[Blob]): Storage.BlobTargetOption = + destinationBlob match { + case None => + Storage.BlobTargetOption.doesNotExist() + case Some(blob) => + Storage.BlobTargetOption.generationMatch(blob.getGeneration) + } } diff --git a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala index 7915fe9fa5..5ba0956ea9 100644 --- a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala +++ b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/GCPStorageStorageInterfaceTest.scala @@ -47,7 +47,10 @@ import io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageFileMetadat import io.lenses.streamreactor.connect.gcp.storage.storage.GCPStorageStorageInterface import SamplePages.emptyPage import SamplePages.pages +import com.google.cloud.storage.CopyWriter +import com.google.cloud.storage.Storage.CopyRequest import org.mockito.Answers +import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchersSugar import org.mockito.MockitoSugar import org.scalatest.BeforeAndAfter @@ -490,6 +493,224 @@ class GCPStorageStorageInterfaceTest verify(client).close() } + "mvFile" should "move a file from one bucket to another successfully" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + val destinationBlobId = BlobInfo.newBuilder(BlobId.of(newBucket, newPath)).build() + + mockBlobExistence(sourceBlobId) + + when(client.copy(any[Storage.CopyRequest])).thenReturn(mock[CopyWriter]) + when(client.delete(sourceBlobId)).thenReturn(true) + val result: Either[FileMoveError, Unit] = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + result.value should be(()) + + val copyRequestCaptor: ArgumentCaptor[Storage.CopyRequest] = ArgumentCaptor.forClass(classOf[Storage.CopyRequest]) + verify(client).copy(copyRequestCaptor.capture()) + val capturedCopyRequest = copyRequestCaptor.getValue + capturedCopyRequest.getSource should be(sourceBlobId) + capturedCopyRequest.getTarget should be(destinationBlobId) + + verify(client).delete(sourceBlobId) + } + + private def mockBlobExistence(sourceBlobId: BlobId) = { + val blob = mock[Blob] + when(blob.exists()).thenReturn(true) + when(client.get(sourceBlobId)).thenReturn(blob) + () + } + private def mockBlobNonExistence1(sourceBlobId: BlobId) = { + val blob = mock[Blob] + when(blob.exists()).thenReturn(false) + when(client.get(sourceBlobId)).thenReturn(blob) + () + } + private def mockBlobNonExistence2(sourceBlobId: BlobId) = { + val blob = null + when(client.get(sourceBlobId)).thenReturn(blob) + () + } + + "mvFile" should "return a FileMoveError if copy fails" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + val destinationBlobId = BlobInfo.newBuilder(BlobId.of(newBucket, newPath)).build() + + mockBlobExistence(sourceBlobId) + + when(client.copy(any[Storage.CopyRequest])).thenThrow(new RuntimeException("Copy failed")) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + result.isLeft should be(true) + result.left.value should be(a[FileMoveError]) + + val copyRequestCaptor: ArgumentCaptor[Storage.CopyRequest] = ArgumentCaptor.forClass(classOf[Storage.CopyRequest]) + verify(client).copy(copyRequestCaptor.capture()) + val capturedCopyRequest = copyRequestCaptor.getValue + capturedCopyRequest.getSource should be(sourceBlobId) + capturedCopyRequest.getTarget should be(destinationBlobId) + + verify(client, never).delete(sourceBlobId) + } + "mvFile" should "return a FileMoveError if delete fails" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + val destinationBlobId = BlobInfo.newBuilder(BlobId.of(newBucket, newPath)).build() + + mockBlobExistence(sourceBlobId) + + when(client.copy(any[Storage.CopyRequest])).thenReturn(mock[CopyWriter]) + when(client.delete(sourceBlobId)).thenThrow(new RuntimeException("Delete failed")) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + result.isLeft should be(true) + result.left.value should be(a[FileMoveError]) + + val copyRequestCaptor: ArgumentCaptor[Storage.CopyRequest] = ArgumentCaptor.forClass(classOf[Storage.CopyRequest]) + verify(client).copy(copyRequestCaptor.capture()) + val capturedCopyRequest = copyRequestCaptor.getValue + capturedCopyRequest.getSource should be(sourceBlobId) + capturedCopyRequest.getTarget should be(destinationBlobId) + + verify(client).delete(sourceBlobId) + } + + "mvFile" should "use doesNotExist precondition if the target file does not exist" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + + mockBlobExistence(sourceBlobId) + + when(client.get(newBucket, newPath)).thenReturn(null) // Target does not exist + when(client.copy(any[Storage.CopyRequest])).thenReturn(mock[CopyWriter]) + when(client.delete(sourceBlobId)).thenReturn(true) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + result.value should be(()) + + val copyRequestCaptor: ArgumentCaptor[Storage.CopyRequest] = ArgumentCaptor.forClass(classOf[Storage.CopyRequest]) + verify(client).copy(copyRequestCaptor.capture()) + val capturedCopyRequest = copyRequestCaptor.getValue + capturedCopyRequest.getTargetOptions should contain(Storage.BlobTargetOption.doesNotExist()) + + verify(client).delete(sourceBlobId) + } + + "mvFile" should "use generationMatch precondition if the target file exists" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + + mockBlobExistence(sourceBlobId) + + val destinationBlob = mock[Blob] + when(destinationBlob.getGeneration).thenReturn(123L) + + when(client.get(newBucket, newPath)).thenReturn(destinationBlob) // Target exists + when(client.copy(any[Storage.CopyRequest])).thenReturn(mock[CopyWriter]) + when(client.delete(sourceBlobId)).thenReturn(true) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + result.value should be(()) + + val copyRequestCaptor: ArgumentCaptor[Storage.CopyRequest] = ArgumentCaptor.forClass(classOf[Storage.CopyRequest]) + verify(client).copy(copyRequestCaptor.capture()) + val capturedCopyRequest = copyRequestCaptor.getValue + capturedCopyRequest.getTargetOptions should contain(Storage.BlobTargetOption.generationMatch(123L)) + + verify(client).delete(sourceBlobId) + } + "mvFile" should "return FileMoveError if the source file does not exist after check" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + val destinationBlobId = BlobInfo.newBuilder(BlobId.of(newBucket, newPath)).build() + + mockBlobExistence(sourceBlobId) + + // Simulate a failure during the copy operation (e.g., source file does not exist) + when(client.copy(any[Storage.CopyRequest])).thenThrow(new RuntimeException("Source file does not exist")) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + // Verify that the result is a FileMoveError + result.left.value should be(a[FileMoveError]) + result.left.value.message() should include("Source file does not exist") + + // Verify that the copy was attempted + val copyRequestCaptor: ArgumentCaptor[Storage.CopyRequest] = ArgumentCaptor.forClass(classOf[Storage.CopyRequest]) + verify(client).copy(copyRequestCaptor.capture()) + val capturedCopyRequest = copyRequestCaptor.getValue + capturedCopyRequest.getSource should be(sourceBlobId) + capturedCopyRequest.getTarget should be(destinationBlobId) + + // Verify that delete was not called, as the copy failed + verify(client, never).delete(sourceBlobId) + } + "mvFile" should "return no error if the source file does not exist initially (1)" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + + mockBlobNonExistence1(sourceBlobId) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + result.value should be(()) + + verify(client, never).copy(any[CopyRequest]) + verify(client, never).delete(sourceBlobId) + } + + "mvFile" should "return no error if the source file does not exist initially (2)" in { + val oldBucket = "oldBucket" + val oldPath = "oldPath" + val newBucket = "newBucket" + val newPath = "newPath" + + val sourceBlobId = BlobId.of(oldBucket, oldPath) + + mockBlobNonExistence2(sourceBlobId) + + // Simulate a failure during the copy operation (e.g., source file does not exist) + when(client.copy(any[Storage.CopyRequest])).thenThrow(new RuntimeException("Source file does not exist")) + + val result = storageInterface.mvFile(oldBucket, oldPath, newBucket, newPath) + + result.value should be(()) + + verify(client, never).copy(any[CopyRequest]) + verify(client, never).delete(sourceBlobId) + } + private def createTestFile = { val source = File.createTempFile("a-file", "") Files.writeString(source.toPath, "real file content", StandardOpenOption.WRITE)