Skip to content

Commit b36f5c3

Browse files
committed
fixes
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent e7226af commit b36f5c3

File tree

12 files changed

+68
-175
lines changed

12 files changed

+68
-175
lines changed

src/clusterfuzz/_internal/base/feature_flags.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class FeatureFlags(Enum):
2929
K8S_JOBS_FREQUENCY = 'k8s_jobs_frequency'
3030
K8S_JOBS_PENDING_LIMIT = 'k8s_jobs_pending_limit'
3131

32+
UTASK_MAIN_QUEUE_LIMIT = 'utask_main_queue_limit'
33+
3234
@property
3335
def flag(self):
3436
"""Get the feature flag."""

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from clusterfuzz._internal import swarming
1919
from clusterfuzz._internal.base import errors
20+
from clusterfuzz._internal.base import feature_flags
2021
from clusterfuzz._internal.base import tasks
2122
from clusterfuzz._internal.base.tasks import task_utils
2223
from clusterfuzz._internal.batch import service as batch_service
@@ -25,6 +26,8 @@
2526
from clusterfuzz._internal.metrics import logs
2627
from clusterfuzz._internal.system import environment
2728

29+
UTASK_MAIN_QUEUE_LIMIT_DEFAULT = 10000
30+
2831

2932
class BaseTask:
3033
"""Base module for tasks."""
@@ -157,7 +160,11 @@ def execute(self, task_argument, job_type, uworker_env):
157160

158161
utask_main_queue_size = tasks.get_utask_main_queue_size()
159162

160-
if utask_main_queue_size > 10000:
163+
utask_main_queue_limit = UTASK_MAIN_QUEUE_LIMIT_DEFAULT
164+
utask_flag = feature_flags.FeatureFlags.UTASK_MAIN_QUEUE_LIMIT.flag
165+
if utask_flag and utask_flag.enabled:
166+
utask_main_queue_limit = utask_flag.content
167+
if utask_main_queue_size > utask_main_queue_limit:
161168
raise errors.QueueLimitReachedError(utask_main_queue_size)
162169

163170
logs.info('Preprocessing utask.')

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import collections
1717
import ipaddress
1818
import os
19-
import typing
2019
import uuid
2120

2221
import google.auth
@@ -53,7 +52,7 @@
5352
])
5453

5554

56-
def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
55+
def _get_config_names(remote_tasks: list[remote_task_types.RemoteTask]):
5756
"""Gets the name of the configs for each batch_task. Returns a dict
5857
5958
that is indexed by command and job_type for efficient lookup."""
@@ -90,9 +89,8 @@ def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
9089
return config_map
9190

9291

93-
def _get_k8s_job_configs(
94-
remote_tasks: typing.List[remote_task_types.RemoteTask]
95-
) -> typing.Dict[typing.Tuple[str, str], KubernetesJobConfig]:
92+
def _get_k8s_job_configs(remote_tasks: list[remote_task_types.RemoteTask]
93+
) -> dict[tuple[str, str], KubernetesJobConfig]:
9694
"""Gets the configured specifications for a batch workload."""
9795

9896
if not remote_tasks:
@@ -295,8 +293,8 @@ def create_utask_main_job(self, module: str, job_type: str,
295293
uncreated_tasks = self.create_utask_main_jobs(batch_tasks)
296294
return uncreated_tasks
297295

298-
def create_utask_main_jobs(
299-
self, remote_tasks: typing.List[remote_task_types.RemoteTask]):
296+
def create_utask_main_jobs(self,
297+
remote_tasks: list[remote_task_types.RemoteTask]):
300298
"""Creates a batch job for a list of uworker main tasks.
301299
302300
This method groups the tasks by their workload specification and creates a
@@ -311,7 +309,8 @@ def create_utask_main_jobs(
311309
pending_jobs_count = self._get_pending_jobs_count()
312310
if pending_jobs_count >= limit:
313311
logs.warning(
314-
f'Pending jobs count {pending_jobs_count} reached limit {limit}.')
312+
f'Pending jobs count {pending_jobs_count} reached limit {limit} '
313+
f'for k8s.')
315314
return remote_tasks
316315

317316
job_specs = collections.defaultdict(list)
@@ -321,7 +320,7 @@ def create_utask_main_jobs(
321320
f'Scheduling {remote_task.command}, {remote_task.job_type} in K8s.')
322321
config = configs[(remote_task.command, remote_task.job_type)]
323322
job_specs[config].append(remote_task.input_download_url)
324-
logs.info('Creating GCP batch jobs.')
323+
logs.info('Creating Kubernetes jobs.')
325324
for config, input_urls in job_specs.items():
326325
for input_url in input_urls:
327326
self.create_job(config, input_url)

src/clusterfuzz/_internal/remote_task/remote_task_gate.py

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121

2222
import collections
2323
import random
24-
import typing
2524

26-
from clusterfuzz._internal.base.tasks.task_utils import is_testcase_based_task
27-
from clusterfuzz._internal.bot.tasks.utasks import uworker_io
2825
from clusterfuzz._internal.metrics import logs
2926
from clusterfuzz._internal.remote_task import remote_task_adapters
3027
from clusterfuzz._internal.remote_task import remote_task_types
@@ -107,44 +104,13 @@ def get_job_frequency(self):
107104
logs.info('Job frequencies', frequencies=frequencies)
108105
return frequencies
109106

110-
def prepare_unscheduled_tasks(
111-
self, unscheduled_remote_tasks: typing.List[remote_task_types.RemoteTask]
112-
) -> typing.List[remote_task_types.RemoteTask]:
113-
"""Prepares the unscheduled remote tasks to be sent back to the
114-
preprocess queue.
115-
116-
The messages in the preprocess queue expects tasks with
117-
the properties followng task_name, fuzzer, job, eta.
118-
The utasks has the signed url as argument instead of the fuzzer,
119-
then for sending it back to the preprocess, it recovers its fuzzer
120-
and override the argument.
121-
"""
122-
123-
prepared_tasks = []
124-
if not unscheduled_remote_tasks:
125-
return prepared_tasks
126-
127-
for task in unscheduled_remote_tasks:
128-
try:
129-
uworker_input = uworker_io.download_and_deserialize_uworker_input(
130-
task.input_download_url)
131-
if is_testcase_based_task(task.command):
132-
task.argument = uworker_input.testcase_id
133-
else:
134-
task.argument = uworker_input.fuzzer_name
135-
prepared_tasks.append(task)
136-
except Exception:
137-
logs.error('Could not prepare the task due to an error.')
138-
139-
return prepared_tasks
140-
141107
def create_utask_main_job(self, module, job_type, input_download_url):
142108
adapter_id = self._get_adapter()
143109
service = self._service_map[adapter_id]
144110
return service.create_utask_main_job(module, job_type, input_download_url)
145111

146-
def create_utask_main_jobs(
147-
self, remote_tasks: typing.List[remote_task_types.RemoteTask]):
112+
def create_utask_main_jobs(self,
113+
remote_tasks: list[remote_task_types.RemoteTask]):
148114
"""Creates a batch of remote tasks, distributing them across backends.
149115
150116
This method handles two cases:
@@ -161,6 +127,7 @@ def create_utask_main_jobs(
161127
# For a single task, use a random distribution.
162128
adapter_id = self._get_adapter()
163129
tasks_by_adapter[adapter_id].extend(remote_tasks)
130+
unscheduled_tasks = []
164131
else:
165132
# For multiple tasks, use deterministic slicing to ensure the
166133
# distribution precisely matches the frequency configuration.
@@ -172,14 +139,17 @@ def create_utask_main_jobs(
172139
remote_tasks[start_index:start_index + count])
173140
start_index += count
174141

175-
# Distribute any remainder tasks (due to rounding) one by one. This
176-
# ensures that all tasks are assigned to a backend.
177142
remaining_tasks = remote_tasks[start_index:]
178-
for i, task in enumerate(remaining_tasks):
179-
adapter_id = list(frequencies.keys())[i % len(frequencies)]
180-
tasks_by_adapter[adapter_id].append(task)
143+
if sum(frequencies.values()) >= 0.999:
144+
# Distribute any remainder tasks (due to rounding) one by one. This
145+
# ensures that all tasks are assigned to a backend.
146+
for i, task in enumerate(remaining_tasks):
147+
adapter_id = list(frequencies.keys())[i % len(frequencies)]
148+
tasks_by_adapter[adapter_id].append(task)
149+
unscheduled_tasks = []
150+
else:
151+
unscheduled_tasks = list(remaining_tasks)
181152

182-
unscheduled_tasks = []
183153
for adapter_id, tasks in tasks_by_adapter.items():
184154
if tasks:
185155
try:

src/clusterfuzz/_internal/remote_task/remote_task_types.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
"""Remote task types."""
1515

1616
import abc
17-
import typing
1817

1918
from clusterfuzz._internal.base import tasks
2019

@@ -53,7 +52,7 @@ def create_utask_main_job(self, module: str, job_type: str,
5352

5453
@abc.abstractmethod
5554
def create_utask_main_jobs(
56-
self, remote_tasks: typing.List[RemoteTask]) -> typing.List[RemoteTask]:
55+
self, remote_tasks: list[RemoteTask]) -> list[RemoteTask]:
5756
"""Creates many remote tasks for uworker main tasks.
5857
Returns the tasks that couldn't be created.
5958
"""

src/clusterfuzz/_internal/tests/core/base/tasks/queue_size_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2023 Google LLC
1+
# Copyright 2026 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.

src/clusterfuzz/_internal/tests/core/bot/startup/run_bot_test.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def setUp(self):
135135
helpers.patch(self, [
136136
'clusterfuzz._internal.base.tasks.get_utask_mains',
137137
'clusterfuzz._internal.remote_task.remote_task_gate.RemoteTaskGate',
138-
'clusterfuzz._internal.base.tasks.bulk_add_tasks',
139138
])
140139

141140
def test_schedule_tasks_requeue_uncreated(self):
@@ -157,9 +156,6 @@ def test_schedule_tasks_requeue_uncreated(self):
157156
self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.assert_called_once(
158157
)
159158

160-
# Check that bulk_add_tasks was NOT called.
161-
self.mock.bulk_add_tasks.assert_not_called()
162-
163159
# Verify that cancel_lease_ack was called on the pubsub task.
164160
mock_task.cancel_lease_ack.assert_called_once()
165161

@@ -179,4 +175,3 @@ def test_schedule_tasks_success(self):
179175

180176
self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.assert_called_once(
181177
)
182-
self.mock.bulk_add_tasks.assert_not_called()

src/clusterfuzz/_internal/tests/core/bot/tasks/task_types_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def test_event_emit_during_exception(self):
137137
self.mock.emit.assert_any_call(event_finished)
138138

139139

140+
@test_utils.with_cloud_emulators('datastore')
140141
class UTaskExecuteTest(unittest.TestCase):
141142
"""Tests for UTask execution."""
142143

src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py

Lines changed: 34 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,6 @@ def setUp(self):
5555
self.patcher.start()
5656
self.addCleanup(self.patcher.stop)
5757

58-
self.mock_prepare_unscheduled_tasks = mock.patch.object(
59-
remote_task_gate.RemoteTaskGate,
60-
'prepare_unscheduled_tasks',
61-
side_effect=lambda x: x).start()
62-
self.addCleanup(mock.patch.stopall)
63-
6458
def test_init(self):
6559
"""Tests that the RemoteTaskGate initializes correctly and creates
6660
service map."""
@@ -161,11 +155,8 @@ def test_create_utask_main_jobs_remainder_distribution(
161155
remote_task_types.RemoteTask('c', 'j', 'u3'),
162156
]
163157

164-
# 33/33/33 split - one task will be a remainder
165-
mock_get_job_frequency.return_value = {
166-
'kubernetes': 0.33,
167-
'gcp_batch': 0.33
168-
}
158+
# 50/50 split - one task will be a remainder
159+
mock_get_job_frequency.return_value = {'kubernetes': 0.5, 'gcp_batch': 0.5}
169160

170161
gate = remote_task_gate.RemoteTaskGate()
171162
gate.create_utask_main_jobs(tasks)
@@ -177,6 +168,38 @@ def test_create_utask_main_jobs_remainder_distribution(
177168
self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with(
178169
[tasks[1]])
179170

171+
@mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency')
172+
def test_create_utask_main_jobs_unscheduled(self, mock_get_job_frequency):
173+
"""Tests that create_utask_main_jobs returns remainder as unscheduled
174+
when sum < 1.0."""
175+
tasks = [
176+
remote_task_types.RemoteTask('c', 'j', 'u1'),
177+
remote_task_types.RemoteTask('c', 'j', 'u2'),
178+
remote_task_types.RemoteTask('c', 'j', 'u3'),
179+
remote_task_types.RemoteTask('c', 'j', 'u4'),
180+
]
181+
182+
# 0.25 each. Sum 0.5.
183+
mock_get_job_frequency.return_value = {
184+
'kubernetes': 0.25,
185+
'gcp_batch': 0.25
186+
}
187+
188+
gate = remote_task_gate.RemoteTaskGate()
189+
result = gate.create_utask_main_jobs(tasks)
190+
191+
# 4 * 0.25 = 1 task each.
192+
# Total assigned = 2.
193+
# Total unscheduled = 2.
194+
195+
self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(
196+
[tasks[0]])
197+
self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with(
198+
[tasks[1]])
199+
200+
# Result should contain unscheduled (tasks[2], tasks[3]).
201+
self.assertEqual(result, [tasks[2], tasks[3]])
202+
180203
@mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency')
181204
def test_create_utask_main_jobs_full_kubernetes(self, mock_get_job_frequency):
182205
"""Tests that all tasks are routed to Kubernetes when frequency is 1.0."""
@@ -222,7 +245,6 @@ def test_create_utask_main_jobs_returns_unscheduled_tasks(
222245
result = gate.create_utask_main_jobs(tasks)
223246

224247
self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(tasks)
225-
self.mock_prepare_unscheduled_tasks.assert_not_called()
226248
self.assertEqual(result, unscheduled_tasks)
227249

228250

@@ -250,47 +272,3 @@ def setUp(self):
250272
self.patcher.start()
251273
self.addCleanup(self.patcher.stop)
252274
self.gate = remote_task_gate.RemoteTaskGate()
253-
254-
@mock.patch(
255-
'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_input'
256-
)
257-
def test_prepare_unscheduled_tasks(self, mock_download):
258-
"""Tests that prepare_unscheduled_tasks correctly updates task arguments."""
259-
260-
def side_effect(url):
261-
mock_input = mock.Mock()
262-
if url == 'signed_url_1':
263-
mock_input.testcase_id = None
264-
mock_input.fuzzer_name = 'fuzzer1'
265-
elif url == 'signed_url_2':
266-
mock_input.testcase_id = None
267-
mock_input.fuzzer_name = 'fuzzer2'
268-
elif url == 'signed_url_3':
269-
mock_input.testcase_id = '12345'
270-
mock_input.fuzzer_name = 'fuzzer3'
271-
return mock_input
272-
273-
mock_download.side_effect = side_effect
274-
275-
task1 = remote_task_types.RemoteTask(
276-
command='fuzz', job_type='job1', input_download_url='signed_url_1')
277-
task2 = remote_task_types.RemoteTask(
278-
command='fuzz', job_type='job2', input_download_url='signed_url_2')
279-
task3 = remote_task_types.RemoteTask(
280-
command='progression',
281-
job_type='job3',
282-
input_download_url='signed_url_3')
283-
284-
tasks = [task1, task2, task3]
285-
286-
updated_tasks = self.gate.prepare_unscheduled_tasks(tasks)
287-
288-
self.assertEqual(len(updated_tasks), 3)
289-
self.assertEqual(updated_tasks[0].argument, 'fuzzer1')
290-
self.assertEqual(updated_tasks[1].argument, 'fuzzer2')
291-
self.assertEqual(updated_tasks[2].argument, '12345')
292-
293-
def test_prepare_unscheduled_tasks_empty(self):
294-
"""Tests that prepare_unscheduled_tasks handles empty list of tasks."""
295-
updated_tasks = self.gate.prepare_unscheduled_tasks([])
296-
self.assertEqual(updated_tasks, [])

src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,6 @@ def setUp(self):
4747
self.addCleanup(patcher.stop)
4848
patcher.start()
4949

50-
# Mock prepare_unscheduled_tasks to avoid NDB context issues
51-
patcher = mock.patch.object(
52-
remote_task_gate.RemoteTaskGate,
53-
'prepare_unscheduled_tasks',
54-
side_effect=lambda x: x)
55-
self.addCleanup(patcher.stop)
56-
patcher.start()
57-
5850
self.gate = remote_task_gate.RemoteTaskGate()
5951

6052
@mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency')

0 commit comments

Comments
 (0)