Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retries to cagg httpSinkActor #124

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ protected HttpPostSink(final Builder<?, ?> builder) {
builder._maximumConcurrency,
builder._maximumQueueSize,
builder._spreadPeriod,
builder._metricsFactory));
builder._metricsFactory,
builder._maxRetries));
}

private final URI _uri;
Expand Down Expand Up @@ -244,6 +245,18 @@ public B setSpreadPeriod(final Period value) {
return self();
}

/**
* Sets the maximum number of retries of the http requests. Optional. Cannot be null.
* Default is 0.
*
* @param value the maximum number of retries of the http requests.
* @return this builder
*/
public B setMaxRetries(final Integer value) {
_maxRetries = value;
return self();
}

/**
* Sets the maximum pending queue size. Optional Cannot be null.
* Default is 25000. Minimum is 1.
Expand Down Expand Up @@ -281,5 +294,7 @@ protected Builder(final Function<B, S> targetConstructor) {
@JacksonInject
@NotNull
private MetricsFactory _metricsFactory;
@NotNull
private Integer _maxRetries = 0;
}
}
47 changes: 42 additions & 5 deletions src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class HttpSinkActor extends AbstractActor {
* @param maximumQueueSize Maximum number of pending requests.
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load.
* @param metricsFactory metrics factory to record metrics.
* @param maxRetries Maximum number of retries for the http requests.
* @return A new Props
*/
public static Props props(
Expand All @@ -69,8 +70,17 @@ public static Props props(
final int maximumConcurrency,
final int maximumQueueSize,
final Period spreadPeriod,
final MetricsFactory metricsFactory) {
return Props.create(HttpSinkActor.class, client, sink, maximumConcurrency, maximumQueueSize, spreadPeriod, metricsFactory);
final MetricsFactory metricsFactory,
final int maxRetries) {
return Props.create(
HttpSinkActor.class,
client,
sink,
maximumConcurrency,
maximumQueueSize,
spreadPeriod,
metricsFactory,
maxRetries);
}

/**
Expand All @@ -82,19 +92,22 @@ public static Props props(
* @param maximumQueueSize Maximum number of pending requests.
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load.
* @param metricsFactory metrics factory to record metrics.
* @param maxRetries Maximum number of retries for the http requests.
*/
public HttpSinkActor(
final AsyncHttpClient client,
final HttpPostSink sink,
final int maximumConcurrency,
final int maximumQueueSize,
final Period spreadPeriod,
final MetricsFactory metricsFactory) {
final MetricsFactory metricsFactory,
final int maxRetries) {
_client = client;
_sink = sink;
_maximumConcurrency = maximumConcurrency;
_pendingRequests = EvictingQueue.create(maximumQueueSize);
_metricsFactory = metricsFactory;
_maxRetries = maxRetries;
if (Period.ZERO.equals(spreadPeriod)) {
_spreadingDelayMillis = 0;
} else {
Expand All @@ -106,6 +119,8 @@ public HttpSinkActor(
_inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time";
_requestSuccessName = "sinks/http_post/" + _sink.getMetricSafeName() + "/success";
_responseStatusName = "sinks/http_post/" + _sink.getMetricSafeName() + "/status";
_httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts";
_samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped";
}

/**
Expand Down Expand Up @@ -276,9 +291,8 @@ private void fireNextRequest() {
final Request request = requestEntry.getRequest();
_inflightRequestsCount++;

final CompletableFuture<Response> promise = new CompletableFuture<>();
metrics.startTimer(_requestLatencyName);
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise));
final CompletableFuture<Response> promise = sendHttpRequest(request, 0);
final CompletionStage<Object> responsePromise = promise
.handle((result, err) -> {
metrics.stopTimer(_requestLatencyName);
Expand All @@ -295,9 +309,11 @@ private void fireNextRequest() {
returnValue = new PostSuccess(result);
} else {
returnValue = new PostRejected(request, result);
metrics.incrementCounter(_samplesDroppedName);
}
} else {
returnValue = new PostFailure(request, err);
metrics.incrementCounter(_samplesDroppedName);
}
metrics.incrementCounter(_requestSuccessName, (returnValue instanceof PostSuccess) ? 1 : 0);
metrics.close();
Expand All @@ -306,6 +322,24 @@ private void fireNextRequest() {
PatternsCS.pipe(responsePromise, context().dispatcher()).to(self());
}

private CompletableFuture<Response> sendHttpRequest(
final Request request,
final int attempt) {
final CompletableFuture<Response> promise = new CompletableFuture<>();
_client.executeRequest(request, new ResponseAsyncCompletionHandler(promise));
promise.handle((result, err) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're probably going to need a bit of a backoff here. I would recommend using an exponential backoff strategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least adding some support for backoff would be nice. But I'm also okay with adding that in a second step.

if (err == null && ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) {
try (Metrics metrics = _metricsFactory.create()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new Metrics object for each POST is going to be quite expensive

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not ideal; unfortunately, there is no end-to-end scoped instrumentation in CAGG. Not to mention the event loop here is async and so decoupled. If CAGG supports periodic metrics, we could use that, but this is not unreasonable (there's an MF in the Router so for each incoming request; Ref).

metrics.incrementCounter(_httpSinkAttemptsName, attempt);
}
return promise;
} else {
return attempt < _maxRetries ? sendHttpRequest(request, attempt + 1) : promise;
}
});
return promise;
}

@Override
public void postStop() throws Exception {
super.postStop();
Expand All @@ -325,12 +359,15 @@ public void postStop() throws Exception {
private final HttpPostSink _sink;
private final int _spreadingDelayMillis;
private final MetricsFactory _metricsFactory;
private final int _maxRetries;

private final String _evictedRequestsName;
private final String _requestLatencyName;
private final String _inQueueLatencyName;
private final String _requestSuccessName;
private final String _responseStatusName;
private final String _httpSinkAttemptsName;
private final String _samplesDroppedName;

private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
private static final Logger EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public void setUp() {
.setName("kairosdb_sink_test")
.setActorSystem(getSystem())
.setUri(URI.create("http://localhost:" + _wireMockServer.port() + PATH))
.setMetricsFactory(_mockMetricsFactory);
.setMetricsFactory(_mockMetricsFactory)
.setMaxRetries(5);
Mockito.doReturn(_mockMetrics).when(_mockMetricsFactory).create();
}

Expand Down Expand Up @@ -134,7 +135,8 @@ public void testPost() throws InterruptedException, IOException {
Assert.assertEquals(expected, actual);

// Verify that metrics has been recorded.
Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create();
Mockito.verify(_mockMetricsFactory, Mockito.times(2)).create();
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/attempts", 0);
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/success", 1);
Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/2xx", 1);
Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer(
Expand All @@ -143,7 +145,7 @@ public void testPost() throws InterruptedException, IOException {
Mockito.any());
Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency");
Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency");
Mockito.verify(_mockMetrics, Mockito.times(1)).close();
Mockito.verify(_mockMetrics, Mockito.times(2)).close();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you're missing a test case for the feature you're trying to add. Can you please add a test showing the retry on a failed POST?


private KairosDbSink.Builder _kairosDbSinkBuilder;
Expand Down