Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement preservable jobs and configurable jobs table name #11

Merged
merged 9 commits into from
Oct 1, 2024
9 changes: 2 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,10 @@ jobs:
fail-fast: false
matrix:
swift-image:
- swift:5.8-jammy
- swift:5.9-jammy
- swift:5.10-noble
- swiftlang/swift:nightly-6.0-jammy
- swift:6.0-noble
- swiftlang/swift:nightly-main-jammy
include:
- sanitize: '--sanitize=thread'
- swift-image: swift:5.8-jammy
sanitize: ''
runs-on: ubuntu-latest
container: ${{ matrix.swift-image }}
services:
Expand All @@ -50,7 +45,7 @@ jobs:
SANITIZE: ${{ matrix.sanitize }}
POSTGRES_HOST: psql
MYSQL_HOST: mysql
run: SWIFT_DETERMINISTIC_HASHING=1 swift test ${SANITIZE} --enable-code-coverage
run: SWIFT_DETERMINISTIC_HASHING=1 swift test --sanitize=thread --enable-code-coverage
- name: Upload coverage data
uses: vapor/swift-codecov-action@v0.3
with:
Expand Down
7 changes: 6 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// swift-tools-version:5.8
// swift-tools-version:5.9
import PackageDescription
import class Foundation.ProcessInfo

let package = Package(
name: "QueuesFluentDriver",
platforms: [
.macOS(.v10_15),
.iOS(.v13),
.watchOS(.v6),
.tvOS(.v13),
],
products: [
.library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]),
Expand Down Expand Up @@ -53,6 +56,8 @@ let package = Package(

var swiftSettings: [SwiftSetting] { [
.enableUpcomingFeature("ForwardTrailingClosures"),
.enableUpcomingFeature("ExistentialAny"),
.enableUpcomingFeature("ConciseMagicFile"),
.enableUpcomingFeature("DisableOutwardActorInference"),
.enableExperimentalFeature("StrictConcurrency=complete"),
] }
63 changes: 0 additions & 63 deletions Package@swift-5.9.swift

This file was deleted.

41 changes: 25 additions & 16 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ public struct FluentQueue: AsyncQueue, Sendable {
// See `Queue.context`.
public let context: QueueContext

let sqlDb: any SQLDatabase

let sqlDB: any SQLDatabase
let preservesCompletedJobs: Bool
let jobsTable: SQLQualifiedTable

let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable`

// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) async throws -> JobData {
guard let job = try await self.sqlDb.select()
guard let job = try await self.sqlDB.select()
.columns("payload", "max_retry_count", "queue_name", "state", "job_name", "delay_until", "queued_at", "attempts", "updated_at")
.from(JobModel.schema)
.from(self.jobsTable)
.where("id", .equal, id)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)
else {
Expand All @@ -28,7 +30,7 @@ public struct FluentQueue: AsyncQueue, Sendable {

// See `Queue.set(_:to:)`.
public func set(_ id: JobIdentifier, to jobStorage: JobData) async throws {
try await self.sqlDb.insert(into: JobModel.schema)
try await self.sqlDB.insert(into: self.jobsTable)
.columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at")
.values(
.bind(id),
Expand All @@ -48,14 +50,21 @@ public struct FluentQueue: AsyncQueue, Sendable {

// See `Queue.clear(_:)`.
public func clear(_ id: JobIdentifier) async throws {
try await self.sqlDb.delete(from: JobModel.schema)
.where("id", .equal, id)
.run()
if self.preservesCompletedJobs {
try await self.sqlDB.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.completed))
.where("id", .equal, id)
.run()
} else {
try await self.sqlDB.delete(from: self.jobsTable)
.where("id", .equal, id)
.run()
}
}

// See `Queue.push(_:)`.
public func push(_ id: JobIdentifier) async throws {
try await self.sqlDb.update(JobModel.schema)
try await self.sqlDB.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.pending))
.set("updated_at", to: .now())
.where("id", .equal, id)
Expand All @@ -69,9 +78,9 @@ public struct FluentQueue: AsyncQueue, Sendable {
// is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time
// of this writing.
if self._sqlLockingClause.withLockedValue({ $0 }) == nil {
switch self.sqlDb.dialect.name {
switch self.sqlDB.dialect.name {
case "mysql":
let version = try await self.sqlDb.select()
let version = try await self.sqlDB.select()
.column(.function("version"), as: "version")
.first(decodingColumn: "version", as: String.self)! // always returns one row
// This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x.
Expand All @@ -87,7 +96,7 @@ public struct FluentQueue: AsyncQueue, Sendable {

let select = SQLSubquery.select { $0
.column("id")
.from(JobModel.schema)
.from(self.jobsTable)
.where("state", .equal, .literal(StoredJobState.pending))
.where("queue_name", .equal, self.queueName)
.where(.dateValue(.function("coalesce", .column("delay_until"), SQLNow())), .lessThanOrEqual, .now())
Expand All @@ -97,24 +106,24 @@ public struct FluentQueue: AsyncQueue, Sendable {
.lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here
}

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
if self.sqlDB.dialect.supportsReturning {
return try await self.sqlDB.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.processing))
.set("updated_at", to: .now())
.where("id", .equal, select)
.returning("id")
.first(decodingColumn: "id", as: String.self)
.map(JobIdentifier.init(string:))
} else {
return try await self.sqlDb.transaction { transaction in
return try await self.sqlDB.transaction { transaction in
guard let id = try await transaction.raw("\(select)") // using raw() to make sure we run on the transaction connection
.first(decodingColumn: "id", as: String.self)
else {
return nil
}

try await transaction
.update(JobModel.schema)
.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.processing))
.set("updated_at", to: .now())
.where("id", .equal, id)
Expand Down
32 changes: 24 additions & 8 deletions Sources/QueuesFluentDriver/FluentQueuesDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,21 @@ import struct Queues.JobIdentifier
import struct Queues.JobData

public struct FluentQueuesDriver: QueuesDriver {
let databaseId: DatabaseID?
let databaseID: DatabaseID?
let preservesCompletedJobs: Bool
let jobsTableName: String
let jobsTableSpace: String?

init(on databaseId: DatabaseID? = nil) {
self.databaseId = databaseId
init(
on databaseID: DatabaseID? = nil,
preserveCompletedJobs: Bool = false,
jobsTableName: String = "_jobs_meta",
jobsTableSpace: String? = nil
) {
self.databaseID = databaseID
self.preservesCompletedJobs = preserveCompletedJobs
self.jobsTableName = jobsTableName
self.jobsTableSpace = jobsTableSpace
}

public func makeQueue(with context: QueueContext) -> any Queue {
Expand All @@ -21,16 +32,21 @@ public struct FluentQueuesDriver: QueuesDriver {
///
/// `Fluent.Databases.database(_:logger:on:)` never returns nil; its optionality is an API mistake.
/// If a nonexistent `DatabaseID` is requested, it triggers a `fatalError()`.
let baseDb = context
let baseDB = context
.application
.databases
.database(self.databaseId, logger: context.logger, on: context.eventLoop)!
guard let sqlDb = baseDb as? any SQLDatabase else {
.database(self.databaseID, logger: context.logger, on: context.eventLoop)!

guard let sqlDB = baseDB as? any SQLDatabase else {
return FailingQueue(failure: QueuesFluentError.unsupportedDatabase, context: context)
}

return FluentQueue(context: context, sqlDb: sqlDb)
return FluentQueue(
context: context,
sqlDB: sqlDB,
preservesCompletedJobs: self.preservesCompletedJobs,
jobsTable: .init(self.jobsTableName, space: self.jobsTableSpace)
)
}

public func shutdown() {}
Expand Down
8 changes: 5 additions & 3 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ enum StoredJobState: String, Codable, CaseIterable {

/// Job is in progress.
case processing

/// Job is completed.
///
/// > Note: This state is only used if the driver is configured to preserve completed jobs.
case completed
}

/// Encapsulates a job's metadata and `JobData`.
struct JobModel: Codable, Sendable {
/// The name of the model's table.
static let schema = "_jobs_meta"

/// The job identifier. Corresponds directly to a `JobIdentifier`.
let id: String?

Expand Down
68 changes: 37 additions & 31 deletions Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import protocol SQLKit.SQLDatabase
import enum SQLKit.SQLColumnConstraintAlgorithm
import enum SQLKit.SQLDataType
import enum SQLKit.SQLLiteral
import struct SQLKit.SQLRaw
import SQLKit

public struct JobModelMigration: AsyncSQLMigration {
private let jobsTableString: String
private let jobsTable: SQLQualifiedTable

/// Public initializer.
public init() {}

public init(
jobsTableName: String = "_jobs_meta",
jobsTableSpace: String? = nil
) {
self.jobsTableString = "\(jobsTableSpace.map { "\($0)_" } ?? "")\(jobsTableName)"
self.jobsTable = .init(jobsTableName, space: jobsTableSpace)
}

// See `AsyncSQLMigration.prepare(on:)`.
public func prepare(on database: any SQLDatabase) async throws {
let stateEnumType: String
let stateEnumType: any SQLExpression

switch database.dialect.enumSyntax {
case .typeName:
stateEnumType = "\(JobModel.schema)_storedjobstatus"
try await database.create(enum: stateEnumType)
.value("pending")
.value("processing")
.run()
stateEnumType = .identifier("\(self.jobsTableString)_storedjobstatus")
var builder = database.create(enum: stateEnumType)
builder = StoredJobState.allCases.reduce(builder, { $0.value($1.rawValue) })
try await builder.run()
case .inline:
stateEnumType = "enum('\(StoredJobState.allCases.map(\.rawValue).joined(separator: "','"))')"
// This is technically a misuse of SQLFunction, but it produces the correct syntax
stateEnumType = .function("enum", StoredJobState.allCases.map { .literal($0.rawValue) })
default:
stateEnumType = "varchar(16)"
// This is technically a misuse of SQLFunction, but it produces the correct syntax
stateEnumType = .function("varchar", .literal(16))
}

/// This whole pile of nonsense is only here because of
Expand All @@ -39,20 +45,20 @@ public struct JobModelMigration: AsyncSQLMigration {
autoTimestampConstraints = []
}

try await database.create(table: JobModel.schema)
.column("id", type: .text, .primaryKey(autoIncrement: false))
.column("queue_name", type: .text, .notNull)
.column("job_name", type: .text, .notNull)
.column("queued_at", type: manualTimestampType, .notNull)
.column("delay_until", type: manualTimestampType, .default(SQLLiteral.null))
.column("state", type: .custom(SQLRaw(stateEnumType)), .notNull)
.column("max_retry_count", type: .int, .notNull)
.column("attempts", type: .int, .notNull)
.column("payload", type: .blob, .notNull)
.column("updated_at", type: .timestamp, autoTimestampConstraints)
try await database.create(table: self.jobsTable)
.column("id", type: .text, .primaryKey(autoIncrement: false))
.column("queue_name", type: .text, .notNull)
.column("job_name", type: .text, .notNull)
.column("queued_at", type: manualTimestampType, .notNull)
.column("delay_until", type: manualTimestampType, .default(SQLLiteral.null))
.column("state", type: .custom(stateEnumType), .notNull)
.column("max_retry_count", type: .int, .notNull)
.column("attempts", type: .int, .notNull)
.column("payload", type: .blob, .notNull)
.column("updated_at", type: .timestamp, autoTimestampConstraints)
.run()
try await database.create(index: "i_\(JobModel.schema)_state_queue_delayUntil")
.on(JobModel.schema)
try await database.create(index: "i_\(self.jobsTableString)_state_queue_delayUntil")
.on(self.jobsTable)
.column("state")
.column("queue_name")
.column("delay_until")
Expand All @@ -61,10 +67,10 @@ public struct JobModelMigration: AsyncSQLMigration {

// See `AsyncSQLMigration.revert(on:)`.
public func revert(on database: any SQLDatabase) async throws {
try await database.drop(table: JobModel.schema).run()
try await database.drop(table: self.jobsTable).run()
switch database.dialect.enumSyntax {
case .typeName:
try await database.drop(enum: "\(JobModel.schema)_storedjobstatus").run()
try await database.drop(enum: "\(self.jobsTableString)_storedjobstatus").run()
default:
break
}
Expand Down
Loading
Loading