Skip to content

Commit

Permalink
#29479 Created integration tests for the PostgresJobQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Sep 20, 2024
1 parent 4d0a0ea commit c66f1d0
Show file tree
Hide file tree
Showing 13 changed files with 613 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.dotmarketing.util.Logger;
import com.google.common.annotations.VisibleForTesting;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -560,11 +561,9 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException
Logger.warn(this, "Job " + job.id() + " has failed and cannot be retried.");

try {
jobQueue.removeJob(job.id());
jobQueue.removeJobFromQueue(job.id());
} catch (JobQueueDataException e) {
throw new DotDataException("Error removing failed job", e);
} catch (JobNotFoundException e) {
throw new DoesNotExistException(e);
}
}

Expand Down Expand Up @@ -609,8 +608,7 @@ private void processJob(final Job job) throws DotDataException {
"Error processing job " + runningJob.id() + ": " + e.getMessage(), e
);
handleJobFailure(
runningJob, processor, e,
"Job processing failed", "Job execution"
runningJob, processor, e, e.getMessage(), "Job execution"
);
}
} else {
Expand Down Expand Up @@ -681,7 +679,7 @@ private void handleJobFailure(final Job job, final JobProcessor processor,

final var errorDetail = ErrorDetail.builder()
.message(errorMessage)
.exception(exception)
.stackTrace(stackTrace(exception))
.exceptionClass(exception.getClass().getName())
.processingStage(processingStage)
.timestamp(LocalDateTime.now())
Expand Down Expand Up @@ -735,8 +733,19 @@ private RetryStrategy retryStrategy(final String queueName) {
* @return {@code true} if the job is eligible for retry, {@code false} otherwise.
*/
private boolean canRetry(final Job job) {

final RetryStrategy retryStrategy = retryStrategy(job.queueName());
return retryStrategy.shouldRetry(job, job.lastException().orElse(null));

Class<?> lastExceptionClass = null;
if (job.lastExceptionClass().isPresent()) {
try {
lastExceptionClass = Class.forName(job.lastExceptionClass().get());
} catch (ClassNotFoundException e) {
Logger.error(this, "Error loading exception class: " + e.getMessage(), e);
}
}

return retryStrategy.shouldRetry(job, (Class<? extends Throwable>) lastExceptionClass);
}

/**
Expand All @@ -750,6 +759,43 @@ private long nextRetryDelay(final Job job) {
return retryStrategy.nextRetryDelay(job);
}

/**
* Generates and returns the stack trace of the exception as a string. This is a derived value
* and will be computed only when accessed.
*
* @param exception The exception for which to generate the stack trace.
* @return A string representation of the exception's stack trace, or null if no exception is
* present.
*/
private String stackTrace(final Throwable exception) {
if (exception != null) {
return Arrays.stream(exception.getStackTrace())
.map(StackTraceElement::toString)
.reduce((a, b) -> a + "\n" + b)
.orElse("");
}
return null;
}

/**
* Returns a truncated version of the stack trace.
*
* @param exception The exception for which to generate the truncated stack trace.
* @param maxLines The maximum number of lines to include in the truncated stack trace.
* @return A string containing the truncated stacktrace, or null if no exception is present.
*/
private String truncatedStackTrace(Throwable exception, int maxLines) {
String fullTrace = stackTrace(exception);
if (fullTrace == null) {
return null;
}
String[] lines = fullTrace.split("\n");
return Arrays.stream(lines)
.limit(maxLines)
.reduce((a, b) -> a + "\n" + b)
.orElse("");
}

/**
* A wrapper class that makes ScheduledExecutorService auto-closeable. This class is designed to
* be used with try-with-resources to ensure that the ScheduledExecutorService is properly shut
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.time.LocalDateTime;
import java.util.Arrays;
import org.immutables.value.Value;

/**
Expand Down Expand Up @@ -33,6 +32,13 @@ public interface AbstractErrorDetail {
*/
String exceptionClass();

/**
* Returns the stack trace of the exception as a string.
*
* @return A string representation of the exception's stack trace.
*/
String stackTrace();

/**
* Returns the timestamp when the error occurred.
*
Expand All @@ -47,49 +53,4 @@ public interface AbstractErrorDetail {
*/
String processingStage();

/**
* Returns the original Throwable object that caused the error.
*
* @return The Throwable object, or null if no exception is available.
*/
Throwable exception();

/**
* Generates and returns the stack trace of the exception as a string. This is a derived value
* and will be computed only when accessed.
*
* @return A string representation of the exception's stack trace, or null if no exception is
* present.
*/
@Value.Derived
default String stackTrace() {
Throwable ex = exception();
if (ex != null) {
return Arrays.stream(ex.getStackTrace())
.map(StackTraceElement::toString)
.reduce((a, b) -> a + "\n" + b)
.orElse("");
}
return null;
}

/**
* Returns a truncated version of the stack trace.
*
* @param maxLines The maximum number of lines to include in the truncated stack trace.
* @return A string containing the truncated stacktrace, or null if no exception is present.
*/
@Value.Derived
default String truncatedStackTrace(int maxLines) {
String fullTrace = stackTrace();
if (fullTrace == null) {
return null;
}
String[] lines = fullTrace.split("\n");
return Arrays.stream(lines)
.limit(maxLines)
.reduce((a, b) -> a + "\n" + b)
.orElse("");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public ExponentialBackoffRetryStrategy(long initialDelay, long maxDelay, double
* Determines whether a job should be retried based on the provided job and exception.
*
* @param job The job in question.
* @param exception The exception that occurred during the execution of the job.
* @param exceptionClass The class of the exception that caused the failure.
* @return true if the job should be retried, false otherwise.
*/
@Override
public boolean shouldRetry(final Job job, final Throwable exception) {
return job.retryCount() < maxRetries && isRetryableException(exception);
public boolean shouldRetry(final Job job, final Class<? extends Throwable> exceptionClass) {
return job.retryCount() < maxRetries && isRetryableException(exceptionClass);
}

/**
Expand Down Expand Up @@ -93,14 +93,15 @@ public int maxRetries() {
}

@Override
public boolean isRetryableException(final Throwable exception) {
if (exception == null) {
public boolean isRetryableException(final Class<? extends Throwable> exceptionClass) {
if (exceptionClass == null) {
return false;
}
if (retryableExceptions.isEmpty()) {
return true; // If no specific exceptions are set, all are retryable
}
return retryableExceptions.stream().anyMatch(clazz -> clazz.isInstance(exception));
return retryableExceptions.stream()
.anyMatch(clazz -> clazz.isAssignableFrom(exceptionClass));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ public interface RetryStrategy {
* caused the failure.
*
* @param job The job that failed and is being considered for retry.
* @param exception The exception that caused the job to fail.
* @param exceptionClass The class of the exception that caused the failure.
* @return true if the job should be retried, false otherwise.
*/
boolean shouldRetry(Job job, Throwable exception);
boolean shouldRetry(Job job, Class<? extends Throwable> exceptionClass);

/**
* Calculates the delay before the next retry attempt for a given job.
Expand All @@ -37,17 +37,17 @@ public interface RetryStrategy {
/**
* Determines whether a given exception is retryable according to this strategy.
*
* @param exception The exception to check.
* @param exceptionClass The class of the exception to check.
* @return true if the exception is retryable, false otherwise.
*/
boolean isRetryableException(Throwable exception);
boolean isRetryableException(Class<? extends Throwable> exceptionClass);

/**
* Adds an exception class to the set of retryable exceptions.
*
* @param exceptionClass The exception class to be considered retryable.
*/
void addRetryableException(final Class<? extends Throwable> exceptionClass);
void addRetryableException(Class<? extends Throwable> exceptionClass);

/**
* Returns an unmodifiable set of the currently registered retryable exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface AbstractJob {

Map<String, Object> parameters();

Optional<Throwable> lastException();
Optional<String> lastExceptionClass();

@Default
default int retryCount() {
Expand Down Expand Up @@ -78,7 +78,7 @@ default Job markAsFailed(final JobResult result) {
return Job.builder().from(this)
.state(JobState.FAILED)
.result(result)
.lastException(result.errorDetail().get().exception())
.lastExceptionClass(result.errorDetail().get().exceptionClass())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Optional;
import org.json.JSONException;
import org.json.JSONObject;
import org.postgresql.util.PGobject;

/**
* Utility class for transforming database result sets into Job objects. This class provides static
Expand Down Expand Up @@ -63,6 +64,19 @@ private static String getString(final Map<String, Object> row, final String colu
return (String) row.get(column);
}

/**
* Retrieves a JSON string value from the row map.
*
* @param row The row map
* @param column The column name
* @return The JSON string value, or null if not present
*/
private static String getJSONAsString(
final Map<String, Object> row, final String column) {
final var json = (PGobject) row.get(column);
return json != null ? json.getValue() : null;
}

/**
* Retrieves an optional string value from the row map.
*
Expand Down Expand Up @@ -94,7 +108,7 @@ private static JobState getJobState(final Map<String, Object> row) {
*/
private static Map<String, Object> getParameters(final Map<String, Object> row) {

String paramsJson = getString(row, "parameters");
String paramsJson = getJSONAsString(row, "parameters");
if (!UtilMethods.isSet(paramsJson)) {
return new HashMap<>();
}
Expand All @@ -115,7 +129,7 @@ private static Map<String, Object> getParameters(final Map<String, Object> row)
*/
private static Optional<JobResult> getJobResult(final Map<String, Object> row) {

String resultJson = getString(row, "result");
String resultJson = getJSONAsString(row, "result");
if (!UtilMethods.isSet(resultJson)) {
return Optional.empty();
}
Expand Down Expand Up @@ -144,12 +158,17 @@ private static Optional<ErrorDetail> getErrorDetail(final JSONObject resultJsonO
}

try {
if (resultJsonObject.isNull("errorDetail")) {
return Optional.empty();
}

JSONObject errorDetailJson = resultJsonObject.getJSONObject("errorDetail");
return Optional.of(ErrorDetail.builder()
.message(errorDetailJson.optString("message"))
.exceptionClass(errorDetailJson.optString("exceptionClass"))
.timestamp(getDateTime(errorDetailJson.opt("timestamp")))
.processingStage(errorDetailJson.optString("processingStage"))
.stackTrace(errorDetailJson.optString("stackTrace"))
.build());
} catch (JSONException e) {
throw new DotRuntimeException("Error parsing error detail", e);
Expand All @@ -169,6 +188,9 @@ private static Optional<Map<String, Object>> getMetadata(final JSONObject result
}

try {
if (resultJsonObject.isNull("metadata")) {
return Optional.empty();
}
return Optional.of(resultJsonObject.getJSONObject("metadata").toMap());
} catch (JSONException e) {
throw new DotRuntimeException("Error parsing metadata", e);
Expand Down Expand Up @@ -220,6 +242,8 @@ private static LocalDateTime getDateTime(final Map<String, Object> row, final St
private static LocalDateTime getDateTime(final Object value) {
if (value instanceof Timestamp) {
return ((Timestamp) value).toLocalDateTime();
} else if (value instanceof String) {
return LocalDateTime.parse((String) value);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ List<Job> getUpdatedJobsSince(Set<String> jobIds, LocalDateTime since)
*
* @param jobId The ID of the job to remove.
* @throws JobQueueDataException if there's a data storage error while removing the job
* @throws JobNotFoundException if the job with the given ID does not exist
*/
void removeJob(String jobId) throws JobQueueDataException, JobNotFoundException;
void removeJobFromQueue(String jobId) throws JobQueueDataException;

}
Loading

0 comments on commit c66f1d0

Please sign in to comment.