-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueuer_task_test.py
More file actions
151 lines (112 loc) · 5.17 KB
/
queuer_task_test.py
File metadata and controls
151 lines (112 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
Tests for task-related methods in the Python queuer implementation.
Mirrors Go's queuerTask_test.go functionality.
"""
import unittest
import pytest
from .model.task import Task
from .queuer import new_queuer_with_db
from .helper.test_database import TimescaleTestMixin
def task_1_test(data: str) -> str:
"""Global test task function."""
return f"Processed: {data}"
def task_2_test(data: str) -> str:
"""Global custom task function."""
return f"Custom: {data}"
class TestQueuerTask(TimescaleTestMixin, unittest.TestCase):
"""Test cases for queuer task functionality."""
@classmethod
def setUpClass(cls):
"""Set up for the entire test class."""
super().setup_class()
@classmethod
def tearDownClass(cls):
"""Clean up after all tests."""
super().teardown_class()
def setUp(self):
"""Set up for each test method."""
super().setup_method()
def test_add_task_success(self):
"""Test successfully adding a task."""
queuer = new_queuer_with_db("test_queuer", 10, "", self.db_config)
task = queuer.add_task(task_1_test)
self.assertIsNotNone(task)
if task:
self.assertEqual(task.name, "task_1_test")
self.assertIn("task_1_test", queuer.tasks)
self.assertIn("task_1_test", queuer.worker.available_tasks)
queuer.stop()
def test_add_task_already_exists(self):
"""Test adding a task that already exists."""
queuer = new_queuer_with_db("test_queuer", 10, "", self.db_config)
queuer.add_task(task_1_test)
with pytest.raises(RuntimeError, match="Task already exists"):
queuer.add_task(task_1_test)
queuer.stop()
def test_add_task_with_name_success(self):
"""Test successfully adding a task with a specific name."""
queuer = new_queuer_with_db("test_queuer", 10, "", self.db_config)
result: Task = queuer.add_task_with_name(task_1_test, "custom_name")
self.assertIsNotNone(result)
self.assertEqual(result.name, "custom_name")
self.assertIn("custom_name", queuer.tasks)
self.assertIn("custom_name", queuer.worker.available_tasks)
queuer.stop()
def test_add_task_with_name_already_exists(self):
"""Test adding a task with name that already exists."""
queuer = new_queuer_with_db("test_queuer", 10, "", self.db_config)
queuer.add_task_with_name(task_1_test, "custom_name")
# Try adding another task with the same name
with pytest.raises(RuntimeError, match="Task already exists"):
queuer.add_task_with_name(task_1_test, "custom_name")
queuer.stop()
def test_add_multiple_tasks(self):
"""Test adding multiple different tasks."""
queuer = new_queuer_with_db("test_queuer", 10, "", self.db_config)
queuer.add_task(task_1_test)
queuer.add_task(task_2_test)
self.assertIn("task_1_test", queuer.tasks)
self.assertIn("task_2_test", queuer.tasks)
self.assertIn("task_1_test", queuer.worker.available_tasks)
self.assertIn("task_2_test", queuer.worker.available_tasks)
self.assertEqual(len(queuer.worker.available_tasks), 2)
queuer.stop()
def test_task_decorator_without_name(self):
"""Test the @queuer.task() decorator without specifying a name."""
queuer = new_queuer_with_db("test_queuer", 10, "", self.db_config)
@queuer.task()
def decorated_task(data: str) -> str:
"""Test task added via decorator."""
return f"Decorated: {data}"
# Verify the task was registered with the function name
self.assertIn("decorated_task", queuer.tasks)
self.assertIn("decorated_task", queuer.worker.available_tasks)
# Verify the original function is still callable
result = decorated_task("test")
self.assertEqual(result, "Decorated: test")
# Verify the task object is correct
task = queuer.tasks["decorated_task"]
self.assertEqual(task.name, "decorated_task")
self.assertEqual(task.task, decorated_task)
queuer.stop()
def test_task_decorator_with_name(self):
"""Test the @queuer.task_with_name(name="custom_name") decorator with a custom name."""
queuer = new_queuer_with_db("test_queuer", 10, "", self.db_config)
@queuer.task_with_name(name="my_custom_task")
def some_function(data: str) -> str:
"""Test task with custom name via decorator."""
return f"Custom: {data}"
# Verify the task was registered with the custom name
self.assertIn("my_custom_task", queuer.tasks)
self.assertNotIn(
"some_function", queuer.tasks
) # Original function name should not be used
self.assertIn("my_custom_task", queuer.worker.available_tasks)
# Verify the original function is still callable
result = some_function("test")
self.assertEqual(result, "Custom: test")
# Verify the task object is correct
task = queuer.tasks["my_custom_task"]
self.assertEqual(task.name, "my_custom_task")
self.assertEqual(task.task, some_function)
queuer.stop()