-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueuer_listener.py
More file actions
73 lines (58 loc) · 2.72 KB
/
queuer_listener.py
File metadata and controls
73 lines (58 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
Listener-related methods for the Python queuer implementation.
Mirrors Go's queuerListener.go functionality.
"""
import logging
from typing import Callable
from .queuer_global import QueuerGlobalMixin
from .model.job import Job
logger = logging.getLogger(__name__)
class QueuerListenerMixin(QueuerGlobalMixin):
"""
Mixin class containing listener-related methods for the Queuer.
Mirrors Go's queuerListener.go functionality.
"""
def __init__(self):
super().__init__()
def listen_for_job_update(self, notify_function: Callable[[Job], None]) -> None:
"""
Listen for job update events and notify the provided function when a job is updated.
:param notify_function: Function to call when a job update event occurs
:raises: RuntimeError: If queuer is not running
"""
if not self.running:
raise RuntimeError("Cannot listen with not running Queuer")
try:
# Start listening in the background
self.job_update_listener.listen(notify_function=notify_function)
except Exception as e:
logger.error(f"Error listening for job updates: {str(e)}")
raise RuntimeError(f"Failed to listen for job updates: {str(e)}")
def listen_for_job_delete(self, notify_function: Callable[[Job], None]) -> None:
"""
Listen for job delete events and notify the provided function when a job is deleted.
:param notify_function: Function to call when a job delete event occurs
:raises: RuntimeError: If queuer is not running
"""
if not self.running:
raise RuntimeError("Cannot listen with not running Queuer")
try:
# Start listening in the background
self.job_delete_listener.listen(notify_function=notify_function)
except Exception as e:
logger.error(f"Error listening for job deletes: {str(e)}")
raise RuntimeError(f"Failed to listen for job deletes: {str(e)}")
def listen_for_job_insert(self, notify_function: Callable[[Job], None]) -> None:
"""
Listen for job insert events and notify the provided function when a job is inserted.
:param notify_function: Function to call when a job insert event occurs
:raises: RuntimeError: If queuer is not running
"""
if not self.running:
raise RuntimeError("Cannot listen with not running Queuer")
try:
# Start listening in the background
self.job_insert_listener.listen(notify_function=notify_function)
except Exception as e:
logger.error(f"Error listening for job inserts: {str(e)}")
raise RuntimeError(f"Failed to listen for job inserts: {str(e)}")