-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueuer_test.py
More file actions
680 lines (539 loc) · 22.6 KB
/
queuer_test.py
File metadata and controls
680 lines (539 loc) · 22.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
"""
Comprehensive test for the main Queuer implementation.
Based on the Go queuerTest.go implementation.
"""
import threading
import time
from datetime import timedelta
from typing import List
import unittest
from uuid import uuid4
from .core.runner import SmallRunner, go_func
from .database.db_job import JobDBHandler
from .helper.logging import get_logger
from .model.batch_job import BatchJob
from .model.job import Job, JobStatus, new_job
from .model.master import MasterSettings
from .queuer import new_queuer_with_db
from .helper.database import DatabaseConfiguration
from .helper.error import QueuerError
from .helper.test_database import TimescaleTestMixin, PostgresTestMixin
from .model.options_on_error import OnError, RetryBackoff
from .model.worker import WorkerStatus
logger = get_logger(__name__)
# Global variable for testing process reuse
_process_memory_state = None
def global_simple_task(x: int) -> int:
"""Global simple task function for testing."""
return x * 2
def global_test_task(message: str) -> str:
"""Global test task function for testing."""
return f"Processed: {message}"
def global_process_reuse_task() -> str:
"""
Task function that tests process reuse by checking/setting a global variable.
First execution should load (set global to 1), subsequent executions in the
same process should detect the already-loaded state.
"""
global _process_memory_state
if _process_memory_state is None:
# Simulate loading/initialization
logger.info("Loading process state (first execution in this process)")
_process_memory_state = 1
return "loaded"
else:
# Process memory was reused
logger.info(
f"Process state already loaded: {_process_memory_state} (process reused)"
)
return "reused"
class TestNewQueuer(TimescaleTestMixin, unittest.TestCase):
"""Test new_queuer factory function with various configurations."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def setUp(self):
"""Set up for each test method."""
super().setup_method()
def test_valid_queuer(self):
"""Test creating a valid queuer."""
queuer = new_queuer_with_db("test_valid", 100, "", self.db_config)
self.assertIsNotNone(queuer, "Expected Queuer to be created successfully")
self.assertEqual(
queuer.name,
"test_valid",
"Expected Queuer name to be test_valid",
)
self.assertEqual(
100, queuer.max_concurrency, "Expected Queuer max concurrency to match"
)
self.assertIsNotNone(queuer.worker, "Expected worker to be initialized")
self.assertEqual(
queuer.worker.name,
"test_valid",
"Expected worker name to be test_valid",
)
self.assertEqual(
100,
queuer.worker.max_concurrency,
"Expected worker max concurrency to match",
)
def test_valid_queuer_with_options(self):
"""Test creating a valid queuer with error handling options."""
options = OnError(
timeout=10.0,
max_retries=3,
retry_delay=1.0,
retry_backoff=RetryBackoff.LINEAR,
)
queuer = new_queuer_with_db(
"test_with_options", 100, "", self.db_config, options
)
self.assertIsNotNone(queuer, "Expected Queuer to be created successfully")
self.assertEqual(
queuer.name,
"test_with_options",
"Expected Queuer name to be test_with_options",
)
self.assertEqual(100, queuer.max_concurrency)
self.assertIsNotNone(queuer.worker)
# Check that worker has the options applied
self.assertIsNotNone(queuer.worker.options)
if queuer.worker.options:
self.assertEqual(10.0, queuer.worker.options.timeout)
self.assertEqual(3, queuer.worker.options.max_retries)
self.assertEqual(1.0, queuer.worker.options.retry_delay)
self.assertEqual(RetryBackoff.LINEAR, queuer.worker.options.retry_backoff)
def test_invalid_max_concurrency(self):
"""Test that invalid max concurrency raises error."""
with self.assertRaises((ValueError, RuntimeError)):
new_queuer_with_db("test_invalid", -1, "", self.db_config)
def test_invalid_options(self):
"""Test that invalid options raise error."""
# In Python, the validation happens when creating OnError object
with self.assertRaises(ValueError):
OnError(
timeout=-10.0, # Invalid timeout value
max_retries=3,
retry_delay=1.0,
retry_backoff=RetryBackoff.LINEAR,
)
def test_invalid_database_config(self):
"""Test that invalid database configuration raises error."""
invalid_config = DatabaseConfiguration(
host="", # Invalid empty host
port=5432,
username="user",
password="password",
database="database",
schema="public",
sslmode="disable",
with_table_drop=True,
)
with self.assertRaises(
(ValueError, RuntimeError, ConnectionError, QueuerError)
):
new_queuer_with_db("test_invalid_db", 100, "", invalid_config)
class TestQueuerStart(TimescaleTestMixin, unittest.TestCase):
"""Test queuer start functionality."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def setUp(self):
"""Set up for each test method."""
super().setup_method()
def test_start_queuer_basic(self):
"""Test starting queuer with basic configuration."""
queuer = new_queuer_with_db("test_start", 10, "", self.db_config)
self.assertIsNotNone(queuer, "Expected Queuer to be created successfully")
try:
# Start should not raise an exception
queuer.start()
self.assertTrue(queuer.running, "Expected queuer to be running after start")
# Worker status should be RUNNING
self.assertEqual(WorkerStatus.RUNNING, queuer.worker.status)
finally:
if queuer.running:
queuer.stop()
def test_start_queuer_already_running(self):
"""Test that starting an already running queuer raises error."""
queuer = new_queuer_with_db("test_start_twice", 10, "", self.db_config)
try:
queuer.start()
self.assertTrue(queuer.running)
# Starting again should raise RuntimeError
with self.assertRaises(RuntimeError):
queuer.start()
finally:
if queuer.running:
queuer.stop()
class TestQueuerStop(TimescaleTestMixin, unittest.TestCase):
"""Test queuer stop functionality."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def setUp(self):
"""Set up for each test method."""
super().setup_method()
def test_stop_queuer(self):
"""Test stopping queuer."""
queuer = new_queuer_with_db("test_stop", 10, "", self.db_config)
# Stop should work even if not started
queuer.stop() # Should not raise error
# Start and then stop
queuer.start()
self.assertTrue(queuer.running)
queuer.stop()
self.assertFalse(queuer.running, "Expected queuer to not be running after stop")
def test_stop_not_running_queuer(self):
"""Test stopping a queuer that's not running."""
queuer = new_queuer_with_db("test_stop_not_running", 10, "", self.db_config)
# Stop should work even if not started - should not raise error
queuer.stop()
self.assertFalse(queuer.running)
class TestQueuerHeartbeat(TimescaleTestMixin, unittest.TestCase):
"""Test queuer heartbeat functionality."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def setUp(self):
"""Set up for each test method."""
super().setup_method()
def test_heartbeat_ticker_starts(self):
"""Test that heartbeat ticker starts successfully."""
queuer = new_queuer_with_db("test_heartbeat", 10, "", self.db_config)
try:
queuer.start()
self.assertTrue(queuer.running)
self.assertIsNotNone(queuer.heartbeat_ticker)
if queuer.heartbeat_ticker:
self.assertTrue(queuer.heartbeat_ticker.is_running())
time.sleep(0.5)
self.assertIsNotNone(queuer.worker)
finally:
if queuer.running:
queuer.stop()
def test_heartbeat_ticker_stops(self):
"""Test that heartbeat ticker stops when queuer stops."""
queuer = new_queuer_with_db("test_heartbeat_stop", 10, "", self.db_config)
queuer.start()
self.assertTrue(queuer.running)
self.assertIsNotNone(queuer.heartbeat_ticker)
if queuer.heartbeat_ticker:
self.assertTrue(queuer.heartbeat_ticker.is_running())
queuer.stop()
self.assertFalse(queuer.running)
if queuer.heartbeat_ticker:
self.assertFalse(
queuer.heartbeat_ticker.is_running(),
"Expected heartbeat ticker to be stopped",
)
class TestQueuerTasks(TimescaleTestMixin, unittest.TestCase):
"""Test queuer task management functionality."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def test_add_task(self):
"""Test adding tasks to queuer."""
queuer = new_queuer_with_db("test_add_task", 10, "", self.db_config)
try:
queuer.add_task(global_simple_task)
self.assertIn("global_simple_task", queuer.tasks)
task_obj = queuer.tasks["global_simple_task"]
self.assertEqual(global_simple_task, task_obj.task)
finally:
queuer.stop()
def test_add_multiple_tasks(self):
"""Test adding multiple tasks to queuer."""
queuer = new_queuer_with_db("test_multiple_tasks", 10, "", self.db_config)
try:
queuer.add_task(global_simple_task)
queuer.add_task(global_test_task)
self.assertIn("global_simple_task", queuer.tasks)
self.assertIn("global_test_task", queuer.tasks)
self.assertEqual(2, len(queuer.tasks))
finally:
queuer.stop()
def test_add_job_basic(self):
"""Test adding a basic job."""
queuer = new_queuer_with_db("test_add_job", 10, "", self.db_config)
try:
queuer.add_task(global_simple_task)
job = queuer.add_job(global_simple_task, 42)
self.assertIsNotNone(job, "Expected job to be created")
self.assertEqual("global_simple_task", job.task_name)
self.assertIsNotNone(job.rid, "Expected job to have a RID")
job = queuer.get_job(job.rid)
self.assertIsNotNone(job, "Expected job to be found in database")
finally:
queuer.stop()
def test_add_job_without_task_registration(self):
"""Test adding job without registering task."""
queuer = new_queuer_with_db("test_job_no_task", 10, "", self.db_config)
try:
job = queuer.add_job("global_simple_task", 42)
self.assertIsNotNone(job)
self.assertEqual("global_simple_task", job.task_name)
logger.debug(
f"job with non existing task inserted: {job.rid if job else 'None'}"
)
finally:
queuer.stop()
def test_job_creation_performance(self):
"""Test that job creation is fast and doesn't hang."""
queuer = new_queuer_with_db("test_job_performance", 10, "", self.db_config)
try:
queuer.add_task(global_test_task)
jobs: List[Job] = []
start_time = time.time()
for i in range(5):
job = queuer.add_job(global_test_task, f"test_data_{i}")
jobs.append(job)
self.assertIsNotNone(job, f"Job {i} should be created successfully")
end_time = time.time()
duration = end_time - start_time
job_ids = [job.rid for job in jobs]
self.assertLess(
duration, 2.0, f"Job creation took {duration:.3f}s, should be under 2s"
)
self.assertEqual(len(jobs), 5, "All 5 jobs should be created")
self.assertEqual(
len(job_ids), len(set(job_ids)), "All job IDs should be unique"
)
finally:
queuer.stop()
class TestQueuerNotifications(TimescaleTestMixin, unittest.TestCase):
"""Test queuer notification system and rapid job insertion."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def setUp(self):
"""Set up for each test method."""
super().setup_method()
def test_rapid_job_insertion_no_hanging(self):
"""Test that rapid job insertion doesn't hang with notification system."""
queuer = new_queuer_with_db("test_no_hang", 5, "", self.db_config)
listener_runner: SmallRunner = SmallRunner(queuer.listen_for_job_insert)
try:
queuer.add_task(global_simple_task)
queuer.start()
jobs: List[Job] = []
jobs_lock = threading.Lock()
def on_job_inserted(job: Job):
logger.debug(f"Received job notification: {job.rid if job else 'None'}")
with jobs_lock:
jobs.append(job)
runner = go_func(
queuer.listen_for_job_insert,
use_mp=False,
notify_function=on_job_inserted,
)
if runner and isinstance(runner, SmallRunner):
listener_runner: SmallRunner = runner
# Give the listener a moment to start up
time.sleep(0.5)
batchJobs = [BatchJob(global_simple_task, [i * 2]) for i in range(10)]
queuer.add_jobs(batchJobs)
final_job = queuer.add_job(global_simple_task, 999)
self.assertIsNotNone(final_job, "Final job should be created successfully")
time.sleep(2.0)
with jobs_lock:
jobs_received = len(jobs)
self.assertGreater(
jobs_received,
0,
f"Should have received at least some job notifications, got {jobs_received}",
)
jobs = queuer.get_jobs(0, 100)
jobs_ended = queuer.get_jobs_ended(0, 100)
self.assertGreaterEqual(
len(jobs) + len(jobs_ended),
9,
f"Should have at least 9 jobs in database, found {len(jobs) + len(jobs_ended)}",
)
finally:
listener_runner.get_results()
queuer.stop()
def test_process_reuse_with_pool(self):
"""Test that multiprocessing pool reuses processes and maintains state."""
# Use max_concurrency=1 to ensure both jobs run on the same worker process
queuer = new_queuer_with_db("test_process_reuse", 1, "", self.db_config)
try:
queuer.add_task(global_process_reuse_task)
queuer.start()
# Add first job - should load the state
job1 = queuer.add_job(global_process_reuse_task)
self.assertIsNotNone(job1, "First job should be created")
# Wait for first job to complete
time.sleep(2.0)
# Check first job result (completed jobs are in archive)
job1_result = queuer.db_job.select_job_from_archive(job1.rid)
self.assertIsNotNone(job1_result, "First job should exist in archive")
if job1_result:
self.assertEqual(
JobStatus.SUCCEEDED, job1_result.status, "First job should succeed"
)
self.assertEqual(
"loaded", job1_result.results[0], "First job should load the state"
)
# Add second job - should reuse the process and find state already loaded
job2 = queuer.add_job(global_process_reuse_task)
self.assertIsNotNone(job2, "Second job should be created")
# Wait for second job to complete
time.sleep(2.0)
# Check second job result (completed jobs are in archive)
job2_result = queuer.db_job.select_job_from_archive(job2.rid)
self.assertIsNotNone(job2_result, "Second job should exist in archive")
if job2_result:
self.assertEqual(
JobStatus.SUCCEEDED, job2_result.status, "Second job should succeed"
)
self.assertEqual(
"reused",
job2_result.results[0],
"Second job should detect reused process with pre-loaded state",
)
finally:
queuer.stop()
class TestJobArchiveRetention(TimescaleTestMixin, unittest.TestCase):
"""
Test job archive retention policies with TimescaleDB.
Mirrors Go's TestJobArchiveRetention with TimescaleDB retention policy.
"""
@classmethod
def setUpClass(cls):
super().setup_class()
@classmethod
def tearDownClass(cls):
super().teardown_class()
def setUp(self):
super().setup_method()
def test_timescale_retention_policy(self):
"""Test retention policy with TimescaleDB native retention."""
# Verify we're using TimescaleDB
self.assertTrue(
self.db_config.with_timescale,
"Expected with_timescale to be True for TimescaleDB tests",
)
# Create queuer with master settings that include retention
queuer = new_queuer_with_db("test-retention", 10, "", self.db_config)
# Define a simple task that the queuer can execute
def test_task() -> str:
return "success"
queuer.add_task(test_task)
try:
# Start queuer with master settings
master_settings = MasterSettings(retention_archive=1) # 1 day
queuer.start(master_settings)
# Wait for master to initialize and set up retention
time.sleep(6)
# Verify master was created with correct settings
master = queuer.db_master.select_master()
self.assertIsNotNone(master, "Expected master to be created")
self.assertEqual(
1,
master.settings.retention_archive,
"Expected master retention_archive to be 1 day",
)
finally:
queuer.stop()
class TestJobArchiveRetentionPostgres(PostgresTestMixin, unittest.TestCase):
"""
Test job archive retention policies with vanilla PostgreSQL.
Mirrors Go's TestJobArchiveRetention with PostgreSQL trigger-based retention.
"""
@classmethod
def setUpClass(cls):
super().setup_class()
@classmethod
def tearDownClass(cls):
super().teardown_class()
def setUp(self):
super().setup_method()
def test_postgres_trigger_retention(self):
"""Test retention policy with PostgreSQL trigger-based cleanup."""
# Verify we're NOT using TimescaleDB
self.assertFalse(
self.db_config.with_timescale,
"Expected with_timescale to be False for vanilla PostgreSQL tests",
)
# Define a simple task that the queuer can execute
def test_task() -> str:
return "success"
# Create queuer with master settings that include retention
queuer = new_queuer_with_db("test-retention", 10, "", self.db_config)
queuer.add_task(test_task)
try:
# Start queuer with master settings
master_settings = MasterSettings(retention_archive=1) # 1 day
queuer.start(master_settings)
# Wait for master to initialize and set up retention
time.sleep(6)
# Insert old job directly into archive
old_job_rid = uuid4()
# Get worker RID from the queuer's worker
worker_rid = queuer.worker.rid if queuer.worker else None
self.assertIsNotNone(worker_rid, "Expected worker to have RID")
# Get database instance for type narrowing
db_instance = queuer.database.instance
if db_instance is None:
self.fail(
"Expected database instance to be available for inserting old job"
)
with db_instance.cursor() as cur:
cur.execute(
"""
INSERT INTO job_archive (id, rid, worker_id, worker_rid, task_name, status, updated_at, created_at)
VALUES (1, %s, 1, %s, 'test_task', 'SUCCEEDED',
CURRENT_TIMESTAMP - INTERVAL '2 days',
CURRENT_TIMESTAMP - INTERVAL '2 days')
""",
(old_job_rid, worker_rid),
)
db_instance.commit()
# Add 15 jobs to trigger cleanup (trigger runs every 10th insert)
for i in range(15):
job = queuer.add_job(test_task, None)
self.assertIsNotNone(job, f"Expected job {i} to be created")
# Wait for jobs to complete and trigger to execute
time.sleep(5)
# Verify old job was deleted
deleted_job = queuer.db_job.select_job_from_archive(old_job_rid)
self.assertIsNone(
deleted_job,
"Expected old job to be deleted by trigger-based retention",
)
finally:
queuer.stop()
if __name__ == "__main__":
unittest.main()