-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueuer_worker.py
More file actions
109 lines (86 loc) · 3.55 KB
/
queuer_worker.py
File metadata and controls
109 lines (86 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
Worker-related methods for the Python queuer implementation.
Mirrors Go's queuerWorker.go functionality.
"""
import logging
from typing import Optional
from uuid import UUID
from .helper.error import QueuerError
from .model.worker import Worker, WorkerStatus
from .queuer_global import QueuerGlobalMixin
# Set up logger
logger = logging.getLogger(__name__)
def get_current_worker_rid(self) -> Optional[UUID]:
"""
Get the current worker's RID safely with mutex locking.
Mirrors Go's GetCurrentWorkerRID method.
:return: Worker RID or None if no worker
"""
with self.worker_mutex:
if self.worker is not None:
return self.worker.rid
return None
class QueuerWorkerMixin(QueuerGlobalMixin):
"""
Mixin class containing worker-related methods for the Queuer.
This mirrors the worker methods from Go's queuerWorker.go.
"""
def __init__(self):
super().__init__()
def stop_worker(self, worker_rid: UUID) -> None:
"""
StopWorkerGracefully sets the status of the specified worker to 'STOPPED'
to cancel running jobs when stopping.
:param worker_rid: The RID of the worker to stop
:raises QueuerError: If getting or updating the worker fails
"""
try:
worker = self.get_worker(worker_rid)
if worker is None:
raise ValueError(f"Worker {worker_rid} not found")
worker.status = WorkerStatus.STOPPED
worker_updated = self.db_worker.update_worker(worker)
if not worker_updated:
raise ValueError(
f"Failed to update worker {worker_rid} status to stopped"
)
# Update local worker object if this is the current queuer's worker
if self.worker is not None and self.worker.rid == worker_rid:
with self.worker_mutex:
self.worker = worker_updated
except Exception as e:
raise QueuerError("stopping worker", e)
def stop_worker_gracefully(self, worker_rid: UUID) -> None:
"""
StopWorkerGracefully sets the worker's status to STOPPING
to allow it to finish current tasks before stopping.
:param worker_rid: The RID of the worker to stop gracefully
:raises QueuerError: If getting or updating the worker fails
"""
try:
worker = self.get_worker(worker_rid)
if worker is None:
raise ValueError(f"Worker {worker_rid} not found")
worker.status = WorkerStatus.STOPPING
worker_updated = self.db_worker.update_worker(worker)
if not worker_updated:
raise ValueError(
f"Failed to update worker {worker_rid} status to stopping"
)
# Update local worker object if this is the current queuer's worker
if self.worker is not None and self.worker.rid == worker_rid:
with self.worker_mutex:
self.worker = worker_updated
except Exception as e:
raise QueuerError("stopping worker gracefully", e)
def get_worker(self, worker_rid: UUID) -> Optional[Worker]:
"""
Get a worker by its RID.
:param worker_rid: The RID of the worker to get
:returns: The worker if found, None otherwise
:raises QueuerError: If the database query fails
"""
try:
return self.db_worker.select_worker(worker_rid)
except Exception as e:
raise QueuerError("getting worker", e)