Skip to content

Commit f96a469

Browse files
authored
schedule fuzz based on preprocess queue size (#5153)
SHOULD BE MERGED AFTER #5150. It updates the logic for scheduling fuzz tasks. Instead of aiming to load the GCP Batch infrastructure based on the available CPUs, it looks only to the preprocess queue size. By default, it aims to keep the preprocess queue with 10k messages, creating the number of tasks based on the difference between the aim value and the current value. <img width="1084" height="771" alt="image" src="https://github.com/user-attachments/assets/e6e22555-069b-4d97-a42f-68ccc9c6c170" /> Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent 7c8f541 commit f96a469

File tree

3 files changed

+41
-220
lines changed

3 files changed

+41
-220
lines changed

src/clusterfuzz/_internal/base/feature_flags.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class FeatureFlags(Enum):
3333

3434
GCP_BATCH_JOBS_FREQUENCY = 'gcp_batch_jobs_frequency'
3535

36+
PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit'
37+
3638
@property
3739
def flag(self):
3840
"""Get the feature flag."""

src/clusterfuzz/_internal/cron/schedule_fuzz.py

Lines changed: 33 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -14,40 +14,24 @@
1414
"""Cron job to schedule fuzz tasks that run on batch."""
1515

1616
import collections
17-
import multiprocessing
1817
import random
1918
import time
20-
from typing import Dict
21-
from typing import List
2219

2320
from google.cloud import monitoring_v3
24-
from googleapiclient import discovery
2521

22+
from clusterfuzz._internal.base import feature_flags
2623
from clusterfuzz._internal.base import tasks
2724
from clusterfuzz._internal.base import utils
28-
from clusterfuzz._internal.batch import service as batch
29-
from clusterfuzz._internal.config import local_config
3025
from clusterfuzz._internal.datastore import data_types
3126
from clusterfuzz._internal.datastore import ndb_utils
3227
from clusterfuzz._internal.google_cloud_utils import credentials
3328
from clusterfuzz._internal.metrics import logs
3429

35-
# TODO(metzman): Actually implement this.
36-
CPUS_PER_FUZZ_JOB = 2
30+
PREPROCESS_TARGET_SIZE_DEFAULT = 10000
3731

38-
# Pretend like our CPU limit is 3% higher than it actually is so that we use the
39-
# full CPU capacity even when scheduling is slow.
40-
CPU_BUFFER_MULTIPLIER = 1.03
4132

42-
43-
def _get_quotas(creds, project, region):
44-
compute = discovery.build('compute', 'v1', credentials=creds)
45-
return compute.regions().get( # pylint: disable=no-member
46-
region=region, project=project).execute()['quotas']
47-
48-
49-
def count_unacked(creds, project_id, subscription_id):
50-
"""Counts the unacked messages in |subscription_id|."""
33+
def get_queue_size(creds, project_id, subscription_id):
34+
"""Returns the size of the queue (unacked messages)."""
5135
# TODO(metzman): Not all of these are fuzz_tasks. Deal with that.
5236
metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages'
5337
query_filter = (f'metric.type="{metric}" AND '
@@ -76,59 +60,15 @@ def count_unacked(creds, project_id, subscription_id):
7660
return 0
7761

7862

79-
def get_cpu_usage(creds, project: str, region: str) -> int:
80-
"""Returns the number of available CPUs in the current GCE region."""
81-
82-
quotas = _get_quotas(creds, project, region)
83-
84-
# Sometimes, the preemptible quota is 0, which means the number of preemptible
85-
# CPUs is actually limited by the CPU quota.
86-
# If preemptible quota is not defined, we need to use CPU quota instead.
87-
cpu_quota = None
88-
preemptible_quota = None
89-
90-
# Get preemptible_quota and cpu_quota from the list of quotas.
91-
for quota in quotas:
92-
if preemptible_quota and cpu_quota:
93-
break
94-
if quota['metric'] == 'CPUS':
95-
cpu_quota = quota
96-
continue
97-
if quota['metric'] == 'PREEMPTIBLE_CPUS':
98-
preemptible_quota = quota
99-
continue
100-
assert preemptible_quota or cpu_quota
101-
102-
if not preemptible_quota['limit']:
103-
# Preemptible quota is not set. Obey the CPU quota since that limits us.
104-
quota = cpu_quota
105-
else:
106-
quota = preemptible_quota
107-
assert quota['limit'], quota
108-
109-
# TODO(metzman): Do this in a more configurable way.
110-
# We need this because us-central1 and us-east4 have different numbers of
111-
# cores alloted to us in their quota. Treat them the same to simplify things.
112-
limit = min(quota['limit'], 100_000)
113-
project_config = local_config.ProjectConfig()
114-
# On OSS-Fuzz there is a limit to the number of CPUs we can use.
115-
limit = min(limit, project_config.get('schedule_fuzz.cpu_limit', limit))
116-
return limit, quota['usage']
117-
118-
11963
class BaseFuzzTaskScheduler:
12064
"""Base fuzz task scheduler for any deployment of ClusterFuzz."""
12165

122-
def __init__(self, num_cpus):
123-
self.num_cpus = num_cpus
66+
def __init__(self, num_tasks):
67+
self.num_tasks = num_tasks
12468

12569
def get_fuzz_tasks(self):
12670
raise NotImplementedError('Child class must implement.')
12771

128-
def _get_cpus_per_fuzz_job(self, job_name):
129-
del job_name
130-
return CPUS_PER_FUZZ_JOB
131-
13272

13373
class FuzzTaskCandidate:
13474
"""Data class that holds more info about FuzzerJobs than the ndb.Models do.
@@ -159,7 +99,7 @@ def copy(self):
15999
class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler):
160100
"""Fuzz task scheduler for OSS-Fuzz."""
161101

162-
def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:
102+
def get_fuzz_tasks(self) -> list[tasks.Task]:
163103
# TODO(metzman): Handle high end.
164104
# A job's weight is determined by its own weight and the weight of the
165105
# project is a part of. First get project weights.
@@ -222,12 +162,11 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:
222162
for fuzz_task_candidate in fuzz_task_candidates:
223163
weights.append(fuzz_task_candidate.weight)
224164

225-
# TODO(metzman): Handle high-end jobs correctly.
226-
num_instances = int(self.num_cpus / self._get_cpus_per_fuzz_job(None))
227-
logs.info(f'Scheduling {num_instances} fuzz tasks for OSS-Fuzz.')
165+
fuzz_tasks_count = self.num_tasks
166+
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.')
228167

229168
choices = random.choices(
230-
fuzz_task_candidates, weights=weights, k=num_instances)
169+
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
231170
fuzz_tasks = [
232171
tasks.Task(
233172
'fuzz',
@@ -245,13 +184,10 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:
245184
class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler):
246185
"""Fuzz task scheduler for Chrome."""
247186

248-
def __init__(self, *args, **kwargs):
249-
super().__init__(*args, **kwargs)
250-
self.num_cpus = respect_project_max_cpus(self.num_cpus)
251-
252-
def get_fuzz_tasks(self) -> List[tasks.Task]:
187+
def get_fuzz_tasks(self) -> list[tasks.Task]:
253188
"""Returns fuzz tasks for chrome, weighted by job weight."""
254189
logs.info('Getting jobs for Chrome.')
190+
255191
candidates_by_job = {}
256192
# Only consider linux jobs for chrome fuzzing.
257193
job_query = data_types.Job.query(data_types.Job.platform == 'LINUX')
@@ -276,14 +212,14 @@ def get_fuzz_tasks(self) -> List[tasks.Task]:
276212
fuzz_task_candidates.append(fuzz_task_candidate)
277213

278214
weights = [candidate.weight for candidate in fuzz_task_candidates]
279-
num_instances = int(self.num_cpus / self._get_cpus_per_fuzz_job(None))
280-
logs.info(f'Scheduling {num_instances} fuzz tasks for Chrome.')
215+
fuzz_tasks_count = self.num_tasks
216+
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.')
281217

282218
if not fuzz_task_candidates:
283219
return []
284220

285221
choices = random.choices(
286-
fuzz_task_candidates, weights=weights, k=num_instances)
222+
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
287223
fuzz_tasks = [
288224
tasks.Task(
289225
'fuzz',
@@ -295,101 +231,38 @@ def get_fuzz_tasks(self) -> List[tasks.Task]:
295231
return fuzz_tasks
296232

297233

298-
def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]:
234+
def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]:
299235
if utils.is_oss_fuzz():
300-
scheduler = OssfuzzFuzzTaskScheduler(available_cpus)
236+
scheduler = OssfuzzFuzzTaskScheduler(num_tasks)
301237
else:
302-
scheduler = ChromeFuzzTaskScheduler(available_cpus)
238+
scheduler = ChromeFuzzTaskScheduler(num_tasks)
303239
fuzz_tasks = scheduler.get_fuzz_tasks()
304240
return fuzz_tasks
305241

306242

307-
def get_batch_regions(batch_config):
308-
fuzz_subconf_names = {
309-
subconf['name'] for subconf in batch_config.get(
310-
'mapping.LINUX-PREEMPTIBLE-UNPRIVILEGED.subconfigs')
311-
}
312-
313-
subconfs = batch_config.get('subconfigs')
314-
return list(
315-
set(subconfs[subconf]['region']
316-
for subconf in subconfs
317-
if subconf in fuzz_subconf_names))
318-
243+
def schedule_fuzz_tasks() -> bool:
244+
"""Schedules fuzz tasks."""
319245

320-
def get_available_cpus(project: str, regions: List[str]) -> int:
321-
"""Returns the available CPUs for fuzz tasks."""
322-
# TODO(metzman): This doesn't distinguish between fuzz and non-fuzz
323-
# tasks (nor preemptible and non-preemptible CPUs). Fix this.
324-
# Get total scheduled and queued.
246+
project = utils.get_application_id()
247+
start = time.time()
325248
creds = credentials.get_default()[0]
249+
preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE)
326250

327-
target = 0
328-
usage = 0
329-
for region in regions:
330-
region_target, region_usage = get_cpu_usage(creds, project, region)
331-
target += region_target
332-
usage += region_usage
333-
waiting_tasks = (
334-
count_unacked(creds, project, 'preprocess') + count_unacked(
335-
creds, project, 'utask_main'))
336-
337-
if usage + waiting_tasks * CPUS_PER_FUZZ_JOB > .95 * target:
338-
# Only worry about queueing build up if we are above 95% utilization.
339-
count_args = ((project, region) for region in regions)
340-
with multiprocessing.Pool(2) as pool:
341-
target *= CPU_BUFFER_MULTIPLIER
342-
# These calls are extremely slow (about 30 minutes total).
343-
result = pool.starmap_async( # pylint: disable=no-member
344-
batch.count_queued_or_scheduled_tasks, count_args)
345-
346-
region_counts = zip(*result.get()) # Group all queued and all scheduled.
347-
# Add up all queued and scheduled.
348-
region_counts = [sum(tup) for tup in region_counts]
349-
logs.info(f'QUEUED/SCHEDULED tasks per region: {region_counts}')
350-
if region_counts[0] > 10_000:
351-
# Check queued tasks.
352-
logs.info('Too many jobs queued, not scheduling more fuzzing.')
353-
return 0
354-
waiting_tasks += sum(region_counts) # Add up queued and scheduled.
251+
target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT
252+
if target_size_flag and target_size_flag.enabled:
253+
target_size = int(target_size_flag.value)
355254
else:
356-
logs.info('Skipping getting tasks.')
357-
358-
occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB + usage
359-
logs.info(f'Soon or currently occupied CPUs: {occupied_cpus}')
360-
361-
logs.info(f'Target number CPUs: {target}')
362-
available_cpus = max(target - occupied_cpus, 0)
363-
logs.info(f'Available CPUs: {available_cpus}')
255+
target_size = PREPROCESS_TARGET_SIZE_DEFAULT
364256

365-
# Don't schedule more than 50K tasks at once. So we don't overload batch.
366-
# This number is arbitrary, but we aren't at full capacity at lower numbers.
367-
available_cpus = min(available_cpus, 50_000 * len(regions))
257+
num_tasks = target_size - preprocess_queue_size
258+
logs.info(f'Preprocess queue size: {preprocess_queue_size}. '
259+
f'Target: {target_size}. Needed: {num_tasks}.')
368260

369-
return available_cpus
370-
371-
372-
def respect_project_max_cpus(num_cpus):
373-
conf = local_config.ProjectConfig()
374-
max_cpus_per_schedule = conf.get('max_cpus_per_schedule')
375-
if max_cpus_per_schedule is None:
376-
return num_cpus
377-
return min(max_cpus_per_schedule, num_cpus)
378-
379-
380-
def schedule_fuzz_tasks() -> bool:
381-
"""Schedules fuzz tasks."""
382-
multiprocessing.set_start_method('spawn')
383-
batch_config = local_config.BatchConfig()
384-
project = batch_config.get('project')
385-
regions = get_batch_regions(batch_config)
386-
start = time.time()
387-
available_cpus = get_available_cpus(project, regions)
388-
logs.info(f'{available_cpus} available CPUs.')
389-
if not available_cpus:
261+
if num_tasks <= 0:
262+
logs.info('Queue size met or exceeded. Not scheduling tasks.')
390263
return False
391264

392-
fuzz_tasks = get_fuzz_tasks(available_cpus)
265+
fuzz_tasks = get_fuzz_tasks(num_tasks)
393266
if not fuzz_tasks:
394267
logs.error('No fuzz tasks found to schedule.')
395268
return False

0 commit comments

Comments
 (0)