From d49e6572dbd901df249cfe3aea0346057cfd15f4 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Sun, 11 Feb 2024 23:47:15 -0800 Subject: [PATCH] Add no-retry logic and metric counter Signed-off-by: Chase Engelbrecht --- .../plugins/sink/opensearch/BulkRetryStrategy.java | 14 +++++++++++++- .../sink/opensearch/BulkRetryStrategyTests.java | 9 +++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 2c629e32a6..d0851d9a02 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -44,6 +44,7 @@ public final class BulkRetryStrategy { public static final String BULK_REQUEST_TIMEOUT_ERRORS = "bulkRequestTimeoutErrors"; public static final String BULK_REQUEST_SERVER_ERRORS = "bulkRequestServerErrors"; public static final String DOCUMENTS_VERSION_CONFLICT_ERRORS = "documentsVersionConflictErrors"; + public static final String DOCUMENT_FAILED_DEPENDENCY_ERRORS = "documentFailedDependencyErrors"; static final long INITIAL_DELAY_MS = 50; static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis(); static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception"; @@ -52,7 +53,8 @@ public final class BulkRetryStrategy { Arrays.asList( RestStatus.BAD_REQUEST.getStatus(), RestStatus.NOT_FOUND.getStatus(), - RestStatus.CONFLICT.getStatus() + RestStatus.CONFLICT.getStatus(), + RestStatus.FAILED_DEPENDENCY.getStatus() )); private static final Set BAD_REQUEST_ERRORS = new HashSet<>( @@ -119,6 +121,7 @@ public final class BulkRetryStrategy { private final Counter bulkRequestTimeoutErrors; private final Counter bulkRequestServerErrors; private final Counter documentsVersionConflictErrors; + private final Counter documentFailedDependencyErrorsCounter; private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class); static class BulkOperationRequestResponse { @@ -164,6 +167,7 @@ public BulkRetryStrategy(final RequestFunction createBulkReq LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); bulkOperation.releaseEventHandle(true); } else { + if (RestStatus.FAILED_DEPENDENCY.getStatus() == bulkItemResponse.status()) { + documentFailedDependencyErrorsCounter.increment(); + } + nonRetryableFailures.add(FailedBulkOperation.builder() .withBulkOperation(bulkOperation) .withBulkResponseItem(bulkItemResponse) @@ -355,6 +363,10 @@ private void handleFailures(final AccumulatingBulkRequest