Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retries to cagg httpSinkActor #124

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ protected HttpPostSink(final Builder<?, ?> builder) {
builder._maximumConcurrency,
builder._maximumQueueSize,
builder._spreadPeriod,
builder._metricsFactory));
builder._metricsFactory,
builder._maxRetries));
}

private final URI _uri;
Expand Down Expand Up @@ -244,6 +245,18 @@ public B setSpreadPeriod(final Period value) {
return self();
}

/**
* Sets the maximum number of retries of the http requests. Optional. Cannot be null.
* Default is 0.
*
* @param value the maximum number of retries of the http requests.
* @return this builder
*/
public B setMaxRetries(final Integer value) {
_maxRetries = value;
return self();
}

/**
* Sets the maximum pending queue size. Optional Cannot be null.
* Default is 25000. Minimum is 1.
Expand Down Expand Up @@ -281,5 +294,7 @@ protected Builder(final Function<B, S> targetConstructor) {
@JacksonInject
@NotNull
private MetricsFactory _metricsFactory;
@NotNull
private Integer _maxRetries = 0;
}
}
75 changes: 69 additions & 6 deletions src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class HttpSinkActor extends AbstractActor {
* @param maximumQueueSize Maximum number of pending requests.
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load.
* @param metricsFactory metrics factory to record metrics.
* @param maxRetries Maximum number of retries for the http requests.
* @return A new Props
*/
public static Props props(
Expand All @@ -69,8 +70,17 @@ public static Props props(
final int maximumConcurrency,
final int maximumQueueSize,
final Period spreadPeriod,
final MetricsFactory metricsFactory) {
return Props.create(HttpSinkActor.class, client, sink, maximumConcurrency, maximumQueueSize, spreadPeriod, metricsFactory);
final MetricsFactory metricsFactory,
final int maxRetries) {
return Props.create(
HttpSinkActor.class,
client,
sink,
maximumConcurrency,
maximumQueueSize,
spreadPeriod,
metricsFactory,
maxRetries);
}

/**
Expand All @@ -82,19 +92,22 @@ public static Props props(
* @param maximumQueueSize Maximum number of pending requests.
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load.
* @param metricsFactory metrics factory to record metrics.
* @param maxRetries Maximum number of retries for the http requests.
*/
public HttpSinkActor(
final AsyncHttpClient client,
final HttpPostSink sink,
final int maximumConcurrency,
final int maximumQueueSize,
final Period spreadPeriod,
final MetricsFactory metricsFactory) {
final MetricsFactory metricsFactory,
final int maxRetries) {
_client = client;
_sink = sink;
_maximumConcurrency = maximumConcurrency;
_pendingRequests = EvictingQueue.create(maximumQueueSize);
_metricsFactory = metricsFactory;
_maxRetries = maxRetries;
if (Period.ZERO.equals(spreadPeriod)) {
_spreadingDelayMillis = 0;
} else {
Expand All @@ -106,6 +119,8 @@ public HttpSinkActor(
_inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time";
_requestSuccessName = "sinks/http_post/" + _sink.getMetricSafeName() + "/success";
_responseStatusName = "sinks/http_post/" + _sink.getMetricSafeName() + "/status";
_httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts";
_samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped";
}

/**
Expand Down Expand Up @@ -276,10 +291,9 @@ private void fireNextRequest() {
final Request request = requestEntry.getRequest();
_inflightRequestsCount++;

final CompletableFuture<Response> promise = new CompletableFuture<>();
metrics.startTimer(_requestLatencyName);
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise));
final CompletionStage<Object> responsePromise = promise
final HttpResponse httpResponse = sendHttpRequest(request, 0);
final CompletionStage<Object> responsePromise = httpResponse.getResponsePromise()
.handle((result, err) -> {
metrics.stopTimer(_requestLatencyName);
final Object returnValue;
Expand All @@ -292,12 +306,15 @@ private void fireNextRequest() {
responseStatusClass == i ? 1 : 0);
}
if (ACCEPTED_STATUS_CODES.contains(responseStatusCode)) {
metrics.incrementCounter(_httpSinkAttemptsName, httpResponse.getAttempt());
returnValue = new PostSuccess(result);
} else {
returnValue = new PostRejected(request, result);
metrics.incrementCounter(_samplesDroppedName);
}
} else {
returnValue = new PostFailure(request, err);
metrics.incrementCounter(_samplesDroppedName);
}
metrics.incrementCounter(_requestSuccessName, (returnValue instanceof PostSuccess) ? 1 : 0);
metrics.close();
Expand All @@ -306,6 +323,31 @@ private void fireNextRequest() {
PatternsCS.pipe(responsePromise, context().dispatcher()).to(self());
}

private HttpResponse sendHttpRequest(
Copy link
Member

@wittekm wittekm Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a base case

    private HttpResponse sendHttpRequest(
            final Request request
    ) { 
      return this.sendHttpRequest(request, 0)
    }

final Request request,
final int attempt) {
try {
Thread.sleep(Double.valueOf(Math.pow(2, attempt) - 1).longValue() * 1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't do a thread sleep here. It can block too much. The rule is basically not to block in an asyc method.

Also, this isn't proper exponential backoff. Please take a look at https://en.m.wikipedia.org/wiki/Exponential_backoff

Specifically the part about randomization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By saying "block too much", do you mean 1) it can block more than the time set or 2)it occupies too much resources because it is a busy wait?
I found a solution here using ScheduledExecutorService which can solve 1).
And we can probably use wait() for a non busy wait if the problem is 2).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to understand how this code is executing. This code is in an Akka actor, which means it is running on a thread pool shared with other actors. There is a limited amount of threads. In an actor, you should not do anything that blocks the thread (in most cases, there can be exceptions if you are very careful and configure things explicitly). But this also gives us access to Akka timers where you can schedule execution in a non blocking way. Please take a look at the following documentation.

https://doc.akka.io/docs/akka/current/actors.html#introduction

https://doc.akka.io/docs/akka/current/dispatchers.html

https://doc.akka.io/docs/akka/current/actors.html#actors-timers

} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
final CompletableFuture<Response> promise = new CompletableFuture<>();
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise));
return new HttpResponse(
promise.thenCompose(result -> {
if (ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) {
return CompletableFuture.completedFuture(result);
} else {
return attempt < _maxRetries
?
sendHttpRequest(request, attempt + 1).getResponsePromise()
:
CompletableFuture.completedFuture(result);
}
}),
attempt);
}

@Override
public void postStop() throws Exception {
super.postStop();
Expand All @@ -325,12 +367,15 @@ public void postStop() throws Exception {
private final HttpPostSink _sink;
private final int _spreadingDelayMillis;
private final MetricsFactory _metricsFactory;
private final int _maxRetries;

private final String _evictedRequestsName;
private final String _requestLatencyName;
private final String _inQueueLatencyName;
private final String _requestSuccessName;
private final String _responseStatusName;
private final String _httpSinkAttemptsName;
private final String _samplesDroppedName;

private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
private static final Logger EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30));
Expand Down Expand Up @@ -465,4 +510,22 @@ public long getEnterTime() {
private final Request _request;
private final long _enterTime;
}

private static final class HttpResponse {
private HttpResponse(final CompletableFuture<Response> responsePromise, final int attempt) {
_responsePromise = responsePromise;
_attempt = attempt;
}

public CompletableFuture<Response> getResponsePromise() {
return _responsePromise;
}

public int getAttempt() {
return _attempt;
}

private final CompletableFuture<Response> _responsePromise;
private final int _attempt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
import com.arpnetworking.metrics.Metrics;
import com.arpnetworking.metrics.MetricsFactory;
import com.arpnetworking.test.TestBeanFactory;
import com.arpnetworking.tsdcore.model.AggregatedData;
import com.arpnetworking.tsdcore.model.Condition;
import com.arpnetworking.tsdcore.model.FQDSN;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void tearDown() {
}

@Test
public void testPost() throws InterruptedException, IOException {
public void testPostSuccess() throws InterruptedException, IOException {
// Fake a successful post to KairosDb
_wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH))
.willReturn(WireMock.aResponse()
Expand Down Expand Up @@ -135,6 +136,7 @@ public void testPost() throws InterruptedException, IOException {

// Verify that metrics has been recorded.
Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create();
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/attempts", 0);
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/success", 1);
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/2xx", 1);
Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer(
Expand All @@ -146,6 +148,27 @@ public void testPost() throws InterruptedException, IOException {
Mockito.verify(_mockMetrics, Mockito.times(1)).close();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you're missing a test case for the feature you're trying to add. Can you please add a test showing the retry on a failed POST?


@Test
public void testPostFailure() throws InterruptedException, IOException {
// Fake a successful post to KairosDb
_wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH))
.willReturn(WireMock.aResponse()
.withStatus(404)));
_kairosDbSinkBuilder.setMaxRetries(1).build().recordAggregateData(TestBeanFactory.createPeriodicData());
Thread.sleep(3000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I recognize there's already a Thread.sleep(1000) in this test case, but - why 3000?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I added exponential backoff for the retry, which means, after initial HTTPrequest, the thread will wait 1000 milliseconds before sending the next attempt. So if we use 1000 here this test will fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sleeps are a really bad way to deal with this and will cause brittle tests. Waiting for an asyc completion with a timeout is probably a better way to go


Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create();
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/4xx", 1);
Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer(
Mockito.matches("sinks/http_post/kairosdb_sink_test/queue_time"),
Mockito.anyLong(),
Mockito.any());
Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency");
Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency");
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/samples_dropped");
Mockito.verify(_mockMetrics, Mockito.times(1)).close();
}

private KairosDbSink.Builder _kairosDbSinkBuilder;
private WireMockServer _wireMockServer;
private WireMock _wireMock;
Expand Down