From 7cd30ced0d50b6903fc5b6f66e503237ed1fe9fe Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Wed, 18 Sep 2024 09:34:13 -0600 Subject: [PATCH] #29479 Implement PostgresJobQueue and related error handling classes This commit adds a PostgreSQL-specific implementation of the JobQueue interface, providing detailed methods for job management using a PostgreSQL database. It also introduces new error handling classes like JobQueueException, JobQueueDataException, and renames ProcessorNotFoundException to JobProcessorNotFoundException for clarity. --- .../jobs/business/api/JobQueueManagerAPI.java | 19 +- .../business/api/JobQueueManagerAPIImpl.java | 251 ++++++--- .../error/JobProcessorNotFoundException.java | 17 + .../error/ProcessorNotFoundException.java | 17 - .../dotcms/jobs/business/job/AbstractJob.java | 4 + .../dotcms/jobs/business/queue/JobQueue.java | 56 +- .../jobs/business/queue/JobQueueProducer.java | 22 +- .../jobs/business/queue/PostgresJobQueue.java | 496 ++++++++++++++++++ .../queue/error/JobLockingException.java | 18 + .../queue/error/JobNotFoundException.java | 19 + .../queue/error/JobQueueDataException.java | 29 + .../queue/error/JobQueueException.java | 28 + .../queue/error/JobQueueFullException.java | 18 + dotCMS/src/main/resources/postgres.sql | 48 ++ .../business/api/JobQueueManagerAPITest.java | 21 +- 15 files changed, 928 insertions(+), 135 deletions(-) create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java delete mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/error/ProcessorNotFoundException.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobLockingException.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueDataException.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueException.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueFullException.java diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 8bd0c59ff822..7ef130a9df44 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -1,11 +1,11 @@ package com.dotcms.jobs.business.api; import com.dotcms.jobs.business.error.CircuitBreaker; -import com.dotcms.jobs.business.error.JobCancellationException; -import com.dotcms.jobs.business.error.ProcessorNotFoundException; +import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotmarketing.exception.DotDataException; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -62,18 +62,20 @@ public interface JobQueueManagerAPI extends AutoCloseable { * @param queueName The name of the queue * @param parameters The parameters for the job * @return The ID of the created job - * @throws ProcessorNotFoundException if no processor is registered for the specified queue + * @throws JobProcessorNotFoundException if no processor is registered for the specified queue + * @throws DotDataException if there's an error creating the job */ String createJob(String queueName, Map parameters) - throws ProcessorNotFoundException; + throws JobProcessorNotFoundException, DotDataException; /** * Retrieves a job by its ID. * * @param jobId The ID of the job * @return The Job object, or null if not found + * @throws DotDataException if there's an error fetching the job */ - Job getJob(String jobId); + Job getJob(String jobId) throws DotDataException; /** * Retrieves a list of jobs. @@ -81,16 +83,17 @@ String createJob(String queueName, Map parameters) * @param page The page number * @param pageSize The number of jobs per page * @return A list of Job objects + * @throws DotDataException if there's an error fetching the jobs */ - List getJobs(int page, int pageSize); + List getJobs(int page, int pageSize) throws DotDataException; /** * Cancels a job. * * @param jobId The ID of the job to cancel - * @throws JobCancellationException if the job cannot be cancelled + * @throws DotDataException if there's an error cancelling the job */ - void cancelJob(String jobId) throws JobCancellationException; + void cancelJob(String jobId) throws DotDataException; /** * Registers a watcher for a specific job. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 838b29c762c8..59417bad2e77 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -1,5 +1,7 @@ package com.dotcms.jobs.business.api; +import com.dotcms.business.CloseDBIfOpened; +import com.dotcms.business.WrapInTransaction; import com.dotcms.jobs.business.api.events.JobCancelledEvent; import com.dotcms.jobs.business.api.events.JobCompletedEvent; import com.dotcms.jobs.business.api.events.JobCreatedEvent; @@ -9,8 +11,7 @@ import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; -import com.dotcms.jobs.business.error.JobCancellationException; -import com.dotcms.jobs.business.error.ProcessorNotFoundException; +import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobResult; @@ -18,6 +19,12 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.jobs.business.queue.JobQueue; +import com.dotcms.jobs.business.queue.error.JobNotFoundException; +import com.dotcms.jobs.business.queue.error.JobQueueDataException; +import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotmarketing.exception.DoesNotExistException; +import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import com.google.common.annotations.VisibleForTesting; import java.time.LocalDateTime; @@ -39,13 +46,11 @@ * Manages the processing of jobs in a distributed job queue system. This class is responsible for * job creation, execution, monitoring, and error handling. *
{@code
- *     public static void main(String[] args) {
  *
- *        // Create the job queue
- *        JobQueue jobQueue = new PostgresJobQueue();
+ *     @Inject
+ *     private JobQueueManagerAPI jobQueueManagerAPI;
  *
- *        // Create and start the job queue manager
- *        JobQueueManagerAPIImpl jobQueueManagerAPI = new JobQueueManagerAPIImpl(jobQueue, 5); // 5 threads
+ *     public static void main(String[] args) {
  *
  *        // (Optional) Set up a retry strategy for content import jobs, if not set, the default retry strategy will be used
  *        RetryStrategy contentImportRetryStrategy = new ExponentialBackoffRetryStrategy(
@@ -235,59 +240,86 @@ public void registerProcessor(final String queueName, final JobProcessor process
         processors.put(queueName, processor);
     }
 
+    @WrapInTransaction
     @Override
-    public String createJob(final String queueName, final Map parameters) {
+    public String createJob(final String queueName, final Map parameters)
+            throws JobProcessorNotFoundException, DotDataException {
 
         if (!processors.containsKey(queueName)) {
-            final var error = new ProcessorNotFoundException(queueName);
+            final var error = new JobProcessorNotFoundException(queueName);
             Logger.error(JobQueueManagerAPIImpl.class, error);
             throw error;
         }
 
-        String jobId = jobQueue.addJob(queueName, parameters);
-        jobCreatedEvent.fire(
-                new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)
-        );
-        return jobId;
+        try {
+            String jobId = jobQueue.createJob(queueName, parameters);
+            jobCreatedEvent.fire(
+                    new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)
+            );
+            return jobId;
+        } catch (JobQueueException e) {
+            throw new DotDataException("Error creating job", e);
+        }
     }
 
+    @CloseDBIfOpened
     @Override
-    public Job getJob(final String jobId) {
-        return jobQueue.getJob(jobId);
+    public Job getJob(final String jobId) throws DotDataException {
+        try {
+            return jobQueue.getJob(jobId);
+        } catch (JobNotFoundException e) {
+            throw new DoesNotExistException(e);
+        } catch (JobQueueDataException e) {
+            throw new DotDataException("Error fetching job", e);
+        }
     }
 
+    @CloseDBIfOpened
     @Override
-    public List getJobs(final int page, final int pageSize) {
-        return jobQueue.getJobs(page, pageSize);
+    public List getJobs(final int page, final int pageSize) throws DotDataException {
+        try {
+            return jobQueue.getJobs(page, pageSize);
+        } catch (JobQueueDataException e) {
+            throw new DotDataException("Error fetching jobs", e);
+        }
     }
 
+    @WrapInTransaction
     @Override
-    public void cancelJob(final String jobId) {
+    public void cancelJob(final String jobId) throws DotDataException {
 
-        Job job = jobQueue.getJob(jobId);
-        if (job != null) {
+        final Job job;
+        try {
+            job = jobQueue.getJob(jobId);
+        } catch (JobNotFoundException e) {
+            throw new DoesNotExistException(e);
+        } catch (JobQueueDataException e) {
+            throw new DotDataException("Error fetching job", e);
+        }
 
-            final var processor = processors.get(job.queueName());
-            if (processor != null && processor.canCancel(job)) {
+        final var processor = processors.get(job.queueName());
+        if (processor != null && processor.canCancel(job)) {
 
-                try {
+            try {
 
-                    Logger.info(this, "Cancelling job " + jobId);
+                Logger.info(this, "Cancelling job " + jobId);
 
-                    processor.cancel(job);
-                    handleJobCancellation(job, processor);
-                } catch (Exception e) {
-                    final var error = new JobCancellationException(jobId, e.getMessage());
-                    Logger.error(JobQueueManagerAPIImpl.class, error);
-                    throw error;
-                }
-            } else {
-                final var error = new JobCancellationException(jobId, "Job cannot be cancelled");
+                processor.cancel(job);
+                handleJobCancellation(job, processor);
+            } catch (Exception e) {
+                final var error = new DotDataException("Error cancelling job " + jobId, e);
                 Logger.error(JobQueueManagerAPIImpl.class, error);
                 throw error;
             }
         } else {
-            final var error = new JobCancellationException(jobId, "Job not found");
+
+            if (processor == null) {
+                final var error = new JobProcessorNotFoundException(job.queueName());
+                Logger.error(JobQueueManagerAPIImpl.class, error);
+                throw error;
+            }
+
+            final var error = new DotDataException(jobId, "Job " + jobId + " cannot be cancelled");
             Logger.error(JobQueueManagerAPIImpl.class, error);
             throw error;
         }
@@ -324,17 +356,25 @@ public RetryStrategy getDefaultRetryStrategy() {
     /**
      * Polls the job queue for updates to watched jobs and notifies their watchers.
      */
+    @CloseDBIfOpened
     private void pollJobUpdates() {
 
-        final var watchedJobIds = realTimeJobMonitor.getWatchedJobIds();
-        if (watchedJobIds.isEmpty()) {
-            return; // No jobs are being watched, skip polling
-        }
+        try {
+            final var watchedJobIds = realTimeJobMonitor.getWatchedJobIds();
+            if (watchedJobIds.isEmpty()) {
+                return; // No jobs are being watched, skip polling
+            }
 
-        final var currentPollTime = LocalDateTime.now();
-        List updatedJobs = jobQueue.getUpdatedJobsSince(watchedJobIds, lastPollJobUpdateTime);
-        realTimeJobMonitor.updateWatchers(updatedJobs);
-        lastPollJobUpdateTime = currentPollTime;
+            final var currentPollTime = LocalDateTime.now();
+            List updatedJobs = jobQueue.getUpdatedJobsSince(
+                    watchedJobIds, lastPollJobUpdateTime
+            );
+            realTimeJobMonitor.updateWatchers(updatedJobs);
+            lastPollJobUpdateTime = currentPollTime;
+        } catch (JobQueueDataException e) {
+            Logger.error(this, "Error polling job updates: " + e.getMessage(), e);
+            throw new DotRuntimeException("Error polling job updates", e);
+        }
     }
 
     /**
@@ -343,16 +383,23 @@ private void pollJobUpdates() {
      * @param job             The job whose progress to update.
      * @param progressTracker The processor progress tracker
      */
-    private void updateJobProgress(final Job job, final ProgressTracker progressTracker) {
-        if (job != null) {
+    @WrapInTransaction
+    private void updateJobProgress(final Job job, final ProgressTracker progressTracker)
+            throws DotDataException {
 
-            float progress = progressTracker.progress();
-            Job updatedJob = job.withProgress(progress);
+        try {
+            if (job != null) {
 
-            jobQueue.updateJobProgress(job.id(), updatedJob.progress());
-            jobProgressUpdatedEvent.fire(
-                    new JobProgressUpdatedEvent(updatedJob, LocalDateTime.now())
-            );
+                float progress = progressTracker.progress();
+                Job updatedJob = job.withProgress(progress);
+
+                jobQueue.updateJobProgress(job.id(), updatedJob.progress());
+                jobProgressUpdatedEvent.fire(
+                        new JobProgressUpdatedEvent(updatedJob, LocalDateTime.now())
+                );
+            }
+        } catch (JobQueueDataException e) {
+            throw new DotDataException("Error updating job progress", e);
         }
     }
 
@@ -371,9 +418,8 @@ private void processJobs() {
 
             try {
 
-                Job job = jobQueue.nextJob();
-                if (job != null) {
-                    processJobWithRetry(job);
+                boolean jobProcessed = processNextJob();
+                if (jobProcessed) {
                     emptyQueueCount = 0;
                 } else {
                     // If no jobs were found, wait for a short time before checking again
@@ -392,6 +438,28 @@ private void processJobs() {
         }
     }
 
+    /**
+     * Processes the next job in the queue.
+     *
+     * @return {@code true} if a job was processed, {@code false} if the queue is empty.
+     */
+    @WrapInTransaction
+    private boolean processNextJob() throws DotDataException {
+
+        try {
+            Job job = jobQueue.nextJob();
+            if (job != null) {
+                processJobWithRetry(job);
+                return true;
+            }
+
+            return false;
+        } catch (JobQueueException e) {
+            Logger.error(this, "Error fetching next job: " + e.getMessage(), e);
+            throw new DotDataException("Error fetching next job", e);
+        }
+    }
+
     /**
      * Closes an ExecutorService, shutting it down and waiting for any running jobs to complete.
      *
@@ -441,7 +509,7 @@ private boolean isCircuitBreakerOpen() {
      *
      * @param job The job to be processed.
      */
-    private void processJobWithRetry(final Job job) {
+    private void processJobWithRetry(final Job job) throws DotDataException {
 
         if (job.state() == JobState.FAILED) {
 
@@ -455,7 +523,11 @@ private void processJobWithRetry(final Job job) {
                     Logger.debug(this, "Job " + job.id() + " is not ready for retry, "
                             + "putting back in queue.");
                     // Put the job back in the queue for later retry
-                    jobQueue.putJobBackInQueue(job);
+                    try {
+                        jobQueue.putJobBackInQueue(job);
+                    } catch (JobQueueDataException e) {
+                        throw new DotDataException("Error re-queueing job", e);
+                    }
                 }
             } else {
                 handleNonRetryableFailedJob(job);
@@ -482,10 +554,17 @@ private boolean isReadyForRetry(Job job) {
      *
      * @param job The failed job that cannot be retried.
      */
-    private void handleNonRetryableFailedJob(final Job job) {
+    private void handleNonRetryableFailedJob(final Job job) throws DotDataException {
 
         Logger.warn(this, "Job " + job.id() + " has failed and cannot be retried.");
-        jobQueue.removeJob(job.id());
+
+        try {
+            jobQueue.removeJob(job.id());
+        } catch (JobQueueDataException e) {
+            throw new DotDataException("Error removing failed job", e);
+        } catch (JobNotFoundException e) {
+            throw new DoesNotExistException(e);
+        }
     }
 
     /**
@@ -493,13 +572,13 @@ private void handleNonRetryableFailedJob(final Job job) {
      *
      * @param job The job to process.
      */
-    private void processJob(final Job job) {
+    private void processJob(final Job job) throws DotDataException {
 
         JobProcessor processor = processors.get(job.queueName());
         if (processor != null) {
 
             Job runningJob = job.withState(JobState.RUNNING);
-            jobQueue.updateJobStatus(runningJob);
+            updateJobStatus(runningJob);
             jobStartedEvent.fire(new JobStartedEvent(runningJob, LocalDateTime.now()));
 
             try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) {
@@ -509,15 +588,18 @@ private void processJob(final Job job) {
                 // Start a separate thread to periodically update and persist progress
                 ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService();
                 progressUpdater.scheduleAtFixedRate(() ->
-                        updateJobProgress(runningJob, progressTracker), 0, 1, TimeUnit.SECONDS
+                        {
+                            try {
+                                updateJobProgress(runningJob, progressTracker);
+                            } catch (DotDataException e) {
+                                Logger.error(this, "Error updating job progress: " + e.getMessage(), e);
+                                throw new DotRuntimeException("Error updating job progress", e);
+                            }
+                        }, 0, 1, TimeUnit.SECONDS
                 );
 
-                try {
-                    processor.process(runningJob);
-                } finally {
-                    // Ensure final progress is updated
-                    updateJobProgress(runningJob, progressTracker);
-                }
+                // Process the job
+                processor.process(runningJob);
 
                 handleJobCompletion(runningJob, processor);
             } catch (Exception e) {
@@ -533,7 +615,7 @@ private void processJob(final Job job) {
         } else {
 
             Logger.error(this, "No processor found for queue: " + job.queueName());
-            handleJobFailure(job, null, new ProcessorNotFoundException(job.queueName()),
+            handleJobFailure(job, null, new JobProcessorNotFoundException(job.queueName()),
                     "No processor found for queue", "Processor selection"
             );
         }
@@ -545,7 +627,8 @@ private void processJob(final Job job) {
      * @param job       The job that completed.
      * @param processor The processor that handled the job.
      */
-    private void handleJobCompletion(final Job job, final JobProcessor processor) {
+    private void handleJobCompletion(final Job job, final JobProcessor processor)
+            throws DotDataException {
 
         final var resultMetadata = processor.getResultMetadata(job);
 
@@ -553,9 +636,9 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) {
         if (resultMetadata != null && !resultMetadata.isEmpty()) {
             jobResult = JobResult.builder().metadata(resultMetadata).build();
         }
-        final Job completedJob = job.markAsCompleted(jobResult);
 
-        jobQueue.updateJobStatus(completedJob);
+        final Job completedJob = job.markAsCompleted(jobResult);
+        updateJobStatus(completedJob);
         jobCompletedEvent.fire(new JobCompletedEvent(completedJob, LocalDateTime.now()));
     }
 
@@ -565,7 +648,8 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) {
      * @param job       The job that was cancelled.
      * @param processor The processor that handled the job.
      */
-    private void handleJobCancellation(final Job job, final JobProcessor processor) {
+    private void handleJobCancellation(final Job job, final JobProcessor processor)
+            throws DotDataException {
 
         final var resultMetadata = processor.getResultMetadata(job);
 
@@ -573,9 +657,9 @@ private void handleJobCancellation(final Job job, final JobProcessor processor)
         if (resultMetadata != null && !resultMetadata.isEmpty()) {
             jobResult = JobResult.builder().metadata(resultMetadata).build();
         }
-        Job cancelledJob = job.markAsCancelled(jobResult);
 
-        jobQueue.updateJobStatus(cancelledJob);
+        Job cancelledJob = job.markAsCancelled(jobResult);
+        updateJobStatus(cancelledJob);
         jobCancelledEvent.fire(
                 new JobCancelledEvent(cancelledJob, LocalDateTime.now())
         );
@@ -591,7 +675,8 @@ private void handleJobCancellation(final Job job, final JobProcessor processor)
      * @param processingStage The stage of processing where the failure occurred.
      */
     private void handleJobFailure(final Job job, final JobProcessor processor,
-            final Exception exception, final String errorMessage, final String processingStage) {
+            final Exception exception, final String errorMessage, final String processingStage)
+            throws DotDataException {
 
         final var errorDetail = ErrorDetail.builder()
                 .message(errorMessage)
@@ -611,13 +696,27 @@ private void handleJobFailure(final Job job, final JobProcessor processor,
         }
 
         final Job failedJob = job.markAsFailed(jobResult);
-        jobQueue.updateJobStatus(failedJob);
+        updateJobStatus(failedJob);
         jobFailedEvent.fire(new JobFailedEvent(failedJob, LocalDateTime.now()));
 
         // Record the failure in the circuit breaker
         getCircuitBreaker().recordFailure();
     }
 
+    /**
+     * Updates the status of a job in the job queue.
+     *
+     * @param job The job to update.
+     * @throws DotDataException if there's an error updating the job status.
+     */
+    private void updateJobStatus(final Job job) throws DotDataException {
+        try {
+            jobQueue.updateJobStatus(job);
+        } catch (JobQueueDataException e) {
+            throw new DotDataException("Error updating job status", e);
+        }
+    }
+
     /**
      * Gets the retry strategy for a specific queue.
      *
diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java
new file mode 100644
index 000000000000..c1b51d621bd1
--- /dev/null
+++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java
@@ -0,0 +1,17 @@
+package com.dotcms.jobs.business.error;
+
+/**
+ * Exception thrown when no job processor is found for a specified queue. This typically occurs when
+ * attempting to process a job from a queue that has no registered processor.
+ */
+public class JobProcessorNotFoundException extends RuntimeException {
+
+    /**
+     * Constructs a new JobProcessorNotFoundException with the specified queue name.
+     *
+     * @param queueName The name of the queue for which no processor was found
+     */
+    public JobProcessorNotFoundException(String queueName) {
+        super("No job processor found for queue: " + queueName);
+    }
+}
\ No newline at end of file
diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ProcessorNotFoundException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/ProcessorNotFoundException.java
deleted file mode 100644
index 3067eb154662..000000000000
--- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ProcessorNotFoundException.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.dotcms.jobs.business.error;
-
-/**
- * Exception thrown when no processor is found for a specified queue. This typically occurs when
- * attempting to process a job from a queue that has no registered processor.
- */
-public class ProcessorNotFoundException extends RuntimeException {
-
-    /**
-     * Constructs a new NoProcessorFoundException with the specified queue name.
-     *
-     * @param queueName The name of the queue for which no processor was found
-     */
-    public ProcessorNotFoundException(String queueName) {
-        super("No processor found for queue: " + queueName);
-    }
-}
\ No newline at end of file
diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java
index e43bd0f6e579..776b304a5714 100644
--- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java
+++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java
@@ -25,8 +25,12 @@ public interface AbstractJob {
 
     JobState state();
 
+    Optional executionNode();
+
     Optional createdAt();
 
+    Optional startedAt();
+
     Optional updatedAt();
 
     Optional completedAt();
diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java
index 2953704405ef..05ab3753966e 100644
--- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java
+++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java
@@ -1,6 +1,10 @@
 package com.dotcms.jobs.business.queue;
 
 import com.dotcms.jobs.business.job.Job;
+import com.dotcms.jobs.business.queue.error.JobLockingException;
+import com.dotcms.jobs.business.queue.error.JobNotFoundException;
+import com.dotcms.jobs.business.queue.error.JobQueueDataException;
+import com.dotcms.jobs.business.queue.error.JobQueueException;
 import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Map;
@@ -13,21 +17,26 @@
 public interface JobQueue {
 
     /**
-     * Adds a new job to the specified queue.
+     * Creates a new job in the specified queue.
      *
-     * @param queueName     The name of the queue to add the job to.
-     * @param parameters    The parameters for the job.
+     * @param queueName  The name of the queue to add the job to.
+     * @param parameters The parameters for the job.
      * @return The ID of the newly created job.
+     * @throws JobQueueException     if there's an error creating the job
+     * @throws JobQueueDataException if there's a data storage error while creating the job
      */
-    String addJob(String queueName, Map parameters);
+    String createJob(String queueName, Map parameters)
+            throws JobQueueException;
 
     /**
      * Retrieves a job by its ID.
      *
      * @param jobId The ID of the job to retrieve.
-     * @return The job with the specified ID, or null if not found.
+     * @return The job with the specified ID.
+     * @throws JobNotFoundException  if the job with the given ID is not found
+     * @throws JobQueueDataException if there's a data storage error while fetching the job
      */
-    Job getJob(String jobId);
+    Job getJob(String jobId) throws JobNotFoundException, JobQueueDataException;
 
     /**
      * Retrieves a list of active jobs for a specific queue.
@@ -36,8 +45,9 @@ public interface JobQueue {
      * @param page      The page number (for pagination).
      * @param pageSize  The number of items per page.
      * @return A list of active jobs.
+     * @throws JobQueueDataException if there's a data storage error while fetching the jobs
      */
-    List getActiveJobs(String queueName, int page, int pageSize);
+    List getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException;
 
     /**
      * Retrieves a list of completed jobs for a specific queue within a date range.
@@ -48,9 +58,10 @@ public interface JobQueue {
      * @param page      The page number (for pagination).
      * @param pageSize  The number of items per page.
      * @return A list of completed jobs.
+     * @throws JobQueueDataException if there's a data storage error while fetching the jobs
      */
     List getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateTime endDate,
-            int page, int pageSize);
+            int page, int pageSize) throws JobQueueDataException;
 
     /**
      * Retrieves a list of all jobs.
@@ -58,8 +69,9 @@ List getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateT
      * @param page     The page number (for pagination).
      * @param pageSize The number of items per page.
      * @return A list of all jobs.
+     * @throws JobQueueDataException if there's a data storage error while fetching the jobs
      */
-    List getJobs(int page, int pageSize);
+    List getJobs(int page, int pageSize) throws JobQueueDataException;
 
     /**
      * Retrieves a list of failed jobs.
@@ -67,15 +79,17 @@ List getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateT
      * @param page     The page number (for pagination).
      * @param pageSize The number of items per page.
      * @return A list of failed jobs.
+     * @throws JobQueueDataException if there's a data storage error while fetching the jobs
      */
-    List getFailedJobs(int page, int pageSize);
+    List getFailedJobs(int page, int pageSize) throws JobQueueDataException;
 
     /**
      * Updates the status of a job.
      *
      * @param job The job with an updated status.
+     * @throws JobQueueDataException if there's a data storage error while updating the job status
      */
-    void updateJobStatus(Job job);
+    void updateJobStatus(Job job) throws JobQueueDataException;
 
     /**
      * Retrieves updates for specific jobs since a given time.
@@ -83,30 +97,37 @@ List getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateT
      * @param jobIds The IDs of the jobs to check for updates
      * @param since  The time from which to fetch updates
      * @return A list of updated Job objects
+     * @throws JobQueueDataException if there's a data storage error while fetching job updates
      */
-    List getUpdatedJobsSince(Set jobIds, LocalDateTime since);
+    List getUpdatedJobsSince(Set jobIds, LocalDateTime since)
+            throws JobQueueDataException;
 
     /**
      * Puts a job back in the queue for retry.
      *
      * @param job The job to retry.
+     * @throws JobQueueDataException if there's a data storage error while re-queueing the job
      */
-    void putJobBackInQueue(Job job);
+    void putJobBackInQueue(Job job) throws JobQueueDataException;
 
     /**
      * Retrieves the next job in the queue.
      *
      * @return The next job in the queue, or null if the queue is empty.
+     * @throws JobQueueDataException if there's a data storage error while fetching the next job
+     * @throws JobLockingException   if there's an error acquiring a lock on the next job
      */
-    Job nextJob();
+    Job nextJob() throws JobQueueDataException, JobLockingException;
 
     /**
      * Updates the progress of a job.
      *
      * @param jobId    The ID of the job to update.
      * @param progress The new progress value (between 0.0 and 1.0).
+     * @throws JobQueueDataException if there's a data storage error while updating the job
+     *                               progress
      */
-    void updateJobProgress(String jobId, float progress);
+    void updateJobProgress(String jobId, float progress) throws JobQueueDataException;
 
     /**
      * Removes a job from the queue. This method should be used for jobs that have permanently
@@ -114,8 +135,9 @@ List getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateT
      * removed from the queue and any associated resources are cleaned up.
      *
      * @param jobId The ID of the job to remove.
-     * @throws IllegalArgumentException if the job with the given ID does not exist.
+     * @throws JobQueueDataException if there's a data storage error while removing the job
+     * @throws JobNotFoundException  if the job with the given ID does not exist
      */
-    void removeJob(String jobId);
+    void removeJob(String jobId) throws JobQueueDataException, JobNotFoundException;
 
 }
\ No newline at end of file
diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueueProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueueProducer.java
index 210653cadc56..1596e9fb17d6 100644
--- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueueProducer.java
+++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueueProducer.java
@@ -1,5 +1,6 @@
 package com.dotcms.jobs.business.queue;
 
+import com.dotmarketing.util.Config;
 import javax.enterprise.context.ApplicationScoped;
 import javax.enterprise.inject.Produces;
 
@@ -10,6 +11,11 @@
 @ApplicationScoped
 public class JobQueueProducer {
 
+    // The type of job queue implementation to use
+    private static final String JOB_QUEUE_IMPLEMENTATION_TYPE = Config.getStringProperty(
+            "JOB_QUEUE_IMPLEMENTATION_TYPE", "postgres"
+    );
+
     /**
      * Produces a JobQueue instance. This method is called by the CDI container to create a JobQueue
      * instance when it is needed for dependency injection.
@@ -20,17 +26,13 @@ public class JobQueueProducer {
     @ApplicationScoped
     public JobQueue produceJobQueue() {
 
-        // Potential future implementation:
-        // String queueType = System.getProperty("job.queue.type", "postgres");
-        // if ("postgres".equals(queueType)) {
-        //     return new PostgresJobQueue();
-        // } else if ("redis".equals(queueType)) {
-        //     return new RedisJobQueue();
-        // }
-        // throw new IllegalStateException("Unknown job queue type: " + queueType);
+        if (JOB_QUEUE_IMPLEMENTATION_TYPE.equals("postgres")) {
+            return new PostgresJobQueue();
+        }
 
-        //return new PostgresJobQueue();
-        return null;
+        throw new IllegalStateException(
+                "Unknown job queue implementation type: " + JOB_QUEUE_IMPLEMENTATION_TYPE
+        );
     }
 
 }
\ No newline at end of file
diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java
new file mode 100644
index 000000000000..556ffd67159e
--- /dev/null
+++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java
@@ -0,0 +1,496 @@
+package com.dotcms.jobs.business.queue;
+
+import com.dotcms.jobs.business.error.ErrorDetail;
+import com.dotcms.jobs.business.job.Job;
+import com.dotcms.jobs.business.job.JobResult;
+import com.dotcms.jobs.business.job.JobState;
+import com.dotcms.jobs.business.queue.error.JobLockingException;
+import com.dotcms.jobs.business.queue.error.JobNotFoundException;
+import com.dotcms.jobs.business.queue.error.JobQueueDataException;
+import com.dotcms.jobs.business.queue.error.JobQueueException;
+import com.dotmarketing.business.APILocator;
+import com.dotmarketing.common.db.DotConnect;
+import com.dotmarketing.exception.DotDataException;
+import com.dotmarketing.exception.DotRuntimeException;
+import com.dotmarketing.util.Logger;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * PostgreSQL implementation of the JobQueue interface. This class provides concrete implementations
+ * for managing jobs using a PostgreSQL database.
+ *
+ * 

The PostgresJobQueue handles all database operations related to job management, including: + *

    + *
  • Creating new jobs and adding them to the queue
  • + *
  • Retrieving jobs by various criteria (ID, status, date range)
  • + *
  • Updating job status and progress
  • + *
  • Managing job lifecycle (queueing, processing, completion, failure)
  • + *
  • Implementing job locking mechanism for concurrent processing
  • + *
+ * + *

This implementation uses SQL queries optimized for PostgreSQL, including the use of + * `SELECT FOR UPDATE SKIP LOCKED` for efficient job queue management in concurrent environments. + * + *

Note: This class assumes the existence of appropriate database tables (job_queue, job, + * job_detail_history) as defined in the database schema. Ensure that these tables are properly + * set up before using this class. + * + * @see JobQueue + * @see Job + * @see JobState + */ +public class PostgresJobQueue implements JobQueue { + + private static final String CREATE_JOB_QUEUE_QUERY = "INSERT INTO job_queue " + + "(id, queue_name, state, created_at) VALUES (?, ?, ?, ?)"; + + private static final String CREATE_JOB_QUERY = "INSERT INTO job " + + "(id, queue_name, state, parameters, created_at, updated_at) " + + "VALUES (?, ?, ?, ?::jsonb, ?, ?)"; + + private static final String SELECT_JOB_BY_ID_QUERY = "SELECT * FROM job WHERE id = ?"; + + private static final String GET_ACTIVE_JOBS_QUERY = "SELECT * FROM job WHERE queue_name = ? " + + "AND state IN (?, ?) ORDER BY created_at LIMIT ? OFFSET ?"; + + private static final String UPDATE_AND_GET_NEXT_JOB_WITH_LOCK_QUERY = + "UPDATE job_queue SET state = ? " + + "WHERE id = (SELECT id FROM job_queue WHERE state = ? " + + "ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED) " + + "RETURNING *"; + + private static final String GET_COMPLETED_JOBS_QUERY = "SELECT * FROM job " + + "WHERE queue_name = ? AND state = ? AND completed_at BETWEEN ? AND ? " + + "ORDER BY completed_at DESC LIMIT ? OFFSET ?"; + + private static final String GET_JOBS_QUERY = "SELECT * FROM job ORDER BY created_at " + + "DESC LIMIT ? OFFSET ?"; + + private static final String GET_UPDATED_JOBS_SINCE_QUERY = "SELECT * FROM job " + + "WHERE id = ANY(?) AND updated_at > ?"; + + private static final String GET_FAILED_JOBS_QUERY = "SELECT * FROM job WHERE state = ? " + + "ORDER BY updated_at DESC LIMIT ? OFFSET ?"; + + private static final String UPDATE_JOBS_QUERY = "UPDATE job SET state = ?, progress = ?, " + + "updated_at = ?, started_at = ?, completed_at = ?, execution_node = ?, retry_count = ?, " + + "result = ?::jsonb WHERE id = ?"; + + private static final String INSERT_INTO_JOB_DETAIL_HISTORY_QUERY = + "INSERT INTO job_detail_history " + + "(id, job_id, state, progress, execution_node, created_at, result) " + + "VALUES (?, ?, ?, ?, ?, ?, ?::jsonb)"; + + private static final String DELETE_JOB_FROM_QUEUE_QUERY = "DELETE FROM job_queue WHERE id = ?"; + + private static final String PUT_JOB_BACK_TO_QUEUE_QUERY = "INSERT INTO job_queue " + + "(id, queue_name, state, priority, created_at) " + + "VALUES (?, ?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET state = ?, priority = ?"; + + private static final String UPDATE_JOB_PROGRESS = "UPDATE job SET progress = ?, updated_at = ?" + + " WHERE id = ?"; + + @Override + public String createJob(final String queueName, final Map parameters) + throws JobQueueException { + + try { + + final ObjectMapper objectMapper = new ObjectMapper(); + + final String jobId = UUID.randomUUID().toString(); + + final var parametersJson = objectMapper.writeValueAsString(parameters); + final var now = Timestamp.valueOf(LocalDateTime.now()); + final var jobState = JobState.PENDING.name(); + + // Insert into job table + new DotConnect().setSQL(CREATE_JOB_QUERY) + .addParam(jobId) + .addParam(queueName) + .addParam(jobState) + .addParam(parametersJson) + .addParam(now) + .addParam(now) + .loadResult(); + + // Creating the jobqueue entry as well + new DotConnect().setSQL(CREATE_JOB_QUEUE_QUERY) + .addParam(jobId) + .addParam(queueName) + .addParam(jobState) + .addParam(now) + .loadResult(); + + return jobId; + } catch (JsonProcessingException e) { + Logger.error(this, "Failed to serialize job parameters", e); + throw new JobQueueException("Failed to serialize job parameters", e); + } catch (DotDataException e) { + Logger.error(this, "Database error while creating job", e); + throw new JobQueueDataException("Database error while creating job", e); + } + } + + @Override + public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(SELECT_JOB_BY_ID_QUERY); + dc.addParam(jobId); + + List> results = dc.loadObjectResults(); + if (!results.isEmpty()) { + return mapResultSetToJob(results.get(0)); + } + + Logger.warn(this, "Job with id: " + jobId + " not found"); + throw new JobNotFoundException(jobId); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching job", e); + throw new JobQueueDataException("Database error while fetching job", e); + } + } + + @Override + public List getActiveJobs(final String queueName, final int page, final int pageSize) + throws JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(GET_ACTIVE_JOBS_QUERY); + dc.addParam(queueName); + dc.addParam(JobState.PENDING.name()); + dc.addParam(JobState.RUNNING.name()); + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + List> results = dc.loadObjectResults(); + return results.stream().map(this::mapResultSetToJob).collect(Collectors.toList()); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching active jobs", e); + throw new JobQueueDataException("Database error while fetching active jobs", e); + } + } + + @Override + public List getCompletedJobs(final String queueName, final LocalDateTime startDate, + final LocalDateTime endDate, final int page, final int pageSize) + throws JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(GET_COMPLETED_JOBS_QUERY); + dc.addParam(queueName); + dc.addParam(JobState.COMPLETED.name()); + dc.addParam(Timestamp.valueOf(startDate)); + dc.addParam(Timestamp.valueOf(endDate)); + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + List> results = dc.loadObjectResults(); + return results.stream().map(this::mapResultSetToJob).collect(Collectors.toList()); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching completed jobs", e); + throw new JobQueueDataException("Database error while fetching completed jobs", e); + } + } + + @Override + public List getJobs(final int page, final int pageSize) throws JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(GET_JOBS_QUERY); + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + List> results = dc.loadObjectResults(); + return results.stream().map(this::mapResultSetToJob).collect(Collectors.toList()); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching jobs", e); + throw new JobQueueDataException("Database error while fetching jobs", e); + } + } + + @Override + public List getFailedJobs(final int page, final int pageSize) + throws JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(GET_FAILED_JOBS_QUERY); + dc.addParam(JobState.FAILED.name()); + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + List> results = dc.loadObjectResults(); + return results.stream().map(this::mapResultSetToJob).collect(Collectors.toList()); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching failed jobs", e); + throw new JobQueueDataException("Database error while fetching failed jobs", e); + } + } + + @Override + public void updateJobStatus(final Job job) throws JobQueueDataException { + + final String serverId = APILocator.getServerAPI().readServerId(); + + try { + + DotConnect dc = new DotConnect(); + dc.setSQL(UPDATE_JOBS_QUERY); + dc.addParam(job.state().name()); + dc.addParam(job.progress()); + dc.addParam(Timestamp.valueOf(LocalDateTime.now())); + dc.addParam(job.startedAt().map(Timestamp::valueOf).orElse(null)); + dc.addParam(job.completedAt().map(Timestamp::valueOf).orElse(null)); + dc.addParam(serverId); + dc.addParam(job.retryCount()); + dc.addParam(job.result().map(r -> { + try { + return new ObjectMapper().writeValueAsString(r); + } catch (Exception e) { + Logger.error(this, "Failed to serialize job result", e); + return null; + } + }).orElse(null)); + dc.addParam(job.id()); + dc.loadResult(); + + // Update job_detail_history + DotConnect historyDc = new DotConnect(); + historyDc.setSQL(INSERT_INTO_JOB_DETAIL_HISTORY_QUERY); + historyDc.addParam(UUID.randomUUID().toString()); + historyDc.addParam(job.id()); + historyDc.addParam(job.state().name()); + historyDc.addParam(job.progress()); + historyDc.addParam(serverId); + historyDc.addParam(Timestamp.valueOf(LocalDateTime.now())); + historyDc.addParam(job.result().map(r -> { + try { + return new ObjectMapper().writeValueAsString(r); + } catch (Exception e) { + Logger.error(this, "Failed to serialize job result for history", e); + return null; + } + }).orElse(null)); + historyDc.loadResult(); + + // Remove from job_queue if completed, failed, or cancelled + if (job.state() == JobState.COMPLETED + || job.state() == JobState.FAILED + || job.state() == JobState.CANCELLED) { + removeJob(job.id()); + } + } catch (DotDataException e) { + Logger.error(this, "Database error while updating job status", e); + throw new JobQueueDataException("Database error while updating job status", e); + } + } + + @Override + public List getUpdatedJobsSince(final Set jobIds, final LocalDateTime since) + throws JobQueueDataException { + + try { + if (jobIds.isEmpty()) { + return Collections.emptyList(); + } + + DotConnect dc = new DotConnect(); + dc.setSQL(GET_UPDATED_JOBS_SINCE_QUERY); + dc.addParam(jobIds.toArray(new String[0])); + dc.addParam(Timestamp.valueOf(since)); + + List> results = dc.loadObjectResults(); + return results.stream().map(this::mapResultSetToJob).collect(Collectors.toList()); + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching updated jobs", e); + throw new JobQueueDataException("Database error while fetching updated jobs", e); + } + } + + @Override + public void putJobBackInQueue(final Job job) throws JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(PUT_JOB_BACK_TO_QUEUE_QUERY); + dc.addParam(job.id()); + dc.addParam(job.queueName()); + dc.addParam(JobState.PENDING.name()); + dc.addParam(0); // Default priority + dc.addParam(Timestamp.valueOf(LocalDateTime.now())); + dc.addParam(JobState.PENDING.name()); + dc.addParam(0); // Default priority + dc.loadResult(); + } catch (DotDataException e) { + Logger.error(this, "Database error while putting job back in queue", e); + throw new JobQueueDataException( + "Database error while putting job back in queue", e + ); + } + } + + @Override + public Job nextJob() throws JobQueueDataException, JobLockingException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(UPDATE_AND_GET_NEXT_JOB_WITH_LOCK_QUERY); + dc.addParam(JobState.RUNNING.name()); + dc.addParam(JobState.PENDING.name()); + + List> results = dc.loadObjectResults(); + if (!results.isEmpty()) { + + // Fetch full job details from the job table + String jobId = (String) results.get(0).get("id"); + final var job = getJob(jobId); + + // Update the job status to RUNNING + Job updatedJob = Job.builder() + .from(job) + .state(JobState.RUNNING) + .startedAt(Optional.of(LocalDateTime.now())) + .build(); + + updateJobStatus(updatedJob); + return updatedJob; + } + + return null; + } catch (DotDataException e) { + Logger.error(this, "Database error while fetching next job", e); + throw new JobQueueDataException("Database error while fetching next job", e); + } catch (JobNotFoundException e) { + Logger.error(this, "Job not found while fetching next job", e); + throw new JobQueueDataException("Job not found while fetching next job", e); + } catch (Exception e) { + Logger.error(this, "Error while locking next job", e); + throw new JobLockingException("Error while locking next job: " + e.getMessage()); + } + } + + @Override + public void updateJobProgress(final String jobId, final float progress) + throws JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(UPDATE_JOB_PROGRESS); + dc.addParam(progress); + dc.addParam(Timestamp.valueOf(LocalDateTime.now())); + dc.addParam(jobId); + dc.loadResult(); + } catch (DotDataException e) { + Logger.error(this, "Database error while updating job progress", e); + throw new JobQueueDataException("Database error while updating job progress", e); + } + } + + @Override + public void removeJob(final String jobId) throws JobQueueDataException { + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(DELETE_JOB_FROM_QUEUE_QUERY); + dc.addParam(jobId); + dc.loadResult(); + } catch (DotDataException e) { + Logger.error(this, "Database error while removing job", e); + throw new JobQueueDataException("Database error while removing job", e); + } + } + + /** + * Maps a result set to a Job object. + * + * @param result The result set to map. + * @return A Job object mapped from the result set. + */ + private Job mapResultSetToJob(final Map result) { + + final ObjectMapper objectMapper = new ObjectMapper(); + + try { + return Job.builder() + .id((String) result.get("id")) + .queueName((String) result.get("queue_name")) + .state(JobState.valueOf((String) result.get("state"))) + .parameters( + objectMapper.readValue((String) result.get("parameters"), Map.class)) + .result(Optional.ofNullable((Map) result.get("result")) + .map(r -> { + try { + return JobResult.builder() + .errorDetail(Optional.ofNullable( + (Map) r.get("errorDetail")) + .map(ed -> ErrorDetail.builder() + .message((String) ed.get("message")) + .exceptionClass((String) ed.get( + "exceptionClass")) + .timestamp(toLocalDateTime( + ed.get("timestamp"))) + .processingStage((String) ed.get( + "processingStage")) + .build()) + ) + .metadata(Optional.ofNullable( + (Map) r.get("metadata"))) + .build(); + } catch (Exception e) { + Logger.error(this, "Failed to map job result", e); + return null; + } + })) + .progress(((Number) result.get("progress")).floatValue()) + .createdAt(toLocalDateTime(result.get("created_at"))) + .updatedAt(toLocalDateTime(result.get("updated_at"))) + .startedAt(Optional.ofNullable(result.get("started_at")) + .map(this::toLocalDateTime)) + .completedAt(Optional.ofNullable(result.get("completed_at")) + .map(this::toLocalDateTime)) + .executionNode(Optional.ofNullable((String) result.get("execution_node"))) + .retryCount(((Number) result.get("retry_count")).intValue()) + .build(); + } catch (Exception e) { + Logger.error(this, "Failed to map result to Job", e); + throw new DotRuntimeException("Failed to map result to Job", e); + } + } + + /** + * Converts a timestamp object to a LocalDateTime. + * + * @param timestamp The timestamp object to convert. + * @return The converted LocalDateTime. + */ + private LocalDateTime toLocalDateTime(final Object timestamp) { + + if (timestamp instanceof java.sql.Timestamp) { + return ((java.sql.Timestamp) timestamp).toLocalDateTime(); + } else if (timestamp instanceof java.util.Date) { + return ((java.util.Date) timestamp).toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + + throw new IllegalArgumentException("Unsupported timestamp type: " + timestamp.getClass()); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobLockingException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobLockingException.java new file mode 100644 index 000000000000..dc6f8418182d --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobLockingException.java @@ -0,0 +1,18 @@ +package com.dotcms.jobs.business.queue.error; + +/** + * Exception thrown when there's an error in acquiring or managing locks for job processing. This + * could occur during attempts to atomically fetch and lock the next job for processing. + */ +public class JobLockingException extends JobQueueException { + + /** + * Constructs a new JobLockingException with the specified detail message. + * + * @param message the detail message + */ + public JobLockingException(String message) { + super(message); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java new file mode 100644 index 000000000000..977cf8198cf3 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java @@ -0,0 +1,19 @@ +package com.dotcms.jobs.business.queue.error; + +/** + * Exception thrown when a requested job cannot be found in the job queue. This typically occurs + * when trying to retrieve, update, or process a job that no longer exists. + */ +public class JobNotFoundException extends JobQueueException { + + /** + * Constructs a new JobNotFoundException with a message indicating the missing job's ID. + * + * @param jobId the ID of the job that was not found + */ + public JobNotFoundException(String jobId) { + super("Job with id: " + jobId + " not found"); + } + +} + diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueDataException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueDataException.java new file mode 100644 index 000000000000..9075690ac129 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueDataException.java @@ -0,0 +1,29 @@ +package com.dotcms.jobs.business.queue.error; + +/** + * Exception thrown when a data-related error occurs during job queue operations. This could include + * connection issues, query failures, or data integrity problems, regardless of the underlying + * storage mechanism (e.g., database, in-memory store, distributed cache). + */ +public class JobQueueDataException extends JobQueueException { + + /** + * Constructs a new JobQueueDataException with the specified detail message. + * + * @param message the detail message + */ + public JobQueueDataException(String message) { + super(message); + } + + /** + * Constructs a new JobQueueDataException with the specified detail message and cause. + * + * @param message the detail message + * @param cause the cause of this exception + */ + public JobQueueDataException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueException.java new file mode 100644 index 000000000000..4ddd1af4a243 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueException.java @@ -0,0 +1,28 @@ +package com.dotcms.jobs.business.queue.error; + +/** + * Base exception class for all job queue related errors. This exception is the parent of all more + * specific job queue exceptions. + */ +public class JobQueueException extends Exception { + + /** + * Constructs a new JobQueueException with the specified detail message. + * + * @param message the detail message + */ + public JobQueueException(String message) { + super(message); + } + + /** + * Constructs a new JobQueueException with the specified detail message and cause. + * + * @param message the detail message + * @param cause the cause of this exception + */ + public JobQueueException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueFullException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueFullException.java new file mode 100644 index 000000000000..7f685d023323 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobQueueFullException.java @@ -0,0 +1,18 @@ +package com.dotcms.jobs.business.queue.error; + +/** + * Exception thrown when the job queue has reached its capacity and cannot accept new jobs. This may + * occur if there's a limit on the number of pending jobs or if system resources are exhausted. + */ +public class JobQueueFullException extends JobQueueException { + + /** + * Constructs a new JobQueueFullException with the specified detail message. + * + * @param message the detail message + */ + public JobQueueFullException(String message) { + super(message); + } + +} diff --git a/dotCMS/src/main/resources/postgres.sql b/dotCMS/src/main/resources/postgres.sql index 700139aea8cf..44779d595c1b 100644 --- a/dotCMS/src/main/resources/postgres.sql +++ b/dotCMS/src/main/resources/postgres.sql @@ -2523,3 +2523,51 @@ create table system_table ( -- Set up "like 'param%'" indexes for inode and identifier CREATE INDEX if not exists inode_inode_leading_idx ON inode(inode COLLATE "C"); CREATE INDEX if not exists identifier_id_leading_idx ON identifier(id COLLATE "C"); + +-- Table for active jobs in the queue +CREATE TABLE job_queue +( + id VARCHAR(255) PRIMARY KEY, + queue_name VARCHAR(255) NOT NULL, + state VARCHAR(50) NOT NULL, + priority INTEGER DEFAULT 0, + created_at timestamptz NOT NULL +); + +-- Table for job details and historical record +CREATE TABLE job +( + id VARCHAR(255) PRIMARY KEY, + queue_name VARCHAR(255) NOT NULL, + state VARCHAR(50) NOT NULL, + parameters JSONB NOT NULL, + result JSONB, + progress FLOAT DEFAULT 0, + created_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL, + started_at timestamptz, + completed_at timestamptz, + execution_node VARCHAR(255), + retry_count INTEGER DEFAULT 0 +); + +-- Table for detailed job history +CREATE TABLE job_detail_history +( + id VARCHAR(255) PRIMARY KEY, + job_id VARCHAR(255) NOT NULL, + state VARCHAR(50) NOT NULL, + progress FLOAT, + execution_node VARCHAR(255), + created_at timestamptz NOT NULL, + result JSONB, + FOREIGN KEY (job_id) REFERENCES job (id) +); + +-- Indexes (add an index for the new parameters field in job_queue) +CREATE INDEX idx_job_queue_status ON job_queue (state); +CREATE INDEX idx_job_queue_priority_created_at ON job_queue (priority DESC, created_at ASC); +CREATE INDEX idx_job_queue_parameters ON job_queue USING GIN (parameters); +CREATE INDEX idx_job_status ON job (status); +CREATE INDEX idx_job_created_at ON job (created_at); +CREATE INDEX idx_job_detail_history_job_id ON job_detail_history (job_id); diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 042c1b018e5c..ac114cc54b26 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -33,6 +33,11 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.jobs.business.queue.JobQueue; +import com.dotcms.jobs.business.queue.error.JobLockingException; +import com.dotcms.jobs.business.queue.error.JobNotFoundException; +import com.dotcms.jobs.business.queue.error.JobQueueDataException; +import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotmarketing.exception.DotDataException; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -90,16 +95,16 @@ public void setUp() { * ExpectedResult: Job is created successfully and correct job ID is returned */ @Test - public void test_createJob() { + public void test_createJob() throws DotDataException, JobQueueException { Map parameters = new HashMap<>(); - when(mockJobQueue.addJob(anyString(), anyMap())).thenReturn("job123"); + when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn("job123"); // Creating a job String jobId = jobQueueManagerAPI.createJob("testQueue", parameters); assertEquals("job123", jobId); - verify(mockJobQueue).addJob("testQueue", parameters); + verify(mockJobQueue).createJob("testQueue", parameters); } /** @@ -108,7 +113,7 @@ public void test_createJob() { * ExpectedResult: Correct job is retrieved from the job queue */ @Test - public void test_getJob() { + public void test_getJob() throws DotDataException, JobQueueDataException, JobNotFoundException { Job mockJob = mock(Job.class); when(mockJobQueue.getJob("job123")).thenReturn(mockJob); @@ -126,7 +131,7 @@ public void test_getJob() { * ExpectedResult: Correct list of jobs is retrieved from the job queue */ @Test - public void test_getJobs() { + public void test_getJobs() throws DotDataException, JobQueueDataException { // Prepare test data Job job1 = mock(Job.class); @@ -150,7 +155,8 @@ public void test_getJobs() { * ExpectedResult: JobQueueManagerAPI starts successfully and begins processing jobs */ @Test - public void test_start() throws InterruptedException { + public void test_start() + throws InterruptedException, JobQueueDataException, JobLockingException { // Make the circuit breaker always allow requests when(mockCircuitBreaker.allowRequest()).thenReturn(true); @@ -1025,7 +1031,8 @@ public void test_CircuitBreaker_Reset() throws Exception { * ExpectedResult: Job is successfully cancelled and its status is updated */ @Test - public void test_simple_cancelJob() { + public void test_simple_cancelJob() + throws DotDataException, JobQueueDataException, JobNotFoundException { Job mockJob = mock(Job.class); when(mockJobQueue.getJob("job123")).thenReturn(mockJob);