Skip to content

Commit 9f9c660

Browse files
committed
add keyed parameters functionality
1 parent 616c99f commit 9f9c660

File tree

7 files changed

+71
-24
lines changed

7 files changed

+71
-24
lines changed

database/db_job.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,12 @@ def insert_job(self, job: Job) -> Job:
9595

9696
with self.db.instance.cursor(row_factory=dict_row) as cur:
9797
cur.execute(
98-
"SELECT * FROM insert_job(%s, %s, %s, %s, %s, %s);",
98+
"SELECT * FROM insert_job(%s, %s, %s, %s, %s, %s, %s);",
9999
(
100100
json.dumps(job.options.to_dict()) if job.options else None,
101101
job.task_name,
102102
json.dumps(job.parameters),
103+
json.dumps(job.parameters_keyed),
103104
job.status,
104105
job.scheduled_at,
105106
job.schedule_count,
@@ -144,21 +145,28 @@ def batch_insert_jobs(self, jobs: List[Job]) -> None:
144145
raise ValueError("Database connection is not established")
145146

146147
try:
147-
batch_data: List[Tuple[str, str, str, Optional[datetime]]] = []
148+
batch_data: List[Tuple[str, str, str, str, Optional[datetime]]] = []
148149
for job in jobs:
149150
options_json = (
150151
json.dumps(job.options.to_dict()) if job.options else "{}"
151152
)
152153
parameters_json = json.dumps(job.parameters)
154+
parameters_keyed_json = json.dumps(job.parameters_keyed)
153155
batch_data.append(
154-
(options_json, job.task_name, parameters_json, job.scheduled_at)
156+
(
157+
options_json,
158+
job.task_name,
159+
parameters_json,
160+
parameters_keyed_json,
161+
job.scheduled_at,
162+
)
155163
)
156164

157165
with self.db.instance.cursor(row_factory=dict_row) as cur:
158166
cur.executemany(
159167
"""
160-
INSERT INTO job (options, task_name, parameters, scheduled_at)
161-
VALUES (%s, %s, %s, %s)
168+
INSERT INTO job (options, task_name, parameters, parameters_keyed, scheduled_at)
169+
VALUES (%s, %s, %s, %s, %s)
162170
""",
163171
batch_data,
164172
)

database/db_job_test.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,16 +281,23 @@ def test_update_stale_jobs(self):
281281

282282
# DeleteJob
283283
def test_delete_job(self):
284-
"""Test deleting a job."""
284+
"""Test deleting a job from archive."""
285285
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
286286

287287
job = new_job("TestTask", None)
288288
inserted_job = job_db_handler.insert_job(job)
289289

290-
job_db_handler.delete_job(inserted_job.rid)
290+
# Mark job as finished to move it to archive
291+
inserted_job.status = "SUCCEEDED"
292+
inserted_job.results = []
293+
inserted_job.error = ""
294+
archived_job = job_db_handler.update_job_final(inserted_job)
291295

292-
# Verify that the job no longer exists
293-
deleted_job = job_db_handler.select_job(inserted_job.rid)
296+
# Now delete from archive
297+
job_db_handler.delete_job(archived_job.rid)
298+
299+
# Verify that the job no longer exists in archive
300+
deleted_job = job_db_handler.select_job_from_archive(archived_job.rid)
294301
self.assertIsNone(deleted_job, "Expected deleted job to be None")
295302

296303
# SelectJob

helper/logging_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def test_error_logging_with_exception(self):
134134

135135
def test_critical_logging(self):
136136
"""Test critical level logging."""
137-
logger = QueuerLogger(stream=self.stream)
137+
logger = QueuerLogger(stream=self.stream, use_colors=False)
138138
logger.critical("Critical message")
139139

140140
output = self.stream.getvalue()

model/batch_job.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
Mirrors Go's model.BatchJob struct.
44
"""
55

6-
from typing import Any, List, Optional, Union, Callable
7-
from dataclasses import dataclass
6+
from typing import Any, Dict, List, Optional, Union, Callable
7+
from dataclasses import dataclass, field
88

99
from .options import Options
1010

@@ -16,5 +16,6 @@ class BatchJob:
1616
"""
1717

1818
task: Union[Callable[..., Any], str]
19-
parameters: List[Any]
19+
parameters: List[Any] = field(default_factory=list)
20+
parameters_keyed: Dict[str, Any] = field(default_factory=dict)
2021
options: Optional[Options] = None

model/job.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ class Job:
4040

4141
# Job definition
4242
task_name: str = ""
43-
parameters: List[Any] = field(default_factory=lambda: [])
43+
parameters: List[Any] = field(default_factory=list)
44+
parameters_keyed: Dict[str, Any] = field(default_factory=dict)
4445
options: Optional[Options] = None
4546

4647
# Job state
@@ -65,6 +66,7 @@ def to_dict(self) -> Dict[str, Any]:
6566
"worker_rid": str(self.worker_rid),
6667
"task_name": self.task_name,
6768
"parameters": self.parameters,
69+
"parameters_keyed": self.parameters_keyed,
6870
"options": self.options.to_dict() if self.options else None,
6971
"status": self.status,
7072
"scheduled_at": (
@@ -110,6 +112,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "Job":
110112
job.worker_rid = UUID(data["worker_rid"])
111113
job.task_name = data.get("task_name", "")
112114
job.parameters = data.get("parameters", [])
115+
job.parameters_keyed = data.get("parameters_keyed", {})
113116
if data.get("options"):
114117
job.options = Options.from_dict(data["options"])
115118
job.status = data.get("status", JobStatus.QUEUED)
@@ -173,6 +176,16 @@ def from_row(cls, row: Dict[str, Any]) -> "Job":
173176
else parameters_value
174177
)
175178

179+
parameters_keyed_value = row.get(
180+
"output_parameters_keyed", row.get("parameters_keyed")
181+
)
182+
if parameters_keyed_value:
183+
job.parameters_keyed = (
184+
json.loads(parameters_keyed_value)
185+
if isinstance(parameters_keyed_value, str)
186+
else parameters_keyed_value
187+
)
188+
176189
results_value = row.get("output_results", row.get("results"))
177190
if results_value is not None:
178191
# JSONB columns return Python objects directly, no JSON parsing needed
@@ -194,6 +207,7 @@ def new_job(
194207
task: Union[Callable[..., Any], str],
195208
options: Optional[Options] = None,
196209
*parameters: Any,
210+
**parameters_keyed: Any,
197211
) -> Job:
198212
"""
199213
Create a new job from a task function or task name.
@@ -237,6 +251,7 @@ def new_job(
237251
job: Job = Job()
238252
job.task_name = task_name
239253
job.parameters = list(parameters)
254+
job.parameters_keyed = parameters_keyed
240255
job.options = options
241256
job.status = status
242257
job.scheduled_at = scheduled_at

queuer_job.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,23 @@ class QueuerJobMixin(QueuerGlobalMixin):
3434
def __init__(self):
3535
super().__init__()
3636

37-
def add_job(self, task: Union[Callable[..., Any], str], *parameters: Any) -> Job:
37+
def add_job(
38+
self,
39+
task: Union[Callable[..., Any], str],
40+
*parameters: Any,
41+
**parameters_keyed: Any,
42+
) -> Job:
3843
"""
3944
Add a job to the queue with the given task and parameters.
4045
4146
:param task: Either a function or a string with the task name
42-
:param parameters: Parameters to pass to the task
47+
:param parameters: Positional parameters to pass to the task
48+
:param parameters_keyed: Keyword parameters to pass to the task
4349
:returns: The created job
4450
:raises Exception: If something goes wrong
4551
"""
4652
options: Optional[Options] = self._merge_options(None)
47-
job: Job = self._add_job(task, options, *parameters)
53+
job: Job = self._add_job(task, options, *parameters, **parameters_keyed)
4854

4955
logger.info(f"Job added: {job.rid}")
5056

@@ -55,14 +61,17 @@ def add_job_with_options(
5561
options: Optional[Options],
5662
task: Union[Callable[..., Any], str],
5763
*parameters: Any,
64+
**parameters_keyed: Any,
5865
) -> Job:
5966
"""Add a new job with specific options."""
6067
# Merge default options with provided options
6168
options_merged: Optional[Options] = self._merge_options(options)
6269

6370
try:
6471
# Create new job
65-
new_job: Job = create_job(task, options_merged, *parameters)
72+
new_job: Job = create_job(
73+
task, options_merged, *parameters, **parameters_keyed
74+
)
6675
job: Job = self.db_job.insert_job(new_job)
6776

6877
logger.info(f"Job with options added: {job.rid}")
@@ -82,7 +91,9 @@ def add_jobs(self, batch_jobs: List[BatchJob]) -> None:
8291
for batch_job in batch_jobs:
8392
options: Optional[Options] = self._merge_options(batch_job.options)
8493
task_name: str = get_task_name_from_interface(batch_job.task)
85-
job: Job = create_job(task_name, options, *batch_job.parameters)
94+
job: Job = create_job(
95+
task_name, options, *batch_job.parameters, **batch_job.parameters_keyed
96+
)
8697
jobs.append(job)
8798

8899
self.db_job.batch_insert_jobs(jobs)
@@ -232,7 +243,9 @@ def readd_job_from_archive(self, job_rid: UUID) -> Job:
232243
raise Exception(f"Job not found in archive: {job_rid}")
233244

234245
# Readd the job to the queue
235-
new_job = self.add_job_with_options(job.options, job.task_name, *job.parameters)
246+
new_job = self.add_job_with_options(
247+
job.options, job.task_name, *job.parameters, **job.parameters_keyed
248+
)
236249
logger.info(f"Job readded: {new_job.rid}")
237250
return new_job
238251

@@ -317,18 +330,20 @@ def _add_job(
317330
task: Union[Callable[..., Any], str],
318331
options: Optional["Options"],
319332
*parameters: Any,
333+
**parameters_keyed: Any,
320334
) -> Job:
321335
"""
322336
Add a job to the queue with all necessary parameters.
323337
324338
:param task: Either a function or a string with the task name
325339
:param options: Job-specific options (can be None)
326-
:param parameters: Parameters to pass to the task
340+
:param parameters: Positional parameters to pass to the task
341+
:param parameters_keyed: Keyword parameters to pass to the task
327342
:returns: The created job
328343
:raises Exception: If something goes wrong
329344
"""
330345
try:
331-
new_job = create_job(task, options, *parameters)
346+
new_job = create_job(task, options, *parameters, **parameters_keyed)
332347
job = self.db_job.insert_job(new_job)
333348

334349
return job
@@ -405,7 +420,8 @@ async def _wait_for_job(
405420

406421
try:
407422
parameters = getattr(job, "parameters", [])
408-
runner = Runner(task, *parameters)
423+
parameters_keyed = getattr(job, "parameters_keyed", {})
424+
runner = Runner(task, *parameters, **parameters_keyed)
409425

410426
logger.info(f"Created runner for job {job.rid}")
411427

sql

Submodule sql updated 1 file

0 commit comments

Comments
 (0)