diff --git a/dotCMS/src/main/java/com/dotmarketing/common/reindex/BulkProcessorListener.java b/dotCMS/src/main/java/com/dotmarketing/common/reindex/BulkProcessorListener.java index f16e1ff25ff8..002272c1c68e 100644 --- a/dotCMS/src/main/java/com/dotmarketing/common/reindex/BulkProcessorListener.java +++ b/dotCMS/src/main/java/com/dotmarketing/common/reindex/BulkProcessorListener.java @@ -2,21 +2,21 @@ import com.dotmarketing.beans.Host; -import com.dotmarketing.business.CacheLocator; import com.google.common.collect.ImmutableList; import com.dotmarketing.business.APILocator; -import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; import com.liferay.util.StringPool; import io.vavr.control.Try; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; @@ -32,75 +32,123 @@ public class BulkProcessorListener implements BulkProcessor.Listener { final Map workingRecords; - final static List RESERVED_IDS = List.of(Host.SYSTEM_HOST); + static final List RESERVED_IDS = List.of(Host.SYSTEM_HOST); private long contentletsIndexed; + AtomicInteger totalWorkingRecords = new AtomicInteger(0); + AtomicInteger totalResponses = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failureCount = new AtomicInteger(0); + + BlockingQueue queue = new LinkedBlockingQueue<>(); + BulkProcessorListener () { - this.workingRecords = new HashMap<>(); + this.workingRecords = new ConcurrentHashMap<>(); } public long getContentletsIndexed(){ return contentletsIndexed; } + public int getTotalResponses(){ + return totalResponses.get(); + } + + public int getSuccessCount(){ + return successCount.get(); + } + public int getFailureCount(){ + return failureCount.get(); + } + public Map getWorkingRecords(){ + return workingRecords; + } + + public BlockingQueue getQueue(){ + return queue; + } + + public void addWorkingRecord(Map entries) { + for ( Map.Entry entrySet : entries.entrySet()) { + ReindexEntry previousEntry = workingRecords.put(entrySet.getKey(), entrySet.getValue()); + if (previousEntry == null) { + totalWorkingRecords.incrementAndGet(); + } else { + Logger.warn(this.getClass(), "ReindexEntry already exists for id: " + entrySet.getKey()); + } + } + + } + @Override public void beforeBulk(final long executionId, final BulkRequest request) { - + String serverId=APILocator.getServerAPI().readServerId(); - List servers = Try.of(()->APILocator.getServerAPI().getReindexingServers()).getOrElse(ImmutableList.of(APILocator.getServerAPI().readServerId())); + List servers = Try.of(()->APILocator.getServerAPI().getReindexingServers()).getOrElse(List.of(APILocator.getServerAPI().readServerId())); Logger.info(this.getClass(), "-----------"); Logger.info(this.getClass(), "Reindexing Server # : " + (servers.indexOf(serverId)+1) + " of " + servers.size()); Logger.info(this.getClass(), "Total Indexed : " + contentletsIndexed); - Logger.info(this.getClass(), "ReindexEntries found : " + workingRecords.size()); + Logger.info(this.getClass(), "ReindexEntries found : " + totalWorkingRecords.get()); Logger.info(this.getClass(), "BulkRequests created : " + request.numberOfActions()); contentletsIndexed += request.numberOfActions(); final Optional duration = APILocator.getContentletIndexAPI().reindexTimeElapsed(); - if (duration.isPresent()) { - Logger.info(this, "Full Reindex Elapsed : " + duration.get() + ""); - } + duration.ifPresent(s -> Logger.info(this, "Full Reindex Elapsed : " + s)); Logger.info(this.getClass(), "-----------"); } @Override public void afterBulk(final long executionId, final BulkRequest request, final BulkResponse response) { Logger.debug(this.getClass(), "Bulk process completed"); - final List successful = new ArrayList<>(); - float totalResponses=0; + for (BulkItemResponse bulkItemResponse : response) { - DocWriteResponse itemResponse = bulkItemResponse.getResponse(); - totalResponses++; - String id; - if (bulkItemResponse.isFailed() || itemResponse == null) { + totalResponses.incrementAndGet(); + String id = getIdFromResponse(bulkItemResponse); - final String reservedId = getMatchingReservedIdIfAny(bulkItemResponse.getFailure().getId()); + ReindexEntry idx = workingRecords.remove(id); + if (idx == null) continue; - id = reservedId!=null ? reservedId: bulkItemResponse.getFailure().getId().substring(0, - bulkItemResponse.getFailure().getId().indexOf(StringPool.UNDERLINE)); + if (bulkItemResponse.isFailed() || bulkItemResponse.getResponse() == null) { + addErrorToQueue(idx, bulkItemResponse.getFailure().getMessage()); } else { - final String reservedId = getMatchingReservedIdIfAny(itemResponse.getId()); - - id = reservedId!=null ? reservedId: itemResponse.getId() - .substring(0, itemResponse.getId().indexOf(StringPool.UNDERLINE)); + addSuccessToQueue(idx); } + } - ReindexEntry idx = workingRecords.get(id); - if (idx == null) { - continue; + if (shouldRebuildBulkProcessor()) { + ReindexThread.rebuildBulkIndexer(); + } + } + + private String getIdFromResponse(BulkItemResponse bulkItemResponse) { + String id; + if (bulkItemResponse.isFailed() || bulkItemResponse.getResponse() == null) { + id = getMatchingReservedIdIfAny(bulkItemResponse.getFailure().getId()); + if (id == null) { + id = bulkItemResponse.getFailure().getId().split(StringPool.UNDERLINE)[0]; } - if (bulkItemResponse.isFailed() || itemResponse == null) { - handleFailure(idx, - "bulk index failure:" + bulkItemResponse.getFailure().getMessage()); - } else { - successful.add(idx); + } else { + id = getMatchingReservedIdIfAny(bulkItemResponse.getResponse().getId()); + if (id == null) { + id = bulkItemResponse.getResponse().getId().split(StringPool.UNDERLINE)[0]; } } - handleSuccess(successful); - // 50% failure rate forces a rebuild of the BulkProcessor - if(totalResponses==0 || (successful.size() / totalResponses < .5)) { - ReindexThread.rebuildBulkIndexer(); - } + return id; + } + + private boolean shouldRebuildBulkProcessor() { + return totalResponses.get() == 0 || ((double) successCount.get() / totalResponses.get() < 0.5); + } + + private void addSuccessToQueue(ReindexEntry idx) { + successCount.incrementAndGet(); + queue.add(new ReindexResult(idx)); + } + + private void addErrorToQueue(ReindexEntry idx, String errorMessage) { + failureCount.incrementAndGet(); + queue.add(new ReindexResult(idx, errorMessage)); } static String getMatchingReservedIdIfAny(String id) { @@ -120,26 +168,23 @@ static String getMatchingReservedIdIfAny(String id) { public void afterBulk(final long executionId, final BulkRequest request, final Throwable failure) { Logger.error(ReindexThread.class, "Bulk process failed entirely:" + failure.getMessage(), failure); - workingRecords.values().forEach(idx -> handleFailure(idx, failure.getMessage())); + workingRecords.values().forEach(idx -> addErrorToQueue(idx, failure.getMessage())); } - private void handleSuccess(final List successful) { - try { - if (!successful.isEmpty()) { - APILocator.getReindexQueueAPI().deleteReindexEntry(successful); - CacheLocator.getESQueryCache().clearCache(); - } - } catch (DotDataException e) { - Logger.warnAndDebug(this.getClass(), "unable to delete indexjournal:" + e.getMessage(), e); - } - } - private void handleFailure(final ReindexEntry idx, final String cause) { - try { - APILocator.getReindexQueueAPI().markAsFailed(idx, cause); - } catch (DotDataException e) { - Logger.warnAndDebug(this.getClass(), "unable to reque indexjournal:" + idx, e); + public static class ReindexResult { + String error; + ReindexEntry entry; + boolean success; + public ReindexResult(ReindexEntry entry, String error) { + this.entry = entry; + this.error = error; + success = false; + } + public ReindexResult(ReindexEntry entry) { + this.entry = entry; + success = true; } } } diff --git a/dotCMS/src/main/java/com/dotmarketing/common/reindex/ReindexThread.java b/dotCMS/src/main/java/com/dotmarketing/common/reindex/ReindexThread.java index c5992c26fe98..77d37ecaa57b 100644 --- a/dotCMS/src/main/java/com/dotmarketing/common/reindex/ReindexThread.java +++ b/dotCMS/src/main/java/com/dotmarketing/common/reindex/ReindexThread.java @@ -1,9 +1,11 @@ package com.dotmarketing.common.reindex; +import com.dotmarketing.common.reindex.BulkProcessorListener.ReindexResult; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import com.dotcms.api.system.event.Visibility; import com.dotcms.business.SystemCache; -import com.dotcms.concurrent.DotConcurrentFactory; -import com.dotcms.concurrent.DotSubmitter; import com.dotcms.content.elasticsearch.business.ContentletIndexAPI; import com.dotcms.content.elasticsearch.business.ElasticReadOnlyCommand; import com.dotcms.content.elasticsearch.util.ESReindexationProcessStatus; @@ -11,27 +13,32 @@ import com.dotcms.notifications.bean.NotificationType; import com.dotcms.notifications.business.NotificationAPI; import com.dotcms.util.I18NMessage; -import com.dotmarketing.business.*; +import com.dotmarketing.business.APILocator; +import com.dotmarketing.business.CacheLocator; +import com.dotmarketing.business.Role; +import com.dotmarketing.business.RoleAPI; +import com.dotmarketing.business.UserAPI; import com.dotmarketing.db.DbConnectionFactory; import com.dotmarketing.db.HibernateUtil; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Config; import com.dotmarketing.util.Logger; + import com.dotmarketing.util.ThreadUtils; import com.google.common.annotations.VisibleForTesting; -import com.liferay.portal.language.LanguageException; import com.liferay.portal.model.User; import io.vavr.Lazy; -import org.apache.felix.framework.OSGISystem; -import org.elasticsearch.action.bulk.BulkProcessor; - -import java.sql.SQLException; import java.time.Duration; import java.util.Map; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; +import org.elasticsearch.action.bulk.BulkProcessor; /** * This thread is in charge of re-indexing the contenlet information placed in the @@ -69,9 +76,15 @@ */ public class ReindexThread { - private enum ThreadState { - STOPPED, PAUSED, RUNNING; - } + + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean paused = new AtomicBoolean(false); + private final ScheduledExecutorService executor; + + AtomicBoolean shutdownRequested = new AtomicBoolean(false); + + private CompletableFuture currentTask = CompletableFuture.completedFuture(null); private final ContentletIndexAPI indexAPI; private final ReindexQueueAPI queueApi; @@ -79,19 +92,15 @@ private enum ThreadState { private final RoleAPI roleAPI; private final UserAPI userAPI; - private static ReindexThread instance; + private static final ReindexThread INSTANCE = new ReindexThread(); - private final long SLEEP = Config.getLongProperty("REINDEX_THREAD_SLEEP", 250); - private final int SLEEP_ON_ERROR = Config.getIntProperty("REINDEX_THREAD_SLEEP_ON_ERROR", 500); - private long contentletsIndexed = 0; + private final long sleep = Config.getLongProperty("REINDEX_THREAD_SLEEP", 250); + private final int sleepOnError = Config.getIntProperty("REINDEX_THREAD_SLEEP_ON_ERROR", 500); + private final AtomicLong contentletsIndexed = new AtomicLong(0); // bulk up to this many requests public static final int ELASTICSEARCH_BULK_ACTIONS = Config.getIntProperty("REINDEX_THREAD_ELASTICSEARCH_BULK_ACTIONS", 10); - // How often should the bulk request processor should flush its request - default 3 seconds - public static final int ELASTICSEARCH_BULK_FLUSH_INTERVAL = - Config.getIntProperty("REINDEX_THREAD_ELASTICSEARCH_BULK_FLUSH_INTERVAL_MS", 3000); - // Setting this to number > 0 makes each bulk request asynchronous, // If set to 0 the bulk requests will be performed synchronously public static final int ELASTICSEARCH_CONCURRENT_REQUESTS = @@ -116,13 +125,10 @@ private enum ThreadState { "WAIT_BEFORE_PAUSE_SECONDS", 0); - private AtomicReference state = new AtomicReference<>(ThreadState.STOPPED); - + private static final String REINDEX_THREAD_PAUSED = "REINDEX_THREAD_PAUSED"; + private static final Lazy cache = Lazy.of(() -> CacheLocator.getSystemCache()); - private final static String REINDEX_THREAD_PAUSED = "REINDEX_THREAD_PAUSED"; - private final static Lazy cache = Lazy.of(() -> CacheLocator.getSystemCache()); - - private final static AtomicBoolean rebuildBulkIndexer = new AtomicBoolean(false); + private static final AtomicBoolean rebuildBulkIndexer = new AtomicBoolean(false); public static void rebuildBulkIndexer() { Logger.warn(ReindexThread.class, "--- ReindexThread BulkProcessor needs to be Rebuilt"); @@ -145,52 +151,177 @@ private ReindexThread() { this.userAPI = userAPI; this.roleAPI = roleAPI; this.indexAPI = indexAPI; - instance = this; + this.executor = Executors.newSingleThreadScheduledExecutor(); + } + + public static ReindexThread getInstance() { + return INSTANCE; + } + + + public void start() { + if (running.compareAndSet(false, true)) { + + currentTask = CompletableFuture.runAsync(this::run, executor) + .exceptionally(ex -> { + Logger.error(ReindexThread.class, "ReindexThread Task failed: " + ex.getMessage()); + return null; + }) + .thenRun(() -> running.set(false)); + } + } + public void stop() { + if (running.compareAndSet(true, false)) { + shutdownRequested.set(true); + paused.set(false); + try { + currentTask.get(60, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + Logger.error(ReindexThread.class, "Error stopping ReindexThread", e); + } + } } - private final Runnable ReindexThreadRunnable = () -> { + public void setPaused() { + if (paused.compareAndSet(false, true)) { + Logger.debug(ReindexThread.class, "--- ReindexThread - Paused"); + cache.get().put(REINDEX_THREAD_PAUSED, System.currentTimeMillis() + Duration + .ofMinutes(Config.getIntProperty("REINDEX_THREAD_PAUSE_IN_MINUTES", 10)) + .toMillis()); + } + } + + public void unpauseInt() { + if (!Config.getBooleanProperty("ALLOW_MANUAL_REINDEX_UNPAUSE", false)) { + Logger.debug(ReindexThread.class, "--- Adding unpause commit listener"); + HibernateUtil.addCommitListener("unpauseIndex", this::unpauseImpl); + } else { + unpauseImpl(); + } + } + + private void unpauseImpl() { + if (paused.compareAndSet(true, false)) { + Logger.debug(ReindexThread.class, "--- ReindexThread - Unpaused"); + cache.get().remove(REINDEX_THREAD_PAUSED); + } + if (!running.get()) { + start(); + } + } + + + private void run() { Logger.info(this.getClass(), "--- ReindexThread is starting, background indexing will begin"); - - while (state.get() != ThreadState.STOPPED) { + BulkProcessorContext context = new BulkProcessorContext(null, null); + while (running.get() && !shutdownRequested.get() && !Thread.currentThread().isInterrupted()) { try { - runReindexLoop(); + runReindexLoop(context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } catch (Exception e) { - Logger.error(this.getClass(), e.getMessage(), e); + Logger.error(this, "ReindexThread Exception", e); } } Logger.warn(this.getClass(), "--- ReindexThread is stopping, background indexing will not take place"); - }; + } @VisibleForTesting long totalESPuts() { - return contentletsIndexed; + return contentletsIndexed.get(); } - private BulkProcessor closeBulkProcessor(final BulkProcessor bulkProcessor) - throws InterruptedException { - if (bulkProcessor != null) { - bulkProcessor.awaitClose(BULK_PROCESSOR_AWAIT_TIMEOUT, TimeUnit.SECONDS); + private synchronized void closeBulkProcessor(final BulkProcessorContext context) { + if (context.getBulkProcessor() != null) { + try { + boolean closed = context.getBulkProcessor().awaitClose(BULK_PROCESSOR_AWAIT_TIMEOUT, TimeUnit.SECONDS); + if (!closed) { + Logger.warn(this.getClass(), "BulkProcessor did not close within the timeout period."); + } + boolean isDone = handleResults(context.getBulkProcessorListener()); + if (!isDone) { + Logger.warn(this.getClass(), "BulkProcessor did not finish processing all records."); + } + } catch (InterruptedException e) { + Logger.error(this.getClass(), "Interrupted while waiting for BulkProcessor to close", e); + Thread.currentThread().interrupt(); // Restore the interrupted status + } catch (Exception e) { + Logger.error(this.getClass(), "Error closing BulkProcessor", e); + } + + // Perform any cleanup if necessary for BulkProcessorListener + context.setBulkProcessorListener(null); + context.setBulkProcessor(null); } + rebuildBulkIndexer.set(false); - return null; + } + + private boolean handleResults(BulkProcessorListener bulkProcessorListener) { + + List results = new ArrayList<>(); + bulkProcessorListener.getQueue().drainTo(results,250); + while (!results.isEmpty()) { + int failureCount = 0; + List success = new ArrayList<>(); + for (ReindexResult result : results) { + if (result.success) { + success.add(result.entry); + } else { + failureCount++; + handleFailure(result.entry, result.error); + } + } + handleSuccess(success); + Logger.info( + this.getClass(), + "Completed " + + results.size() + + " reindex requests with " + + failureCount + + " failures"); + results.clear(); + bulkProcessorListener.getQueue().drainTo(results,250); + } + return bulkProcessorListener.getWorkingRecords().isEmpty(); + } + private void handleSuccess(final List successful) { + + try { + if (!successful.isEmpty()) { + APILocator.getReindexQueueAPI().deleteReindexEntry(successful); + CacheLocator.getESQueryCache().clearCache(); + } + } catch (DotDataException e) { + Logger.warnAndDebug(this.getClass(), "unable to delete indexjournal:" + e.getMessage(), e); + } + } + + private void handleFailure(final ReindexEntry idx, final String cause) { + try { + APILocator.getReindexQueueAPI().markAsFailed(idx, cause); + } catch (DotDataException e) { + Logger.warnAndDebug(this.getClass(), "unable to reque indexjournal:" + idx, e); + } } - private BulkProcessor finalizeReIndex(BulkProcessor bulkProcessor) - throws InterruptedException, LanguageException, DotDataException, SQLException { - bulkProcessor = closeBulkProcessor(bulkProcessor); + private BulkProcessorContext finalizeReIndex(BulkProcessorContext context) throws DotDataException { + closeBulkProcessor(context); switchOverIfNeeded(); if (!indexAPI.isInFullReindex()) { ReindexThread.pause(); } - return bulkProcessor; - + return context; } @@ -201,56 +332,64 @@ private BulkProcessor finalizeReIndex(BulkProcessor bulkProcessor) * be sent to the user via the Notifications API to take care of the problem as soon as * possible. */ - private void runReindexLoop() { - BulkProcessor bulkProcessor = null; - BulkProcessorListener bulkProcessorListener = null; - while (state.get() != ThreadState.STOPPED) { - try { + private void runReindexLoop(BulkProcessorContext context) throws InterruptedException { + try { + processReindexing(context); + } catch (Exception ex) { + Logger.error(this, "ReindexThread Exception", ex); + closeBulkProcessor(context); + context.setBulkProcessor(null); + context.setBulkProcessorListener(null); + ThreadUtils.sleep(sleepOnError); + } finally { + DbConnectionFactory.closeSilently(); + } - final Map workingRecords = queueApi.findContentToReindex(); + while (paused.get()) { + handlePausedState(context); + } + } - if (workingRecords.isEmpty()) { - bulkProcessor = finalizeReIndex(bulkProcessor); - } + private void handlePausedState(BulkProcessorContext context) throws InterruptedException { + if (context.getBulkProcessorListener() != null && !context.getBulkProcessorListener().getWorkingRecords().isEmpty()) { + if (handleResults(context.getBulkProcessorListener())) { + Logger.info(this, "All running records processed, pausing"); + } else { + TimeUnit.MILLISECONDS.sleep(Math.min(sleep, 2000)); + } + } else { + TimeUnit.MILLISECONDS.sleep(sleep); + } + + Long restartTime = (Long) cache.get().get(REINDEX_THREAD_PAUSED); + if (restartTime == null || restartTime < System.currentTimeMillis()) { + unpauseImpl(); + } + } - if (!workingRecords.isEmpty() && !ElasticReadOnlyCommand.getInstance() - .isIndexOrClusterReadOnly()) { - Logger.debug(this, - "Found " + workingRecords + " index items to process"); - - if (bulkProcessor == null || rebuildBulkIndexer.get()) { - closeBulkProcessor(bulkProcessor); - bulkProcessorListener = new BulkProcessorListener(); - bulkProcessor = indexAPI.createBulkProcessor(bulkProcessorListener); - } - bulkProcessorListener.workingRecords.putAll(workingRecords); - indexAPI.appendToBulkProcessor(bulkProcessor, workingRecords.values()); - contentletsIndexed += bulkProcessorListener.getContentletsIndexed(); - // otherwise, reindex normally + private void processReindexing(BulkProcessorContext context) throws DotDataException { + final Map workingRecords = queueApi.findContentToReindex(); - } - } catch (Throwable ex) { - Logger.error(this, "ReindexThread Exception", ex); - ThreadUtils.sleep(SLEEP_ON_ERROR); - } finally { - DbConnectionFactory.closeSilently(); - } - while (state.get() == ThreadState.PAUSED) { - ThreadUtils.sleep(SLEEP); - //Logs every 60 minutes - Logger.infoEvery(ReindexThread.class, "--- ReindexThread Paused", - Config.getIntProperty("REINDEX_THREAD_PAUSE_IN_MINUTES", 60) * 60000); - Long restartTime = (Long) cache.get().get(REINDEX_THREAD_PAUSED); - if (restartTime == null || restartTime < System.currentTimeMillis()) { - state.set(ThreadState.RUNNING); - } + if (workingRecords.isEmpty()) { + finalizeReIndex(context); + return; + } + + if (!isClusterReadOnly() && !workingRecords.isEmpty()) { + try { + processRecords(workingRecords, context); + } catch (Exception e) { + Logger.error(this, "Error processing records", e); + closeBulkProcessor(context); + throw e; // Rethrow to handle in runReindexLoop } } + // Handle any results from the BulkProcessor + if (context.getBulkProcessorListener()!=null) + handleResults(context.getBulkProcessorListener()); } - - private boolean switchOverIfNeeded() - throws LanguageException, DotDataException, SQLException, InterruptedException { + private boolean switchOverIfNeeded() throws DotDataException { if (ESReindexationProcessStatus.inFullReindexation() && queueApi.recordsInQueue() == 0) { // The re-indexation process has finished successfully if (indexAPI.reindexSwitchover(false)) { @@ -262,93 +401,65 @@ private boolean switchOverIfNeeded() return false; } - /** - * Tells the thread to start processing. Starts the thread - */ + public static void startThread() { - unpause(); + getInstance().unpauseInt(); } - private void state(ThreadState state) { - getInstance().state.set(state); + public static void unpause() + { + getInstance().unpauseInt(); } /** * Tells the thread to stop processing. Doesn't shut down the thread. */ public static void stopThread() { - getInstance().state(ThreadState.STOPPED); - + getInstance().stop(); } - /** - * This instance is intended to already be started. It will try to restart the thread if - * instance is null. - */ - public static ReindexThread getInstance() { - if (instance == null) { - synchronized (ReindexThread.class) { - if (instance == null) { - return new ReindexThread().instance; - } - } + public static void pause() { + getInstance().setPaused(); + } - } - return instance; + public static boolean isWorking() { + return getInstance().isRunning(); } - public static void pause() { - Logger.debug(ReindexThread.class, "--- ReindexThread - Paused"); - cache.get().put(REINDEX_THREAD_PAUSED, System.currentTimeMillis() + Duration - .ofMinutes(Config.getIntProperty("REINDEX_THREAD_PAUSE_IN_MINUTES", 10)) - .toMillis()); - getInstance().state(ThreadState.PAUSED); + private boolean isRunning() { + return running.get(); } - public static void unpause() { - if (!Config.getBooleanProperty("ALLOW_MANUAL_REINDEX_UNPAUSE", false)) { - Logger.debug(ReindexThread.class, "--- Adding unpause commit listener"); - HibernateUtil.addCommitListener("unpauseIndex", ReindexThread::unpauseImpl); - } else { - unpauseImpl(); - } + private boolean isClusterReadOnly() { + return ElasticReadOnlyCommand.getInstance().isIndexOrClusterReadOnly(); } - private static void unpauseImpl() { + private void processRecords(Map workingRecords, BulkProcessorContext context) + throws DotDataException { + Logger.debug(this, "Found " + workingRecords.size() + " index items to process"); - ThreadState state = getInstance().state.get(); - if (state == ThreadState.PAUSED) { - Logger.info(ReindexThread.class, "--- Unpausing reindex thread "); - cache.get().remove(REINDEX_THREAD_PAUSED); - getInstance().state(ThreadState.RUNNING); - } else if (state == ThreadState.STOPPED) { - Logger.info(ReindexThread.class, "--- Recreating ReindexThread from stopped"); - OSGISystem.getInstance().initializeFramework(); - Logger.infoEvery(ReindexThread.class, "--- ReindexThread Running", 60000); - cache.get().remove(REINDEX_THREAD_PAUSED); + if (context.getBulkProcessor() == null || rebuildBulkIndexer.get()) { + closeBulkProcessor(context); + context.setBulkProcessorListener(new BulkProcessorListener()); - final DotSubmitter submitter = DotConcurrentFactory.getInstance() - .getSubmitter("ReindexThreadSubmitter", - new DotConcurrentFactory.SubmitterConfigBuilder() - .poolSize(1) - .maxPoolSize(1) - .queueCapacity(2) - .rejectedExecutionHandler( - new ThreadPoolExecutor.DiscardOldestPolicy()) - .build() - ); - getInstance().state(ThreadState.RUNNING); - submitter.submit(getInstance().ReindexThreadRunnable); + try { + BulkProcessor newBulkProcessor = indexAPI.createBulkProcessor(context.getBulkProcessorListener()); + context.setBulkProcessor(newBulkProcessor); + context.getBulkProcessorListener().addWorkingRecord(workingRecords); + indexAPI.appendToBulkProcessor(newBulkProcessor, workingRecords.values()); + contentletsIndexed.addAndGet(context.getBulkProcessorListener().getContentletsIndexed()); + } catch (Exception e) { + Logger.error(this, "Error creating or using new BulkProcessor", e); + throw new DotDataException("Error creating or using new BulkProcessor", e); + } + } else { + context.getBulkProcessorListener().workingRecords.putAll(workingRecords); + indexAPI.appendToBulkProcessor(context.getBulkProcessor(), workingRecords.values()); + contentletsIndexed.addAndGet(context.getBulkProcessorListener().getContentletsIndexed()); } - } - public static boolean isWorking() { - return getInstance().state.get() == ThreadState.RUNNING; - } - - /** * Generates a new notification displayed at the top left side of the back-end page in dotCMS. * This utility method allows you to send reports to the user regarding the operations performed @@ -361,11 +472,11 @@ public static boolean isWorking() { * properties file. Otherwise, the message key will be returned. * @param error - true if we want to send an error notification * @throws DotDataException The notification could not be posted to the system. - * @throws LanguageException The language properties could not be retrieved. + * @throws DotDataException The language properties could not be retrieved. */ protected void sendNotification(final String key, final Object[] msgParams, final String defaultMsg, boolean error) - throws DotDataException, LanguageException { + throws DotDataException { NotificationLevel notificationLevel = error ? NotificationLevel.ERROR : NotificationLevel.INFO; @@ -381,4 +492,30 @@ protected void sendNotification(final String key, final Object[] msgParams, systemUser.getUserId(), systemUser.getLocale()); } -} + + private static class BulkProcessorContext { + private BulkProcessor bulkProcessor; + private BulkProcessorListener bulkProcessorListener; + + public BulkProcessorContext(BulkProcessor bulkProcessor, BulkProcessorListener bulkProcessorListener) { + this.bulkProcessor = bulkProcessor; + this.bulkProcessorListener = bulkProcessorListener; + } + + public BulkProcessor getBulkProcessor() { + return bulkProcessor; + } + + public void setBulkProcessor(BulkProcessor bulkProcessor) { + this.bulkProcessor = bulkProcessor; + } + + public BulkProcessorListener getBulkProcessorListener() { + return bulkProcessorListener; + } + + public void setBulkProcessorListener(BulkProcessorListener bulkProcessorListener) { + this.bulkProcessorListener = bulkProcessorListener; + } + } +} \ No newline at end of file