From 674ccc469a0cb7656bcb63b1f4869a150cc6a030 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 27 Jan 2026 19:06:36 +0000 Subject: [PATCH 1/4] Pass queue name to middleware via task local Also add requirement for queue name in driver protocol --- Sources/Jobs/JobQueue.swift | 5 ++++- Sources/Jobs/JobQueueDriver.swift | 4 ++++ Sources/Jobs/JobQueueProcessor.swift | 5 ++++- Sources/Jobs/MemoryJobQueue.swift | 4 +++- Sources/Jobs/Middleware/JobMiddleware.swift | 2 ++ Sources/Jobs/Middleware/MetricsMiddleware.swift | 6 +++++- Sources/Jobs/Middleware/TracingMiddleware.swift | 6 +++++- Tests/JobsTests/TracingTests.swift | 4 ++-- 8 files changed, 29 insertions(+), 7 deletions(-) diff --git a/Sources/Jobs/JobQueue.swift b/Sources/Jobs/JobQueue.swift index f971180..0a22078 100644 --- a/Sources/Jobs/JobQueue.swift +++ b/Sources/Jobs/JobQueue.swift @@ -185,9 +185,12 @@ public struct JobQueue: JobQueueProtocol, Sendable { @JobMiddlewareBuilder middleware: () -> some JobMiddleware = { NullJobMiddleware() } ) { self.queue = queue - self.middleware = middleware() self.logger = logger self.options = options + let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.queueName) { + middleware() + } + self.middleware = middleware } /// Create JobQueue handler that will process jobs pushed to the queue diff --git a/Sources/Jobs/JobQueueDriver.swift b/Sources/Jobs/JobQueueDriver.swift index 3aca41c..214025f 100644 --- a/Sources/Jobs/JobQueueDriver.swift +++ b/Sources/Jobs/JobQueueDriver.swift @@ -54,6 +54,8 @@ public protocol JobQueueDriver: AsyncSequence, Sendable where Element == JobQueu func shutdownGracefully() async /// worker context var workerContext: JobWorkerContext { get } + /// queue name + var queueName: String { get } } extension JobQueueDriver { @@ -61,6 +63,8 @@ extension JobQueueDriver { public func waitUntilReady() async throws {} /// default version of worker ID is to return the string version of a UUID public var workerContext: JobWorkerContext { .init(id: UUID().uuidString, metadata: [:]) } + /// default queue name + public var queueName: String { "default" } } extension JobQueueDriver { diff --git a/Sources/Jobs/JobQueueProcessor.swift b/Sources/Jobs/JobQueueProcessor.swift index e6620ac..48b5979 100644 --- a/Sources/Jobs/JobQueueProcessor.swift +++ b/Sources/Jobs/JobQueueProcessor.swift @@ -39,7 +39,10 @@ public final class JobQueueProcessor: Service { self.queue = queue.queue self.logger = logger self.options = options - self.middleware = middleware() + let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.queue.queueName) { + middleware() + } + self.middleware = middleware } init( diff --git a/Sources/Jobs/MemoryJobQueue.swift b/Sources/Jobs/MemoryJobQueue.swift index 14d5db4..e7ee4bf 100644 --- a/Sources/Jobs/MemoryJobQueue.swift +++ b/Sources/Jobs/MemoryJobQueue.swift @@ -45,12 +45,14 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo fileprivate let queue: Internal private let onFailedJob: @Sendable (JobID, any Error) -> Void private let jobRegistry: JobRegistry + public let queueName: String /// Initialise In memory job queue - public init(onFailedJob: @escaping @Sendable (JobID, any Error) -> Void = { _, _ in }) { + public init(queueName: String = "default", onFailedJob: @escaping @Sendable (JobID, any Error) -> Void = { _, _ in }) { self.queue = .init() self.onFailedJob = onFailedJob self.jobRegistry = .init() + self.queueName = queueName } /// Stop queue serving more jobs diff --git a/Sources/Jobs/Middleware/JobMiddleware.swift b/Sources/Jobs/Middleware/JobMiddleware.swift index 45e46db..ca233c1 100644 --- a/Sources/Jobs/Middleware/JobMiddleware.swift +++ b/Sources/Jobs/Middleware/JobMiddleware.swift @@ -171,6 +171,8 @@ struct TwoJobMiddlewares /// Result builder used to create Job middleware chain @resultBuilder public enum JobMiddlewareBuilder { + @TaskLocal static var jobQueueName: String? + public static func buildBlock(_ middleware: Middleware) -> Middleware { middleware } diff --git a/Sources/Jobs/Middleware/MetricsMiddleware.swift b/Sources/Jobs/Middleware/MetricsMiddleware.swift index 3bbb9de..b950c55 100644 --- a/Sources/Jobs/Middleware/MetricsMiddleware.swift +++ b/Sources/Jobs/Middleware/MetricsMiddleware.swift @@ -26,7 +26,11 @@ public struct MetricsJobMiddleware: JobMiddleware { @usableFromInline let queueName: String - public init(queueName: String = "default") { + public init() { + self.queueName = JobMiddlewareBuilder.jobQueueName ?? "default" + } + + public init(queueName: String) { self.queueName = queueName } diff --git a/Sources/Jobs/Middleware/TracingMiddleware.swift b/Sources/Jobs/Middleware/TracingMiddleware.swift index 0715d8b..4a2706b 100644 --- a/Sources/Jobs/Middleware/TracingMiddleware.swift +++ b/Sources/Jobs/Middleware/TracingMiddleware.swift @@ -25,7 +25,11 @@ public struct TracingJobMiddleware: JobMiddleware { @usableFromInline let queueName: String - public init(queueName: String = "default") { + public init() { + self.queueName = JobMiddlewareBuilder.jobQueueName ?? "default" + } + + public init(queueName: String) { self.queueName = queueName } diff --git a/Tests/JobsTests/TracingTests.swift b/Tests/JobsTests/TracingTests.swift index f45882b..14fbcf3 100644 --- a/Tests/JobsTests/TracingTests.swift +++ b/Tests/JobsTests/TracingTests.swift @@ -40,7 +40,7 @@ struct TracingTests { let sleep: Double } try await Self.testTracer.withUnique { - let jobQueue = JobQueue(.memory, logger: Logger(label: "JobsTests")) { + let jobQueue = JobQueue(MemoryQueue(queueName: "TestTracing"), logger: Logger(label: "JobsTests")) { TracingJobMiddleware() } jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in @@ -68,7 +68,7 @@ struct TracingTests { [ "job.id": "\(jobID.uuidString)", "job.attempt": 1, - "job.queue": "default", + "job.queue": "TestTracing", ] ) } From 004f293808b05df8be8ee1d30da725a8c8f2dea3 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 28 Jan 2026 09:21:06 +0000 Subject: [PATCH 2/4] Move queueName into context --- Sources/Jobs/JobQueueDriver.swift | 19 ++++++++++--------- Sources/Jobs/JobQueueProcessor.swift | 2 +- Sources/Jobs/MemoryJobQueue.swift | 6 ++---- Tests/JobsTests/JobsTests.swift | 2 +- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/Sources/Jobs/JobQueueDriver.swift b/Sources/Jobs/JobQueueDriver.swift index 214025f..b303f91 100644 --- a/Sources/Jobs/JobQueueDriver.swift +++ b/Sources/Jobs/JobQueueDriver.swift @@ -52,17 +52,15 @@ public protocol JobQueueDriver: AsyncSequence, Sendable where Element == JobQueu func stop() async /// shutdown queue func shutdownGracefully() async - /// worker context - var workerContext: JobWorkerContext { get } - /// queue name - var queueName: String { get } + /// job queue context + var context: JobQueueContext { get } } extension JobQueueDriver { /// default version of waitUntilReady doing nothing public func waitUntilReady() async throws {} /// default version of worker ID is to return the string version of a UUID - public var workerContext: JobWorkerContext { .init(id: UUID().uuidString, metadata: [:]) } + public var workerContext: JobQueueContext { .init(workerID: UUID().uuidString, metadata: [:]) } /// default queue name public var queueName: String { "default" } } @@ -74,15 +72,18 @@ extension JobQueueDriver { } } -public struct JobWorkerContext: Sendable { +public struct JobQueueContext: Sendable { /// Job worker id - public let id: String + public let workerID: String + /// Job queue name + public let queueName: String /// Job worker metadata public let metadata: [String: String] // initialize JobWorkerContext - public init(id: String, metadata: [String: String]) { - self.id = id + public init(workerID: String, queueName: String = "default", metadata: [String: String]) { + self.workerID = workerID + self.queueName = queueName self.metadata = metadata } } diff --git a/Sources/Jobs/JobQueueProcessor.swift b/Sources/Jobs/JobQueueProcessor.swift index 48b5979..8c8a3c5 100644 --- a/Sources/Jobs/JobQueueProcessor.swift +++ b/Sources/Jobs/JobQueueProcessor.swift @@ -96,7 +96,7 @@ public final class JobQueueProcessor: Service { if case .acquire(let every, let holdFor) = self.options.workerActiveLock.value, let metadataDriver = self.queue as? JobMetadataDriver { - metadataDriver.updateActiveLock(&group, every: every, holdFor: holdFor, workerID: self.queue.workerContext.id) + metadataDriver.updateActiveLock(&group, every: every, holdFor: holdFor, workerID: self.queue.context.workerID) } // wait on first child task to return. If the first task to return is the queue handler then diff --git a/Sources/Jobs/MemoryJobQueue.swift b/Sources/Jobs/MemoryJobQueue.swift index e7ee4bf..660188c 100644 --- a/Sources/Jobs/MemoryJobQueue.swift +++ b/Sources/Jobs/MemoryJobQueue.swift @@ -45,14 +45,14 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo fileprivate let queue: Internal private let onFailedJob: @Sendable (JobID, any Error) -> Void private let jobRegistry: JobRegistry - public let queueName: String + public let context: JobQueueContext /// Initialise In memory job queue public init(queueName: String = "default", onFailedJob: @escaping @Sendable (JobID, any Error) -> Void = { _, _ in }) { self.queue = .init() self.onFailedJob = onFailedJob self.jobRegistry = .init() - self.queueName = queueName + self.context = JobQueueContext(workerID: UUID().uuidString, queueName: queueName, metadata: [:]) } /// Stop queue serving more jobs @@ -115,8 +115,6 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo await self.queue.pauseJob(jobID: jobID) } - public let workerContext = JobWorkerContext(id: UUID().uuidString, metadata: [:]) - /// Internal actor managing the job queue fileprivate actor Internal { struct QueuedJob: Sendable { diff --git a/Tests/JobsTests/JobsTests.swift b/Tests/JobsTests/JobsTests.swift index df97d56..4221d05 100644 --- a/Tests/JobsTests/JobsTests.swift +++ b/Tests/JobsTests/JobsTests.swift @@ -648,7 +648,7 @@ struct JobsTests { MemoryQueue { _, _ in }, logger: logger ) - let workerID = jobQueue.queue.workerContext.id + let workerID = jobQueue.queue.context.workerID try await testJobQueue(jobQueue.processor()) { try await Task.sleep(for: .milliseconds(100)) let acquired = try await jobQueue.queue.acquireLock( From 41138ab33153ace08fc047af0b974d8eeba1a2fc Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 28 Jan 2026 09:24:36 +0000 Subject: [PATCH 3/4] Allow metadata values that are non-string --- Sources/Jobs/JobQueueDriver.swift | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Sources/Jobs/JobQueueDriver.swift b/Sources/Jobs/JobQueueDriver.swift index b303f91..b4bf3de 100644 --- a/Sources/Jobs/JobQueueDriver.swift +++ b/Sources/Jobs/JobQueueDriver.swift @@ -73,15 +73,20 @@ extension JobQueueDriver { } public struct JobQueueContext: Sendable { + public enum MetadataValue: Sendable { + case string(String) + case integer(Int) + case double(Double) + } /// Job worker id public let workerID: String /// Job queue name public let queueName: String /// Job worker metadata - public let metadata: [String: String] + public let metadata: [String: MetadataValue] // initialize JobWorkerContext - public init(workerID: String, queueName: String = "default", metadata: [String: String]) { + public init(workerID: String, queueName: String = "default", metadata: [String: MetadataValue]) { self.workerID = workerID self.queueName = queueName self.metadata = metadata From 32a996715b1ed90973bccf5fab3129a009e45452 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 28 Jan 2026 10:52:20 +0000 Subject: [PATCH 4/4] Fix tests --- Sources/Jobs/JobQueue.swift | 2 +- Sources/Jobs/JobQueueDriver.swift | 4 +--- Sources/Jobs/JobQueueProcessor.swift | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/Sources/Jobs/JobQueue.swift b/Sources/Jobs/JobQueue.swift index 0a22078..dde1640 100644 --- a/Sources/Jobs/JobQueue.swift +++ b/Sources/Jobs/JobQueue.swift @@ -187,7 +187,7 @@ public struct JobQueue: JobQueueProtocol, Sendable { self.queue = queue self.logger = logger self.options = options - let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.queueName) { + let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.context.queueName) { middleware() } self.middleware = middleware diff --git a/Sources/Jobs/JobQueueDriver.swift b/Sources/Jobs/JobQueueDriver.swift index b4bf3de..a727480 100644 --- a/Sources/Jobs/JobQueueDriver.swift +++ b/Sources/Jobs/JobQueueDriver.swift @@ -60,9 +60,7 @@ extension JobQueueDriver { /// default version of waitUntilReady doing nothing public func waitUntilReady() async throws {} /// default version of worker ID is to return the string version of a UUID - public var workerContext: JobQueueContext { .init(workerID: UUID().uuidString, metadata: [:]) } - /// default queue name - public var queueName: String { "default" } + public var context: JobQueueContext { .init(workerID: UUID().uuidString, metadata: [:]) } } extension JobQueueDriver { diff --git a/Sources/Jobs/JobQueueProcessor.swift b/Sources/Jobs/JobQueueProcessor.swift index 8c8a3c5..96ef6ec 100644 --- a/Sources/Jobs/JobQueueProcessor.swift +++ b/Sources/Jobs/JobQueueProcessor.swift @@ -39,7 +39,7 @@ public final class JobQueueProcessor: Service { self.queue = queue.queue self.logger = logger self.options = options - let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.queue.queueName) { + let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.queue.context.queueName) { middleware() } self.middleware = middleware