Skip to content

Commit

Permalink
Nicer separation depending on DB type; implement support for Postgres…
Browse files Browse the repository at this point in the history
… and (hopefully) MySQL
  • Loading branch information
m-barthelemy committed Apr 6, 2020
1 parent 4d043a2 commit 30ca2d7
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 59 deletions.
71 changes: 12 additions & 59 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ struct FluentQueue {
let context: QueueContext
let dbType: QueuesFluentDbType
let useSoftDeletes: Bool = true
static let model = JobModel(id: UUID.generateRandom(), key: "")
}

extension FluentQueue: Queue {
static let model = JobModel(id: UUID.generateRandom(), key: "")

func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
guard let database = db else {
Expand Down Expand Up @@ -103,70 +103,23 @@ extension FluentQueue: Queue {
.where("\(Self.model.$state.key)", SQLBinaryOperator.equal, SQLBind.init(JobState.pending))
.orderBy("\(Self.model.$createdAt.path.first!)")
.limit (1)

if (self.dbType != .sqlite) {
selectQuery = selectQuery.lockingClause(SQLForUpdateSkipLocked.forUpdateSkipLocked)
}
let subQueryGroup = SQLGroupExpression.init(selectQuery.query)

let query = db
.update(JobModel.schema)
//.set("\(Self.model.$state.key)", to: JobState.processing)
.set(SQLColumn.init("\(Self.model.$state.key)"), to: SQLBind.init(JobState.processing))
//.set("\(Self.model.$updatedAt.path.first!)", to: Date())
.set(SQLColumn.init("\(Self.model.$updatedAt.path.first!)"), to: SQLBind.init(Date()))
.where(
SQLBinaryExpression(left: SQLColumn("\(Self.model.$id.key)"), op: SQLBinaryOperator.equal , right: subQueryGroup)
)
// Gross abuse
.orWhere(SQLReturning.returning(column: Self.model.$id.key))
.query

var id: UUID?
return db.execute(sql: query) { (row) -> Void in
print("••• columns: \(row.allColumns)")
id = try? row.decode(column: "\(Self.model.$id.key)", as: UUID.self)
print("••• returned id \(id)")
}
.map {
if (id != nil) {
return JobIdentifier(string: id!.uuidString)
}
return nil
var popProvider: PopQueryProtocol!
switch (self.dbType) {
case .postgres:
popProvider = PostgresPop()
case .mysql:
popProvider = MySQLPop()
default:
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
return popProvider.pop(db: database, select: selectQuery.query).optionalMap { id in
return JobIdentifier(string: id.uuidString)
}

// UPDATE `jobs`
// SET `state` = ?, `updated_at` = ?
// WHERE `id` = (SELECT `id` FROM `jobs` WHERE `state` = ? ORDER BY `created_at` ASC LIMIT 1 FOR UPDATE SKIP LOCKED)
// OR 1=2
// RETURNING "id"

// -- should be --

// BEGIN TRANSACTION
// SELECT `id` FROM `jobs` WHERE `state` = ? ORDER BY `created_at` ASC LIMIT 1 FOR UPDATE SKIP LOCKED;
// UPDATE `jobs`
// SET
// `state` = ?,
// `updated_at` = ?
// WHERE `id` = xxxxxxx;
// COMMIT

/*let driver = dbDriver()
return driver.rawQuery(db: database, query: query).map { id in
if(id != nil ) {
return JobIdentifier(string: id!.uuidString)
}
else {
return nil
}
}*/

/*let (sql, binds) = db.serialize(query)
return db.query(db: db, sql: sql, binds: binds).first().optionalMap { row in
return JobIdentifier(string: (try! row.decode(column: "\(Self.model.$id.key)", as: UUID.self)).uuidString)
}
*/
}

}
34 changes: 34 additions & 0 deletions Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import Foundation
import SQLKit
import Fluent
import Queues

struct MySQLPop : PopQueryProtocol {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<UUID?> {
db.transaction { transaction in
let database = transaction as! SQLDatabase

var id: UUID?
return database.execute(sql: select) { (row) -> Void in
print("••• columns: \(row.allColumns)")
id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self)
print("••• returned id \(id)")
}
.flatMap {
if (id != nil) {
let updateQuery = database
.update(JobModel.schema)
.set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(JobState.processing))
.set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date()))
.where(SQLColumn.init("\(FluentQueue.model.$id.key)"), .equal, SQLBind.init(id!))
.query
return database.execute(sql: updateQuery) { (row) in }
.map { id }
}
return database.eventLoop.makeSucceededFuture(nil)
}


}
}
}
29 changes: 29 additions & 0 deletions Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import Foundation
import SQLKit
import Fluent
import Queues

struct PostgresPop : PopQueryProtocol {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<UUID?> {
let database = db as! SQLDatabase
let subQueryGroup = SQLGroupExpression.init(select)
let query = database
.update(JobModel.schema)
.set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(JobState.processing))
.set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date()))
.where(
SQLBinaryExpression(left: SQLColumn("\(FluentQueue.model.$id.key)"), op: SQLBinaryOperator.equal , right: subQueryGroup)
)
// Gross abuse
.orWhere(SQLReturning.returning(column: FluentQueue.model.$id.key))
.query

var id: UUID?
return database.execute(sql: query) { (row) -> Void in
id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self)
}
.map {
return id
}
}
}

0 comments on commit 30ca2d7

Please sign in to comment.