-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add retries to cagg httpSinkActor #124
base: master
Are you sure you want to change the base?
Changes from all commits
6ac4fc1
36013a9
cf2e798
0639d72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,7 @@ public class HttpSinkActor extends AbstractActor { | |
* @param maximumQueueSize Maximum number of pending requests. | ||
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load. | ||
* @param metricsFactory metrics factory to record metrics. | ||
* @param maxRetries Maximum number of retries for the http requests. | ||
* @return A new Props | ||
*/ | ||
public static Props props( | ||
|
@@ -69,8 +70,17 @@ public static Props props( | |
final int maximumConcurrency, | ||
final int maximumQueueSize, | ||
final Period spreadPeriod, | ||
final MetricsFactory metricsFactory) { | ||
return Props.create(HttpSinkActor.class, client, sink, maximumConcurrency, maximumQueueSize, spreadPeriod, metricsFactory); | ||
final MetricsFactory metricsFactory, | ||
final int maxRetries) { | ||
return Props.create( | ||
HttpSinkActor.class, | ||
client, | ||
sink, | ||
maximumConcurrency, | ||
maximumQueueSize, | ||
spreadPeriod, | ||
metricsFactory, | ||
maxRetries); | ||
} | ||
|
||
/** | ||
|
@@ -82,19 +92,22 @@ public static Props props( | |
* @param maximumQueueSize Maximum number of pending requests. | ||
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load. | ||
* @param metricsFactory metrics factory to record metrics. | ||
* @param maxRetries Maximum number of retries for the http requests. | ||
*/ | ||
public HttpSinkActor( | ||
final AsyncHttpClient client, | ||
final HttpPostSink sink, | ||
final int maximumConcurrency, | ||
final int maximumQueueSize, | ||
final Period spreadPeriod, | ||
final MetricsFactory metricsFactory) { | ||
final MetricsFactory metricsFactory, | ||
final int maxRetries) { | ||
_client = client; | ||
_sink = sink; | ||
_maximumConcurrency = maximumConcurrency; | ||
_pendingRequests = EvictingQueue.create(maximumQueueSize); | ||
_metricsFactory = metricsFactory; | ||
_maxRetries = maxRetries; | ||
if (Period.ZERO.equals(spreadPeriod)) { | ||
_spreadingDelayMillis = 0; | ||
} else { | ||
|
@@ -106,6 +119,8 @@ public HttpSinkActor( | |
_inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time"; | ||
_requestSuccessName = "sinks/http_post/" + _sink.getMetricSafeName() + "/success"; | ||
_responseStatusName = "sinks/http_post/" + _sink.getMetricSafeName() + "/status"; | ||
_httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts"; | ||
_samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped"; | ||
} | ||
|
||
/** | ||
|
@@ -276,10 +291,9 @@ private void fireNextRequest() { | |
final Request request = requestEntry.getRequest(); | ||
_inflightRequestsCount++; | ||
|
||
final CompletableFuture<Response> promise = new CompletableFuture<>(); | ||
metrics.startTimer(_requestLatencyName); | ||
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); | ||
final CompletionStage<Object> responsePromise = promise | ||
final HttpResponse httpResponse = sendHttpRequest(request, 0); | ||
final CompletionStage<Object> responsePromise = httpResponse.getResponsePromise() | ||
.handle((result, err) -> { | ||
metrics.stopTimer(_requestLatencyName); | ||
final Object returnValue; | ||
|
@@ -292,12 +306,15 @@ private void fireNextRequest() { | |
responseStatusClass == i ? 1 : 0); | ||
} | ||
if (ACCEPTED_STATUS_CODES.contains(responseStatusCode)) { | ||
metrics.incrementCounter(_httpSinkAttemptsName, httpResponse.getAttempt()); | ||
returnValue = new PostSuccess(result); | ||
} else { | ||
returnValue = new PostRejected(request, result); | ||
metrics.incrementCounter(_samplesDroppedName); | ||
} | ||
} else { | ||
returnValue = new PostFailure(request, err); | ||
metrics.incrementCounter(_samplesDroppedName); | ||
} | ||
metrics.incrementCounter(_requestSuccessName, (returnValue instanceof PostSuccess) ? 1 : 0); | ||
metrics.close(); | ||
|
@@ -306,6 +323,31 @@ private void fireNextRequest() { | |
PatternsCS.pipe(responsePromise, context().dispatcher()).to(self()); | ||
} | ||
|
||
private HttpResponse sendHttpRequest( | ||
final Request request, | ||
final int attempt) { | ||
try { | ||
Thread.sleep(Double.valueOf(Math.pow(2, attempt) - 1).longValue() * 1000); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can't do a thread sleep here. It can block too much. The rule is basically not to block in an asyc method. Also, this isn't proper exponential backoff. Please take a look at https://en.m.wikipedia.org/wiki/Exponential_backoff Specifically the part about randomization. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By saying "block too much", do you mean 1) it can block more than the time set or 2)it occupies too much resources because it is a busy wait? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to understand how this code is executing. This code is in an Akka actor, which means it is running on a thread pool shared with other actors. There is a limited amount of threads. In an actor, you should not do anything that blocks the thread (in most cases, there can be exceptions if you are very careful and configure things explicitly). But this also gives us access to Akka timers where you can schedule execution in a non blocking way. Please take a look at the following documentation. https://doc.akka.io/docs/akka/current/actors.html#introduction https://doc.akka.io/docs/akka/current/dispatchers.html https://doc.akka.io/docs/akka/current/actors.html#actors-timers |
||
} catch (final InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
final CompletableFuture<Response> promise = new CompletableFuture<>(); | ||
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); | ||
return new HttpResponse( | ||
promise.thenCompose(result -> { | ||
if (ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { | ||
return CompletableFuture.completedFuture(result); | ||
} else { | ||
return attempt < _maxRetries | ||
? | ||
sendHttpRequest(request, attempt + 1).getResponsePromise() | ||
: | ||
CompletableFuture.completedFuture(result); | ||
} | ||
}), | ||
attempt); | ||
} | ||
|
||
@Override | ||
public void postStop() throws Exception { | ||
super.postStop(); | ||
|
@@ -325,12 +367,15 @@ public void postStop() throws Exception { | |
private final HttpPostSink _sink; | ||
private final int _spreadingDelayMillis; | ||
private final MetricsFactory _metricsFactory; | ||
private final int _maxRetries; | ||
|
||
private final String _evictedRequestsName; | ||
private final String _requestLatencyName; | ||
private final String _inQueueLatencyName; | ||
private final String _requestSuccessName; | ||
private final String _responseStatusName; | ||
private final String _httpSinkAttemptsName; | ||
private final String _samplesDroppedName; | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class); | ||
private static final Logger EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30)); | ||
|
@@ -465,4 +510,22 @@ public long getEnterTime() { | |
private final Request _request; | ||
private final long _enterTime; | ||
} | ||
|
||
private static final class HttpResponse { | ||
private HttpResponse(final CompletableFuture<Response> responsePromise, final int attempt) { | ||
_responsePromise = responsePromise; | ||
_attempt = attempt; | ||
} | ||
|
||
public CompletableFuture<Response> getResponsePromise() { | ||
return _responsePromise; | ||
} | ||
|
||
public int getAttempt() { | ||
return _attempt; | ||
} | ||
|
||
private final CompletableFuture<Response> _responsePromise; | ||
private final int _attempt; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory; | ||
import com.arpnetworking.metrics.Metrics; | ||
import com.arpnetworking.metrics.MetricsFactory; | ||
import com.arpnetworking.test.TestBeanFactory; | ||
import com.arpnetworking.tsdcore.model.AggregatedData; | ||
import com.arpnetworking.tsdcore.model.Condition; | ||
import com.arpnetworking.tsdcore.model.FQDSN; | ||
|
@@ -75,7 +76,7 @@ public void tearDown() { | |
} | ||
|
||
@Test | ||
public void testPost() throws InterruptedException, IOException { | ||
public void testPostSuccess() throws InterruptedException, IOException { | ||
// Fake a successful post to KairosDb | ||
_wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH)) | ||
.willReturn(WireMock.aResponse() | ||
|
@@ -135,6 +136,7 @@ public void testPost() throws InterruptedException, IOException { | |
|
||
// Verify that metrics has been recorded. | ||
Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/attempts", 0); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/success", 1); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/2xx", 1); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer( | ||
|
@@ -146,6 +148,27 @@ public void testPost() throws InterruptedException, IOException { | |
Mockito.verify(_mockMetrics, Mockito.times(1)).close(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like you're missing a test case for the feature you're trying to add. Can you please add a test showing the retry on a failed POST? |
||
|
||
@Test | ||
public void testPostFailure() throws InterruptedException, IOException { | ||
// Fake a successful post to KairosDb | ||
_wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH)) | ||
.willReturn(WireMock.aResponse() | ||
.withStatus(404))); | ||
_kairosDbSinkBuilder.setMaxRetries(1).build().recordAggregateData(TestBeanFactory.createPeriodicData()); | ||
Thread.sleep(3000); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I recognize there's already a Thread.sleep(1000) in this test case, but - why 3000?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because I added exponential backoff for the retry, which means, after initial HTTPrequest, the thread will wait 1000 milliseconds before sending the next attempt. So if we use 1000 here this test will fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sleeps are a really bad way to deal with this and will cause brittle tests. Waiting for an asyc completion with a timeout is probably a better way to go |
||
|
||
Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/4xx", 1); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer( | ||
Mockito.matches("sinks/http_post/kairosdb_sink_test/queue_time"), | ||
Mockito.anyLong(), | ||
Mockito.any()); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency"); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency"); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/samples_dropped"); | ||
Mockito.verify(_mockMetrics, Mockito.times(1)).close(); | ||
} | ||
|
||
private KairosDbSink.Builder _kairosDbSinkBuilder; | ||
private WireMockServer _wireMockServer; | ||
private WireMock _wireMock; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a base case