From c66f1d0b0b9c35693398fa23fe908682d214751f Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Thu, 19 Sep 2024 18:32:55 -0600 Subject: [PATCH] #29479 Created integration tests for the PostgresJobQueue --- .../business/api/JobQueueManagerAPIImpl.java | 60 ++- .../business/error/AbstractErrorDetail.java | 53 +-- .../ExponentialBackoffRetryStrategy.java | 13 +- .../jobs/business/error/RetryStrategy.java | 10 +- .../dotcms/jobs/business/job/AbstractJob.java | 4 +- .../jobs/business/queue/DBJobTransformer.java | 28 +- .../dotcms/jobs/business/queue/JobQueue.java | 3 +- .../jobs/business/queue/PostgresJobQueue.java | 80 +++- dotCMS/src/main/resources/postgres.sql | 10 +- .../test/java/com/dotcms/Junit5Suite1.java | 6 +- .../api/JobQueueManagerAPICDITest.java | 3 - .../business/api/JobQueueManagerAPITest.java | 6 +- .../PostgresJobQueueIntegrationTest.java | 439 ++++++++++++++++++ 13 files changed, 613 insertions(+), 102 deletions(-) create mode 100644 dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 20d90ae6359d..3b6dfcb25415 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -29,6 +29,7 @@ import com.dotmarketing.util.Logger; import com.google.common.annotations.VisibleForTesting; import java.time.LocalDateTime; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -560,11 +561,9 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException Logger.warn(this, "Job " + job.id() + " has failed and cannot be retried."); try { - jobQueue.removeJob(job.id()); + jobQueue.removeJobFromQueue(job.id()); } catch (JobQueueDataException e) { throw new DotDataException("Error removing failed job", e); - } catch (JobNotFoundException e) { - throw new DoesNotExistException(e); } } @@ -609,8 +608,7 @@ private void processJob(final Job job) throws DotDataException { "Error processing job " + runningJob.id() + ": " + e.getMessage(), e ); handleJobFailure( - runningJob, processor, e, - "Job processing failed", "Job execution" + runningJob, processor, e, e.getMessage(), "Job execution" ); } } else { @@ -681,7 +679,7 @@ private void handleJobFailure(final Job job, final JobProcessor processor, final var errorDetail = ErrorDetail.builder() .message(errorMessage) - .exception(exception) + .stackTrace(stackTrace(exception)) .exceptionClass(exception.getClass().getName()) .processingStage(processingStage) .timestamp(LocalDateTime.now()) @@ -735,8 +733,19 @@ private RetryStrategy retryStrategy(final String queueName) { * @return {@code true} if the job is eligible for retry, {@code false} otherwise. */ private boolean canRetry(final Job job) { + final RetryStrategy retryStrategy = retryStrategy(job.queueName()); - return retryStrategy.shouldRetry(job, job.lastException().orElse(null)); + + Class lastExceptionClass = null; + if (job.lastExceptionClass().isPresent()) { + try { + lastExceptionClass = Class.forName(job.lastExceptionClass().get()); + } catch (ClassNotFoundException e) { + Logger.error(this, "Error loading exception class: " + e.getMessage(), e); + } + } + + return retryStrategy.shouldRetry(job, (Class) lastExceptionClass); } /** @@ -750,6 +759,43 @@ private long nextRetryDelay(final Job job) { return retryStrategy.nextRetryDelay(job); } + /** + * Generates and returns the stack trace of the exception as a string. This is a derived value + * and will be computed only when accessed. + * + * @param exception The exception for which to generate the stack trace. + * @return A string representation of the exception's stack trace, or null if no exception is + * present. + */ + private String stackTrace(final Throwable exception) { + if (exception != null) { + return Arrays.stream(exception.getStackTrace()) + .map(StackTraceElement::toString) + .reduce((a, b) -> a + "\n" + b) + .orElse(""); + } + return null; + } + + /** + * Returns a truncated version of the stack trace. + * + * @param exception The exception for which to generate the truncated stack trace. + * @param maxLines The maximum number of lines to include in the truncated stack trace. + * @return A string containing the truncated stacktrace, or null if no exception is present. + */ + private String truncatedStackTrace(Throwable exception, int maxLines) { + String fullTrace = stackTrace(exception); + if (fullTrace == null) { + return null; + } + String[] lines = fullTrace.split("\n"); + return Arrays.stream(lines) + .limit(maxLines) + .reduce((a, b) -> a + "\n" + b) + .orElse(""); + } + /** * A wrapper class that makes ScheduledExecutorService auto-closeable. This class is designed to * be used with try-with-resources to ensure that the ScheduledExecutorService is properly shut diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/AbstractErrorDetail.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/AbstractErrorDetail.java index d83461487e2f..f1dfa9554684 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/AbstractErrorDetail.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/AbstractErrorDetail.java @@ -3,7 +3,6 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.time.LocalDateTime; -import java.util.Arrays; import org.immutables.value.Value; /** @@ -33,6 +32,13 @@ public interface AbstractErrorDetail { */ String exceptionClass(); + /** + * Returns the stack trace of the exception as a string. + * + * @return A string representation of the exception's stack trace. + */ + String stackTrace(); + /** * Returns the timestamp when the error occurred. * @@ -47,49 +53,4 @@ public interface AbstractErrorDetail { */ String processingStage(); - /** - * Returns the original Throwable object that caused the error. - * - * @return The Throwable object, or null if no exception is available. - */ - Throwable exception(); - - /** - * Generates and returns the stack trace of the exception as a string. This is a derived value - * and will be computed only when accessed. - * - * @return A string representation of the exception's stack trace, or null if no exception is - * present. - */ - @Value.Derived - default String stackTrace() { - Throwable ex = exception(); - if (ex != null) { - return Arrays.stream(ex.getStackTrace()) - .map(StackTraceElement::toString) - .reduce((a, b) -> a + "\n" + b) - .orElse(""); - } - return null; - } - - /** - * Returns a truncated version of the stack trace. - * - * @param maxLines The maximum number of lines to include in the truncated stack trace. - * @return A string containing the truncated stacktrace, or null if no exception is present. - */ - @Value.Derived - default String truncatedStackTrace(int maxLines) { - String fullTrace = stackTrace(); - if (fullTrace == null) { - return null; - } - String[] lines = fullTrace.split("\n"); - return Arrays.stream(lines) - .limit(maxLines) - .reduce((a, b) -> a + "\n" + b) - .orElse(""); - } - } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java index ebf84e28c540..967f8a1658d8 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java @@ -60,12 +60,12 @@ public ExponentialBackoffRetryStrategy(long initialDelay, long maxDelay, double * Determines whether a job should be retried based on the provided job and exception. * * @param job The job in question. - * @param exception The exception that occurred during the execution of the job. + * @param exceptionClass The class of the exception that caused the failure. * @return true if the job should be retried, false otherwise. */ @Override - public boolean shouldRetry(final Job job, final Throwable exception) { - return job.retryCount() < maxRetries && isRetryableException(exception); + public boolean shouldRetry(final Job job, final Class exceptionClass) { + return job.retryCount() < maxRetries && isRetryableException(exceptionClass); } /** @@ -93,14 +93,15 @@ public int maxRetries() { } @Override - public boolean isRetryableException(final Throwable exception) { - if (exception == null) { + public boolean isRetryableException(final Class exceptionClass) { + if (exceptionClass == null) { return false; } if (retryableExceptions.isEmpty()) { return true; // If no specific exceptions are set, all are retryable } - return retryableExceptions.stream().anyMatch(clazz -> clazz.isInstance(exception)); + return retryableExceptions.stream() + .anyMatch(clazz -> clazz.isAssignableFrom(exceptionClass)); } @Override diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java index 3c5996464718..23f54600de02 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/RetryStrategy.java @@ -14,10 +14,10 @@ public interface RetryStrategy { * caused the failure. * * @param job The job that failed and is being considered for retry. - * @param exception The exception that caused the job to fail. + * @param exceptionClass The class of the exception that caused the failure. * @return true if the job should be retried, false otherwise. */ - boolean shouldRetry(Job job, Throwable exception); + boolean shouldRetry(Job job, Class exceptionClass); /** * Calculates the delay before the next retry attempt for a given job. @@ -37,17 +37,17 @@ public interface RetryStrategy { /** * Determines whether a given exception is retryable according to this strategy. * - * @param exception The exception to check. + * @param exceptionClass The class of the exception to check. * @return true if the exception is retryable, false otherwise. */ - boolean isRetryableException(Throwable exception); + boolean isRetryableException(Class exceptionClass); /** * Adds an exception class to the set of retryable exceptions. * * @param exceptionClass The exception class to be considered retryable. */ - void addRetryableException(final Class exceptionClass); + void addRetryableException(Class exceptionClass); /** * Returns an unmodifiable set of the currently registered retryable exceptions. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java index 776b304a5714..324182899f67 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java @@ -39,7 +39,7 @@ public interface AbstractJob { Map parameters(); - Optional lastException(); + Optional lastExceptionClass(); @Default default int retryCount() { @@ -78,7 +78,7 @@ default Job markAsFailed(final JobResult result) { return Job.builder().from(this) .state(JobState.FAILED) .result(result) - .lastException(result.errorDetail().get().exception()) + .lastExceptionClass(result.errorDetail().get().exceptionClass()) .build(); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/DBJobTransformer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/DBJobTransformer.java index 88cdf849212f..35bbf2c08789 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/DBJobTransformer.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/DBJobTransformer.java @@ -13,6 +13,7 @@ import java.util.Optional; import org.json.JSONException; import org.json.JSONObject; +import org.postgresql.util.PGobject; /** * Utility class for transforming database result sets into Job objects. This class provides static @@ -63,6 +64,19 @@ private static String getString(final Map row, final String colu return (String) row.get(column); } + /** + * Retrieves a JSON string value from the row map. + * + * @param row The row map + * @param column The column name + * @return The JSON string value, or null if not present + */ + private static String getJSONAsString( + final Map row, final String column) { + final var json = (PGobject) row.get(column); + return json != null ? json.getValue() : null; + } + /** * Retrieves an optional string value from the row map. * @@ -94,7 +108,7 @@ private static JobState getJobState(final Map row) { */ private static Map getParameters(final Map row) { - String paramsJson = getString(row, "parameters"); + String paramsJson = getJSONAsString(row, "parameters"); if (!UtilMethods.isSet(paramsJson)) { return new HashMap<>(); } @@ -115,7 +129,7 @@ private static Map getParameters(final Map row) */ private static Optional getJobResult(final Map row) { - String resultJson = getString(row, "result"); + String resultJson = getJSONAsString(row, "result"); if (!UtilMethods.isSet(resultJson)) { return Optional.empty(); } @@ -144,12 +158,17 @@ private static Optional getErrorDetail(final JSONObject resultJsonO } try { + if (resultJsonObject.isNull("errorDetail")) { + return Optional.empty(); + } + JSONObject errorDetailJson = resultJsonObject.getJSONObject("errorDetail"); return Optional.of(ErrorDetail.builder() .message(errorDetailJson.optString("message")) .exceptionClass(errorDetailJson.optString("exceptionClass")) .timestamp(getDateTime(errorDetailJson.opt("timestamp"))) .processingStage(errorDetailJson.optString("processingStage")) + .stackTrace(errorDetailJson.optString("stackTrace")) .build()); } catch (JSONException e) { throw new DotRuntimeException("Error parsing error detail", e); @@ -169,6 +188,9 @@ private static Optional> getMetadata(final JSONObject result } try { + if (resultJsonObject.isNull("metadata")) { + return Optional.empty(); + } return Optional.of(resultJsonObject.getJSONObject("metadata").toMap()); } catch (JSONException e) { throw new DotRuntimeException("Error parsing metadata", e); @@ -220,6 +242,8 @@ private static LocalDateTime getDateTime(final Map row, final St private static LocalDateTime getDateTime(final Object value) { if (value instanceof Timestamp) { return ((Timestamp) value).toLocalDateTime(); + } else if (value instanceof String) { + return LocalDateTime.parse((String) value); } return null; } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index ad82826b5ec5..8e9563affd64 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -138,8 +138,7 @@ List getUpdatedJobsSince(Set jobIds, LocalDateTime since) * * @param jobId The ID of the job to remove. * @throws JobQueueDataException if there's a data storage error while removing the job - * @throws JobNotFoundException if the job with the given ID does not exist */ - void removeJob(String jobId) throws JobQueueDataException, JobNotFoundException; + void removeJobFromQueue(String jobId) throws JobQueueDataException; } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java index 53ab24b3e248..4f69e0fa157e 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java @@ -13,6 +13,12 @@ import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.guava.GuavaModule; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.github.jonpeterson.jackson.module.versioning.VersioningModule; +import io.vavr.Lazy; import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.ArrayList; @@ -41,7 +47,7 @@ * `SELECT FOR UPDATE SKIP LOCKED` for efficient job queue management in concurrent environments. * *

Note: This class assumes the existence of appropriate database tables (job_queue, job, - * job_detail_history) as defined in the database schema. Ensure that these tables are properly + * job_history) as defined in the database schema. Ensure that these tables are properly * set up before using this class. * * @see JobQueue @@ -54,8 +60,13 @@ public class PostgresJobQueue implements JobQueue { + "(id, queue_name, state, created_at) VALUES (?, ?, ?, ?)"; private static final String CREATE_JOB_QUERY = "INSERT INTO job " - + "(id, queue_name, state, parameters, created_at, updated_at) " - + "VALUES (?, ?, ?, ?::jsonb, ?, ?)"; + + "(id, queue_name, state, parameters, created_at, execution_node, updated_at) " + + "VALUES (?, ?, ?, ?::jsonb, ?, ?, ?)"; + + private static final String CREATE_JOB_HISTORY_QUERY = + "INSERT INTO job_history " + + "(id, job_id, state, execution_node, created_at) " + + "VALUES (?, ?, ?, ?, ?)"; private static final String SELECT_JOB_BY_ID_QUERY = "SELECT * FROM job WHERE id = ?"; @@ -111,10 +122,10 @@ public class PostgresJobQueue implements JobQueue { + "updated_at = ?, started_at = ?, completed_at = ?, execution_node = ?, retry_count = ?, " + "result = ?::jsonb WHERE id = ?"; - private static final String INSERT_INTO_JOB_DETAIL_HISTORY_QUERY = - "INSERT INTO job_detail_history " - + "(id, job_id, state, progress, execution_node, created_at, result) " - + "VALUES (?, ?, ?, ?, ?, ?, ?::jsonb)"; + private static final String INSERT_INTO_JOB_HISTORY_QUERY = + "INSERT INTO job_history " + + "(id, job_id, state, execution_node, created_at, result) " + + "VALUES (?, ?, ?, ?, ?, ?::jsonb)"; private static final String DELETE_JOB_FROM_QUEUE_QUERY = "DELETE FROM job_queue WHERE id = ?"; @@ -125,17 +136,31 @@ public class PostgresJobQueue implements JobQueue { private static final String UPDATE_JOB_PROGRESS = "UPDATE job SET progress = ?, updated_at = ?" + " WHERE id = ?"; + /** + * Jackson mapper configuration and lazy initialized instance. + */ + private final Lazy objectMapper = Lazy.of(() -> { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + objectMapper.registerModule(new Jdk8Module()); + objectMapper.registerModule(new GuavaModule()); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.registerModule(new VersioningModule()); + objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + return objectMapper; + }); + @Override public String createJob(final String queueName, final Map parameters) throws JobQueueException { - try { + final String serverId = APILocator.getServerAPI().readServerId(); - final ObjectMapper objectMapper = new ObjectMapper(); + try { final String jobId = UUID.randomUUID().toString(); - final var parametersJson = objectMapper.writeValueAsString(parameters); + final var parametersJson = objectMapper.get().writeValueAsString(parameters); final var now = Timestamp.valueOf(LocalDateTime.now()); final var jobState = JobState.PENDING.name(); @@ -146,10 +171,11 @@ public String createJob(final String queueName, final Map parame .addParam(jobState) .addParam(parametersJson) .addParam(now) + .addParam(serverId) .addParam(now) .loadResult(); - // Creating the jobqueue entry as well + // Creating the jobqueue entry new DotConnect().setSQL(CREATE_JOB_QUEUE_QUERY) .addParam(jobId) .addParam(queueName) @@ -157,6 +183,15 @@ public String createJob(final String queueName, final Map parame .addParam(now) .loadResult(); + // Creating job_history entry + new DotConnect().setSQL(CREATE_JOB_HISTORY_QUERY) + .addParam(UUID.randomUUID().toString()) + .addParam(jobId) + .addParam(jobState) + .addParam(serverId) + .addParam(now) + .loadResult(); + return jobId; } catch (JsonProcessingException e) { Logger.error(this, "Failed to serialize job parameters", e); @@ -353,7 +388,7 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { dc.addParam(job.retryCount()); dc.addParam(job.result().map(r -> { try { - return new ObjectMapper().writeValueAsString(r); + return objectMapper.get().writeValueAsString(r); } catch (Exception e) { Logger.error(this, "Failed to serialize job result", e); return null; @@ -362,18 +397,17 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { dc.addParam(job.id()); dc.loadResult(); - // Update job_detail_history + // Update job_history DotConnect historyDc = new DotConnect(); - historyDc.setSQL(INSERT_INTO_JOB_DETAIL_HISTORY_QUERY); + historyDc.setSQL(INSERT_INTO_JOB_HISTORY_QUERY); historyDc.addParam(UUID.randomUUID().toString()); historyDc.addParam(job.id()); historyDc.addParam(job.state().name()); - historyDc.addParam(job.progress()); historyDc.addParam(serverId); historyDc.addParam(Timestamp.valueOf(LocalDateTime.now())); historyDc.addParam(job.result().map(r -> { try { - return new ObjectMapper().writeValueAsString(r); + return objectMapper.get().writeValueAsString(r); } catch (Exception e) { Logger.error(this, "Failed to serialize job result for history", e); return null; @@ -385,7 +419,7 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { if (job.state() == JobState.COMPLETED || job.state() == JobState.FAILED || job.state() == JobState.CANCELLED) { - removeJob(job.id()); + removeJobFromQueue(job.id()); } } catch (DotDataException e) { Logger.error(this, "Database error while updating job status", e); @@ -419,16 +453,22 @@ public List getUpdatedJobsSince(final Set jobIds, final LocalDateTi public void putJobBackInQueue(final Job job) throws JobQueueDataException { try { + + final var updatedJob = job.withState(JobState.PENDING); + DotConnect dc = new DotConnect(); dc.setSQL(PUT_JOB_BACK_TO_QUEUE_QUERY); dc.addParam(job.id()); dc.addParam(job.queueName()); - dc.addParam(JobState.PENDING.name()); + dc.addParam(job.state().name()); dc.addParam(0); // Default priority dc.addParam(Timestamp.valueOf(LocalDateTime.now())); - dc.addParam(JobState.PENDING.name()); + dc.addParam(job.state().name()); dc.addParam(0); // Default priority dc.loadResult(); + + // Update the job status + updateJobStatus(updatedJob); } catch (DotDataException e) { Logger.error(this, "Database error while putting job back in queue", e); throw new JobQueueDataException( @@ -495,7 +535,7 @@ public void updateJobProgress(final String jobId, final float progress) } @Override - public void removeJob(final String jobId) throws JobQueueDataException { + public void removeJobFromQueue(final String jobId) throws JobQueueDataException { try { DotConnect dc = new DotConnect(); diff --git a/dotCMS/src/main/resources/postgres.sql b/dotCMS/src/main/resources/postgres.sql index 44779d595c1b..882b237e4fdf 100644 --- a/dotCMS/src/main/resources/postgres.sql +++ b/dotCMS/src/main/resources/postgres.sql @@ -2552,12 +2552,11 @@ CREATE TABLE job ); -- Table for detailed job history -CREATE TABLE job_detail_history +CREATE TABLE job_history ( id VARCHAR(255) PRIMARY KEY, job_id VARCHAR(255) NOT NULL, state VARCHAR(50) NOT NULL, - progress FLOAT, execution_node VARCHAR(255), created_at timestamptz NOT NULL, result JSONB, @@ -2567,7 +2566,8 @@ CREATE TABLE job_detail_history -- Indexes (add an index for the new parameters field in job_queue) CREATE INDEX idx_job_queue_status ON job_queue (state); CREATE INDEX idx_job_queue_priority_created_at ON job_queue (priority DESC, created_at ASC); -CREATE INDEX idx_job_queue_parameters ON job_queue USING GIN (parameters); -CREATE INDEX idx_job_status ON job (status); +CREATE INDEX idx_job_parameters ON job USING GIN (parameters); +CREATE INDEX idx_job_result ON job USING GIN (result); +CREATE INDEX idx_job_status ON job (state); CREATE INDEX idx_job_created_at ON job (created_at); -CREATE INDEX idx_job_detail_history_job_id ON job_detail_history (job_id); +CREATE INDEX idx_job_history_job_id ON job_history (job_id); diff --git a/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java b/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java index 2f5b5c15335f..9895692f8bdf 100644 --- a/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java +++ b/dotcms-integration/src/test/java/com/dotcms/Junit5Suite1.java @@ -1,11 +1,15 @@ package com.dotcms; import com.dotcms.jobs.business.api.JobQueueManagerAPICDITest; +import com.dotcms.jobs.business.queue.PostgresJobQueueIntegrationTest; import org.junit.platform.suite.api.SelectClasses; import org.junit.platform.suite.api.Suite; @Suite -@SelectClasses({JobQueueManagerAPICDITest.class}) +@SelectClasses({ + JobQueueManagerAPICDITest.class, + PostgresJobQueueIntegrationTest.class +}) public class Junit5Suite1 { } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPICDITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPICDITest.java index ceaa31b506bb..9eaafb88e8e3 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPICDITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPICDITest.java @@ -78,9 +78,6 @@ void test_CDIInjection() { @Test void test_JobQueueManagerAPIFields() { - // There are not JobQueue implementations yet - //Assertions.assertNotNull(impl.getJobQueue(), "JobQueue should be injected"); - assertNotNull(jobQueueManagerAPI.getCircuitBreaker(), "CircuitBreaker should be injected"); assertNotNull(jobQueueManagerAPI.getDefaultRetryStrategy(), diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 20f24727f61f..ec4eefba8bb6 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -564,7 +564,7 @@ public void test_JobRetry_MaxRetryLimit() throws Exception { // Verify the job was not retried after reaching the max retry limit verify(mockRetryStrategy, times(maxRetries + 1)). shouldRetry(any(), any()); // Retries + final attempt - verify(mockJobQueue, times(1)).removeJob(mockJob.id()); + verify(mockJobQueue, times(1)).removeJobFromQueue(mockJob.id()); // Stop the job queue jobQueueManagerAPI.close(); @@ -692,13 +692,13 @@ public void test_Job_NotRetryable() throws Exception { // Verify the job was not retried verify(mockRetryStrategy, times(1)).shouldRetry(any(), any()); verify(mockJobQueue, never()).putJobBackInQueue(any()); - verify(mockJobQueue, times(1)).removeJob(mockJob.id()); + verify(mockJobQueue, times(1)).removeJobFromQueue(mockJob.id()); // Capture and verify the error details ArgumentCaptor jobResultCaptor = ArgumentCaptor.forClass(JobResult.class); verify(mockJob).markAsFailed(jobResultCaptor.capture()); ErrorDetail capturedErrorDetail = jobResultCaptor.getValue().errorDetail().get(); - assertEquals("Non-retryable error", capturedErrorDetail.exception().getMessage()); + assertEquals("Non-retryable error", capturedErrorDetail.message()); // Stop the job queue jobQueueManagerAPI.close(); diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java new file mode 100644 index 000000000000..d1951656850d --- /dev/null +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java @@ -0,0 +1,439 @@ +package com.dotcms.jobs.business.queue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import com.dotcms.jobs.business.error.ErrorDetail; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobPaginatedResult; +import com.dotcms.jobs.business.job.JobResult; +import com.dotcms.jobs.business.job.JobState; +import com.dotcms.jobs.business.queue.error.JobNotFoundException; +import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.util.IntegrationTestInitService; +import com.dotmarketing.common.db.DotConnect; +import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.util.Logger; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Integration tests for PostgresJobQueue implementation + */ +public class PostgresJobQueueIntegrationTest { + + private static JobQueue jobQueue; + + @BeforeAll + static void setUp() throws Exception { + + //Setting web app environment + IntegrationTestInitService.getInstance().init(); + + jobQueue = new PostgresJobQueue(); + } + + @AfterEach + void cleanUpEach() { + clearJobs(); + } + + /** + * Method to test: createJob and getJob in PostgresJobQueue + * Given Scenario: A job is created with specific parameters + * ExpectedResult: The job can be retrieved and its properties match the input + */ + @Test + void test_createJob_and_getJob() throws JobQueueException { + + String queueName = "testQueue"; + Map parameters = new HashMap<>(); + parameters.put("key", "value"); + + String jobId = jobQueue.createJob(queueName, parameters); + assertNotNull(jobId); + + Job job = jobQueue.getJob(jobId); + assertNotNull(job); + assertEquals(queueName, job.queueName()); + assertEquals(JobState.PENDING, job.state()); + assertEquals(parameters, job.parameters()); + } + + /** + * Method to test: getActiveJobs in PostgresJobQueue + * Given Scenario: Multiple active jobs are created + * ExpectedResult: All active jobs are retrieved correctly + */ + @Test + void test_getActiveJobs() throws JobQueueException { + + String queueName = "testQueue"; + for (int i = 0; i < 5; i++) { + jobQueue.createJob(queueName, new HashMap<>()); + } + + JobPaginatedResult result = jobQueue.getActiveJobs(queueName, 1, 10); + assertEquals(5, result.jobs().size()); + assertEquals(5, result.total()); + } + + /** + * Method to test: getCompletedJobs in PostgresJobQueue + * Given Scenario: Multiple jobs are created and completed + * ExpectedResult: All completed jobs within the given time range are retrieved + */ + @Test + void testGetCompletedJobs() throws JobQueueException { + + String queueName = "testQueue"; + LocalDateTime startDate = LocalDateTime.now().minusDays(1); + LocalDateTime endDate = LocalDateTime.now().plusDays(1); + + // Create and complete some jobs + for (int i = 0; i < 3; i++) { + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + Job job = jobQueue.getJob(jobId); + Job completedJob = job.markAsCompleted(null); + jobQueue.updateJobStatus(completedJob); + } + + JobPaginatedResult result = jobQueue.getCompletedJobs(queueName, startDate, endDate, 1, 10); + assertEquals(3, result.jobs().size()); + assertEquals(3, result.total()); + result.jobs().forEach(job -> assertEquals(JobState.COMPLETED, job.state())); + } + + /** + * Method to test: getFailedJobs in PostgresJobQueue + * Given Scenario: Multiple jobs are created and set to failed state + * ExpectedResult: All failed jobs are retrieved correctly + */ + @Test + void test_getFailedJobs() throws JobQueueException { + + // Create and fail some jobs + for (int i = 0; i < 2; i++) { + String jobId = jobQueue.createJob("testQueue", new HashMap<>()); + Job job = jobQueue.getJob(jobId); + Job failedJob = Job.builder().from(job) + .state(JobState.FAILED) + .build(); + jobQueue.updateJobStatus(failedJob); + } + + JobPaginatedResult result = jobQueue.getFailedJobs(1, 10); + assertEquals(2, result.jobs().size()); + assertEquals(2, result.total()); + result.jobs().forEach(job -> assertEquals(JobState.FAILED, job.state())); + } + + /** + * Method to test: updateJobStatus in PostgresJobQueue + * Given Scenario: A job's status is updated + * ExpectedResult: The job's status is correctly reflected in the database + */ + @Test + void test_updateJobStatus() throws JobQueueException { + + String jobId = jobQueue.createJob("testQueue", new HashMap<>()); + Job job = jobQueue.getJob(jobId); + + Job updatedJob = Job.builder().from(job) + .state(JobState.RUNNING) + .progress(0.5f) + .build(); + + jobQueue.updateJobStatus(updatedJob); + + Job fetchedJob = jobQueue.getJob(jobId); + assertEquals(JobState.RUNNING, fetchedJob.state()); + assertEquals(0.5f, fetchedJob.progress(), 0.001); + } + + /** + * Method to test: nextJob in PostgresJobQueue + * Given Scenario: Multiple threads attempt to get the next job concurrently + * ExpectedResult: Each job is processed exactly once and all jobs are eventually completed + */ + @Test + void test_nextJob() throws Exception { + + final int NUM_JOBS = 10; + final int NUM_THREADS = 5; + String queueName = "testQueue"; + + // Create jobs + Set createdJobIds = new HashSet<>(); + for (int i = 0; i < NUM_JOBS; i++) { + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + createdJobIds.add(jobId); + } + + // Set to keep track of processed job IDs + Set processedJobIds = Collections.synchronizedSet(new HashSet<>()); + + // Create and start threads + List threads = new ArrayList<>(); + for (int i = 0; i < NUM_THREADS; i++) { + Thread thread = new Thread(() -> { + try { + while (true) { + Job nextJob = jobQueue.nextJob(); + if (nextJob == null) { + break; // No more jobs to process + } + // Ensure this job hasn't been processed before + assertTrue(processedJobIds.add(nextJob.id()), + "Job " + nextJob.id() + " was processed more than once"); + assertEquals(JobState.RUNNING, nextJob.state()); + + // Simulate some processing time + Thread.sleep(1000); + + // Mark job as completed + Job completedJob = nextJob.markAsCompleted(null); + jobQueue.updateJobStatus(completedJob); + } + } catch (Exception e) { + fail("Exception in thread: " + e.getMessage()); + } + }); + threads.add(thread); + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify all jobs were processed + assertEquals(NUM_JOBS, processedJobIds.size(), "Not all jobs were processed"); + assertEquals(createdJobIds, processedJobIds, "Processed jobs don't match created jobs"); + + // Verify no more jobs are available + assertNull(jobQueue.nextJob(), "There should be no more jobs available"); + + // Verify all jobs are in COMPLETED state + for (String jobId : createdJobIds) { + Job job = jobQueue.getJob(jobId); + assertEquals(JobState.COMPLETED, job.state(), + "Job " + jobId + " is not in COMPLETED state"); + } + } + + /** + * Method to test: getJob in PostgresJobQueue with non-existent ID + * Given Scenario: Attempt to retrieve a job with a non-existent ID + * ExpectedResult: JobNotFoundException is thrown + */ + @Test + void testJobNotFound() { + assertThrows(JobNotFoundException.class, () -> jobQueue.getJob("non-existent-id")); + } + + /** + * Method to test: updateJobProgress in PostgresJobQueue + * Given Scenario: A job's progress is updated multiple times + * ExpectedResult: The job's progress is correctly updated in the database + */ + @Test + void test_updateJobProgress() throws JobQueueException { + + String jobId = jobQueue.createJob("testQueue", new HashMap<>()); + + jobQueue.updateJobProgress(jobId, 0.75f); + + Job updatedJob = jobQueue.getJob(jobId); + assertEquals(0.75f, updatedJob.progress(), 0.001); + + jobQueue.updateJobProgress(jobId, 0.85f); + + updatedJob = jobQueue.getJob(jobId); + assertEquals(0.85f, updatedJob.progress(), 0.001); + } + + /** + * Method to test: getJobs in PostgresJobQueue + * Given Scenario: Jobs with various states are created + * ExpectedResult: All jobs are retrieved correctly with proper pagination + */ + @Test + void test_getJobs() throws JobQueueException { + + // Create a mix of jobs with different states + String queueName = "testQueue"; + for (int i = 0; i < 3; i++) { + jobQueue.createJob(queueName, new HashMap<>()); + } + String runningJobId = jobQueue.createJob(queueName, new HashMap<>()); + Job runningJob = jobQueue.getJob(runningJobId); + jobQueue.updateJobStatus(runningJob.withState(JobState.RUNNING)); + + String completedJobId = jobQueue.createJob(queueName, new HashMap<>()); + Job completedJob = jobQueue.getJob(completedJobId); + jobQueue.updateJobStatus(completedJob.markAsCompleted(null)); + + // Get all jobs + JobPaginatedResult result = jobQueue.getJobs(1, 10); + + assertEquals(5, result.jobs().size()); + assertEquals(5, result.total()); + assertEquals(1, result.page()); + assertEquals(10, result.pageSize()); + + // Verify job states + Map stateCounts = new HashMap<>(); + for (Job job : result.jobs()) { + stateCounts.put(job.state(), stateCounts.getOrDefault(job.state(), 0) + 1); + } + assertEquals(3, stateCounts.getOrDefault(JobState.PENDING, 0)); + assertEquals(1, stateCounts.getOrDefault(JobState.RUNNING, 0)); + assertEquals(1, stateCounts.getOrDefault(JobState.COMPLETED, 0)); + } + + /** + * Method to test: getUpdatedJobsSince in PostgresJobQueue + * Given Scenario: Jobs are created and updated at different times + * ExpectedResult: Only jobs updated after the specified time are retrieved + */ + @Test + void test_getUpdatedJobsSince() throws JobQueueException, InterruptedException { + + String queueName = "testQueue"; + + // Create initial jobs + String job1Id = jobQueue.createJob(queueName, new HashMap<>()); + String job2Id = jobQueue.createJob(queueName, new HashMap<>()); + + Thread.sleep(100); // Ensure some time passes + LocalDateTime checkpointTime = LocalDateTime.now(); + Thread.sleep(100); // Ensure some more time passes + + // Update job1 and create a new job after the checkpoint + Job job1 = jobQueue.getJob(job1Id); + jobQueue.updateJobStatus(job1.withState(JobState.RUNNING)); + String job3Id = jobQueue.createJob(queueName, new HashMap<>()); + + Set jobIdsToCheck = new HashSet<>(Arrays.asList(job1Id, job2Id, job3Id)); + List updatedJobs = jobQueue.getUpdatedJobsSince(jobIdsToCheck, checkpointTime); + + assertEquals(2, updatedJobs.size()); + Set updatedJobIds = updatedJobs.stream().map(Job::id).collect(Collectors.toSet()); + assertTrue(updatedJobIds.contains(job1Id)); + assertTrue(updatedJobIds.contains(job3Id)); + assertFalse(updatedJobIds.contains(job2Id)); + } + + /** + * Method to test: putJobBackInQueue in PostgresJobQueue + * Given Scenario: A failed job is put back into the queue + * ExpectedResult: The job is reset to PENDING state and can be retrieved by nextJob + */ + @Test + void test_putJobBackInQueue() throws JobQueueException { + + String queueName = "testQueue"; + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + + // Simulate job processing + Job job = jobQueue.getJob(jobId); + jobQueue.updateJobStatus(job.withState(JobState.RUNNING)); + + // Simulate job failure + final var jobResult = JobResult.builder() + .errorDetail(ErrorDetail.builder() + .message("Simulated error") + .stackTrace(stackTrace(new RuntimeException("Simulated error"))) + .exceptionClass("java.lang.RuntimeException") + .timestamp(LocalDateTime.now()) + .processingStage("Simulated stage") + .build()) + .build(); + jobQueue.updateJobStatus(job.markAsFailed(jobResult)); + + // Put the job back in the queue + jobQueue.putJobBackInQueue(job); + + // Verify the job is back in PENDING state + Job retrievedJob = jobQueue.getJob(jobId); + assertEquals(JobState.PENDING, retrievedJob.state()); + + // Verify the job can be retrieved by nextJob + Job nextJob = jobQueue.nextJob(); + assertNotNull(nextJob); + assertEquals(jobId, nextJob.id()); + assertEquals(JobState.RUNNING, nextJob.state()); + } + + /** + * Method to test: removeJobFromQueue in PostgresJobQueue + * Given Scenario: A job is removed from the queue + * ExpectedResult: The job cannot be retrieved by nextJob after removal + */ + @Test + void test_removeJobFromQueue() throws JobQueueException { + + String queueName = "testQueue"; + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + + // Verify job exists + Job job = jobQueue.getJob(jobId); + assertNotNull(job); + + // Remove the job + jobQueue.removeJobFromQueue(jobId); + + // Verify job is not returned by nextJob + assertNull(jobQueue.nextJob()); + } + + /** + * Helper method to clear all jobs from the database + */ + private void clearJobs() { + try { + new DotConnect().setSQL("delete from job_history").loadResult(); + new DotConnect().setSQL("delete from job_queue").loadResult(); + new DotConnect().setSQL("delete from job").loadResult(); + } catch (DotDataException e) { + Logger.error(this, "Error clearing jobs", e); + } + } + + /** + * Generates and returns the stack trace of the exception as a string. This is a derived value + * and will be computed only when accessed. + * + * @param exception The exception for which to generate the stack trace. + * @return A string representation of the exception's stack trace, or null if no exception is + * present. + */ + private String stackTrace(final Throwable exception) { + if (exception != null) { + return Arrays.stream(exception.getStackTrace()) + .map(StackTraceElement::toString) + .reduce((a, b) -> a + "\n" + b) + .orElse(""); + } + return null; + } + +} \ No newline at end of file