Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/Jobs/JobQueueProcessorOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public struct JobQueueProcessorOptions: Sendable {
/// to indicate if a worker is running. If another process can acquire the lock then the worker must have
/// stopped.
public init(
numWorkers: Int = 1,
numWorkers: Int = 16,
gracefulShutdownTimeout: Duration = .seconds(30),
workerActiveLock: ExclusiveLock = .acquire(every: .milliseconds(300000), for: .milliseconds(450000))
) {
Expand Down
113 changes: 97 additions & 16 deletions Sources/Jobs/MemoryJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
self.context = JobQueueContext(workerID: UUID().uuidString, queueName: queueName, metadata: [:])
}

public func waitUntilReady() async throws {
await self.queue.start()
}

/// Stop queue serving more jobs
public func stop() async {
await self.queue.stop()
Expand Down Expand Up @@ -88,11 +92,11 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
}

public func finished(jobID: JobID) async throws {
await self.queue.clearPendingJob(jobID: jobID)
await self.queue.clearProcessingJob(jobID: jobID)
}

public func failed(jobID: JobID, error: any Error) async throws {
if await self.queue.clearAndReturnPendingJob(jobID: jobID) != nil {
if await self.queue.failJob(jobID: jobID) {
self.onFailedJob(jobID, error)
}
}
Expand All @@ -116,14 +120,18 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
let jobBuffer: ByteBuffer
}
var queue: Deque<(job: QueuedJob, options: JobOptions)>
var pendingJobs: [JobID: ByteBuffer]
var processingJobs: [JobID: ByteBuffer]
var pausedJobs: [JobID: ByteBuffer]
var failedJobs: [JobID: ByteBuffer]
var metadata: [String: (data: ByteBuffer, expires: Date)]
var isStopped: Bool

init() {
self.queue = .init()
self.isStopped = false
self.pendingJobs = .init()
self.processingJobs = .init()
self.pausedJobs = .init()
self.failedJobs = .init()
self.metadata = .init()
}

Expand All @@ -134,12 +142,12 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
}

func retry(_ id: JobID, buffer: ByteBuffer, options: JobOptions) throws {
self.clearPendingJob(jobID: id)
self.clearProcessingJob(jobID: id)
let _ = self.queue.append((job: QueuedJob(id: id, jobBuffer: buffer), options: options))
}

func clearPendingJob(jobID: JobID) {
self.pendingJobs[jobID] = nil
func clearProcessingJob(jobID: JobID) {
self.processingJobs[jobID] = nil
}

func cancelJob(jobID: JobID) {
Expand All @@ -148,23 +156,25 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo

func pauseJob(jobID: JobID) {
let job = self.queue.first(where: { $0.job.id == jobID })
self.pendingJobs[jobID] = job?.job.jobBuffer
self.pausedJobs[jobID] = job?.job.jobBuffer
self.queue.removeAll(where: { $0.job.id == jobID })
}

func resumeJob(jobID: JobID) {
if let jobBuffer = self.pendingJobs[jobID] {
if let jobBuffer = self.pausedJobs[jobID] {
self.pausedJobs[jobID] = nil
self.queue.append((job: QueuedJob(id: jobID, jobBuffer: jobBuffer), options: .init()))
} else {
print("Warning: attempted to resume job \(jobID) which is not pending")
}
self.clearPendingJob(jobID: jobID)
}

func clearAndReturnPendingJob(jobID: JobID) -> ByteBuffer? {
let instance = self.pendingJobs[jobID]
self.pendingJobs[jobID] = nil
return instance
func failJob(jobID: JobID) -> Bool {
let instance = self.processingJobs[jobID]
self.clearProcessingJob(jobID: jobID)
self.processingJobs[jobID] = nil
self.failedJobs[jobID] = instance
return instance != nil
}

func next() async throws -> QueuedJob? {
Expand All @@ -181,7 +191,7 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
continue
}
} else {
self.pendingJobs[request.job.id] = request.job.jobBuffer
self.processingJobs[request.job.id] = request.job.jobBuffer
return request.job
}
}
Expand All @@ -193,8 +203,12 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
self.isStopped = true
}

func start() {
self.isStopped = false
}

func shutdown() {
assert(self.pendingJobs.count == 0)
assert(self.processingJobs.count == 0)
self.isStopped = true
}

Expand Down Expand Up @@ -269,6 +283,73 @@ extension MemoryQueue {
}
}

extension MemoryQueue {
/// how to cleanup a job
public struct JobCleanup: Sendable, Codable {
enum RawValue: Codable {
case doNothing
case rerun
case remove
}
let rawValue: RawValue

/// Do nothing to jobs
public static var doNothing: Self { .init(rawValue: .doNothing) }
/// Add jobs back onto the pending queue
public static var rerun: Self { .init(rawValue: .rerun) }
/// Delete jobs
public static var remove: Self { .init(rawValue: .remove) }
}

/// Cleanup job queues
///
/// This function is used to re-run or delete jobs in a certain state. Failed, paused and processing jobs
/// can be pushed back into the pending queue to be re-run or removed.
///
/// You can call `cleanup` with `failedJobs`, `pausedJobs` set to whatever you like at any point to re-queue
/// jobs. Moving processing jobs should only be done if you are certain there is nothing processing the job
/// queue.
///
/// - Parameters:
/// - processingJobs: What to do with jobs tagged as processing
/// - failedJobs: What to do with jobs tagged as failed
/// - pausedJobs: What to do with jobs tagged as cancelled
/// - Throws:
public func cleanup(
processingJobs: JobCleanup = .doNothing,
failedJobs: JobCleanup = .doNothing,
pausedJobs: MemoryQueue.JobCleanup = .doNothing
) async throws {
await self.queue.cleanup(processingJobs: processingJobs, failedJobs: failedJobs, pausedJobs: pausedJobs)
}
}

extension MemoryQueue.Internal {
func cleanup(
processingJobs: MemoryQueue.JobCleanup,
failedJobs: MemoryQueue.JobCleanup,
pausedJobs: MemoryQueue.JobCleanup
) {
cleanupQueue(processingJobs, queue: &self.processingJobs)
cleanupQueue(failedJobs, queue: &self.failedJobs)
cleanupQueue(pausedJobs, queue: &self.pausedJobs)
}

func cleanupQueue(_ cleanup: MemoryQueue.JobCleanup, queue: inout [MemoryQueue.JobID: ByteBuffer]) {
switch cleanup.rawValue {
case .remove:
queue = [:]
case .rerun:
for job in queue {
self.queue.append((QueuedJob(id: job.key, jobBuffer: job.value), .init()))
}
queue = [:]
case .doNothing:
break
}
}
}

extension JobQueueDriver where Self == MemoryQueue {
/// Return In memory driver for Job Queue
/// - Parameters:
Expand Down
26 changes: 26 additions & 0 deletions Tests/JobsTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,32 @@ struct JobsTests {
}
}

@Test func testRunTwice() async throws {
struct TestParameters: JobParameters {
static let jobName = "testBasic"
let value: Int
}
let logger = Logger(label: "JobsTests")
let expectation = TestExpectation()
let jobQueue = JobQueue(.memory, logger: logger)
let job = JobDefinition { (parameters: TestParameters, context) in
context.logger.info("Parameters=\(parameters.value)")
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
expectation.trigger()
}
jobQueue.registerJob(job)
try await testJobQueue(JobQueueProcessor(queue: jobQueue, logger: logger)) {
try await jobQueue.push(TestParameters(value: 1))

try await expectation.wait(count: 1)
}
try await testJobQueue(JobQueueProcessor(queue: jobQueue, logger: logger)) {
try await jobQueue.push(TestParameters(value: 1))

try await expectation.wait(count: 1)
}
}

@Test func testErrorRetryAndThenSucceed() async throws {
struct TestParameters: JobParameters {
static let jobName = "testErrorRetryAndThenSucceed"
Expand Down
2 changes: 1 addition & 1 deletion Tests/JobsTests/MetricsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct MetricsTests {
try await jobQueue.push(TestParameters(value: 3))
try await jobQueue.push(TestParameters(value: 4))
try await jobQueue.push(TestParameters(value: 5))
try await testJobQueue(jobQueue.processor()) {
try await testJobQueue(jobQueue.processor(options: .init(numWorkers: 1))) {
try await expectation.wait(count: 5)
}

Expand Down
Loading