Skip to content

Commit 2cba71a

Browse files
committed
add gcp batch feature flag
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent b90da76 commit 2cba71a

File tree

5 files changed

+18
-5
lines changed

5 files changed

+18
-5
lines changed

src/clusterfuzz/_internal/base/errors.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,5 @@ def __str__(self):
115115
class QueueLimitReachedError(Error):
116116
"""Error thrown when the queue limit is reached."""
117117

118-
def __init__(self, size):
119-
super().__init__(f'Queue limit reached {size}.')
118+
def __init__(self, size, queue):
119+
super().__init__(f'Queue {queue} has reached the limit of {size}.')

src/clusterfuzz/_internal/base/feature_flags.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class FeatureFlags(Enum):
3131

3232
UTASK_MAIN_QUEUE_LIMIT = 'utask_main_queue_limit'
3333

34+
GCP_BATCH_JOBS_FREQUENCY = 'gcp_batch_jobs_frequency'
35+
3436
@property
3537
def flag(self):
3638
"""Get the feature flag."""

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ def get_task_duration(command):
109109
# scheduling on batch.
110110
MAX_UTASKS = 3000
111111

112+
# Time window to get the metrics.
113+
# We should look for metrics in
114+
# start = now - _QUEUE_LIMIT_INTERVAL
115+
# end = now
116+
_QUEUE_LIMIT_INTERVAL = 5 * 60 # 5 minutes.
117+
112118

113119
class Error(Exception):
114120
"""Base exception class."""
@@ -707,7 +713,7 @@ def get_utask_main_queue_size():
707713
now = time.time()
708714
interval = monitoring_v3.TimeInterval(
709715
end_time={'seconds': int(now)},
710-
start_time={'seconds': int(now - 5 * 60)},
716+
start_time={'seconds': int(now - _QUEUE_LIMIT_INTERVAL)},
711717
)
712718

713719
results = client.list_time_series(

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from clusterfuzz._internal.base import feature_flags
2121
from clusterfuzz._internal.base import tasks
2222
from clusterfuzz._internal.base.tasks import task_utils
23+
from clusterfuzz._internal.base.tasks import UTASK_MAIN_QUEUE
2324
from clusterfuzz._internal.batch import service as batch_service
2425
from clusterfuzz._internal.bot.tasks import utasks
2526
from clusterfuzz._internal.metrics import events
@@ -165,7 +166,10 @@ def execute(self, task_argument, job_type, uworker_env):
165166
if utask_flag and utask_flag.enabled:
166167
utask_main_queue_limit = utask_flag.content
167168
if utask_main_queue_size > utask_main_queue_limit:
168-
raise errors.QueueLimitReachedError(utask_main_queue_size)
169+
base_os_version = environment.get_value('BASE_OS_VERSION')
170+
queue_name = UTASK_MAIN_QUEUE if not base_os_version else \
171+
f'{UTASK_MAIN_QUEUE}-{base_os_version}'
172+
raise errors.QueueLimitReachedError(utask_main_queue_size, queue_name)
169173

170174
logs.info('Preprocessing utask.')
171175
download_url = self.preprocess(task_argument, job_type, uworker_env)

src/clusterfuzz/_internal/remote_task/remote_task_adapters.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class RemoteTaskAdapters(Enum):
3535
"""
3636
KUBERNETES = ('kubernetes', k8s_service.KubernetesService,
3737
feature_flags.FeatureFlags.K8S_JOBS_FREQUENCY, 0.0)
38-
GCP_BATCH = ('gcp_batch', batch_service.GcpBatchService, None, 1.0)
38+
GCP_BATCH = ('gcp_batch', batch_service.GcpBatchService,
39+
feature_flags.FeatureFlags.GCP_BATCH_JOBS_FREQUENCY, 1.0)
3940

4041
def __init__(self, adapter_id, service, feature_flag, default_weight):
4142
self.id = adapter_id

0 commit comments

Comments
 (0)