|
21 | 21 |
|
22 | 22 | import collections |
23 | 23 | import random |
24 | | -from typing import List |
| 24 | +import typing |
25 | 25 |
|
| 26 | +from clusterfuzz._internal.datastore import data_types |
| 27 | +from clusterfuzz._internal.datastore import ndb_utils |
26 | 28 | from clusterfuzz._internal.metrics import logs |
27 | 29 | from clusterfuzz._internal.remote_task import remote_task_adapters |
28 | 30 | from clusterfuzz._internal.remote_task import remote_task_types |
@@ -105,13 +107,39 @@ def get_job_frequency(self): |
105 | 107 | logs.info('Job frequencies', frequencies=frequencies) |
106 | 108 | return frequencies |
107 | 109 |
|
| 110 | + def prepare_unscheduled_tasks( |
| 111 | + self, unscheduled_remote_tasks: typing.List[remote_task_types.RemoteTask] |
| 112 | + ) -> typing.List[remote_task_types.RemoteTask]: |
| 113 | + """Prepares the unscheduled remote tasks to be sent back to the |
| 114 | + preprocess queue. |
| 115 | +
|
| 116 | + The messages in the preprocess queue expects tasks with |
| 117 | + the properties followng task_name, fuzzer, job, eta. |
| 118 | + The utasks has the signed url as argument instead of the fuzzer, |
| 119 | + then for sending it back to the preprocess, it recovers its fuzzer |
| 120 | + and override the argument. |
| 121 | + """ |
| 122 | + |
| 123 | + job_names = {task.job_type for task in unscheduled_remote_tasks} |
| 124 | + query = data_types.FuzzerJob.query( |
| 125 | + data_types.FuzzerJob.job.IN(list(job_names))) |
| 126 | + fuzzer_jobs = ndb_utils.get_all_from_query(query) |
| 127 | + fuzzer_jobs_name_mapped = {job.name: job for job in fuzzer_jobs} |
| 128 | + for task in unscheduled_remote_tasks: |
| 129 | + if task.job_type not in fuzzer_jobs_name_mapped: |
| 130 | + logs.error(f'{task.job_type} not found.') |
| 131 | + continue |
| 132 | + task.argument = fuzzer_jobs_name_mapped[task.job_type].argument |
| 133 | + |
| 134 | + return unscheduled_remote_tasks |
| 135 | + |
108 | 136 | def create_utask_main_job(self, module, job_type, input_download_url): |
109 | 137 | adapter_id = self._get_adapter() |
110 | 138 | service = self._service_map[adapter_id] |
111 | 139 | return service.create_utask_main_job(module, job_type, input_download_url) |
112 | 140 |
|
113 | | - def create_utask_main_jobs(self, |
114 | | - remote_tasks: List[remote_task_types.RemoteTask]): |
| 141 | + def create_utask_main_jobs( |
| 142 | + self, remote_tasks: typing.List[remote_task_types.RemoteTask]): |
115 | 143 | """Creates a batch of remote tasks, distributing them across backends. |
116 | 144 |
|
117 | 145 | This method handles two cases: |
@@ -146,13 +174,16 @@ def create_utask_main_jobs(self, |
146 | 174 | adapter_id = list(frequencies.keys())[i % len(frequencies)] |
147 | 175 | tasks_by_adapter[adapter_id].append(task) |
148 | 176 |
|
149 | | - results = [] |
| 177 | + unscheduled_tasks = [] |
150 | 178 | for adapter_id, tasks in tasks_by_adapter.items(): |
151 | 179 | if tasks: |
152 | 180 | try: |
153 | 181 | logs.info(f'Sending {len(tasks)} tasks to {adapter_id}.') |
154 | 182 | service = self._service_map[adapter_id] |
155 | | - results.extend(service.create_utask_main_jobs(tasks)) |
| 183 | + unscheduled_tasks.extend(service.create_utask_main_jobs(tasks)) |
156 | 184 | except Exception: # pylint: disable=broad-except |
157 | 185 | logs.error(f'Failed to send {len(tasks)} tasks to {adapter_id}.') |
158 | | - return results |
| 186 | + |
| 187 | + prepared_unscheduled_tasks = self.prepare_unscheduled_tasks( |
| 188 | + unscheduled_tasks) |
| 189 | + return prepared_unscheduled_tasks |
0 commit comments