Skip to content

Comments

Add protocol for drivers that store their jobs in a queryable manner#120

Draft
Joannis wants to merge 1 commit intomainfrom
jo/queryable-jobs
Draft

Add protocol for drivers that store their jobs in a queryable manner#120
Joannis wants to merge 1 commit intomainfrom
jo/queryable-jobs

Conversation

@Joannis
Copy link
Member

@Joannis Joannis commented Jan 24, 2026

It's a draft. Would like to discuss the public API prior to implementing this further.

@codecov
Copy link

codecov bot commented Jan 24, 2026

Codecov Report

❌ Patch coverage is 0% with 11 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.10%. Comparing base (0b79a4a) to head (29da279).

Files with missing lines Patch % Lines
Sources/Jobs/QueryableJobQueue.swift 0.00% 11 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #120      +/-   ##
==========================================
- Coverage   86.81%   86.10%   -0.72%     
==========================================
  Files          25       26       +1     
  Lines        1320     1331      +11     
==========================================
  Hits         1146     1146              
- Misses        174      185      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Member

@adam-fowler adam-fowler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've made job parameters the most important thing here ie you search by job parameter with a series of possible options. This assumes you know what job parameters have been registered. I think job status is probably the more interesting. When looking at the swift jobs, the first things I'd be interested in would be

  • List of pending jobs (id, name, delay)
  • List of processing jobs (id, name)
  • List of failed jobs (id, name)
    Clicking on individual jobs would then provide the structure of the parameters. Given the parameters could be anything, it would have to be a generic tree like structure, with arrays and dictionaries and leaf nodes (string, int, double, bool etc)

Also I think it is probably a good idea to create a wrapper type for a queryable job which holds a JobInstanceData, so we can continue to hide its contents.

public var includeCompletedJobs = true
/// Whether to include delayed jobs in the query result. Defaults to `true`.
public var includeDelayedJobs = true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot to include pending jobs (ie queued but not executed). I'd probably replace delayed with pending.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also filter on queuedAt and a couple other properties. Not sure what we all should support

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these filters could be quite hard for the valkey solution. It is not a bunch of database tables. The Postgres solution will be happy to supply anything I'm sure, given it two tables (the queue and the jobs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Can Valkey emit a historic record at all? And does it make sense for Valkey to implement/support this new feature?
  • If so; What alternations can we make to accomodate valkey(-like) usecases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does have a historic record. The problem with valkey is that some of the more complex queries you can do with postgres are not so easily available. I don't think we even include queuedAt in the valkey data. We could do that, I just added a hash map for each job which can hold arbitrary data.

If the valkey installation include Valkey search you can search hash maps

@Joannis
Copy link
Member Author

Joannis commented Jan 26, 2026

What I think would also be useful is a count route

@adam-fowler
Copy link
Member

I was thinking a little more about this. The query interface should probably reflect the underlying structure to some degree. Something like the following

// job id and easily accessible metadata
struct JobMetadata {
    struct MetadataType: RawRepresentable {
        let rawValue: String
    
        static var delayUntil: Self { .init(rawValue: "delayUntil") }
        static var driverID: Self { .init(rawValue: "driverID") }
        ...
    }

    let jobID
    let metadata: [MetadataType: String]
}

enum JobQuery {
    case hasStatus(Status)
    case delayUntilRange(min: Date?, max: Date?)
    ...
}

protocol QueryableJobQueueDriver {
    /// Get list of jobs that pass list of job queries
    func getJobs(queries: [JobQuery])-> some Sequence<JobMetadata>
    /// Get count of jobs that pass list of job queries
    func getCount(queries: [JobQuery])
    /// Get job data and decode parameters as type Value
    func getJob<Value: Decodable>(id: JobID, as: Value.Type) -> QueryableJob<Value>
}

@adam-fowler
Copy link
Member

adam-fowler commented Jan 28, 2026

Ok second attempt

struct Status: RawRepresentable {
    let rawValue: String

    static var pending: Status { .init(rawValue: "pending") }
    static var processing: Status { .init(rawValue: "processing") }
    static var failed: Status { .init(rawValue: "failed") }
}
/// Job query
protocol JobQuery {}
struct HasStatusJobQuery: JobQuery {
    let status: Status
}
struct NameJobQuery: JobQuery {
    let name: String
}
struct DelayUntilJobQuery: JobQuery {
    let min: Date?
    let max: Date?
}
extension JobQuery where Self == HasStatusJobQuery {
    static func hasStatus(_ status: Status) -> Self { HasStatusJobQuery(status: status) }
}
extension JobQuery where Self == NameJobQuery {
    static func named(_ name: String) -> Self { NameJobQuery(name: name) }
}
extension JobQuery where Self == DelayUntilJobQuery {
    static func delayUntil(min: Date? = nil, max: Date? = nil) -> Self { DelayUntilJobQuery(min: min, max: max) }
}

/// Job description

struct MetadataType: RawRepresentable, Hashable {
    let rawValue: String

    static var delayUntil: Self { .init(rawValue: "delayUntil") }
    static var driverID: Self { .init(rawValue: "driverID") }
}
/// Parameter description. We can probably do this better
enum ParameterTree: Codable {
    case integer
    case double
    case string
    case bool
    case map([String: ParameterTree])
    case array([ParameterTree])
}
struct QueryableJob<JobID> {
    let id: JobID
    let name: String
    let parameters: ParameterTree
    let metadata: [MetadataType: String]  // metadata some of it may be specific to that job driver eg priority
}

protocol QueryableJobQueueDriver: JobQueueDriver {
    associatedtype QueryResults: Sequence<QueryableJob<JobID>>
    /// Get list of jobs that pass list of job queries
    func getJobs(_ queries: any JobQuery...) -> QueryResults
    /// Get count of jobs that pass list of job queries
    func getCount<each Query>(_ query: repeat each Query)
}

func test(_ queue: some QueryableJobQueueDriver) {
    let jobs = queue.getJobs(.hasStatus(.pending), .delayUntil(min: .now))
    let jobs2 = queue.getJobs(.hasStatus(.failed), .named("email"))
    print(jobs)
    print(jobs2)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants