diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java index 063832b1..7fbcb489 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java @@ -160,7 +160,8 @@ protected HttpPostSink(final Builder builder) { builder._maximumConcurrency, builder._maximumQueueSize, builder._spreadPeriod, - builder._metricsFactory)); + builder._metricsFactory, + builder._maxRetries)); } private final URI _uri; @@ -244,6 +245,18 @@ public B setSpreadPeriod(final Period value) { return self(); } + /** + * Sets the maximum number of retries of the http requests. Optional. Cannot be null. + * Default is 0. + * + * @param value the maximum number of retries of the http requests. + * @return this builder + */ + public B setMaxRetries(final Integer value) { + _maxRetries = value; + return self(); + } + /** * Sets the maximum pending queue size. Optional Cannot be null. * Default is 25000. Minimum is 1. @@ -281,5 +294,7 @@ protected Builder(final Function targetConstructor) { @JacksonInject @NotNull private MetricsFactory _metricsFactory; + @NotNull + private Integer _maxRetries = 0; } } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index 5fd3c0d7..9776328f 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -61,6 +61,7 @@ public class HttpSinkActor extends AbstractActor { * @param maximumQueueSize Maximum number of pending requests. * @param spreadPeriod Maximum time to delay sending new aggregates to spread load. * @param metricsFactory metrics factory to record metrics. + * @param maxRetries Maximum number of retries for the http requests. * @return A new Props */ public static Props props( @@ -69,8 +70,17 @@ public static Props props( final int maximumConcurrency, final int maximumQueueSize, final Period spreadPeriod, - final MetricsFactory metricsFactory) { - return Props.create(HttpSinkActor.class, client, sink, maximumConcurrency, maximumQueueSize, spreadPeriod, metricsFactory); + final MetricsFactory metricsFactory, + final int maxRetries) { + return Props.create( + HttpSinkActor.class, + client, + sink, + maximumConcurrency, + maximumQueueSize, + spreadPeriod, + metricsFactory, + maxRetries); } /** @@ -82,6 +92,7 @@ public static Props props( * @param maximumQueueSize Maximum number of pending requests. * @param spreadPeriod Maximum time to delay sending new aggregates to spread load. * @param metricsFactory metrics factory to record metrics. + * @param maxRetries Maximum number of retries for the http requests. */ public HttpSinkActor( final AsyncHttpClient client, @@ -89,12 +100,14 @@ public HttpSinkActor( final int maximumConcurrency, final int maximumQueueSize, final Period spreadPeriod, - final MetricsFactory metricsFactory) { + final MetricsFactory metricsFactory, + final int maxRetries) { _client = client; _sink = sink; _maximumConcurrency = maximumConcurrency; _pendingRequests = EvictingQueue.create(maximumQueueSize); _metricsFactory = metricsFactory; + _maxRetries = maxRetries; if (Period.ZERO.equals(spreadPeriod)) { _spreadingDelayMillis = 0; } else { @@ -106,6 +119,8 @@ public HttpSinkActor( _inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time"; _requestSuccessName = "sinks/http_post/" + _sink.getMetricSafeName() + "/success"; _responseStatusName = "sinks/http_post/" + _sink.getMetricSafeName() + "/status"; + _httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts"; + _samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped"; } /** @@ -276,10 +291,9 @@ private void fireNextRequest() { final Request request = requestEntry.getRequest(); _inflightRequestsCount++; - final CompletableFuture promise = new CompletableFuture<>(); metrics.startTimer(_requestLatencyName); - _client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); - final CompletionStage responsePromise = promise + final HttpResponse httpResponse = sendHttpRequest(request, 0); + final CompletionStage responsePromise = httpResponse.getResponsePromise() .handle((result, err) -> { metrics.stopTimer(_requestLatencyName); final Object returnValue; @@ -292,12 +306,15 @@ private void fireNextRequest() { responseStatusClass == i ? 1 : 0); } if (ACCEPTED_STATUS_CODES.contains(responseStatusCode)) { + metrics.incrementCounter(_httpSinkAttemptsName, httpResponse.getAttempt()); returnValue = new PostSuccess(result); } else { returnValue = new PostRejected(request, result); + metrics.incrementCounter(_samplesDroppedName); } } else { returnValue = new PostFailure(request, err); + metrics.incrementCounter(_samplesDroppedName); } metrics.incrementCounter(_requestSuccessName, (returnValue instanceof PostSuccess) ? 1 : 0); metrics.close(); @@ -306,6 +323,31 @@ private void fireNextRequest() { PatternsCS.pipe(responsePromise, context().dispatcher()).to(self()); } + private HttpResponse sendHttpRequest( + final Request request, + final int attempt) { + try { + Thread.sleep(Double.valueOf(Math.pow(2, attempt) - 1).longValue() * 1000); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + final CompletableFuture promise = new CompletableFuture<>(); + _client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); + return new HttpResponse( + promise.thenCompose(result -> { + if (ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { + return CompletableFuture.completedFuture(result); + } else { + return attempt < _maxRetries + ? + sendHttpRequest(request, attempt + 1).getResponsePromise() + : + CompletableFuture.completedFuture(result); + } + }), + attempt); + } + @Override public void postStop() throws Exception { super.postStop(); @@ -325,12 +367,15 @@ public void postStop() throws Exception { private final HttpPostSink _sink; private final int _spreadingDelayMillis; private final MetricsFactory _metricsFactory; + private final int _maxRetries; private final String _evictedRequestsName; private final String _requestLatencyName; private final String _inQueueLatencyName; private final String _requestSuccessName; private final String _responseStatusName; + private final String _httpSinkAttemptsName; + private final String _samplesDroppedName; private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class); private static final Logger EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30)); @@ -465,4 +510,22 @@ public long getEnterTime() { private final Request _request; private final long _enterTime; } + + private static final class HttpResponse { + private HttpResponse(final CompletableFuture responsePromise, final int attempt) { + _responsePromise = responsePromise; + _attempt = attempt; + } + + public CompletableFuture getResponsePromise() { + return _responsePromise; + } + + public int getAttempt() { + return _attempt; + } + + private final CompletableFuture _responsePromise; + private final int _attempt; + } } diff --git a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java index 9e23d82f..c36dd034 100644 --- a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java +++ b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java @@ -19,6 +19,7 @@ import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory; import com.arpnetworking.metrics.Metrics; import com.arpnetworking.metrics.MetricsFactory; +import com.arpnetworking.test.TestBeanFactory; import com.arpnetworking.tsdcore.model.AggregatedData; import com.arpnetworking.tsdcore.model.Condition; import com.arpnetworking.tsdcore.model.FQDSN; @@ -75,7 +76,7 @@ public void tearDown() { } @Test - public void testPost() throws InterruptedException, IOException { + public void testPostSuccess() throws InterruptedException, IOException { // Fake a successful post to KairosDb _wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH)) .willReturn(WireMock.aResponse() @@ -135,6 +136,7 @@ public void testPost() throws InterruptedException, IOException { // Verify that metrics has been recorded. Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); + Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/attempts", 0); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/success", 1); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/2xx", 1); Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer( @@ -146,6 +148,27 @@ public void testPost() throws InterruptedException, IOException { Mockito.verify(_mockMetrics, Mockito.times(1)).close(); } + @Test + public void testPostFailure() throws InterruptedException, IOException { + // Fake a successful post to KairosDb + _wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH)) + .willReturn(WireMock.aResponse() + .withStatus(404))); + _kairosDbSinkBuilder.setMaxRetries(1).build().recordAggregateData(TestBeanFactory.createPeriodicData()); + Thread.sleep(3000); + + Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); + Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/4xx", 1); + Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer( + Mockito.matches("sinks/http_post/kairosdb_sink_test/queue_time"), + Mockito.anyLong(), + Mockito.any()); + Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency"); + Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency"); + Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/samples_dropped"); + Mockito.verify(_mockMetrics, Mockito.times(1)).close(); + } + private KairosDbSink.Builder _kairosDbSinkBuilder; private WireMockServer _wireMockServer; private WireMock _wireMock;