-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueuer_worker_test.py
More file actions
157 lines (123 loc) · 5.18 KB
/
queuer_worker_test.py
File metadata and controls
157 lines (123 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Test cases for worker-related methods in Python queuer implementation.
Mirrors Go's queuerWorker_test.go.
"""
import time
import unittest
from datetime import timedelta
from uuid import uuid4
from .helper.test_database import TimescaleTestMixin
from .model.worker import Worker, WorkerStatus, new_worker
from .queuer import new_queuer_with_db
from .helper.error import QueuerError
class TestQueuerWorker(TimescaleTestMixin, unittest.TestCase):
"""Test cases for queuer worker functionality."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def setUp(self):
"""Set up for each test method."""
super().setup_method()
def test_stop_worker(self):
"""Test stopping a worker."""
queuer = new_queuer_with_db("test_stop_worker", 10, "", self.db_config)
try:
# Set short heartbeat interval for testing
queuer.worker_poll_interval = timedelta(seconds=0.5)
queuer.start()
# Get the worker RID
worker_rid = queuer.worker.rid
# Stop the worker
queuer.stop_worker(worker_rid)
# Verify the local worker object was updated immediately
self.assertEqual(queuer.worker.status, WorkerStatus.STOPPED)
# Verify the worker status is STOPPED in database
worker = queuer.get_worker(worker_rid)
self.assertIsNotNone(worker)
assert worker is not None # Type narrowing for type checker
self.assertEqual(worker.status, WorkerStatus.STOPPED)
# Wait for heartbeat to detect and stop the queuer (takes ~5s)
time.sleep(6.0)
# Queuer should have stopped automatically
self.assertFalse(queuer.running)
finally:
# Only stop if still running (shouldn't be the case)
if hasattr(queuer, "running") and queuer.running:
queuer.stop()
def test_stop_worker_gracefully(self):
"""Test stopping a worker gracefully."""
queuer = new_queuer_with_db(
"test_stop_worker_gracefully", 10, "", self.db_config
)
try:
# Set short heartbeat interval for testing
queuer.worker_poll_interval = timedelta(seconds=0.5)
queuer.start()
# Get the worker RID
worker_rid = queuer.worker.rid
# Stop the worker gracefully
queuer.stop_worker_gracefully(worker_rid)
# Verify the local worker object was updated immediately
self.assertEqual(queuer.worker.status, WorkerStatus.STOPPING)
# Verify the worker status is STOPPING in database
worker = queuer.get_worker(worker_rid)
self.assertIsNotNone(worker)
assert worker is not None # Type narrowing for type checker
self.assertEqual(worker.status, WorkerStatus.STOPPING)
# Wait for heartbeat to detect STOPPING status and eventually stop (takes ~6s)
time.sleep(7.0)
# Queuer should have stopped automatically
self.assertFalse(queuer.running)
finally:
# Only stop if still running (shouldn't be the case)
if hasattr(queuer, "running") and queuer.running:
queuer.stop()
def test_stop_worker_not_found(self):
"""Test stopping a worker that doesn't exist."""
queuer = new_queuer_with_db(
"test_stop_worker_not_found", 10, "", self.db_config
)
try:
# Try to stop a non-existent worker
with self.assertRaises(QueuerError):
queuer.stop_worker(uuid4())
finally:
# Clean up the database connection
if queuer.database:
queuer.database.close()
def test_get_worker(self):
"""Test getting a worker by RID."""
queuer = new_queuer_with_db("test_get_worker", 10, "", self.db_config)
try:
# Get the worker RID
worker_rid = queuer.worker.rid
# Get the worker
worker = queuer.get_worker(worker_rid)
# Verify the worker was retrieved
self.assertIsNotNone(worker)
assert worker is not None # Type narrowing for type checker
self.assertEqual(worker.rid, worker_rid)
self.assertEqual(worker.name, "test_get_worker")
finally:
# Clean up the database connection
if queuer.database:
queuer.database.close()
def test_get_worker_not_found(self):
"""Test getting a worker that doesn't exist."""
queuer = new_queuer_with_db("test_get_worker_not_found", 10, "", self.db_config)
try:
# Try to get a non-existent worker
worker = queuer.get_worker(uuid4())
# Verify the worker was not found
self.assertIsNone(worker)
finally:
# Clean up the database connection
if queuer.database:
queuer.database.close()
if __name__ == "__main__":
unittest.main()