Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/Jobs/JobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ extension JobQueueProtocol {
execute: @escaping @Sendable (Parameters, JobContext) async throws -> Void
) where Parameters: JobParameters {
self.logger.info("Registered Job", metadata: ["JobName": .string(Parameters.jobName)])
let job = JobDefinition<Parameters>(retryStrategy: retryStrategy ?? self.options.retryStrategy, execute: execute)
let job = JobDefinition<Parameters>(retryStrategy: retryStrategy ?? self.options.jobRetryStrategy, execute: execute)
self.registerJob(job)
}

Expand Down
47 changes: 47 additions & 0 deletions Sources/Jobs/JobQueueDriverError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// Job Queue Driver error type
///
/// Error returned by job queue driver.
public struct JobQueueDriverError: Error {
public struct ErrorCode: Equatable, Sendable, CustomStringConvertible {
/// failed to connect to underlying driver
public static var connectionError: Self { .init(.connectionError) }

private enum _ErrorCode: String {
case connectionError
}

private let code: _ErrorCode

private init(_ code: _ErrorCode) {
self.code = code
}

public var description: String {
self.code.rawValue
}

Check warning on line 35 in Sources/Jobs/JobQueueDriverError.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Jobs/JobQueueDriverError.swift#L33-L35

Added lines #L33 - L35 were not covered by tests
}

/// Error code
public let code: ErrorCode
/// underlying error
public let underlyingError: Error

public init(_ code: ErrorCode, underlyingError: Error) {
self.code = code
self.underlyingError = underlyingError
}
}
4 changes: 2 additions & 2 deletions Sources/Jobs/JobQueueError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2024 the Hummingbird authors
// Copyright (c) 2021-2025 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

/// Job Queue Error type
public struct JobQueueError: Error, Equatable {
public struct JobQueueError: Error {
public struct ErrorCode: Equatable, Sendable, CustomStringConvertible {
/// failed to decode job. Possibly because it hasn't been registered or data that was expected
/// is not available
Expand Down
73 changes: 59 additions & 14 deletions Sources/Jobs/JobQueueHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,25 @@
try await withThrowingTaskGroup(of: Void.self) { group in
var iterator = self.queue.makeAsyncIterator()
for _ in 0..<self.numWorkers {
if let jobResult = try await iterator.next() {
if let jobResult = try await withExponentialBackoff(
"Pop next job",
logger: self.logger,
operation: { try await iterator.next() }
) {
group.addTask {
try await self.processJobResult(jobResult)
}
}
}
while true {
try await group.next()
guard let jobResult = try await iterator.next() else { break }
guard
let jobResult = try await withExponentialBackoff(
"Pop next job",
logger: self.logger,
operation: { try await iterator.next() }
)
else { break }
group.addTask {
try await self.processJobResult(jobResult)
}
Expand All @@ -54,6 +64,32 @@
}
}

/// Run operation and retry with exponential backoff if it fails
func withExponentialBackoff<Value: Sendable>(
_ message: @autoclosure () -> String,
logger: Logger,
operation: () async throws -> Value
) async throws -> Value {
var attempt = 0
while true {
do {
return try await operation()
} catch let error as JobQueueDriverError where error.code == .connectionError {
logger.debug("\(message()) failed")
if self.options.driverRetryStrategy.shouldRetry(attempt: attempt, error: error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call still be made since the default maxAttempt is set to the maximum int value? We can have a maximum of two states here where a job was popped off a queue and we loose connection to the driver and will retry until connected or the job lost connection while polling.

For the first case, I am wondering if we should have a background running that finds jobs with states 'processing' that do not exist in a queue? Or should we by default move jobs with such state to their specific queue?

Copy link
Member Author

@adam-fowler adam-fowler Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we hit the retry limit the error is propagated further up and the job queue handler exits and we'll have to restart the queue process to continue processing jobs. The default is set to .max as the alternative is exiting the process.

If the default is set to a lower number and we exit the handler then the cleanup at start can fixup any jobs left in the processing state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we hit the retry limit the error is propagated further up and the job queue handler exits and we'll have to restart the queue process to continue processing jobs. The default is set to .max as the alternative is exiting the process.

If the default is set to a lower number and we exit the handler then the cleanup at start can fixup any jobs left in the processing state.

By default all the drivers are setup to do nothing on boot. I think this should be documented.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of documentation to add. We have made a lot of changes since the last release

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of documentation to add. We have made a lot of changes since the last release

Indeed! I will help with documents too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I forgot to mention this earlier. How will this work with the Postgres driver? PostgresNIO seems to keep on retrying after a connection lost. I am that familiar with the Redis driver, I suppose it'll be same since the connection pool logic seems very similar between the two?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah PostgresNIO will retry connections ad-infinitum. So in theory it isn't an issue when using the Postgres driver.

Redis is different in that it will eventually throw an error and has different errors for when an open connection was closed and when a connection couldn't be made.

Without this change the error would be propagated up and end the job queue handler and eventually the application.

We could move the retry to the drivers instead. I'm already asking the drivers to recognise connection errors.

let wait = self.options.driverRetryStrategy.calculateBackoff(attempt: attempt)
try await cancelWhenGracefulShutdown {
try await Task.sleep(for: .seconds(wait))
}
attempt += 1
} else {
throw error

Check warning on line 86 in Sources/Jobs/JobQueueHandler.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Jobs/JobQueueHandler.swift#L86

Added line #L86 was not covered by tests
}
}
logger.debug("Retrying \(message())")
}
}

/// Process job result from queue
func processJobResult(_ jobResult: JobQueueResult<Queue.JobID>) async throws {
var logger = self.logger
Expand All @@ -80,7 +116,9 @@
default:
logger.debug("Job failed to decode")
}
try await self.queue.failed(jobID: jobResult.id, error: error)
try await withExponentialBackoff("Tag job as failed", logger: logger) {
try await self.queue.failed(jobID: jobResult.id, error: error)
}
await self.middleware.onPopJob(result: .failure(error), jobInstanceID: jobResult.id.description)
}
}
Expand All @@ -106,28 +144,33 @@
// as soon as we create it so can guarantee the Task is done when we leave the
// function.
try await Task {
try await self.queue.failed(jobID: jobID, error: error)
try await withExponentialBackoff("Tag job as failed", logger: logger) {
try await self.queue.failed(jobID: jobID, error: error)
}
}.value
return
} catch {
if !job.shouldRetry(error: error) {
logger.debug("Job: failed")
try await self.queue.failed(jobID: jobID, error: error)
try await withExponentialBackoff("Tag job as failed", logger: logger) {
try await self.queue.failed(jobID: jobID, error: error)
}
return
}

let attempts = (job.attempts ?? 0) + 1
let attempts = (job.attempts ?? 0)
let delay = job.retryStrategy.calculateBackoff(attempt: attempts)
let delayUntil = Date.now.addingTimeInterval(delay)

/// retry the current job
try await self.queue.retry(
jobID,
job: job,
attempts: attempts,
options: .init(delayUntil: delayUntil)
)

try await withExponentialBackoff("Retry Job", logger: logger) {
try await self.queue.retry(
jobID,
job: job,
attempts: attempts + 1,
options: .init(delayUntil: delayUntil)
)
}
logger.debug(
"Retrying Job",
metadata: [
Expand All @@ -140,7 +183,9 @@
return
}
logger.debug("Finished Job")
try await self.queue.finished(jobID: jobID)
try await withExponentialBackoff("Finish Job", logger: logger) {
try await self.queue.finished(jobID: jobID)
}
} catch {
logger.debug("Failed to set job status")
}
Expand Down
11 changes: 8 additions & 3 deletions Sources/Jobs/JobQueueOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ import struct Foundation.TimeInterval

/// JobQueueOptions
public struct JobQueueOptions: Sendable {
let retryStrategy: JobRetryStrategy
let jobRetryStrategy: JobRetryStrategy
let driverRetryStrategy: JobRetryStrategy

/// Initialize a JobQueueOptions
public init(defaultRetryStrategy: any JobRetryStrategy = .exponentialJitter()) {
self.retryStrategy = defaultRetryStrategy
public init(
jobRetryStrategy: any JobRetryStrategy = .exponentialJitter(),
driverRetryStrategy: any JobRetryStrategy = .exponentialJitter(maxAttempts: .max, maxBackoff: 120)
) {
self.jobRetryStrategy = jobRetryStrategy
self.driverRetryStrategy = driverRetryStrategy
}
}
95 changes: 95 additions & 0 deletions Tests/JobsTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Atomics
import Jobs
import Logging
import NIOConcurrencyHelpers
import NIOCore
import ServiceLifecycle
import XCTest

Expand Down Expand Up @@ -381,4 +382,98 @@ final class JobsTests: XCTestCase {
}
}
}

func testFailedJobQueueDriver() async throws {
struct DriverFailed: Error {}
struct TestJobQueueDriver: JobQueueDriver {
// amount of times driver fails before being successful
let failCount: Int

typealias JobID = MemoryQueue.JobID
typealias Element = MemoryQueue.Element

init(failCount: Int) {
self.memory = .init()
self.failCount = failCount
}

func registerJob<Parameters>(_ job: Jobs.JobDefinition<Parameters>) where Parameters: Jobs.JobParameters {
self.memory.registerJob(job)
}

func push<Parameters>(_ jobRequest: Jobs.JobRequest<Parameters>, options: Jobs.JobOptions) async throws -> JobID
where Parameters: Jobs.JobParameters {
try await self.memory.push(jobRequest, options: options)
}

func retry<Parameters>(_ id: JobID, jobRequest: Jobs.JobRequest<Parameters>, options: Jobs.JobOptions) async throws
where Parameters: Jobs.JobParameters {
try await self.memory.retry(id, jobRequest: jobRequest, options: options)
}

func finished(jobID: JobID) async throws {
try await self.memory.finished(jobID: jobID)
}

func failed(jobID: JobID, error: any Error) async throws {
try await self.memory.failed(jobID: jobID, error: error)
}

func stop() async {
await self.memory.stop()
}

func shutdownGracefully() async {
await self.memory.shutdownGracefully()
}

func getMetadata(_ key: String) async throws -> ByteBuffer? {
await self.memory.getMetadata(key)
}

func setMetadata(key: String, value: ByteBuffer) async throws {
await self.memory.setMetadata(key: key, value: value)
}

struct AsyncIterator: AsyncIteratorProtocol {
var memory: MemoryQueue.AsyncIterator
var failCount: Int

mutating func next() async throws -> Element? {
if failCount > 0 {
failCount -= 1
throw JobQueueDriverError(.connectionError, underlyingError: DriverFailed())
}
return try await self.memory.next()
}
}

func makeAsyncIterator() -> AsyncIterator {
.init(memory: memory.makeAsyncIterator(), failCount: self.failCount)
}

let memory: MemoryQueue
}

struct TestJobParameters: JobParameters {
static let jobName: String = "TestJobParameters"
}
let expectation = XCTestExpectation(description: "TestJob.execute was called")
var logger = Logger(label: "JobsTests")
logger.logLevel = .debug
let jobQueue = JobQueue(
TestJobQueueDriver(failCount: 3),
numWorkers: 3,
logger: logger,
options: .init(driverRetryStrategy: .exponentialJitter(maxAttempts: .max, maxBackoff: 0.1, maxJitter: 0.01))
)
jobQueue.registerJob(parameters: TestJobParameters.self) { parameters, _ in
expectation.fulfill()
}
try await testJobQueue(jobQueue) {
try await jobQueue.push(TestJobParameters())

await fulfillment(of: [expectation], timeout: 5)
}
}
}
Loading