Skip to content

Commit

Permalink
Basic wiring for failed request queue
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Aug 12, 2024
1 parent e349381 commit f508f36
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 45 deletions.
3 changes: 1 addition & 2 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public Mono<Void> reindex(
IDocumentMigrationContexts.IDocumentReindexContext context
) {

return documentStream.map(BulkDocSection::new) // Convert each Document to part of a bulk
// operation
return documentStream.map(BulkDocSection::new)
.buffer(numDocsPerBulkRequest) // Collect until you hit the batch size
.doOnNext(bulk -> logger.info("{} documents in current bulk request", bulk.size()))
.flatMap(
Expand Down
83 changes: 45 additions & 38 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.migrations.parsing.BulkResponseParser;

import com.fasterxml.jackson.databind.DeserializationFeature;
Expand All @@ -23,11 +20,15 @@
import com.rfs.common.http.HttpResponse;
import com.rfs.tracing.IRfsContexts;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

@Slf4j
public class OpenSearchClient {
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class);
private static final Logger failedRequestsLogger = LoggerFactory.getLogger("FailedRequestsLogger");
private static final ObjectMapper objectMapper = new ObjectMapper();

static {
Expand Down Expand Up @@ -123,7 +124,7 @@ private Optional<ObjectNode> createObjectIdempotent(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(checkIfItemExistsRetryStrategy)
.block();

Expand All @@ -150,7 +151,7 @@ private Optional<ObjectNode> createObjectIdempotent(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(createItemExistsRetryStrategy)
.block();

Expand Down Expand Up @@ -186,7 +187,7 @@ public void registerSnapshotRepo(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(snapshotRetryStrategy)
.block();
}
Expand Down Expand Up @@ -216,7 +217,7 @@ public void createSnapshot(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(snapshotRetryStrategy)
.block();
}
Expand Down Expand Up @@ -244,7 +245,7 @@ public Optional<ObjectNode> getSnapshotStatus(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(snapshotRetryStrategy)
.block();

Expand All @@ -269,46 +270,52 @@ public Optional<ObjectNode> getSnapshotStatus(
}

Retry getBulkRetryStrategy() {
return Retry.backoff(6, Duration.ofSeconds(2)).maxBackoff(Duration.ofSeconds(60));
return Retry.backoff(10, Duration.ofSeconds(2)).maxBackoff(Duration.ofSeconds(60));
}

public Mono<BulkResponse> sendBulkRequest(String indexName, List<BulkDocSection> docs, IRfsContexts.IRequestContext context) {
String targetPath = indexName + "/_bulk";
var maxAttempts = 6;

Map<String, BulkDocSection> docsMap = docs.stream().collect(Collectors.toMap(d -> d.getDocId(), d -> d));
for (int i = 0; i < maxAttempts; i++) {

logger.warn("Creating bulk body with " + docsMap.keySet());
return Mono.defer(() -> {
log.atTrace().setMessage("Creating bulk body with document ids {}").addArgument(docsMap.keySet());
if (docsMap.isEmpty()) {
return Mono.empty();
}
var body = BulkDocSection.convertToBulkRequestBody(docsMap.values());
try {
var outerResponse = client.postAsync(targetPath, body, context)
.flatMap(response -> {
var resp = new BulkResponse(response.statusCode, response.statusText, response.headers, response.body);
if (!resp.hasBadStatusCode() && !resp.hasFailedOperations()) {
return Mono.just(resp);
}
// Remove all successful documents for the next bulk request attempt
resp.getSuccessfulDocs().forEach(docsMap::remove);

logger.error(resp.getFailureMessage());
return Mono.error(new OperationFailed(resp.getFailureMessage(), resp));
}).blockOptional();

if (outerResponse.isPresent()) {
return Mono.just(outerResponse.get());
}
} catch (OperationFailed of) {
if (i + 1 < maxAttempts) {
continue;
return client.postAsync(targetPath, body, context)
.flatMap(response -> {
var resp = new BulkResponse(response.statusCode, response.statusText, response.headers, response.body);
if (!resp.hasBadStatusCode() && !resp.hasFailedOperations()) {
return Mono.just(resp);
}
// Remove all successful documents for the next bulk request attempt
var successfulDocs = resp.getSuccessfulDocs();
successfulDocs.forEach(docsMap::remove);
log.info("After bulk request on index '{}', {} more documents have succeed, {} remain", indexName, successfulDocs.size(), docsMap.size());

log.error(resp.getFailureMessage());
return Mono.error(new OperationFailed(resp.getFailureMessage(), resp));
});
})
.retryWhen(getBulkRetryStrategy())
.doOnError(error -> {
if (!docsMap.isEmpty()) {
final String response;
if (error instanceof OperationFailed) {
response = ((OperationFailed)error).response.body;
} else {
response = error.getMessage();
}
throw of;

failedRequestsLogger.atInfo()
.setMessage("Bulk request failed for {} documents, request followed by response.\n{},\n{}")
.addArgument(docsMap.size())
.addArgument(BulkDocSection.convertToBulkRequestBody(docsMap.values()))
.addArgument(response)
.log();
}
}
return Mono.empty();
});
}

public HttpResponse refresh(IRfsContexts.IRequestContext context) {
Expand Down Expand Up @@ -340,7 +347,7 @@ public List<String> getSuccessfulDocs() {
try {
return BulkResponseParser.findSuccessDocs(body);
} catch (IOException ioe) {
logger.warn("Unable to process bulk request for success", ioe);
log.warn("Unable to process bulk request for success", ioe);
return List.of();
}
}
Expand Down
6 changes: 1 addition & 5 deletions RFS/src/test/java/com/rfs/common/OpenSearchClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@
import org.junit.jupiter.api.Test;

import com.rfs.common.DocumentReindexer.BulkDocSection;
import com.rfs.common.OpenSearchClient.OperationFailed;
import com.rfs.common.http.HttpResponse;
import com.rfs.tracing.IRfsContexts;
import com.rfs.tracing.IRfsContexts.ICheckedIdempotentPutRequestContext;
import com.rfs.tracing.IRfsContexts.IRequestContext;

import lombok.SneakyThrows;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.hamcrest.CoreMatchers.containsString;
Expand All @@ -27,7 +24,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -232,7 +228,7 @@ void testBulkRequest() {

var bulkDocs = List.of(dockSection, dockSection2);
var openSearchClient = spy(new OpenSearchClient(restClient));
doReturn(Retry.fixedDelay(2, Duration.ofMillis(10)).maxAttempts(2)).when(openSearchClient).getBulkRetryStrategy();
doReturn(Retry.fixedDelay(4, Duration.ofMillis(10))).when(openSearchClient).getBulkRetryStrategy();

var responseMono = openSearchClient.sendBulkRequest("myIndex", bulkDocs, mock(IRfsContexts.IRequestContext.class));

Expand Down

0 comments on commit f508f36

Please sign in to comment.