Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ let package = Package(
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.1.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"),
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0"..<"3.0.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"),
Expand All @@ -24,6 +25,7 @@ let package = Package(
name: "Jobs",
dependencies: [
.product(name: "Collections", package: "swift-collections"),
.product(name: "Tracing", package: "swift-distributed-tracing"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
Expand Down
8 changes: 7 additions & 1 deletion Sources/Jobs/EncodableJob.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ struct EncodableJob<Parameters: Codable & Sendable>: Encodable, Sendable {
let id: JobIdentifier<Parameters>
let data: JobInstanceData<Parameters>

init(id: JobIdentifier<Parameters>, parameters: Parameters, queuedAt: Date, attempts: Int) {
init(
id: JobIdentifier<Parameters>,
parameters: Parameters,
queuedAt: Date,
attempts: Int
) {

self.id = id
self.data = .init(parameters: parameters, queuedAt: queuedAt, attempts: attempts)
}
Expand Down
49 changes: 49 additions & 0 deletions Sources/Jobs/JobInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import Foundation
import Tracing

/// Protocol for a Job
protocol JobInstanceProtocol: Sendable {
Expand All @@ -28,6 +29,8 @@ protocol JobInstanceProtocol: Sendable {
var attempts: Int? { get }
/// Job parameters
var parameters: Parameters { get }
/// Trace context
var traceContext: [String: String]? { get }
/// Function to execute the job
func execute(context: JobContext) async throws
}
Expand All @@ -47,6 +50,16 @@ extension JobInstanceProtocol {
public var didFail: Bool {
(attempts ?? 0) >= maxRetryCount
}

/// Extract trace context from job instance data
func serviceContext() -> ServiceContext? {
if let traceContext {
var serviceContext = ServiceContext.topLevel
InstrumentationSystem.tracer.extract(traceContext, into: &serviceContext, using: DictionaryExtractor())
return serviceContext
}
return nil
}
}

/// Job decoded from Queue
Expand All @@ -66,6 +79,8 @@ struct JobInstance<Parameters: Codable & Sendable>: JobInstanceProtocol {
var queuedAt: Date { self.data.queuedAt }
/// Number of attempts so far
var attempts: Int? { self.data.attempts ?? 0 }
/// Trace context
var traceContext: [String: String]? { self.data.traceContext }
/// Job parameters
var parameters: Parameters { self.data.parameters }

Expand All @@ -87,11 +102,45 @@ struct JobInstanceData<Parameters: Codable & Sendable>: Codable {
let queuedAt: Date
/// Number of attempts so far
let attempts: Int?
/// trace context
let traceContext: [String: String]?

init(
parameters: Parameters,
queuedAt: Date,
attempts: Int?
) {
self.parameters = parameters
self.queuedAt = queuedAt
self.attempts = attempts
var traceContext: [String: String]? = nil
if let serviceContext = ServiceContext.current {
var tempTraceContext = [String: String]()
InstrumentationSystem.tracer.inject(serviceContext, into: &tempTraceContext, using: DictionaryInjector())
if tempTraceContext.count > 0 {
traceContext = tempTraceContext
}
}
self.traceContext = traceContext
}

// keep JSON strings small to improve decode speed
private enum CodingKeys: String, CodingKey {
case parameters = "p"
case queuedAt = "q"
case attempts = "a"
case traceContext = "t"
}
}

private struct DictionaryInjector: Injector {
func inject(_ value: String, forKey key: String, into carrier: inout [String: String]) {
carrier[key] = value
}
}

private struct DictionaryExtractor: Extractor {
func extract(key: String, from carrier: [String: String]) -> String? {
carrier[key]
}
}
133 changes: 74 additions & 59 deletions Sources/Jobs/JobQueueHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Logging
import Metrics
import NIOCore
import ServiceLifecycle
import Tracing

/// Object handling a single job queue
final class JobQueueHandler<Queue: JobQueueDriver>: Sendable {
Expand Down Expand Up @@ -127,79 +128,93 @@ final class JobQueueHandler<Queue: JobQueueDriver>: Sendable {
).recordSeconds(jobQueuedDuration)

logger.debug("Starting Job")
let linkContext = job.serviceContext()
await withSpan(job.name, ofKind: .server) { span in
if let linkContext {
span.addLink(.init(context: linkContext, attributes: .init()))
}
span.updateAttributes { attributes in
attributes["job.id"] = queuedJob.id.description
attributes["job.attempt"] = (job.attempts ?? 0) + 1
}

do {
do {
try await job.execute(context: .init(logger: logger))
} catch let error as CancellationError {
logger.debug("Job cancelled")
// Job failed is called but due to the fact the task is cancelled, depending on the
// job queue driver, the process of failing the job might not occur because itself
// might get cancelled
try await self.queue.failed(jobId: queuedJob.id, error: error)
JobMetricsHelper.updateMetrics(
for: job.name,
startTime: startTime,
error: error
)
return
} catch {
if job.didFail {
logger.debug("Job: failed")
do {
try await job.execute(context: .init(logger: logger))
} catch let error as CancellationError {
logger.debug("Job cancelled")
span.recordError(error)
span.attributes["job.failed"] = true
// Job failed is called but due to the fact the task is cancelled, depending on the
// job queue driver, the process of failing the job might not occur because itself
// might get cancelled
try await self.queue.failed(jobId: queuedJob.id, error: error)
JobMetricsHelper.updateMetrics(
for: job.name,
startTime: startTime,
error: error
)
return
}
} catch {
span.recordError(error)
if job.didFail {
logger.debug("Job: failed")
span.attributes["job.failed"] = true
try await self.queue.failed(jobId: queuedJob.id, error: error)
JobMetricsHelper.updateMetrics(
for: job.name,
startTime: startTime,
error: error
)
return
}

let attempts = (job.attempts ?? 0) + 1
let attempts = (job.attempts ?? 0) + 1

let delay = self.calculateBackoff(attempts: attempts)
let delay = self.calculateBackoff(attempts: attempts)

// remove from processing lists
try await self.queue.finished(jobId: queuedJob.id)
// push new job in the queue
let newJobId = try await self.queue.push(
self.queue.encode(job, attempts: attempts),
options: .init(
delayUntil: delay
// remove from processing lists
try await self.queue.finished(jobId: queuedJob.id)
// push new job in the queue
let newJobId = try await self.queue.push(
self.queue.encode(job, attempts: attempts),
options: .init(
delayUntil: delay
)
)
)

// Guard against negative queue values, this is needed because we call
// the job queue directly in the retrying step
Meter(
label: JobMetricsHelper.meterLabel,
dimensions: [
("status", JobMetricsHelper.JobStatus.queued.rawValue)
]
).increment()

JobMetricsHelper.updateMetrics(
for: job.name,
startTime: startTime,
retrying: true
)
logger.debug(
"Retrying Job",
metadata: [
"JobID": .stringConvertible(newJobId),
"JobName": .string(job.name),
"attempts": .stringConvertible(attempts),
"delayedUntil": .stringConvertible(delay),
]
)
return

// Guard against negative queue values, this is needed because we call
// the job queue directly in the retrying step
Meter(
label: JobMetricsHelper.meterLabel,
dimensions: [
("status", JobMetricsHelper.JobStatus.queued.rawValue)
]
).increment()

JobMetricsHelper.updateMetrics(
for: job.name,
startTime: startTime,
retrying: true
)
logger.debug(
"Retrying Job",
metadata: [
"JobID": .stringConvertible(newJobId),
"JobName": .string(job.name),
"attempts": .stringConvertible(attempts),
"delayedUntil": .stringConvertible(delay),
]
)
return
}
logger.debug("Finished Job")
try await self.queue.finished(jobId: queuedJob.id)
JobMetricsHelper.updateMetrics(for: job.name, startTime: startTime)
} catch {
logger.debug("Failed to set job status")
JobMetricsHelper.updateMetrics(for: job.name, startTime: startTime, error: error)
}
logger.debug("Finished Job")
try await self.queue.finished(jobId: queuedJob.id)
JobMetricsHelper.updateMetrics(for: job.name, startTime: startTime)
} catch {
logger.debug("Failed to set job status")
JobMetricsHelper.updateMetrics(for: job.name, startTime: startTime, error: error)
}
}

Expand Down
Loading