Skip to content

Commit

Permalink
Add delay property (#18)
Browse files Browse the repository at this point in the history
* delay

* add delay logic

* change to delayUntil
  • Loading branch information
jdmcd authored Jun 13, 2019
1 parent 992440a commit 4f42614
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 5 deletions.
6 changes: 5 additions & 1 deletion Sources/Jobs/JobStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ public struct JobStorage: Codable {
/// The maxRetryCount for the `Job`.
var maxRetryCount: Int

/// A date to execute this job after
var delayUntil: Date?

/// A unique ID for the job
public internal(set) var id: String

/// The name of the `Job`
var jobName: String

/// Creates a new `JobStorage` holding object
public init(key: String, data: Data, maxRetryCount: Int, id: String, jobName: String) {
public init(key: String, data: Data, maxRetryCount: Int, id: String, jobName: String, delayUntil: Date?) {
self.key = key
self.data = data
self.maxRetryCount = maxRetryCount
self.id = id
self.jobName = jobName
self.delayUntil = delayUntil
}

/// Returns a string representation of the JobStorage object
Expand Down
16 changes: 15 additions & 1 deletion Sources/Jobs/JobsCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ public final class JobsCommand: Command {
) throws {
let queue = QueueName(name: queueName)
let key = queue.makeKey(with: queueService.persistenceKey)
_ = eventLoop.scheduleRepeatedAsyncTask(initialDelay: .seconds(0), delay: queueService.refreshInterval) { task -> EventLoopFuture<Void> in

// Schedule the repeating task
_ = eventLoop.scheduleRepeatedAsyncTask(
initialDelay: .seconds(0),
delay: queueService.refreshInterval
) { task -> EventLoopFuture<Void> in
//Check if shutting down

if self.isShuttingDown {
Expand All @@ -124,6 +129,15 @@ public final class JobsCommand: Command {
return self.queueService.persistenceLayer.get(key: key).flatMap { jobStorage in
//No job found, go to the next iteration
guard let jobStorage = jobStorage else { return eventLoop.makeSucceededFuture(()) }

// If the job has a delay, we must check to make sure we can execute
if let delay = jobStorage.delayUntil {
guard delay >= Date() else {
// The delay has not passed yet, requeue the job
return self.queueService.persistenceLayer.requeue(key: key, jobStorage: jobStorage)
}
}

guard let job = self.config.make(for: jobStorage.jobName) else {
return eventLoop.makeFailedFuture(Abort(.internalServerError, reason: "Please register \(jobStorage.jobName)"))
}
Expand Down
5 changes: 5 additions & 0 deletions Sources/Jobs/JobsPersistenceLayer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public protocol JobsPersistenceLayer {
/// - Parameter key: The base key
/// - Returns: The processing key
func processingKey(key: String) -> String

/// Requeues a job due to a delay
/// - Parameter key: The key of the job
/// - Parameter jobStorage: The jobStorage holding the `Job` to be requeued
func requeue(key: String, jobStorage: JobStorage) -> EventLoopFuture<Void>
}
11 changes: 9 additions & 2 deletions Sources/Jobs/QueueService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@ public struct QueueService {
/// - jobData: The `JobData` to dispatch to the queue
/// - maxRetryCount: The number of retries to attempt upon error before calling `Job`.`error()`
/// - queue: The queue to run this job on
/// - delay: A date to execute the job after
/// - Returns: A future `Void` value used to signify completion
public func dispatch<J: JobData>(jobData: J, maxRetryCount: Int = 0, queue: QueueName = .default) throws -> EventLoopFuture<Void> {
public func dispatch<J: JobData>(
jobData: J,
maxRetryCount: Int = 0,
queue: QueueName = .default,
delayUntil: Date? = nil
) throws -> EventLoopFuture<Void> {
let data = try JSONEncoder().encode(jobData)
let jobStorage = JobStorage(key: persistenceKey,
data: data,
maxRetryCount: maxRetryCount,
id: UUID().uuidString,
jobName: J.jobName)
jobName: J.jobName,
delayUntil: delayUntil)

return persistenceLayer.set(key: queue.makeKey(with: persistenceKey), jobStorage: jobStorage).map({})
}
Expand Down
9 changes: 8 additions & 1 deletion Tests/JobsTests/JobStorageTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import XCTest

final class JobStorageTests: XCTestCase {
func testStringRepresentationIsValidJSON() {
let jobStorage = JobStorage(key: "vapor", data: Data(), maxRetryCount: 1, id: "identifier", jobName: "jobs")
let jobStorage = JobStorage(key: "vapor",
data: Data(),
maxRetryCount: 1,
id: "identifier",
jobName: "jobs",
delayUntil: nil)

let stringRepresentation = jobStorage.stringValue()

if let data = stringRepresentation?.data(using: String.Encoding.utf8), let jobStorageRestored = try? JSONDecoder().decode(JobStorage.self, from: data) {
Expand All @@ -19,6 +25,7 @@ final class JobStorageTests: XCTestCase {
XCTAssertEqual(jobStorage.maxRetryCount, jobStorageRestored.maxRetryCount)
XCTAssertEqual(jobStorage.id, jobStorageRestored.id)
XCTAssertEqual(jobStorage.jobName, jobStorageRestored.jobName)
XCTAssertEqual(jobStorage.delayUntil, nil)
} else {
XCTFail("There was a problem restoring JobStorage")
}
Expand Down

0 comments on commit 4f42614

Please sign in to comment.