Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Sources/Jobs/JobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ public struct JobQueue<Queue: JobQueueDriver>: JobQueueProtocol, Sendable {
@JobMiddlewareBuilder middleware: () -> some JobMiddleware = { NullJobMiddleware() }
) {
self.queue = queue
self.middleware = middleware()
self.logger = logger
self.options = options
let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.context.queueName) {
middleware()
}
self.middleware = middleware
}

/// Create JobQueue handler that will process jobs pushed to the queue
Expand Down
24 changes: 16 additions & 8 deletions Sources/Jobs/JobQueueDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public protocol JobQueueDriver: AsyncSequence, Sendable where Element == JobQueu
func stop() async
/// shutdown queue
func shutdownGracefully() async
/// worker context
var workerContext: JobWorkerContext { get }
/// job queue context
var context: JobQueueContext { get }
}

extension JobQueueDriver {
/// default version of waitUntilReady doing nothing
public func waitUntilReady() async throws {}
/// default version of worker ID is to return the string version of a UUID
public var workerContext: JobWorkerContext { .init(id: UUID().uuidString, metadata: [:]) }
public var context: JobQueueContext { .init(workerID: UUID().uuidString, metadata: [:]) }
}

extension JobQueueDriver {
Expand All @@ -70,15 +70,23 @@ extension JobQueueDriver {
}
}

public struct JobWorkerContext: Sendable {
public struct JobQueueContext: Sendable {
public enum MetadataValue: Sendable {
case string(String)
case integer(Int)
case double(Double)
}
/// Job worker id
public let id: String
public let workerID: String
/// Job queue name
public let queueName: String
/// Job worker metadata
public let metadata: [String: String]
public let metadata: [String: MetadataValue]

// initialize JobWorkerContext
public init(id: String, metadata: [String: String]) {
self.id = id
public init(workerID: String, queueName: String = "default", metadata: [String: MetadataValue]) {
self.workerID = workerID
self.queueName = queueName
self.metadata = metadata
}
}
Expand Down
7 changes: 5 additions & 2 deletions Sources/Jobs/JobQueueProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public final class JobQueueProcessor<Queue: JobQueueDriver>: Service {
self.queue = queue.queue
self.logger = logger
self.options = options
self.middleware = middleware()
let middleware = JobMiddlewareBuilder.$jobQueueName.withValue(queue.queue.context.queueName) {
middleware()
}
self.middleware = middleware
}

init(
Expand Down Expand Up @@ -93,7 +96,7 @@ public final class JobQueueProcessor<Queue: JobQueueDriver>: Service {
if case .acquire(let every, let holdFor) = self.options.workerActiveLock.value,
let metadataDriver = self.queue as? JobMetadataDriver
{
metadataDriver.updateActiveLock(&group, every: every, holdFor: holdFor, workerID: self.queue.workerContext.id)
metadataDriver.updateActiveLock(&group, every: every, holdFor: holdFor, workerID: self.queue.context.workerID)
}

// wait on first child task to return. If the first task to return is the queue handler then
Expand Down
6 changes: 3 additions & 3 deletions Sources/Jobs/MemoryJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
fileprivate let queue: Internal
private let onFailedJob: @Sendable (JobID, any Error) -> Void
private let jobRegistry: JobRegistry
public let context: JobQueueContext

/// Initialise In memory job queue
public init(onFailedJob: @escaping @Sendable (JobID, any Error) -> Void = { _, _ in }) {
public init(queueName: String = "default", onFailedJob: @escaping @Sendable (JobID, any Error) -> Void = { _, _ in }) {
self.queue = .init()
self.onFailedJob = onFailedJob
self.jobRegistry = .init()
self.context = JobQueueContext(workerID: UUID().uuidString, queueName: queueName, metadata: [:])
}

/// Stop queue serving more jobs
Expand Down Expand Up @@ -113,8 +115,6 @@ public final class MemoryQueue: JobQueueDriver, CancellableJobQueue, ResumableJo
await self.queue.pauseJob(jobID: jobID)
}

public let workerContext = JobWorkerContext(id: UUID().uuidString, metadata: [:])

/// Internal actor managing the job queue
fileprivate actor Internal {
struct QueuedJob: Sendable {
Expand Down
2 changes: 2 additions & 0 deletions Sources/Jobs/Middleware/JobMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ struct TwoJobMiddlewares<Middleware1: JobMiddleware, Middleware2: JobMiddleware>
/// Result builder used to create Job middleware chain
@resultBuilder
public enum JobMiddlewareBuilder {
@TaskLocal static var jobQueueName: String?

public static func buildBlock<Middleware: JobMiddleware>(_ middleware: Middleware) -> Middleware {
middleware
}
Expand Down
6 changes: 5 additions & 1 deletion Sources/Jobs/Middleware/MetricsMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ public struct MetricsJobMiddleware: JobMiddleware {
@usableFromInline
let queueName: String

public init(queueName: String = "default") {
public init() {
self.queueName = JobMiddlewareBuilder.jobQueueName ?? "default"
}

public init(queueName: String) {
self.queueName = queueName
}

Expand Down
6 changes: 5 additions & 1 deletion Sources/Jobs/Middleware/TracingMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ public struct TracingJobMiddleware: JobMiddleware {
@usableFromInline
let queueName: String

public init(queueName: String = "default") {
public init() {
self.queueName = JobMiddlewareBuilder.jobQueueName ?? "default"
}

public init(queueName: String) {
self.queueName = queueName
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/JobsTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ struct JobsTests {
MemoryQueue { _, _ in },
logger: logger
)
let workerID = jobQueue.queue.workerContext.id
let workerID = jobQueue.queue.context.workerID
try await testJobQueue(jobQueue.processor()) {
try await Task.sleep(for: .milliseconds(100))
let acquired = try await jobQueue.queue.acquireLock(
Expand Down
4 changes: 2 additions & 2 deletions Tests/JobsTests/TracingTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct TracingTests {
let sleep: Double
}
try await Self.testTracer.withUnique {
let jobQueue = JobQueue(.memory, logger: Logger(label: "JobsTests")) {
let jobQueue = JobQueue(MemoryQueue(queueName: "TestTracing"), logger: Logger(label: "JobsTests")) {
TracingJobMiddleware()
}
jobQueue.registerJob(parameters: TestParameters.self) { parameters, context in
Expand Down Expand Up @@ -68,7 +68,7 @@ struct TracingTests {
[
"job.id": "\(jobID.uuidString)",
"job.attempt": 1,
"job.queue": "default",
"job.queue": "TestTracing",
]
)
}
Expand Down
Loading