Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable HTTP Sink request retries #120

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| format | required | Specify what format to use. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.delivery-guarantee | optional | Defines the delivery semantic for the HTTP sink. Accepted enumerations are 'at-least-once', and 'none' (actually 'none' is the same as 'at-most-once'. 'exactly-once' semantic is not supported. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
Expand Down Expand Up @@ -573,9 +574,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json
- Think about Retry Policy for Http Request
- Check other `//TODO`'s.

### HTTP Sink
- Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented.

###
[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join
</br>
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/getindata/connectors/http/HttpSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Properties;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.ElementConverter;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
Expand All @@ -54,6 +56,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
deliveryGuarantee,
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.Properties;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.Preconditions;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
Expand Down Expand Up @@ -71,6 +73,8 @@ public class HttpSinkBuilder<InputT> extends

private final Properties properties = new Properties();

private DeliveryGuarantee deliveryGuarantee;

// Mandatory field
private String endpointUrl;

Expand All @@ -92,6 +96,17 @@ public class HttpSinkBuilder<InputT> extends
this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR;
}

/**
* @param deliveryGuarantee HTTP Sink delivery guarantee
* @return {@link HttpSinkBuilder} itself
*/
public HttpSinkBuilder<InputT> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE,
"Only at-least-once and none delivery guarantees are supported.");
this.deliveryGuarantee = deliveryGuarantee;
return this;
}

/**
* @param endpointUrl the URL of the endpoint
* @return {@link HttpSinkBuilder} itself
Expand Down Expand Up @@ -181,6 +196,7 @@ public HttpSink<InputT> build() {
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
Optional.ofNullable(deliveryGuarantee).orElse(DeliveryGuarantee.NONE),
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.getindata.connectors.http.internal;

import java.util.List;
import java.util.stream.Collectors;

import lombok.Data;
import lombok.NonNull;
Expand All @@ -11,21 +12,36 @@

/**
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
* to write, divided into two lists &mdash; successful and failed ones.
* to write.
*/
@Data
@ToString
public class SinkHttpClientResponse {

/**
* A list of successfully written requests.
* A list of requests along with write status.
*/
@NonNull
private final List<HttpRequest> successfulRequests;
private final List<ResponseItem> requests;

/**
* A list of requests that {@link SinkHttpClient} failed to write.
*/
@NonNull
private final List<HttpRequest> failedRequests;
public List<HttpRequest> getSuccessfulRequests() {
return requests.stream()
.filter(ResponseItem::isSuccessful)
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getFailedRequests() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a google and found. https://stackoverflow.com/questions/47680711/which-http-errors-should-never-trigger-an-automatic-retry . It seems to be that you should only retry if the status code is retriable. Maybe group into successful, retriable and failed (no retry).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Good point. I may refactor the code so that there are 3 statuses as you suggested. The change may also affect lookup, not only sink. So as a "side-effect" I may introduce lookup retry feature :)

return requests.stream()
.filter(r -> !r.isSuccessful())
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

@Data
@ToString
public static class ResponseItem {
private final HttpRequest request;
private final boolean successful;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.Properties;

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class HttpSinkInternal<InputT> extends AsyncSinkBase<InputT, HttpSinkRequ

private final String endpointUrl;

private final DeliveryGuarantee deliveryGuarantee;

// having Builder instead of an instance of `SinkHttpClient`
// makes it possible to serialize `HttpSink`
private final SinkHttpClientBuilder sinkHttpClientBuilder;
Expand All @@ -79,6 +82,7 @@ protected HttpSinkInternal(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
Expand All @@ -94,9 +98,9 @@ protected HttpSinkInternal(
maxTimeInBufferMS,
maxRecordSizeInBytes
);

Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl),
"The endpoint URL must be set when initializing HTTP Sink.");
this.deliveryGuarantee = deliveryGuarantee;
this.endpointUrl = endpointUrl;
this.httpPostRequestCallback =
Preconditions.checkNotNull(
Expand Down Expand Up @@ -132,6 +136,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
deliveryGuarantee,
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand Down Expand Up @@ -159,6 +164,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
deliveryGuarantee,
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.getindata.connectors.http.internal.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -10,13 +11,19 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

Expand All @@ -32,6 +39,9 @@
@Slf4j
public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequestEntry> {

private static final int AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE = 10;
private static final double AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR = 0.99D;

private static final String HTTP_SINK_WRITER_THREAD_POOL_SIZE = "4";

/**
Expand All @@ -45,6 +55,8 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ

private final Counter numRecordsSendErrorsCounter;

private final DeliveryGuarantee deliveryGuarantee;

public HttpSinkWriter(
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
Sink.InitContext context,
Expand All @@ -54,13 +66,26 @@ public HttpSinkWriter(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
SinkHttpClient sinkHttpClient,
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
Properties properties) {

super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
super(
elementConverter,
context,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeInBytes(maxBatchSizeInBytes)
.setMaxInFlightRequests(maxInFlightRequests)
.setMaxBufferedRequests(maxBufferedRequests)
.setMaxTimeInBufferMS(maxTimeInBufferMS)
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
.setRateLimitingStrategy(
buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize))
.build(),
bufferedRequestStates);
this.deliveryGuarantee = deliveryGuarantee;
this.endpointUrl = endpointUrl;
this.sinkHttpClient = sinkHttpClient;

Expand All @@ -79,6 +104,19 @@ public HttpSinkWriter(
"http-sink-writer-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
}

private static RateLimitingStrategy buildRateLimitingStrategy(
int maxInFlightRequests, int maxBatchSize) {
return CongestionControlRateLimitingStrategy.builder()
.setMaxInFlightRequests(maxInFlightRequests)
.setInitialMaxInFlightMessages(maxBatchSize)
.setScalingStrategy(
AIMDScalingStrategy.builder(maxBatchSize * maxInFlightRequests)
.setIncreaseRate(AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE)
.setDecreaseFactor(AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR)
.build())
.build();
}

// TODO: Reintroduce retries by adding backoff policy
@Override
protected void submitRequestEntries(
Expand All @@ -87,37 +125,61 @@ protected void submitRequestEntries(
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
future.whenCompleteAsync((response, err) -> {
if (err != null) {
int failedRequestsNumber = requestEntries.size();
log.error(
"Http Sink fatally failed to write all {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

// TODO: Make `HttpSinkInternal` retry the failed requests.
// Currently, it does not retry those at all, only adds their count
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.
//requestResult.accept(requestEntries);
} else if (response.getFailedRequests().size() > 0) {
int failedRequestsNumber = response.getFailedRequests().size();
log.error("Http Sink failed to write and will retry {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

// TODO: Make `HttpSinkInternal` retry the failed requests. Currently,
// it does not retry those at all, only adds their count to the
// `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.

//requestResult.accept(response.getFailedRequests());
//} else {
//requestResult.accept(Collections.emptyList());
//}
handleFullyFailedRequest(err, requestEntries, requestResult);
} else if (response.getRequests().stream().anyMatch(r -> !r.isSuccessful())) {
handlePartiallyFailedRequest(response, requestEntries, requestResult);
} else {
requestResult.accept(Collections.emptyList());
}
requestResult.accept(Collections.emptyList());
}, sinkWriterThreadPool);
}

private void handleFullyFailedRequest(Throwable err,
List<HttpSinkRequestEntry> requestEntries,
Consumer<List<HttpSinkRequestEntry>> requestResult) {
int failedRequestsNumber = requestEntries.size();
log.error(
"Http Sink fatally failed to write all {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) {
// Retry all requests.
requestResult.accept(requestEntries);
} else if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// Do not retry failed requests.
requestResult.accept(Collections.emptyList());
}
}

private void handlePartiallyFailedRequest(SinkHttpClientResponse response,
List<HttpSinkRequestEntry> requestEntries,
Consumer<List<HttpSinkRequestEntry>> requestResult) {
long failedRequestsNumber = response.getRequests().stream()
.filter(r -> !r.isSuccessful())
.count();
log.error("Http Sink failed to write and will retry {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) {
// Assumption: the order of response.requests is the same as requestEntries.
// See com.getindata.connectors.http.internal.sink.httpclient.
// JavaNetSinkHttpClient#putRequests where requests are submitted sequentially and
// then their futures are joined sequentially too.
List<HttpSinkRequestEntry> failedRequestEntries = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have multiple values for the same key in one list of requestEntries? It seems to me that by introducing retry mechanism we can interfere with final entries ordering. That may be the case, when latest key entry is saved successfully and then older is retried, effectively overwriting latest value. Do I read that correctly?

for (int i = 0; i < response.getRequests().size(); ++i) {
if (!response.getRequests().get(i).isSuccessful()) {
failedRequestEntries.add(requestEntries.get(i));
}
}
requestResult.accept(failedRequestEntries);
} else if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// Do not retry failed requests.
requestResult.accept(Collections.emptyList());
}
}

@Override
protected long getSizeInBytes(HttpSinkRequestEntry s) {
return s.getSizeInBytes();
Expand Down
Loading
Loading