Add protocol for drivers that store their jobs in a queryable manner#120
Add protocol for drivers that store their jobs in a queryable manner#120
Conversation
…) jobs in a queryable manner
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #120 +/- ##
==========================================
- Coverage 86.81% 86.10% -0.72%
==========================================
Files 25 26 +1
Lines 1320 1331 +11
==========================================
Hits 1146 1146
- Misses 174 185 +11 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
adam-fowler
left a comment
There was a problem hiding this comment.
You've made job parameters the most important thing here ie you search by job parameter with a series of possible options. This assumes you know what job parameters have been registered. I think job status is probably the more interesting. When looking at the swift jobs, the first things I'd be interested in would be
- List of pending jobs (id, name, delay)
- List of processing jobs (id, name)
- List of failed jobs (id, name)
Clicking on individual jobs would then provide the structure of the parameters. Given the parameters could be anything, it would have to be a generic tree like structure, with arrays and dictionaries and leaf nodes (string, int, double, bool etc)
Also I think it is probably a good idea to create a wrapper type for a queryable job which holds a JobInstanceData, so we can continue to hide its contents.
| public var includeCompletedJobs = true | ||
| /// Whether to include delayed jobs in the query result. Defaults to `true`. | ||
| public var includeDelayedJobs = true | ||
|
|
There was a problem hiding this comment.
You forgot to include pending jobs (ie queued but not executed). I'd probably replace delayed with pending.
There was a problem hiding this comment.
We could also filter on queuedAt and a couple other properties. Not sure what we all should support
There was a problem hiding this comment.
Some of these filters could be quite hard for the valkey solution. It is not a bunch of database tables. The Postgres solution will be happy to supply anything I'm sure, given it two tables (the queue and the jobs)
There was a problem hiding this comment.
- Can Valkey emit a historic record at all? And does it make sense for Valkey to implement/support this new feature?
- If so; What alternations can we make to accomodate valkey(-like) usecases?
There was a problem hiding this comment.
It does have a historic record. The problem with valkey is that some of the more complex queries you can do with postgres are not so easily available. I don't think we even include queuedAt in the valkey data. We could do that, I just added a hash map for each job which can hold arbitrary data.
If the valkey installation include Valkey search you can search hash maps
|
What I think would also be useful is a |
|
I was thinking a little more about this. The query interface should probably reflect the underlying structure to some degree. Something like the following // job id and easily accessible metadata
struct JobMetadata {
struct MetadataType: RawRepresentable {
let rawValue: String
static var delayUntil: Self { .init(rawValue: "delayUntil") }
static var driverID: Self { .init(rawValue: "driverID") }
...
}
let jobID
let metadata: [MetadataType: String]
}
enum JobQuery {
case hasStatus(Status)
case delayUntilRange(min: Date?, max: Date?)
...
}
protocol QueryableJobQueueDriver {
/// Get list of jobs that pass list of job queries
func getJobs(queries: [JobQuery])-> some Sequence<JobMetadata>
/// Get count of jobs that pass list of job queries
func getCount(queries: [JobQuery])
/// Get job data and decode parameters as type Value
func getJob<Value: Decodable>(id: JobID, as: Value.Type) -> QueryableJob<Value>
} |
|
Ok second attempt struct Status: RawRepresentable {
let rawValue: String
static var pending: Status { .init(rawValue: "pending") }
static var processing: Status { .init(rawValue: "processing") }
static var failed: Status { .init(rawValue: "failed") }
}
/// Job query
protocol JobQuery {}
struct HasStatusJobQuery: JobQuery {
let status: Status
}
struct NameJobQuery: JobQuery {
let name: String
}
struct DelayUntilJobQuery: JobQuery {
let min: Date?
let max: Date?
}
extension JobQuery where Self == HasStatusJobQuery {
static func hasStatus(_ status: Status) -> Self { HasStatusJobQuery(status: status) }
}
extension JobQuery where Self == NameJobQuery {
static func named(_ name: String) -> Self { NameJobQuery(name: name) }
}
extension JobQuery where Self == DelayUntilJobQuery {
static func delayUntil(min: Date? = nil, max: Date? = nil) -> Self { DelayUntilJobQuery(min: min, max: max) }
}
/// Job description
struct MetadataType: RawRepresentable, Hashable {
let rawValue: String
static var delayUntil: Self { .init(rawValue: "delayUntil") }
static var driverID: Self { .init(rawValue: "driverID") }
}
/// Parameter description. We can probably do this better
enum ParameterTree: Codable {
case integer
case double
case string
case bool
case map([String: ParameterTree])
case array([ParameterTree])
}
struct QueryableJob<JobID> {
let id: JobID
let name: String
let parameters: ParameterTree
let metadata: [MetadataType: String] // metadata some of it may be specific to that job driver eg priority
}
protocol QueryableJobQueueDriver: JobQueueDriver {
associatedtype QueryResults: Sequence<QueryableJob<JobID>>
/// Get list of jobs that pass list of job queries
func getJobs(_ queries: any JobQuery...) -> QueryResults
/// Get count of jobs that pass list of job queries
func getCount<each Query>(_ query: repeat each Query)
}
func test(_ queue: some QueryableJobQueueDriver) {
let jobs = queue.getJobs(.hasStatus(.pending), .delayUntil(min: .now))
let jobs2 = queue.getJobs(.hasStatus(.failed), .named("email"))
print(jobs)
print(jobs2)
} |
It's a draft. Would like to discuss the public API prior to implementing this further.