Skip to content

Commit

Permalink
Add retry policy for uploading requests to agent (#7824)
Browse files Browse the repository at this point in the history
Add a mechanism to retry on network failure or 500 response code.
The retry policy store the call made to OkHttp to track number of
retries made and the maximum retries allowed by failure.
0 retry are allowed for snapshots, 10 for probe statuses & symdb
  • Loading branch information
jpbempel authored Oct 23, 2024
1 parent 48a5921 commit 893bd2d
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ private static DebuggerSink createDebuggerSink(Config config, ProbeStatusSink pr
String tags = getDefaultTagsMergedWithGlobalTags(config);
SnapshotSink snapshotSink =
new SnapshotSink(
config, tags, new BatchUploader(config, config.getFinalDebuggerSnapshotUrl()));
config,
tags,
new BatchUploader(
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY));
SymbolSink symbolSink = new SymbolSink(config);
return new DebuggerSink(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ public DebuggerTransformer(Config config, Configuration configuration) {
DebuggerMetrics.getInstance(config),
new ProbeStatusSink(config, config.getFinalDebuggerSnapshotUrl(), false),
new SnapshotSink(
config, "", new BatchUploader(config, config.getFinalDebuggerSnapshotUrl())),
config,
"",
new BatchUploader(
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
new SymbolSink(config)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public DebuggerSink(Config config, ProbeStatusSink probeStatusSink) {
DebuggerMetrics.getInstance(config),
probeStatusSink,
new SnapshotSink(
config, null, new BatchUploader(config, config.getFinalDebuggerSnapshotUrl())),
config,
null,
new BatchUploader(
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
new SymbolSink(config));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ProbeStatusSink {
private static final JsonAdapter<ProbeStatus> PROBE_STATUS_ADAPTER =
MoshiHelper.createMoshiProbeStatus().adapter(ProbeStatus.class);
private static final int MINUTES_BETWEEN_ERROR_LOG = 5;
public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(10);

private final BatchUploader diagnosticUploader;
private final Builder messageBuilder;
Expand All @@ -46,7 +47,7 @@ public class ProbeStatusSink {
private final boolean useMultiPart;

public ProbeStatusSink(Config config, String diagnosticsEndpoint, boolean useMultiPart) {
this(config, new BatchUploader(config, diagnosticsEndpoint), useMultiPart);
this(config, new BatchUploader(config, diagnosticsEndpoint, RETRY_POLICY), useMultiPart);
}

ProbeStatusSink(Config config, BatchUploader diagnosticUploader, boolean useMultiPart) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class SnapshotSink {
private static final int HIGH_RATE_25_PERCENT_CAPACITY = HIGH_RATE_CAPACITY / 4;
private static final int HIGH_RATE_75_PERCENT_CAPACITY = HIGH_RATE_CAPACITY * 3 / 4;
static final long HIGH_RATE_STEP_SIZE = 10;
public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(0);

private final BlockingQueue<Snapshot> lowRateSnapshots =
new ArrayBlockingQueue<>(LOW_RATE_CAPACITY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class SymbolSink {

private static final Logger LOGGER = LoggerFactory.getLogger(SymbolSink.class);
static final int CAPACITY = 1024;
public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(10);
private static final JsonAdapter<ServiceVersion> SERVICE_VERSION_ADAPTER =
MoshiHelper.createMoshiSymbol().adapter(ServiceVersion.class);
private static final String EVENT_FORMAT =
Expand All @@ -38,7 +39,7 @@ public class SymbolSink {
private final Stats stats = new Stats();

public SymbolSink(Config config) {
this(config, new BatchUploader(config, config.getFinalDebuggerSymDBUrl()));
this(config, new BatchUploader(config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY));
}

SymbolSink(Config config, BatchUploader symbolUploader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import datadog.trace.util.AgentThreadFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.SynchronousQueue;
Expand Down Expand Up @@ -56,7 +58,16 @@ public String getFileName() {
}
}

private static final Logger log = LoggerFactory.getLogger(BatchUploader.class);
public static class RetryPolicy {
public final ConcurrentMap<Call, Integer> failures = new ConcurrentHashMap<>();
public final int maxFailures;

public RetryPolicy(int maxFailures) {
this.maxFailures = maxFailures;
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(BatchUploader.class);
private static final int MINUTES_BETWEEN_ERROR_LOG = 5;
private static final MediaType APPLICATION_JSON = MediaType.parse("application/json");
private static final String HEADER_DD_CONTAINER_ID = "Datadog-Container-ID";
Expand All @@ -76,18 +87,28 @@ public String getFileName() {
private final DebuggerMetrics debuggerMetrics;
private final boolean instrumentTheWorld;
private final RatelimitedLogger ratelimitedLogger;
private final RetryPolicy retryPolicy;

private final Phaser inflightRequests = new Phaser(1);

public BatchUploader(Config config, String endpoint) {
this(config, endpoint, new RatelimitedLogger(log, MINUTES_BETWEEN_ERROR_LOG, TimeUnit.MINUTES));
public BatchUploader(Config config, String endpoint, RetryPolicy retryPolicy) {
this(
config,
endpoint,
new RatelimitedLogger(LOGGER, MINUTES_BETWEEN_ERROR_LOG, TimeUnit.MINUTES),
retryPolicy);
}

BatchUploader(Config config, String endpoint, RatelimitedLogger ratelimitedLogger) {
BatchUploader(
Config config,
String endpoint,
RatelimitedLogger ratelimitedLogger,
RetryPolicy retryPolicy) {
this(
config,
endpoint,
ratelimitedLogger,
retryPolicy,
ContainerInfo.get().containerId,
ContainerInfo.getEntityId());
}
Expand All @@ -97,17 +118,17 @@ public BatchUploader(Config config, String endpoint) {
Config config,
String endpoint,
RatelimitedLogger ratelimitedLogger,
RetryPolicy retryPolicy,
String containerId,
String entityId) {
instrumentTheWorld = config.isDebuggerInstrumentTheWorld();
if (endpoint == null || endpoint.length() == 0) {
throw new IllegalArgumentException("Endpoint url is empty");
}
urlBase = HttpUrl.get(endpoint);
log.debug("Started BatchUploader with target url {}", urlBase);
LOGGER.debug("Started BatchUploader with target url {}", urlBase);
apiKey = config.getApiKey();
this.ratelimitedLogger = ratelimitedLogger;
responseCallback = new ResponseCallback(ratelimitedLogger, inflightRequests);
// This is the same thing OkHttp Dispatcher is doing except thread naming and daemonization
okHttpExecutorService =
new ThreadPoolExecutor(
Expand All @@ -117,6 +138,7 @@ public BatchUploader(Config config, String endpoint) {
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new AgentThreadFactory(DEBUGGER_HTTP_DISPATCHER));
this.retryPolicy = retryPolicy;
this.containerId = containerId;
this.entityId = entityId;
Duration requestTimeout = Duration.ofSeconds(config.getDebuggerUploadTimeout());
Expand All @@ -132,6 +154,8 @@ public BatchUploader(Config config, String endpoint) {
null, /* proxyUsername */
null, /* proxyPassword */
requestTimeout.toMillis());
responseCallback =
new ResponseCallback(ratelimitedLogger, inflightRequests, client, retryPolicy);
debuggerMetrics = DebuggerMetrics.getInstance(config);
}

Expand Down Expand Up @@ -195,6 +219,10 @@ public HttpUrl getUrl() {
return urlBase;
}

RetryPolicy getRetryPolicy() {
return retryPolicy;
}

private void makeUploadRequest(byte[] json, String tags) {
int contentLength = json.length;
// use RequestBody.create(MediaType, byte[]) to avoid changing Content-Type to
Expand All @@ -205,8 +233,8 @@ private void makeUploadRequest(byte[] json, String tags) {

private void buildAndSendRequest(RequestBody body, int contentLength, String tags) {
debuggerMetrics.histogram("batch.uploader.request.size", contentLength);
if (log.isDebugEnabled()) {
log.debug("Uploading batch data size={} bytes", contentLength);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Uploading batch data size={} bytes", contentLength);
}
HttpUrl.Builder builder = urlBase.newBuilder();
if (tags != null && !tags.isEmpty()) {
Expand All @@ -215,17 +243,17 @@ private void buildAndSendRequest(RequestBody body, int contentLength, String tag
Request.Builder requestBuilder = new Request.Builder().url(builder.build()).post(body);
if (apiKey != null) {
if (apiKey.isEmpty()) {
log.debug("API key is empty");
LOGGER.debug("API key is empty");
}
if (apiKey.length() != 32) {
log.debug(
LOGGER.debug(
"API key length is incorrect (truncated?) expected=32 actual={} API key={}...",
apiKey.length(),
apiKey.substring(0, Math.min(apiKey.length(), 6)));
}
requestBuilder.addHeader(HEADER_DD_API_KEY, apiKey);
} else {
log.debug("API key is null");
LOGGER.debug("API key is null");
}
if (containerId != null) {
requestBuilder.addHeader(HEADER_DD_CONTAINER_ID, containerId);
Expand All @@ -234,24 +262,23 @@ private void buildAndSendRequest(RequestBody body, int contentLength, String tag
requestBuilder.addHeader(HEADER_DD_ENTITY_ID, entityId);
}
Request request = requestBuilder.build();
log.debug("Sending request: {} CT: {}", request, request.body().contentType());
client.newCall(request).enqueue(responseCallback);
inflightRequests.register();
LOGGER.debug("Sending request: {} CT: {}", request, request.body().contentType());
enqueueCall(client, request, responseCallback, retryPolicy, 0, inflightRequests);
}

public void shutdown() {
try {
inflightRequests.awaitAdvanceInterruptibly(inflightRequests.arrive(), 10, TimeUnit.SECONDS);
} catch (TimeoutException | InterruptedException ignored) {
log.warn("Not all upload requests have been handled");
LOGGER.warn("Not all upload requests have been handled");
}
okHttpExecutorService.shutdownNow();
try {
okHttpExecutorService.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
// Note: this should only happen in main thread right before exiting, so eating up interrupted
// state should be fine.
log.warn("Wait for executor shutdown interrupted");
LOGGER.warn("Wait for executor shutdown interrupted");
}
client.connectionPool().evictAll();
}
Expand All @@ -260,28 +287,68 @@ private boolean canEnqueueMoreRequests() {
return client.dispatcher().queuedCallsCount() < MAX_ENQUEUED_REQUESTS;
}

private static void enqueueCall(
OkHttpClient client,
Request request,
Callback responseCallback,
RetryPolicy retryPolicy,
int failureCount,
Phaser inflightRequests) {
Call call = client.newCall(request);
retryPolicy.failures.put(call, failureCount);
call.enqueue(responseCallback);
inflightRequests.register();
}

private static final class ResponseCallback implements Callback {

private final RatelimitedLogger ratelimitedLogger;
private final Phaser inflightRequests;
private final OkHttpClient client;
private final RetryPolicy retryPolicy;

public ResponseCallback(final RatelimitedLogger ratelimitedLogger, Phaser inflightRequests) {
public ResponseCallback(
final RatelimitedLogger ratelimitedLogger,
Phaser inflightRequests,
OkHttpClient client,
RetryPolicy retryPolicy) {
this.ratelimitedLogger = ratelimitedLogger;
this.inflightRequests = inflightRequests;
this.client = client;
this.retryPolicy = retryPolicy;
}

@Override
public void onFailure(final Call call, final IOException e) {
public void onFailure(Call call, IOException e) {
inflightRequests.arriveAndDeregister();
ratelimitedLogger.warn("Failed to upload batch to {}", call.request().url(), e);
handleRetry(call, retryPolicy.maxFailures);
}

private void handleRetry(Call call, int maxFailures) {
Integer failure = retryPolicy.failures.remove(call);
if (failure != null) {
int failureCount = failure + 1;
if (failureCount <= maxFailures) {
LOGGER.debug(
"Retrying upload to {}, {}/{}", call.request().url(), failureCount, maxFailures);
enqueueCall(client, call.request(), this, retryPolicy, failureCount, inflightRequests);
} else {
LOGGER.warn(
"Failed permanently to upload batch to {} after {} attempts",
call.request().url(),
maxFailures);
}
}
}

@Override
public void onResponse(final Call call, final Response response) {
public void onResponse(Call call, Response response) {
try {
inflightRequests.arriveAndDeregister();
if (response.isSuccessful()) {
log.debug("Upload done");
LOGGER.debug("Upload done");
retryPolicy.failures.remove(call);
} else {
ResponseBody body = response.body();
// Retrieve body content for detailed error messages
Expand All @@ -301,6 +368,11 @@ public void onResponse(final Call call, final Response response) {
response.message(),
response.code());
}
if (response.code() >= 500 || response.code() == 408 || response.code() == 429) {
handleRetry(call, retryPolicy.maxFailures);
} else {
retryPolicy.failures.remove(call);
}
}
} finally {
response.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,10 @@ public void skipSnapshot() {
DebuggerMetrics debuggerMetrics = spy(DebuggerMetrics.getInstance(config));
SnapshotSink snapshotSink =
new SnapshotSink(
config, "", new BatchUploader(config, config.getFinalDebuggerSnapshotUrl()));
config,
"",
new BatchUploader(
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY));
SymbolSink symbolSink = new SymbolSink(config);
DebuggerSink sink =
new DebuggerSink(config, "", debuggerMetrics, probeStatusSink, snapshotSink, symbolSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static class SymbolUploaderMock extends BatchUploader {
final List<MultiPartContent> multiPartContents = new ArrayList<>();

public SymbolUploaderMock() {
super(Config.get(), "http://localhost");
super(Config.get(), "http://localhost", SymbolSink.RETRY_POLICY);
}

@Override
Expand Down
Loading

0 comments on commit 893bd2d

Please sign in to comment.