From ae478766f1063fa92a6bf6db892326c023cb73e6 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 5 Mar 2025 10:57:02 +0000 Subject: [PATCH 1/4] Add JobQueueDriverError --- Sources/Jobs/JobQueueDriverError.swift | 47 ++++++++++++++++++++++++++ Sources/Jobs/JobQueueError.swift | 4 +-- 2 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 Sources/Jobs/JobQueueDriverError.swift diff --git a/Sources/Jobs/JobQueueDriverError.swift b/Sources/Jobs/JobQueueDriverError.swift new file mode 100644 index 0000000..4401a36 --- /dev/null +++ b/Sources/Jobs/JobQueueDriverError.swift @@ -0,0 +1,47 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Job Queue Driver error type +/// +/// Error returned by job queue driver. +public struct JobQueueDriverError: Error { + public struct ErrorCode: Equatable, Sendable, CustomStringConvertible { + /// failed to connect to underlying driver + public static var connectionFailed: Self { .init(.connectionError) } + + private enum _ErrorCode: String { + case connectionError + } + + private let code: _ErrorCode + + private init(_ code: _ErrorCode) { + self.code = code + } + + public var description: String { + self.code.rawValue + } + } + + /// Error code + public let code: ErrorCode + /// underlying error + public let underlyingError: Error + + public init(code: ErrorCode, underlyingError: Error) { + self.code = code + self.underlyingError = underlyingError + } +} diff --git a/Sources/Jobs/JobQueueError.swift b/Sources/Jobs/JobQueueError.swift index 8562bf0..14bdef5 100644 --- a/Sources/Jobs/JobQueueError.swift +++ b/Sources/Jobs/JobQueueError.swift @@ -2,7 +2,7 @@ // // This source file is part of the Hummingbird server framework project // -// Copyright (c) 2021-2024 the Hummingbird authors +// Copyright (c) 2021-2025 the Hummingbird authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// /// Job Queue Error type -public struct JobQueueError: Error, Equatable { +public struct JobQueueError: Error { public struct ErrorCode: Equatable, Sendable, CustomStringConvertible { /// failed to decode job. Possibly because it hasn't been registered or data that was expected /// is not available From f0143775283fd95ee8d03d9c8f5f5265ef6c448c Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 5 Mar 2025 12:03:40 +0000 Subject: [PATCH 2/4] Retry driver operations if they throw errors instead of just dying --- Sources/Jobs/JobQueue.swift | 2 +- Sources/Jobs/JobQueueDriverError.swift | 47 ----------------- Sources/Jobs/JobQueueHandler.swift | 73 +++++++++++++++++++++----- Sources/Jobs/JobQueueOptions.swift | 11 ++-- 4 files changed, 68 insertions(+), 65 deletions(-) delete mode 100644 Sources/Jobs/JobQueueDriverError.swift diff --git a/Sources/Jobs/JobQueue.swift b/Sources/Jobs/JobQueue.swift index 85d0186..59c345d 100644 --- a/Sources/Jobs/JobQueue.swift +++ b/Sources/Jobs/JobQueue.swift @@ -55,7 +55,7 @@ extension JobQueueProtocol { execute: @escaping @Sendable (Parameters, JobContext) async throws -> Void ) where Parameters: JobParameters { self.logger.info("Registered Job", metadata: ["JobName": .string(Parameters.jobName)]) - let job = JobDefinition(retryStrategy: retryStrategy ?? self.options.retryStrategy, execute: execute) + let job = JobDefinition(retryStrategy: retryStrategy ?? self.options.jobRetryStrategy, execute: execute) self.registerJob(job) } diff --git a/Sources/Jobs/JobQueueDriverError.swift b/Sources/Jobs/JobQueueDriverError.swift deleted file mode 100644 index 4401a36..0000000 --- a/Sources/Jobs/JobQueueDriverError.swift +++ /dev/null @@ -1,47 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2025 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -/// Job Queue Driver error type -/// -/// Error returned by job queue driver. -public struct JobQueueDriverError: Error { - public struct ErrorCode: Equatable, Sendable, CustomStringConvertible { - /// failed to connect to underlying driver - public static var connectionFailed: Self { .init(.connectionError) } - - private enum _ErrorCode: String { - case connectionError - } - - private let code: _ErrorCode - - private init(_ code: _ErrorCode) { - self.code = code - } - - public var description: String { - self.code.rawValue - } - } - - /// Error code - public let code: ErrorCode - /// underlying error - public let underlyingError: Error - - public init(code: ErrorCode, underlyingError: Error) { - self.code = code - self.underlyingError = underlyingError - } -} diff --git a/Sources/Jobs/JobQueueHandler.swift b/Sources/Jobs/JobQueueHandler.swift index a1b3942..524e152 100644 --- a/Sources/Jobs/JobQueueHandler.swift +++ b/Sources/Jobs/JobQueueHandler.swift @@ -32,7 +32,11 @@ final class JobQueueHandler: Sendable { try await withThrowingTaskGroup(of: Void.self) { group in var iterator = self.queue.makeAsyncIterator() for _ in 0..: Sendable { } while true { try await group.next() - guard let jobResult = try await iterator.next() else { break } + guard + let jobResult = try await withExponentialBackoff( + "Pop next job", + logger: self.logger, + operation: { try await iterator.next() } + ) + else { break } group.addTask { try await self.processJobResult(jobResult) } @@ -54,6 +64,32 @@ final class JobQueueHandler: Sendable { } } + /// Run operation and retry with exponential backoff if it fails + func withExponentialBackoff( + _ message: @autoclosure () -> String, + logger: Logger, + operation: () async throws -> Value + ) async throws -> Value { + var attempt = 0 + while true { + do { + return try await operation() + } catch { + logger.debug("\(message()) failed") + if self.options.driverRetryStrategy.shouldRetry(attempt: attempt, error: error) { + let wait = self.options.driverRetryStrategy.calculateBackoff(attempt: attempt) + try await cancelWhenGracefulShutdown { + try await Task.sleep(for: .seconds(wait)) + } + attempt += 1 + } else { + throw error + } + } + logger.debug("Retrying \(message())") + } + } + /// Process job result from queue func processJobResult(_ jobResult: JobQueueResult) async throws { var logger = self.logger @@ -80,7 +116,9 @@ final class JobQueueHandler: Sendable { default: logger.debug("Job failed to decode") } - try await self.queue.failed(jobID: jobResult.id, error: error) + try await withExponentialBackoff("Tag job as failed", logger: logger) { + try await self.queue.failed(jobID: jobResult.id, error: error) + } await self.middleware.onPopJob(result: .failure(error), jobInstanceID: jobResult.id.description) } } @@ -106,28 +144,33 @@ final class JobQueueHandler: Sendable { // as soon as we create it so can guarantee the Task is done when we leave the // function. try await Task { - try await self.queue.failed(jobID: jobID, error: error) + try await withExponentialBackoff("Tag job as failed", logger: logger) { + try await self.queue.failed(jobID: jobID, error: error) + } }.value return } catch { if !job.shouldRetry(error: error) { logger.debug("Job: failed") - try await self.queue.failed(jobID: jobID, error: error) + try await withExponentialBackoff("Tag job as failed", logger: logger) { + try await self.queue.failed(jobID: jobID, error: error) + } return } - let attempts = (job.attempts ?? 0) + 1 + let attempts = (job.attempts ?? 0) let delay = job.retryStrategy.calculateBackoff(attempt: attempts) let delayUntil = Date.now.addingTimeInterval(delay) /// retry the current job - try await self.queue.retry( - jobID, - job: job, - attempts: attempts, - options: .init(delayUntil: delayUntil) - ) - + try await withExponentialBackoff("Retry Job", logger: logger) { + try await self.queue.retry( + jobID, + job: job, + attempts: attempts + 1, + options: .init(delayUntil: delayUntil) + ) + } logger.debug( "Retrying Job", metadata: [ @@ -140,7 +183,9 @@ final class JobQueueHandler: Sendable { return } logger.debug("Finished Job") - try await self.queue.finished(jobID: jobID) + try await withExponentialBackoff("Finish Job", logger: logger) { + try await self.queue.finished(jobID: jobID) + } } catch { logger.debug("Failed to set job status") } diff --git a/Sources/Jobs/JobQueueOptions.swift b/Sources/Jobs/JobQueueOptions.swift index 075170b..db6d8ea 100644 --- a/Sources/Jobs/JobQueueOptions.swift +++ b/Sources/Jobs/JobQueueOptions.swift @@ -16,10 +16,15 @@ import struct Foundation.TimeInterval /// JobQueueOptions public struct JobQueueOptions: Sendable { - let retryStrategy: JobRetryStrategy + let jobRetryStrategy: JobRetryStrategy + let driverRetryStrategy: JobRetryStrategy /// Initialize a JobQueueOptions - public init(defaultRetryStrategy: any JobRetryStrategy = .exponentialJitter()) { - self.retryStrategy = defaultRetryStrategy + public init( + jobRetryStrategy: any JobRetryStrategy = .exponentialJitter(), + driverRetryStrategy: any JobRetryStrategy = .exponentialJitter(maxAttempts: .max, maxBackoff: 120) + ) { + self.jobRetryStrategy = jobRetryStrategy + self.driverRetryStrategy = driverRetryStrategy } } From 6f93e5c2de6fc9d13de2f36d97c38079fd898dd7 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 5 Mar 2025 12:30:20 +0000 Subject: [PATCH 3/4] Add test with driver throwing an error --- Tests/JobsTests/JobsTests.swift | 95 +++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/Tests/JobsTests/JobsTests.swift b/Tests/JobsTests/JobsTests.swift index baf3628..55b6896 100644 --- a/Tests/JobsTests/JobsTests.swift +++ b/Tests/JobsTests/JobsTests.swift @@ -16,6 +16,7 @@ import Atomics import Jobs import Logging import NIOConcurrencyHelpers +import NIOCore import ServiceLifecycle import XCTest @@ -381,4 +382,98 @@ final class JobsTests: XCTestCase { } } } + + func testFailedJobQueueDriver() async throws { + struct DriverFailed: Error {} + struct TestJobQueueDriver: JobQueueDriver { + // amount of times driver fails before being successful + let failCount: Int + + typealias JobID = MemoryQueue.JobID + typealias Element = MemoryQueue.Element + + init(failCount: Int) { + self.memory = .init() + self.failCount = failCount + } + + func registerJob(_ job: Jobs.JobDefinition) where Parameters: Jobs.JobParameters { + self.memory.registerJob(job) + } + + func push(_ jobRequest: Jobs.JobRequest, options: Jobs.JobOptions) async throws -> JobID + where Parameters: Jobs.JobParameters { + try await self.memory.push(jobRequest, options: options) + } + + func retry(_ id: JobID, jobRequest: Jobs.JobRequest, options: Jobs.JobOptions) async throws + where Parameters: Jobs.JobParameters { + try await self.memory.retry(id, jobRequest: jobRequest, options: options) + } + + func finished(jobID: JobID) async throws { + try await self.memory.finished(jobID: jobID) + } + + func failed(jobID: JobID, error: any Error) async throws { + try await self.memory.failed(jobID: jobID, error: error) + } + + func stop() async { + await self.memory.stop() + } + + func shutdownGracefully() async { + await self.memory.shutdownGracefully() + } + + func getMetadata(_ key: String) async throws -> ByteBuffer? { + await self.memory.getMetadata(key) + } + + func setMetadata(key: String, value: ByteBuffer) async throws { + await self.memory.setMetadata(key: key, value: value) + } + + struct AsyncIterator: AsyncIteratorProtocol { + var memory: MemoryQueue.AsyncIterator + var failCount: Int + + mutating func next() async throws -> Element? { + if failCount > 0 { + failCount -= 1 + throw DriverFailed() + } + return try await self.memory.next() + } + } + + func makeAsyncIterator() -> AsyncIterator { + .init(memory: memory.makeAsyncIterator(), failCount: self.failCount) + } + + let memory: MemoryQueue + } + + struct TestJobParameters: JobParameters { + static let jobName: String = "TestJobParameters" + } + let expectation = XCTestExpectation(description: "TestJob.execute was called") + var logger = Logger(label: "JobsTests") + logger.logLevel = .debug + let jobQueue = JobQueue( + TestJobQueueDriver(failCount: 3), + numWorkers: 3, + logger: logger, + options: .init(driverRetryStrategy: .exponentialJitter(maxAttempts: .max, maxBackoff: 0.1, maxJitter: 0.01)) + ) + jobQueue.registerJob(parameters: TestJobParameters.self) { parameters, _ in + expectation.fulfill() + } + try await testJobQueue(jobQueue) { + try await jobQueue.push(TestJobParameters()) + + await fulfillment(of: [expectation], timeout: 5) + } + } } From 3383a0f73bddf01ee350153465d26f6d5de76b2d Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 5 Mar 2025 16:22:01 +0000 Subject: [PATCH 4/4] Only retry on receiving a connection error --- Sources/Jobs/JobQueueDriverError.swift | 47 ++++++++++++++++++++++++++ Sources/Jobs/JobQueueHandler.swift | 2 +- Tests/JobsTests/JobsTests.swift | 2 +- 3 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 Sources/Jobs/JobQueueDriverError.swift diff --git a/Sources/Jobs/JobQueueDriverError.swift b/Sources/Jobs/JobQueueDriverError.swift new file mode 100644 index 0000000..4bd3a54 --- /dev/null +++ b/Sources/Jobs/JobQueueDriverError.swift @@ -0,0 +1,47 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2025 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Job Queue Driver error type +/// +/// Error returned by job queue driver. +public struct JobQueueDriverError: Error { + public struct ErrorCode: Equatable, Sendable, CustomStringConvertible { + /// failed to connect to underlying driver + public static var connectionError: Self { .init(.connectionError) } + + private enum _ErrorCode: String { + case connectionError + } + + private let code: _ErrorCode + + private init(_ code: _ErrorCode) { + self.code = code + } + + public var description: String { + self.code.rawValue + } + } + + /// Error code + public let code: ErrorCode + /// underlying error + public let underlyingError: Error + + public init(_ code: ErrorCode, underlyingError: Error) { + self.code = code + self.underlyingError = underlyingError + } +} diff --git a/Sources/Jobs/JobQueueHandler.swift b/Sources/Jobs/JobQueueHandler.swift index 524e152..445d014 100644 --- a/Sources/Jobs/JobQueueHandler.swift +++ b/Sources/Jobs/JobQueueHandler.swift @@ -74,7 +74,7 @@ final class JobQueueHandler: Sendable { while true { do { return try await operation() - } catch { + } catch let error as JobQueueDriverError where error.code == .connectionError { logger.debug("\(message()) failed") if self.options.driverRetryStrategy.shouldRetry(attempt: attempt, error: error) { let wait = self.options.driverRetryStrategy.calculateBackoff(attempt: attempt) diff --git a/Tests/JobsTests/JobsTests.swift b/Tests/JobsTests/JobsTests.swift index 55b6896..6306ea5 100644 --- a/Tests/JobsTests/JobsTests.swift +++ b/Tests/JobsTests/JobsTests.swift @@ -442,7 +442,7 @@ final class JobsTests: XCTestCase { mutating func next() async throws -> Element? { if failCount > 0 { failCount -= 1 - throw DriverFailed() + throw JobQueueDriverError(.connectionError, underlyingError: DriverFailed()) } return try await self.memory.next() }