[ˈjɔbːɪɡ] the swedish word for bothersome
A TypeScript library for type-safe durable workflows with pluggable storage/queue backends.
This is the first library I have ever published, I might not get everything right immediately. Before version 1.0.0, there will be breaking changes. Before 1.0.0, I will treat minor versions as major versions and patches as both bug fixes and features.
import { Job, Jobbig } from "@jobbig/core";
import { ServerPlugin } from "@jobbig/core/plugins";
import { LocalStore } from "@jobbig/local";
import { z } from "zod";
// 1) Define a job with steps
const processData = Job({
id: "process-data",
schema: z.object({
input: z.number(),
output: z.number().optional(),
}),
run: async ({ ctx }) => {
await ctx.step.run("preprocess", async () => {
console.log("Preprocessing input...");
});
await ctx.step.run("process", async () => {
console.log("Processing data...");
});
await ctx.step.run("save", async () => {
console.log("Saving results...");
});
},
});
// 2) Instantiate a store
const store = LocalStore({});
// 3) Create Jobbig, register plugins/jobs, and start the server worker
const jobbig = Jobbig({
store,
jobs: [processData],
}).use(ServerPlugin());
// 4) Schedule runs
await jobbig.schedule({
jobId: "process-data",
data: { input: 42 },
});
// 5) Start processing
jobbig.server();A brief explanation of the key interfaces and concepts of Jobbig.
A worker is a handler for different types of environments.
interface Worker {
start(): Promise<void>;
}An orchestrator specifies how the polling of jobs is done. In a cron job, it exhausts the queue until it is empty, but in a server environment, it constantly polls the queue for new jobs within a specified interval.
A runner is responsible for executing jobs. There exists a BaseRunner, which is likely sufficient in most cases.
export interface Runner {
run(): Promise<{
promise: Promise<any>,
amount: number
}>;
}A job is a function which can be scheduled. A job can consist of multiple steps.
export interface Job<T extends StandardSchemaV1 = any> {
/**
* Unique identifier of the jobs. Used to match handlers with runs.
*/
id: string;
/**
* The job runner.
* @param opts - The options for running the job.
* @returns A promise that resolves when the job is completed.
*/
run(opts: RunInput<StandardSchemaV1.InferInput<T>, string, JobbigInstance>): Promise<void>;
/**
* Schema of the data
*/
schema: T;
retries?: {
/**
* Max amount of retries
* @default 0
*/
amount?: number;
/**
* Defaults to exponential backoff
* @param attempt (the current attempt, starts at 0)
* @returns delay
*/
delayFn?: (attempt: number) => number;
};
hooks?: {
beforeRun?(opts: RunInput<StandardSchemaV1.InferInput<T>, string, JobbigInstance>): Promise<void>;
afterRun?(opts: RunInput<StandardSchemaV1.InferInput<T>, string, JobbigInstance>): Promise<void>;
beforeStep?(opts: RunInput<StandardSchemaV1.InferInput<T>, string, JobbigInstance>): Promise<void>;
afterStep?(opts: RunInput<StandardSchemaV1.InferInput<T>, string, JobbigInstance>): Promise<void>;
};
}A step is a smaller part of a job, which consists of a handler and an id. A step with a given id will only be executed once per job. While the core logic of the job will be rerun on retries.
export interface Step {
run(id: string, handler: () => Promise<void>): Promise<void>;
cleanup(): Promise<void>;
}A run is a scheduled execution of a job. It contains metadata about the jobs execution, such as the status, start time, end time, results and the current execution step.
export interface Run {
id: string;
jobId: string;
status: RunStatus;
scheduledAt: Date;
data: unknown;
result?: unknown;
currentStep: number;
startedAt?: Date;
createdAt: Date;
finishedAt?: Date;
}The context provides access to job data, utilities, and scheduling capabilities within job handlers.
export type Context<T, Id extends string, I extends JobbigInstance> = {
/**
* The id of the running job.
*/
id: Id;
/**
* Data contained within the run.
*/
data: T;
/**
* Step runner for the current job.
*/
step: Step;
/**
* Scoped store for the current run.
*/
store: ScopedStore;
/**
* Metadata passed when scheduling the job.
*/
metadata?: Record<string, unknown>;
/**
* Schedule a new job run.
*/
schedule: I["schedule"];
/**
* Sleep for a specified duration (ms). Uses steps for durability.
*/
sleep: (ms: number) => Promise<void>;
};A store contains the state of each run and the polling/locking used by workers.
export interface Store {
push(run: RunData): Promise<void>;
poll(amount: number): Promise<{ runs: RunData[]; info: { exhausted: boolean } }>;
set<T extends keyof RunData>(runId: string, key: T, value: RunData[T]): Promise<void>;
get<T extends keyof RunData>(runId: string, key: T): Promise<RunData[T] | undefined>;
fetch(runId: string): Promise<RunData | undefined>;
lock(runId: string): Promise<boolean>;
unlock(runId: string): Promise<boolean>;
isLocked(runId: string): Promise<boolean>;
}A scoped store is a store that is scoped to a specific run. It is used to store and retrieve data for a specific run, so that it can be used in the context of the job without being able to impact other runs.
export interface ScopedStore {
set<T extends keyof Run>(key: T, value: Run[T]): Promise<void>;
get<T extends keyof Run>(key: T): Promise<Run[T] | undefined>;
}- Required: Node.js v24+ (project uses modern ES modules and features)
- Recommended: Use nvm for Node.js version management
- Setup:
nvm use defaultto use the latest Node.js version
- Required: bun v1.2.20 (specified in package.json)
- Install:
curl -fsSL https://bun.sh/install | bash
bun installbun run buildcd examples/local
bun run index.tsSee examples/local/index.ts for a complete working example with:
- Multiple job definitions
- Step-based execution
- Local storage setup
- Server plugin configuration
MIT