From ac1336ce7e7f36232ddeb1ab47304fca19ec734e Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 10 Feb 2026 08:06:21 +0000 Subject: [PATCH 1/4] Store failed jobs and add cleanup Also store paused jobs separately, and reset stopping flag at start up --- Sources/Jobs/JobQueueProcessorOptions.swift | 2 +- Sources/Jobs/MemoryJobQueue.swift | 118 +++++++++++++++++--- Tests/JobsTests/JobsTests.swift | 26 +++++ 3 files changed, 129 insertions(+), 17 deletions(-) diff --git a/Sources/Jobs/JobQueueProcessorOptions.swift b/Sources/Jobs/JobQueueProcessorOptions.swift index de2557a..679580d 100644 --- a/Sources/Jobs/JobQueueProcessorOptions.swift +++ b/Sources/Jobs/JobQueueProcessorOptions.swift @@ -49,7 +49,7 @@ public struct JobQueueProcessorOptions: Sendable { /// to indicate if a worker is running. If another process can acquire the lock then the worker must have /// stopped. public init( - numWorkers: Int = 1, + numWorkers: Int = 16, gracefulShutdownTimeout: Duration = .seconds(30), workerActiveLock: ExclusiveLock = .acquire(every: .milliseconds(300000), for: .milliseconds(450000)) ) { diff --git a/Sources/Jobs/MemoryJobQueue.swift b/Sources/Jobs/MemoryJobQueue.swift index 0de8bbb..efbb2f3 100644 --- a/Sources/Jobs/MemoryJobQueue.swift +++ b/Sources/Jobs/MemoryJobQueue.swift @@ -49,6 +49,10 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo self.context = JobQueueContext(workerID: UUID().uuidString, queueName: queueName, metadata: [:]) } + public func waitUntilReady() async throws { + await self.queue.start() + } + /// Stop queue serving more jobs public func stop() async { await self.queue.stop() @@ -88,11 +92,11 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo } public func finished(jobID: JobID) async throws { - await self.queue.clearPendingJob(jobID: jobID) + await self.queue.clearProcessingJob(jobID: jobID) } public func failed(jobID: JobID, error: any Error) async throws { - if await self.queue.clearAndReturnPendingJob(jobID: jobID) != nil { + if await self.queue.failJob(jobID: jobID) { self.onFailedJob(jobID, error) } } @@ -116,14 +120,18 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo let jobBuffer: ByteBuffer } var queue: Deque<(job: QueuedJob, options: JobOptions)> - var pendingJobs: [JobID: ByteBuffer] + var processingJobs: [JobID: ByteBuffer] + var pausedJobs: [JobID: ByteBuffer] + var failedJobs: [JobID: ByteBuffer] var metadata: [String: (data: ByteBuffer, expires: Date)] var isStopped: Bool init() { self.queue = .init() self.isStopped = false - self.pendingJobs = .init() + self.processingJobs = .init() + self.pausedJobs = .init() + self.failedJobs = .init() self.metadata = .init() } @@ -134,12 +142,12 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo } func retry(_ id: JobID, buffer: ByteBuffer, options: JobOptions) throws { - self.clearPendingJob(jobID: id) + self.clearProcessingJob(jobID: id) let _ = self.queue.append((job: QueuedJob(id: id, jobBuffer: buffer), options: options)) } - func clearPendingJob(jobID: JobID) { - self.pendingJobs[jobID] = nil + func clearProcessingJob(jobID: JobID) { + self.processingJobs[jobID] = nil } func cancelJob(jobID: JobID) { @@ -148,23 +156,25 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo func pauseJob(jobID: JobID) { let job = self.queue.first(where: { $0.job.id == jobID }) - self.pendingJobs[jobID] = job?.job.jobBuffer + self.pausedJobs[jobID] = job?.job.jobBuffer self.queue.removeAll(where: { $0.job.id == jobID }) } func resumeJob(jobID: JobID) { - if let jobBuffer = self.pendingJobs[jobID] { + if let jobBuffer = self.pausedJobs[jobID] { + self.pausedJobs[jobID] = nil self.queue.append((job: QueuedJob(id: jobID, jobBuffer: jobBuffer), options: .init())) } else { print("Warning: attempted to resume job \(jobID) which is not pending") } - self.clearPendingJob(jobID: jobID) } - func clearAndReturnPendingJob(jobID: JobID) -> ByteBuffer? { - let instance = self.pendingJobs[jobID] - self.pendingJobs[jobID] = nil - return instance + func failJob(jobID: JobID) -> Bool { + let instance = self.processingJobs[jobID] + self.clearProcessingJob(jobID: jobID) + self.processingJobs[jobID] = nil + self.failedJobs[jobID] = instance + return instance != nil } func next() async throws -> QueuedJob? { @@ -181,7 +191,7 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo continue } } else { - self.pendingJobs[request.job.id] = request.job.jobBuffer + self.processingJobs[request.job.id] = request.job.jobBuffer return request.job } } @@ -193,8 +203,12 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo self.isStopped = true } + func start() { + self.isStopped = false + } + func shutdown() { - assert(self.pendingJobs.count == 0) + assert(self.processingJobs.count == 0) self.isStopped = true } @@ -269,6 +283,78 @@ extension MemoryQueue { } } +extension MemoryQueue { + /// how to cleanup a job + public struct JobCleanup: Sendable, Codable { + enum RawValue: Codable { + case doNothing + case rerun + case remove + } + let rawValue: RawValue + + /// Do nothing to jobs + public static var doNothing: Self { .init(rawValue: .doNothing) } + /// Add jobs back onto the pending queue + public static var rerun: Self { .init(rawValue: .rerun) } + /// Delete jobs + public static var remove: Self { .init(rawValue: .remove) } + } + + /// Cleanup job queues + /// + /// This function is used to re-run or delete jobs in a certain state. Failed, completed, + /// cancelled and paused jobs can be pushed back into the pending queue to be re-run or removed. + /// When called at startup in theory no job should be set to processing, or set to pending but + /// not in the queue. but if your job server crashes these states are possible, so we also provide + /// options to re-queue these jobs so they are run again. + /// + /// You can call `cleanup` with `failedJobs`, `completedJobs`, `cancelledJobs` or `pausedJobs` set + /// to whatever you like at any point to re-queue failed jobs. Moving processing or pending jobs + /// should only be done if you are certain there is nothing processing the job queue. + /// + /// - Parameters: + /// - pendingJobs: What to do with jobs tagged as pending + /// - processingJobs: What to do with jobs tagged as processing + /// - completedJobs: What to do with jobs tagged as completed + /// - failedJobs: What to do with jobs tagged as failed + /// - cancelledJobs: What to do with jobs tagged as cancelled + /// - pausedJobs: What to do with jobs tagged as cancelled + /// - Throws: + public func cleanup( + processingJobs: JobCleanup = .doNothing, + failedJobs: JobCleanup = .doNothing, + pausedJobs: MemoryQueue.JobCleanup + ) async throws { + await self.queue.cleanup(processingJobs: processingJobs, failedJobs: failedJobs, pausedJobs: pausedJobs) + } +} + +extension MemoryQueue.Internal { + func cleanup( + processingJobs: MemoryQueue.JobCleanup, + failedJobs: MemoryQueue.JobCleanup, + pausedJobs: MemoryQueue.JobCleanup + ) { + cleanupQueue(processingJobs, queue: &self.processingJobs) + cleanupQueue(failedJobs, queue: &self.failedJobs) + cleanupQueue(pausedJobs, queue: &self.pausedJobs) + } + + func cleanupQueue(_ cleanup: MemoryQueue.JobCleanup, queue: inout [MemoryQueue.JobID: ByteBuffer]) { + switch cleanup.rawValue { + case .remove: + queue = [:] + case .rerun: + for job in queue { + self.queue.append((QueuedJob(id: job.key, jobBuffer: job.value), .init())) + } + case .doNothing: + break + } + } +} + extension JobQueueDriver where Self == MemoryQueue { /// Return In memory driver for Job Queue /// - Parameters: diff --git a/Tests/JobsTests/JobsTests.swift b/Tests/JobsTests/JobsTests.swift index b214135..057ce7a 100644 --- a/Tests/JobsTests/JobsTests.swift +++ b/Tests/JobsTests/JobsTests.swift @@ -106,6 +106,32 @@ struct JobsTests { } } + @Test func testRunTwice() async throws { + struct TestParameters: JobParameters { + static let jobName = "testBasic" + let value: Int + } + let logger = Logger(label: "JobsTests") + let expectation = TestExpectation() + let jobQueue = JobQueue(.memory, logger: logger) + let job = JobDefinition { (parameters: TestParameters, context) in + context.logger.info("Parameters=\(parameters.value)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.trigger() + } + jobQueue.registerJob(job) + try await testJobQueue(JobQueueProcessor(queue: jobQueue, logger: logger)) { + try await jobQueue.push(TestParameters(value: 1)) + + try await expectation.wait(count: 1) + } + try await testJobQueue(JobQueueProcessor(queue: jobQueue, logger: logger)) { + try await jobQueue.push(TestParameters(value: 1)) + + try await expectation.wait(count: 1) + } + } + @Test func testErrorRetryAndThenSucceed() async throws { struct TestParameters: JobParameters { static let jobName = "testErrorRetryAndThenSucceed" From f1c8ebd2d359845f307ed3022d15b99e21201f72 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 10 Feb 2026 09:36:37 +0000 Subject: [PATCH 2/4] Default argument for MemoryQueue.cleanup --- Sources/Jobs/MemoryJobQueue.swift | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/Sources/Jobs/MemoryJobQueue.swift b/Sources/Jobs/MemoryJobQueue.swift index efbb2f3..5343fa1 100644 --- a/Sources/Jobs/MemoryJobQueue.swift +++ b/Sources/Jobs/MemoryJobQueue.swift @@ -303,28 +303,22 @@ extension MemoryQueue { /// Cleanup job queues /// - /// This function is used to re-run or delete jobs in a certain state. Failed, completed, - /// cancelled and paused jobs can be pushed back into the pending queue to be re-run or removed. - /// When called at startup in theory no job should be set to processing, or set to pending but - /// not in the queue. but if your job server crashes these states are possible, so we also provide - /// options to re-queue these jobs so they are run again. + /// This function is used to re-run or delete jobs in a certain state. Failed, paused and processing jobs + /// can be pushed back into the pending queue to be re-run or removed. /// - /// You can call `cleanup` with `failedJobs`, `completedJobs`, `cancelledJobs` or `pausedJobs` set - /// to whatever you like at any point to re-queue failed jobs. Moving processing or pending jobs - /// should only be done if you are certain there is nothing processing the job queue. + /// You can call `cleanup` with `failedJobs`, `pausedJobs` set to whatever you like at any point to re-queue + /// jobs. Moving processing jobs should only be done if you are certain there is nothing processing the job + /// queue. /// /// - Parameters: - /// - pendingJobs: What to do with jobs tagged as pending /// - processingJobs: What to do with jobs tagged as processing - /// - completedJobs: What to do with jobs tagged as completed /// - failedJobs: What to do with jobs tagged as failed - /// - cancelledJobs: What to do with jobs tagged as cancelled /// - pausedJobs: What to do with jobs tagged as cancelled /// - Throws: public func cleanup( processingJobs: JobCleanup = .doNothing, failedJobs: JobCleanup = .doNothing, - pausedJobs: MemoryQueue.JobCleanup + pausedJobs: MemoryQueue.JobCleanup = .doNothing ) async throws { await self.queue.cleanup(processingJobs: processingJobs, failedJobs: failedJobs, pausedJobs: pausedJobs) } From 74730230b2598cc69576df4a81ba6c2f957b920d Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 10 Feb 2026 12:23:31 +0000 Subject: [PATCH 3/4] Fix metrics test --- Sources/Jobs/MemoryJobQueue.swift | 1 + Tests/JobsTests/MetricsTests.swift | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/Jobs/MemoryJobQueue.swift b/Sources/Jobs/MemoryJobQueue.swift index 5343fa1..99ebf64 100644 --- a/Sources/Jobs/MemoryJobQueue.swift +++ b/Sources/Jobs/MemoryJobQueue.swift @@ -343,6 +343,7 @@ extension MemoryQueue.Internal { for job in queue { self.queue.append((QueuedJob(id: job.key, jobBuffer: job.value), .init())) } + queue = [:] case .doNothing: break } diff --git a/Tests/JobsTests/MetricsTests.swift b/Tests/JobsTests/MetricsTests.swift index fc80070..93edb49 100644 --- a/Tests/JobsTests/MetricsTests.swift +++ b/Tests/JobsTests/MetricsTests.swift @@ -71,7 +71,7 @@ struct MetricsTests { try await jobQueue.push(TestParameters(value: 3)) try await jobQueue.push(TestParameters(value: 4)) try await jobQueue.push(TestParameters(value: 5)) - try await testJobQueue(jobQueue.processor()) { + try await testJobQueue(jobQueue.processor(options: .init(numWorkers: 1))) { try await expectation.wait(count: 5) } From c2c38f1753555be4c463b736a97185c56a5c8a70 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 11 Feb 2026 19:50:42 +0000 Subject: [PATCH 4/4] Fix flakey test --- Tests/JobsTests/JobsTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/JobsTests/JobsTests.swift b/Tests/JobsTests/JobsTests.swift index 057ce7a..6191548 100644 --- a/Tests/JobsTests/JobsTests.swift +++ b/Tests/JobsTests/JobsTests.swift @@ -582,7 +582,7 @@ struct JobsTests { try await withThrowingTaskGroup(of: Void.self) { group in let serviceGroup = ServiceGroup( configuration: .init( - services: [jobQueue.processor()], + services: [jobQueue.processor(options: .init(numWorkers: 1))], gracefulShutdownSignals: [.sigterm, .sigint], logger: Logger(label: "JobQueueService") )