Skip to content

Commit

Permalink
Merge pull request #9 from vapor/tn-beta-2
Browse files Browse the repository at this point in the history
beta 2
  • Loading branch information
tanner0101 authored Dec 9, 2019
2 parents 5b589bf + 29fe144 commit 972576e
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 338 deletions.
2 changes: 2 additions & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github: [tanner0101] # loganwright, joscdk
open_collective: vapor
32 changes: 32 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: test
on:
- pull_request
jobs:
xenial:
container:
image: vapor/swift:5.1-xenial
services:
redis:
image: redis
ports:
- 6379:6379
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- run: swift test --enable-test-discovery --sanitize=thread
env:
REDIS_HOSTNAME: redis
bionic:
container:
image: vapor/swift:5.1-bionic
services:
redis:
image: redis
ports:
- 6379:6379
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- run: swift test --enable-test-discovery --sanitize=thread
env:
REDIS_HOSTNAME: redis
14 changes: 9 additions & 5 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ let package = Package(
products: [
.library(
name: "JobsRedisDriver",
targets: ["JobsRedisDriver"]),
targets: ["JobsRedisDriver"]
)
],
dependencies: [
.package(url: "https://github.com/vapor/jobs.git", from: "1.0.0-beta.1"),
.package(url: "https://github.com/vapor/redis-kit.git", from: "1.0.0-beta.1")
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0-beta.2"),
.package(url: "https://github.com/vapor/jobs.git", from: "1.0.0-beta.3"),
.package(url: "https://github.com/vapor/redis-kit.git", from: "1.0.0-beta.2"),
],
targets: [
.target(
name: "JobsRedisDriver",
dependencies: ["Jobs", "RedisKit"]),
dependencies: ["Jobs", "RedisKit"]
),
.testTarget(
name: "JobsRedisDriverTests",
dependencies: ["JobsRedisDriver"]),
dependencies: ["JobsRedisDriver", "XCTVapor"]
),
]
)
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Add this to your Package.swift:
```

Follow the instructions in the `Jobs` package for setup and configure information.

168 changes: 93 additions & 75 deletions Sources/JobsRedisDriver/JobsRedisDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,97 +4,115 @@ import NIO
import Foundation
import Vapor

/// A wrapper that conforms to `JobsDriver`
public struct JobsRedisDriver: JobsDriver {
/// The `RedisClient` to run commands on
public let client: RedisClient
struct InvalidRedisURL: Error {
let url: String
}

extension Application.Jobs.Provider {
public static func redis(url string: String) throws -> Self {
guard let url = URL(string: string) else {
throw InvalidRedisURL(url: string)
}
return try .redis(url: url)
}

/// The event loop group
public let eventLoopGroup: EventLoopGroup
public static func redis(url: URL) throws -> Self {
guard let configuration = RedisConfiguration(url: url) else {
throw InvalidRedisURL(url: url.absoluteString)
}
return .redis(configuration)
}

/// The logger
let logger: Logger
public static func redis(_ configuration: RedisConfiguration) -> Self {
.init {
$0.jobs.use(custom: JobsRedisDriver(configuration: configuration, on: $0.eventLoopGroup))
}
}
}
public struct JobsRedisDriver {
let pool: EventLoopGroupConnectionPool<RedisConnectionSource>

/// Creates a new `RedisJobs` instance
///
/// - Parameters:
/// - client: The `RedisClient` to use
public init(client: RedisClient, eventLoopGroup: EventLoopGroup) {
self.client = client
self.eventLoopGroup = eventLoopGroup
self.logger = Logger(label: "codes.vapor.jobs-redis-driver")
public init(configuration: RedisConfiguration, on eventLoopGroup: EventLoopGroup) {
let logger = Logger(label: "codes.vapor.redis")
self.pool = .init(
source: .init(configuration: configuration, logger: logger),
maxConnectionsPerEventLoop: 1,
logger: logger,
on: eventLoopGroup
)
}

/// See `JobsDriver.get`
public func get(key: String, eventLoop: JobsEventLoopPreference) -> EventLoopFuture<JobStorage?> {
let processing = processingKey(key: key)
let el = eventLoop.delegate(for: self.eventLoopGroup)

return self.client.rpoplpush(from: key, to: processing).flatMap { redisData -> EventLoopFuture<String?> in
guard !redisData.isNull else {
return el.makeSucceededFuture(nil)
}
public func shutdown() {
self.pool.shutdown()
}
}

guard let id = redisData.string else {
self.logger.error("Could not convert RedisData to string: \(redisData)")
return el.makeFailedFuture(Abort(.internalServerError))
}

return self.client.get(id)
}.flatMapThrowing { redisData -> JobStorage? in
guard let redisData = redisData else {
return nil
}
extension JobsRedisDriver: JobsDriver {
public func makeQueue(with context: JobContext) -> JobsQueue {
_JobsRedisQueue(
client: pool.pool(for: context.eventLoop).client(),
context: context
)
}
}

guard let data = redisData.data(using: .utf8) else {
self.logger.error("Could not convert redis data to string: \(redisData)")
throw Abort(.internalServerError)
}

let decoder = try JSONDecoder().decode(DecoderUnwrapper.self, from: data)
return try JobStorage(from: decoder.decoder)
}
struct _JobsRedisQueue {
let client: RedisClient
let context: JobContext
}

extension _JobsRedisQueue: RedisClient {
func send(command: String, with arguments: [RESPValue]/*, logger: Logger*/) -> EventLoopFuture<RESPValue> {
self.client.send(command: command, with: arguments) // , logger: logger)
}
}

enum _JobsRedisError: Error {
case missingJob
case invalidIdentifier(RESPValue)
}

extension JobIdentifier {
var key: String {
"job:\(self.string)"
}
}

extension _JobsRedisQueue: JobsQueue {
func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
self.client.get(id.key, asJSON: JobData.self)
.unwrap(or: _JobsRedisError.missingJob)
}

/// See `JobsDriver.set`
public func set(key: String, job: JobStorage, eventLoop: JobsEventLoopPreference) -> EventLoopFuture<Void> {
do {
let data = try JSONEncoder().encode(job).convertedToRESPValue()

return client.set(job.id, to: data).flatMap { data in
return self.client.lpush([job.id.convertedToRESPValue()], into: key).transform(to: ())
}
} catch {
return eventLoop.delegate(for: self.eventLoopGroup).makeFailedFuture(error)
}
func set(_ id: JobIdentifier, to storage: JobData) -> EventLoopFuture<Void> {
self.client.set(id.key, toJSON: storage)
}

/// See `JobsDriver.completed`
public func completed(key: String, job: JobStorage, eventLoop: JobsEventLoopPreference) -> EventLoopFuture<Void> {
let processing = self.processingKey(key: key)
let jobData = job.id.convertedToRESPValue()

return client.lrem(jobData, from: processing, count: 0).flatMap { _ in
return self.client.delete([job.id]).transform(to: ())
}
func clear(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.lrem(id.string, from: self.processingKey).flatMap { _ in
self.client.delete(id.key)
}.map { _ in }
}

func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.client.lpush(id.string, into: self.key)
.map { _ in }
}

/// See `JobsDriver.requeue`
public func requeue(key: String, job: JobStorage, eventLoop: JobsEventLoopPreference) -> EventLoopFuture<Void> {
let processing = self.processingKey(key: key)
let jobData = job.id.convertedToRESPValue()

// Remove the job from the processing list
return client.lrem(jobData, from: processing, count: 0).flatMap { _ in

// Add the job back to the queue list
return self.client.lpush([jobData], into: key).transform(to: ())
func pop() -> EventLoopFuture<JobIdentifier?> {
self.client.rpoplpush(from: self.key, to: self.processingKey).flatMapThrowing { redisData in
guard !redisData.isNull else {
return nil
}
guard let id = redisData.string else {
throw _JobsRedisError.invalidIdentifier(redisData)
}
return .init(string: id)
}
}

/// See `JobsDriver.processingKey`
public func processingKey(key: String) -> String {
return key + "-processing"
var processingKey: String {
self.key + "-processing"
}
}

Expand Down
Loading

0 comments on commit 972576e

Please sign in to comment.