Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Sources/Jobs/JobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@ public struct JobQueue<Queue: JobQueueDriver>: JobQueueProtocol, Sendable {
@JobMiddlewareBuilder middleware: () -> some JobMiddleware = { NullJobMiddleware() }
) {
self.queue = queue
self.middleware = middleware()
self.logger = logger
self.options = options
let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.context.queueName) {
middleware()
}
self.middleware = middleware
}

public init(
Expand Down
24 changes: 16 additions & 8 deletions Sources/Jobs/JobQueueDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public protocol JobQueueDriver: AsyncSequence, Sendable where Element == JobQueu
func stop() async
/// shutdown queue
func shutdownGracefully() async
/// worker context
var workerContext: JobWorkerContext { get }
/// job queue context
var context: JobQueueContext { get }
}

extension JobQueueDriver {
/// default version of waitUntilReady doing nothing
public func waitUntilReady() async throws {}
/// default version of worker ID is to return the string version of a UUID
public var workerContext: JobWorkerContext { .init(id: UUID().uuidString, metadata: [:]) }
public var context: JobQueueContext { .init(workerID: UUID().uuidString, metadata: [:]) }
}

extension JobQueueDriver {
Expand All @@ -64,15 +64,23 @@ extension JobQueueDriver {
}
}

public struct JobWorkerContext: Sendable {
public struct JobQueueContext: Sendable {
public enum MetadataValue: Sendable {
case string(String)
case integer(Int)
case double(Double)
}
/// Job worker id
public let id: String
public let workerID: String
/// Job queue name
public let queueName: String
/// Job worker metadata
public let metadata: [String: String]
public let metadata: [String: MetadataValue]

// initialize JobWorkerContext
public init(id: String, metadata: [String: String]) {
self.id = id
public init(workerID: String, queueName: String = "default", metadata: [String: MetadataValue]) {
self.workerID = workerID
self.queueName = queueName
self.metadata = metadata
}
}
Expand Down
7 changes: 5 additions & 2 deletions Sources/Jobs/JobQueueProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public final class JobQueueProcessor<Queue: JobQueueDriver>: Service {
self.queue = queue.queue
self.logger = logger
self.options = options
self.middleware = middleware()
let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.queue.context.queueName) {
middleware()
}
self.middleware = middleware
}

init(
Expand Down Expand Up @@ -87,7 +90,7 @@ public final class JobQueueProcessor<Queue: JobQueueDriver>: Service {
if case .acquire(let every, let holdFor) = self.options.workerActiveLock.value,
let metadataDriver = self.queue as? JobMetadataDriver
{
metadataDriver.updateActiveLock(&group, every: every, holdFor: holdFor, workerID: self.queue.workerContext.id)
metadataDriver.updateActiveLock(&group, every: every, holdFor: holdFor, workerID: self.queue.context.workerID)
}

// wait on first child task to return. If the first task to return is the queue handler then
Expand Down
6 changes: 3 additions & 3 deletions Sources/Jobs/MemoryJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
fileprivate let queue: Internal
private let onFailedJob: @Sendable (JobID, any Error) -> Void
private let jobRegistry: JobRegistry
public let context: JobQueueContext

/// Initialise In memory job queue
public init(onFailedJob: @escaping @Sendable (JobID, any Error) -> Void = { _, _ in }) {
public init(queueName: String = "default", onFailedJob: @escaping @Sendable (JobID, any Error) -> Void = { _, _ in }) {
self.queue = .init()
self.onFailedJob = onFailedJob
self.jobRegistry = .init()
self.context = JobQueueContext(workerID: UUID().uuidString, queueName: queueName, metadata: [:])
}

/// Stop queue serving more jobs
Expand Down Expand Up @@ -107,8 +109,6 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
await self.queue.pauseJob(jobID: jobID)
}

public let workerContext = JobWorkerContext(id: UUID().uuidString, metadata: [:])

/// Internal actor managing the job queue
fileprivate actor Internal {
struct QueuedJob: Sendable {
Expand Down
2 changes: 2 additions & 0 deletions Sources/Jobs/Middleware/JobMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ struct TwoJobMiddlewares<Middleware1: JobMiddleware, Middleware2: JobMiddleware>
/// Result builder used to create Job middleware chain
@resultBuilder
public enum JobMiddlewareBuilder {
@TaskLocal static var jobQueueName: String?

public static func buildBlock<Middleware: JobMiddleware>(_ middleware: Middleware) -> Middleware {
middleware
}
Expand Down
6 changes: 5 additions & 1 deletion Sources/Jobs/Middleware/MetricsMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ public struct MetricsJobMiddleware: JobMiddleware {
@usableFromInline
let queueName: String

public init(queueName: String = "default") {
public init() {
self.queueName = JobMiddlewareBuilder.jobQueueName ?? "default"
}

public init(queueName: String) {
self.queueName = queueName
}

Expand Down
6 changes: 5 additions & 1 deletion Sources/Jobs/Middleware/TracingMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public struct TracingJobMiddleware: JobMiddleware {
@usableFromInline
let queueName: String

public init(queueName: String = "default") {
public init() {
self.queueName = JobMiddlewareBuilder.jobQueueName ?? "default"
}

public init(queueName: String) {
self.queueName = queueName
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/JobsTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ struct JobsTests {
MemoryQueue { _, _ in },
logger: logger
)
let workerID = jobQueue.queue.workerContext.id
let workerID = jobQueue.queue.context.workerID
try await testJobQueue(jobQueue.processor()) {
try await Task.sleep(for: .milliseconds(100))
let acquired = try await jobQueue.queue.acquireLock(
Expand Down
4 changes: 2 additions & 2 deletions Tests/JobsTests/TracingTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct TracingTests {
let sleep: Double
}
try await Self.testTracer.withUnique {
let jobQueue = JobQueue(.memory, logger: Logger(label: "JobsTests")) {
let jobQueue = JobQueue(MemoryQueue(queueName: "TestTracing"), logger: Logger(label: "JobsTests")) {
TracingJobMiddleware()
}
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
Expand Down Expand Up @@ -62,7 +62,7 @@ struct TracingTests {
[
"job.id": "\(jobID.uuidString)",
"job.attempt": 1,
"job.queue": "default",
"job.queue": "TestTracing",
]
)
}
Expand Down
Loading