diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 00172701cc..cb9f17115f 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -216,16 +216,7 @@ private void startProcessingObject(final long waitTimeMillis) { partitionKeys.remove(objectToProcess.get().getPartitionKey()); }, ACKNOWLEDGEMENT_SET_TIMEOUT); - acknowledgementSet.addProgressCheck( - (ratio) -> { - try { - sourceCoordinator.renewPartitionOwnership(objectToProcess.get().getPartitionKey()); - } catch (final PartitionUpdateException | PartitionNotOwnedException | PartitionNotFoundException e) { - LOG.debug("Failed to update partition ownership for {} in the acknowledgment progress check", objectToProcess.get().getPartitionKey()); - partitionOwnershipUpdateFailures.increment(); - } - }, - CHECKPOINT_OWNERSHIP_INTERVAL); + addProgressCheck(acknowledgementSet, objectToProcess.get()); } @@ -360,6 +351,7 @@ private void processObjectsForFolderPartition(final List obje activeAcknowledgmentSetId = acknowledgmentSetId; acknowledgementSet = createAcknowledgmentSetForFolderPartition(folderPartition, acknowledgmentSetId); + addProgressCheck(acknowledgementSet, folderPartition); objectsToDeleteForAcknowledgmentSets.put(acknowledgmentSetId, new HashSet<>()); @@ -412,4 +404,17 @@ private AcknowledgementSet createAcknowledgmentSetForFolderPartition(final Sourc } }, ACKNOWLEDGEMENT_SET_TIMEOUT); } + + private void addProgressCheck(final AcknowledgementSet acknowledgementSet, final SourcePartition objectToProcess) { + acknowledgementSet.addProgressCheck( + (ratio) -> { + try { + sourceCoordinator.renewPartitionOwnership(objectToProcess.getPartitionKey()); + } catch (final PartitionUpdateException | PartitionNotOwnedException | PartitionNotFoundException e) { + LOG.debug("Failed to update partition ownership for {} in the acknowledgment progress check", objectToProcess.getPartitionKey()); + partitionOwnershipUpdateFailures.increment(); + } + }, + CHECKPOINT_OWNERSHIP_INTERVAL); + } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java index 3e71510bf4..0a81795647 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java @@ -514,6 +514,9 @@ void processing_with_folder_partition_processes_objects_in_folder_and_deletes_th .thenReturn(acknowledgementSet1) .thenReturn(acknowledgementSet2); + doNothing().when(acknowledgementSet1).addProgressCheck(any(Consumer.class), eq(CHECKPOINT_OWNERSHIP_INTERVAL)); + doNothing().when(acknowledgementSet2).addProgressCheck(any(Consumer.class), eq(CHECKPOINT_OWNERSHIP_INTERVAL)); + doNothing().when(s3ObjectDeleteWorker).deleteS3Object(any(DeleteObjectRequest.class)); doNothing().when(s3ObjectHandler).parseS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class), eq(sourceCoordinator), eq(partitionKey)); @@ -589,6 +592,7 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))) .thenReturn(acknowledgementSet1); + doNothing().when(acknowledgementSet1).addProgressCheck(any(Consumer.class), eq(CHECKPOINT_OWNERSHIP_INTERVAL)); doNothing().when(s3ObjectDeleteWorker).deleteS3Object(any(DeleteObjectRequest.class)); doNothing().when(s3ObjectHandler).parseS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class), eq(sourceCoordinator), eq(partitionKey)); @@ -601,6 +605,13 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje final ArgumentCaptor consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); verify(acknowledgementSetManager, times(1)).create(consumerArgumentCaptor.capture(), any(Duration.class)); + final ArgumentCaptor progressCheckArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(acknowledgementSet1).addProgressCheck(progressCheckArgumentCaptor.capture(), eq(CHECKPOINT_OWNERSHIP_INTERVAL)); + + final Consumer progressCheckConsumer = progressCheckArgumentCaptor.getValue(); + progressCheckConsumer.accept(mock(ProgressCheck.class)); + verify(sourceCoordinator).renewPartitionOwnership(partitionKey); + final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet1, s3ObjectDeleteWorker);