Skip to content

Commit

Permalink
Added AsyncScheduledJob and cleaned up AsyncJob. (#104)
Browse files Browse the repository at this point in the history
* Added AsyncScheduledJob and cleaned up AsyncJob.

* Implement Job protocol.

* Add error stub

Co-authored-by: jdmcd <[email protected]>
  • Loading branch information
Andrewangeta and jdmcd authored Oct 27, 2021
1 parent 09f39d5 commit bdb4171
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 48 deletions.
56 changes: 8 additions & 48 deletions Sources/Queues/AsyncJob.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Foundation
#if compiler(>=5.5) && canImport(_Concurrency)
/// A task that can be queued for future execution.
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
public protocol AsyncJob: AnyAsyncJob, AnyJob {
public protocol AsyncJob: Job {
/// The data associated with a job
associatedtype Payload

Expand Down Expand Up @@ -64,15 +64,6 @@ extension AsyncJob {
return String(describing: Self.self)
}

/// See `Job`.`error`
public func error(
_ context: QueueContext,
_ error: Error,
_ payload: Payload
) async throws {
return
}

/// See `Job`.`nextRetryIn`
public func nextRetryIn(attempt: Int) -> Int {
return -1
Expand All @@ -81,56 +72,25 @@ extension AsyncJob {
public func _nextRetryIn(attempt: Int) -> Int {
return nextRetryIn(attempt: attempt)
}

public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) async throws {
var contextCopy = context
contextCopy.logger[metadataKey: "job_id"] = .string(id)
return try await self.error(contextCopy, error, Self.parsePayload(payload))
}

public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) async throws {
var contextCopy = context
contextCopy.logger[metadataKey: "job_id"] = .string(id)
return try await self.dequeue(contextCopy, Self.parsePayload(payload))
}

/// See `Job`.`error`
public func error(
_ context: QueueContext,
_ error: Error,
_ payload: Payload
) -> EventLoopFuture<Void> {
public func dequeue(_ context: QueueContext, _ payload: Payload) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self.error(context, error, payload)
try await self.dequeue(context, payload)
}
return promise.futureResult
}

public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture<Void> {
public func error(_ context: QueueContext, _ error: Error, _ payload: Payload) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self._error(context, id: id, error, payload: payload)
try await self.error(context, error, payload)
}
return promise.futureResult
}

public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self._dequeue(context, id: id, payload: payload)
}
return promise.futureResult
public func error(_ context: QueueContext, _ error: Error, _ payload: Payload) async throws {
return
}
}

/// A type-erased version of `Job`
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
public protocol AnyAsyncJob {
/// The name of the `Job`
static var name: String { get }
func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) async throws
func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) async throws
func _nextRetryIn(attempt: Int) -> Int
}
#endif
28 changes: 28 additions & 0 deletions Sources/Queues/AsyncScheduledJob.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import Vapor
import NIOCore
import Foundation

#if compiler(>=5.5) && canImport(_Concurrency)
/// Describes a job that can be scheduled and repeated
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
protocol AsyncScheduledJob: ScheduledJob {
var name: String { get }

/// The method called when the job is run
/// - Parameter context: A `JobContext` that can be used
func run(context: QueueContext) async throws
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension AsyncScheduledJob {
public var name: String { "\(Self.self)" }

func run(context: QueueContext) -> EventLoopFuture<Void> {
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self.run(context: context)
}
return promise.futureResult
}
}
#endif
57 changes: 57 additions & 0 deletions Tests/QueuesTests/AsyncQueueTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#if compiler(>=5.5) && canImport(_Concurrency)
import Queues
import Vapor
import XCTVapor
import XCTQueues
@testable import Vapor
import NIOConcurrencyHelpers

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
final class AsyncQueueTests: XCTestCase {
func testAsyncJob() throws {
let app = Application(.testing)
defer { app.shutdown() }
app.queues.use(.test)

let promise = app.eventLoopGroup.next().makePromise(of: Void.self)
app.queues.add(MyAsyncJob(promise: promise))

app.get("foo") { req in
req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
.map { _ in "done" }
}

try app.testable().test(.GET, "foo") { res in
XCTAssertEqual(res.status, .ok)
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(app.queues.test.queue.count, 1)
XCTAssertEqual(app.queues.test.jobs.count, 1)
let job = app.queues.test.first(MyAsyncJob.self)
XCTAssert(app.queues.test.contains(MyAsyncJob.self))
XCTAssertNotNil(job)
XCTAssertEqual(job!.foo, "bar")

try app.queues.queue.worker.run().wait()
XCTAssertEqual(app.queues.test.queue.count, 0)
XCTAssertEqual(app.queues.test.jobs.count, 0)

try XCTAssertNoThrow(promise.futureResult.wait())
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
struct MyAsyncJob: AsyncJob {
let promise: EventLoopPromise<Void>

struct Data: Codable {
var foo: String
}

func dequeue(_ context: QueueContext, _ payload: Data) async throws {
promise.succeed(())
return
}
}
#endif

0 comments on commit bdb4171

Please sign in to comment.