Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat (Core): PostgresJobQueue Implementation and Job Processing Enhancements #30175

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
40c342d
#29479 Fixing issue with job watcher.
jgambarios Sep 12, 2024
731b71c
#29479 Add job lifecycle events to JobQueueManagerAPI
jgambarios Sep 13, 2024
acbceef
#29479 Refactor job events to use Job objects directly
jgambarios Sep 13, 2024
a15d7c8
#29479 Remove JobResult enum and refactor job handling logic
jgambarios Sep 13, 2024
c8a91cd
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 17, 2024
7cd30ce
#29479 Implement PostgresJobQueue and related error handling classes
jgambarios Sep 18, 2024
746b387
#29479 Refactor job queries to return paginated results.
jgambarios Sep 18, 2024
c7d6bcc
#29479 Refactor job mapping logic to separate utility class
jgambarios Sep 18, 2024
7def000
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 18, 2024
2807e1c
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 19, 2024
2955ab1
#29479 Fixing unit test
jgambarios Sep 19, 2024
e7b3e1d
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 19, 2024
4d0a0ea
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 19, 2024
c66f1d0
#29479 Created integration tests for the PostgresJobQueue
jgambarios Sep 20, 2024
2a64551
#29479 Created integration tests for the PostgresJobQueue
jgambarios Sep 20, 2024
fd872ba
#29479 Add JobQueue getter to JobQueueManagerAPI
jgambarios Sep 20, 2024
a05c6ed
#29479 Refactor job cancellation handling
jgambarios Sep 25, 2024
9b5c929
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 26, 2024
9053cae
#29479 Refactor job cancellation handling
jgambarios Sep 26, 2024
a0c4da0
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 26, 2024
f6e181c
Add job cancellation handling and progress updates
jgambarios Sep 26, 2024
2b96c50
#29479 Refactor job state management and retry logic
jgambarios Sep 26, 2024
cb883de
#29479 Cleaning imports
jgambarios Sep 26, 2024
c7ad2ec
#29479 Improvements
jgambarios Sep 27, 2024
af33445
#29479 Added IT for the JobQueueManagerAPI
jgambarios Sep 27, 2024
079adbf
#29479 Added IT for the JobQueueManagerAPI
jgambarios Sep 27, 2024
cb368bb
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 27, 2024
ada07ff
#29479 Added default constructor for CDI
jgambarios Sep 27, 2024
ec99927
#29479 Fixed typo
jgambarios Sep 27, 2024
0952bd7
#29479 Applying sonarlint feedback.
jgambarios Sep 27, 2024
55ed75d
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 27, 2024
1afa6a1
#29479 Applying sonarlint feedback.
jgambarios Sep 27, 2024
0f1fb34
#29479 Applying sonarlint feedback.
jgambarios Sep 27, 2024
cccf9b3
#29479 Applying sonarlint feedback.
jgambarios Sep 28, 2024
e5bb54a
Merge remote-tracking branch 'origin/master' into issue-29479-Create-…
jgambarios Sep 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ public class JobQueueConfig {
*/
private final int threadPoolSize;

// The interval in milliseconds to poll for job updates
private final int pollJobUpdatesIntervalMilliseconds;

/**
* Constructs a new JobQueueConfig
*
* @param threadPoolSize The number of threads to use for job processing.
* @param pollJobUpdatesIntervalMilliseconds The interval in milliseconds to poll for job updates.
*/
public JobQueueConfig(int threadPoolSize) {
public JobQueueConfig(int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) {
this.threadPoolSize = threadPoolSize;
this.pollJobUpdatesIntervalMilliseconds = pollJobUpdatesIntervalMilliseconds;
}

/**
Expand All @@ -28,4 +33,13 @@ public int getThreadPoolSize() {
return threadPoolSize;
}

/**
* Gets the interval in milliseconds to poll for job updates.
*
* @return The interval in milliseconds to poll for job updates.
*/
public int getPollJobUpdatesIntervalMilliseconds() {
return pollJobUpdatesIntervalMilliseconds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class JobQueueConfigProducer {
"JOB_QUEUE_THREAD_POOL_SIZE", 10
);

// The interval in milliseconds to poll for job updates.
static final int DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS = Config.getIntProperty(
"JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 1000
);

/**
* Produces a JobQueueConfig object. This method is called by the CDI container to create a
* JobQueueConfig instance when it is necessary for dependency injection.
Expand All @@ -24,7 +29,10 @@ public class JobQueueConfigProducer {
*/
@Produces
public JobQueueConfig produceJobQueueConfig() {
return new JobQueueConfig(DEFAULT_THREAD_POOL_SIZE);
return new JobQueueConfig(
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS
);
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.error.ProcessorNotFoundException;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.processor.JobProcessor;
import java.util.List;
import com.dotcms.jobs.business.queue.JobQueue;
import com.dotmarketing.exception.DotDataException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -15,7 +16,7 @@
* Defines the contract for interacting with the job queue system. This interface provides methods
* for managing jobs, processors, and the overall state of the job queue.
*/
public interface JobQueueManagerAPI extends AutoCloseable {
public interface JobQueueManagerAPI {

/**
* Starts the job queue manager, initializing the thread pool for job processing.
Expand Down Expand Up @@ -62,35 +63,38 @@ public interface JobQueueManagerAPI extends AutoCloseable {
* @param queueName The name of the queue
* @param parameters The parameters for the job
* @return The ID of the created job
* @throws ProcessorNotFoundException if no processor is registered for the specified queue
* @throws JobProcessorNotFoundException if no processor is registered for the specified queue
* @throws DotDataException if there's an error creating the job
*/
String createJob(String queueName, Map<String, Object> parameters)
throws ProcessorNotFoundException;
throws JobProcessorNotFoundException, DotDataException;

/**
* Retrieves a job by its ID.
*
* @param jobId The ID of the job
* @return The Job object, or null if not found
* @throws DotDataException if there's an error fetching the job
*/
Job getJob(String jobId);
Job getJob(String jobId) throws DotDataException;

/**
* Retrieves a list of jobs.
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A list of Job objects
* @return A result object containing the list of active jobs and pagination information.
* @throws DotDataException if there's an error fetching the jobs
*/
List<Job> getJobs(int page, int pageSize);
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Cancels a job.
*
* @param jobId The ID of the job to cancel
* @throws JobCancellationException if the job cannot be cancelled
* @throws DotDataException if there's an error cancelling the job
*/
void cancelJob(String jobId) throws JobCancellationException;
void cancelJob(String jobId) throws DotDataException;

/**
* Registers a watcher for a specific job.
Expand All @@ -113,6 +117,11 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
CircuitBreaker getCircuitBreaker();

/**
* @return The JobQueue instance
*/
JobQueue getJobQueue();

/**
* @return The size of the thread pool
*/
Expand Down
Loading