Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(backend): Refactor of ReindexThread (#30109) #30110

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@


import com.dotmarketing.beans.Host;
import com.dotmarketing.business.CacheLocator;
import com.google.common.collect.ImmutableList;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.exception.DotDataException;

import com.dotmarketing.util.Logger;
import com.liferay.util.StringPool;

import io.vavr.control.Try;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -32,75 +32,123 @@ public class BulkProcessorListener implements BulkProcessor.Listener {

final Map<String, ReindexEntry> workingRecords;

final static List<String> RESERVED_IDS = List.of(Host.SYSTEM_HOST);
static final List<String> RESERVED_IDS = List.of(Host.SYSTEM_HOST);

private long contentletsIndexed;

AtomicInteger totalWorkingRecords = new AtomicInteger(0);
AtomicInteger totalResponses = new AtomicInteger(0);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failureCount = new AtomicInteger(0);

BlockingQueue<ReindexResult> queue = new LinkedBlockingQueue<>();

BulkProcessorListener () {
this.workingRecords = new HashMap<>();
this.workingRecords = new ConcurrentHashMap<>();
}

public long getContentletsIndexed(){
return contentletsIndexed;
}

public int getTotalResponses(){
return totalResponses.get();
}

public int getSuccessCount(){
return successCount.get();
}
public int getFailureCount(){
return failureCount.get();
}
public Map<String, ReindexEntry> getWorkingRecords(){
return workingRecords;
}

public BlockingQueue<ReindexResult> getQueue(){
return queue;
}

public void addWorkingRecord(Map<String,ReindexEntry> entries) {
for ( Map.Entry<String,ReindexEntry> entrySet : entries.entrySet()) {
ReindexEntry previousEntry = workingRecords.put(entrySet.getKey(), entrySet.getValue());
if (previousEntry == null) {
totalWorkingRecords.incrementAndGet();
} else {
Logger.warn(this.getClass(), "ReindexEntry already exists for id: " + entrySet.getKey());
}
}

}

@Override
public void beforeBulk(final long executionId, final BulkRequest request) {

String serverId=APILocator.getServerAPI().readServerId();
List<String> servers = Try.of(()->APILocator.getServerAPI().getReindexingServers()).getOrElse(ImmutableList.of(APILocator.getServerAPI().readServerId()));
List<String> servers = Try.of(()->APILocator.getServerAPI().getReindexingServers()).getOrElse(List.of(APILocator.getServerAPI().readServerId()));
Logger.info(this.getClass(), "-----------");
Logger.info(this.getClass(), "Reindexing Server # : " + (servers.indexOf(serverId)+1) + " of " + servers.size());
Logger.info(this.getClass(), "Total Indexed : " + contentletsIndexed);
Logger.info(this.getClass(), "ReindexEntries found : " + workingRecords.size());
Logger.info(this.getClass(), "ReindexEntries found : " + totalWorkingRecords.get());
Logger.info(this.getClass(), "BulkRequests created : " + request.numberOfActions());

contentletsIndexed += request.numberOfActions();
final Optional<String> duration = APILocator.getContentletIndexAPI().reindexTimeElapsed();
if (duration.isPresent()) {
Logger.info(this, "Full Reindex Elapsed : " + duration.get() + "");
}
duration.ifPresent(s -> Logger.info(this, "Full Reindex Elapsed : " + s));
Logger.info(this.getClass(), "-----------");
}

@Override
public void afterBulk(final long executionId, final BulkRequest request, final BulkResponse response) {
Logger.debug(this.getClass(), "Bulk process completed");
final List<ReindexEntry> successful = new ArrayList<>();
float totalResponses=0;

for (BulkItemResponse bulkItemResponse : response) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
totalResponses++;
String id;
if (bulkItemResponse.isFailed() || itemResponse == null) {
totalResponses.incrementAndGet();
String id = getIdFromResponse(bulkItemResponse);

final String reservedId = getMatchingReservedIdIfAny(bulkItemResponse.getFailure().getId());
ReindexEntry idx = workingRecords.remove(id);
if (idx == null) continue;

id = reservedId!=null ? reservedId: bulkItemResponse.getFailure().getId().substring(0,
bulkItemResponse.getFailure().getId().indexOf(StringPool.UNDERLINE));
if (bulkItemResponse.isFailed() || bulkItemResponse.getResponse() == null) {
addErrorToQueue(idx, bulkItemResponse.getFailure().getMessage());
} else {
final String reservedId = getMatchingReservedIdIfAny(itemResponse.getId());

id = reservedId!=null ? reservedId: itemResponse.getId()
.substring(0, itemResponse.getId().indexOf(StringPool.UNDERLINE));
addSuccessToQueue(idx);
}
}

ReindexEntry idx = workingRecords.get(id);
if (idx == null) {
continue;
if (shouldRebuildBulkProcessor()) {
ReindexThread.rebuildBulkIndexer();
}
}

private String getIdFromResponse(BulkItemResponse bulkItemResponse) {
String id;
if (bulkItemResponse.isFailed() || bulkItemResponse.getResponse() == null) {
id = getMatchingReservedIdIfAny(bulkItemResponse.getFailure().getId());
if (id == null) {
id = bulkItemResponse.getFailure().getId().split(StringPool.UNDERLINE)[0];
}
if (bulkItemResponse.isFailed() || itemResponse == null) {
handleFailure(idx,
"bulk index failure:" + bulkItemResponse.getFailure().getMessage());
} else {
successful.add(idx);
} else {
id = getMatchingReservedIdIfAny(bulkItemResponse.getResponse().getId());
if (id == null) {
id = bulkItemResponse.getResponse().getId().split(StringPool.UNDERLINE)[0];
}
}
handleSuccess(successful);
// 50% failure rate forces a rebuild of the BulkProcessor
if(totalResponses==0 || (successful.size() / totalResponses < .5)) {
ReindexThread.rebuildBulkIndexer();
}
return id;
}

private boolean shouldRebuildBulkProcessor() {
return totalResponses.get() == 0 || ((double) successCount.get() / totalResponses.get() < 0.5);
}

private void addSuccessToQueue(ReindexEntry idx) {
successCount.incrementAndGet();
queue.add(new ReindexResult(idx));
}

private void addErrorToQueue(ReindexEntry idx, String errorMessage) {
failureCount.incrementAndGet();
queue.add(new ReindexResult(idx, errorMessage));
}

static String getMatchingReservedIdIfAny(String id) {
Expand All @@ -120,26 +168,23 @@ static String getMatchingReservedIdIfAny(String id) {
public void afterBulk(final long executionId, final BulkRequest request, final Throwable failure) {
Logger.error(ReindexThread.class, "Bulk process failed entirely:" + failure.getMessage(),
failure);
workingRecords.values().forEach(idx -> handleFailure(idx, failure.getMessage()));
workingRecords.values().forEach(idx -> addErrorToQueue(idx, failure.getMessage()));
}

private void handleSuccess(final List<ReindexEntry> successful) {

try {
if (!successful.isEmpty()) {
APILocator.getReindexQueueAPI().deleteReindexEntry(successful);
CacheLocator.getESQueryCache().clearCache();
}
} catch (DotDataException e) {
Logger.warnAndDebug(this.getClass(), "unable to delete indexjournal:" + e.getMessage(), e);
}
}

private void handleFailure(final ReindexEntry idx, final String cause) {
try {
APILocator.getReindexQueueAPI().markAsFailed(idx, cause);
} catch (DotDataException e) {
Logger.warnAndDebug(this.getClass(), "unable to reque indexjournal:" + idx, e);
public static class ReindexResult {
String error;
ReindexEntry entry;
boolean success;
public ReindexResult(ReindexEntry entry, String error) {
this.entry = entry;
this.error = error;
success = false;
}
public ReindexResult(ReindexEntry entry) {
this.entry = entry;
success = true;
}
}
}
Loading