Skip to content

Commit

Permalink
Allow worker count to be configured (#86)
Browse files Browse the repository at this point in the history
* allow worker count to be configured

* fix comments
  • Loading branch information
tanner0101 authored Aug 6, 2020
1 parent facd81e commit 649737c
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 1 deletion.
14 changes: 13 additions & 1 deletion Sources/Queues/QueuesCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,19 @@ public final class QueuesCommand: Command {
/// Starts an in-process jobs worker for queued tasks
/// - Parameter queueName: The queue to run the jobs on
public func startJobs(on queueName: QueueName) throws {
for eventLoop in eventLoopGroup.makeIterator() {
let workerCount: Int
switch self.application.queues.configuration.workerCount {
case .default:
var count = 0
for _ in self.eventLoopGroup.makeIterator() {
count += 1
}
workerCount = count
case .custom(let custom):
workerCount = custom
}
for _ in 0..<workerCount {
let eventLoop = self.eventLoopGroup.next()
let worker = self.application.queues.queue(queueName, on: eventLoop).worker
let task = eventLoop.scheduleRepeatedAsyncTask(
initialDelay: .seconds(0),
Expand Down
19 changes: 19 additions & 0 deletions Sources/Queues/QueuesConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ public struct QueuesConfiguration {

/// The key that stores the data about a job. Defaults to `vapor_queues`
public var persistenceKey: String

/// Supported options for number of job handling workers.
public enum WorkerCount: ExpressibleByIntegerLiteral {
/// One worker per event loop.
case `default`

/// Specify a custom worker count.
case custom(Int)

/// See `ExpressibleByIntegerLiteral`.
public init(integerLiteral value: Int) {
self = .custom(value)
}
}

/// Sets the number of workers used for handling jobs.
public var workerCount: WorkerCount

/// A logger
public let logger: Logger
Expand All @@ -19,13 +36,15 @@ public struct QueuesConfiguration {
public init(
refreshInterval: TimeAmount = .seconds(1),
persistenceKey: String = "vapor_queues",
workerCount: WorkerCount = .default,
logger: Logger = .init(label: "codes.vapor.queues")
) {
self.jobs = [:]
self.scheduledJobs = []
self.logger = logger
self.refreshInterval = refreshInterval
self.persistenceKey = persistenceKey
self.workerCount = workerCount
self.userInfo = [:]
}

Expand Down
74 changes: 74 additions & 0 deletions Tests/QueuesTests/QueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,80 @@ final class QueueTests: XCTestCase {
}
try promise.futureResult.wait()
}

func testCustomWorkerCount() throws {
// Setup custom ELG with 4 threads
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4)
defer { try! eventLoopGroup.syncShutdownGracefully() }

let app = Application(.testing, .shared(eventLoopGroup))
defer { app.shutdown() }

let count = app.eventLoopGroup.next().makePromise(of: Int.self)
app.queues.use(custom: WorkerCountDriver(count: count))
// Limit worker count to less than 4 threads
app.queues.configuration.workerCount = 2

try app.queues.startInProcessJobs(on: .default)
try XCTAssertEqual(count.futureResult.wait(), 2)
}
}

final class WorkerCountDriver: QueuesDriver {
let count: EventLoopPromise<Int>
let lock: Lock
var recordedEventLoops: Set<ObjectIdentifier>

init(count: EventLoopPromise<Int>) {
self.count = count
self.lock = .init()
self.recordedEventLoops = []
}

func makeQueue(with context: QueueContext) -> Queue {
WorkerCountQueue(driver: self, context: context)
}

func record(eventLoop: EventLoop) {
self.lock.lock()
defer { self.lock.unlock() }
let previousCount = self.recordedEventLoops.count
self.recordedEventLoops.insert(.init(eventLoop))
if self.recordedEventLoops.count == previousCount {
// we've detected all unique event loops now
self.count.succeed(previousCount)
}
}

func shutdown() {
// nothing
}

private struct WorkerCountQueue: Queue {
let driver: WorkerCountDriver
var context: QueueContext

func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
fatalError()
}

func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture<Void> {
fatalError()
}

func clear(_ id: JobIdentifier) -> EventLoopFuture<Void> {
fatalError()
}

func pop() -> EventLoopFuture<JobIdentifier?> {
self.driver.record(eventLoop: self.context.eventLoop)
return self.context.eventLoop.makeSucceededFuture(nil)
}

func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
fatalError()
}
}
}

struct Failure: Error { }
Expand Down

0 comments on commit 649737c

Please sign in to comment.