Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apps/worker/helpers/checkpoint_logger/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ class UploadFlow(BaseFlow):


@failure_events(
"TEST_RESULTS_ERROR", "UNCAUGHT_RETRY_EXCEPTION", "CELERY_FAILURE", "CELERY_TIMEOUT"
"TEST_RESULTS_ERROR",
"NOTIF_LOCK_ERROR",
"UNCAUGHT_RETRY_EXCEPTION",
"CELERY_FAILURE",
"CELERY_TIMEOUT",
)
@success_events("TEST_RESULTS_NOTIFY")
@subflows(
Expand All @@ -95,6 +99,7 @@ class TestResultsFlow(BaseFlow):
FLAKE_DETECTION_NOTIFY = auto()
TEST_RESULTS_ERROR = auto()
TEST_RESULTS_FINISHER_BEGIN = auto()
NOTIF_LOCK_ERROR = auto()
UNCAUGHT_RETRY_EXCEPTION = auto()
CELERY_FAILURE = auto()
CELERY_TIMEOUT = auto()
Expand Down
149 changes: 149 additions & 0 deletions apps/worker/services/notification/debounce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import logging
from contextlib import contextmanager
from typing import cast

from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError
from redis import Redis

from services.lock_manager import LockManager, LockRetry, LockType

log = logging.getLogger(__name__)

SKIP_DEBOUNCE_TOKEN = 1
"""
Token value to use in tests to skip debounce acquisition step.

When passed as fencing_token, the task will skip acquiring a new token
and proceed directly to the main notification logic. This is useful for
tests that want to test the core functionality without the debounce retry.
"""


class LockAcquisitionLimitError(Exception):
"""Raised when lock acquisition attempts exceed the maximum allowed.

This is distinct from Celery's MaxRetriesExceededError which tracks task retries.
This exception specifically tracks failed attempts to acquire a distributed lock.
"""

pass


class NotificationDebouncer[T]:
"""
Handles debouncing logic for notification tasks using fencing tokens.

This ensures that when multiple notification tasks are queued for the same
commit, only the latest one actually sends a notification after a debounce period.
"""

def __init__(
self,
redis_key_template: str,
debounce_period_seconds: int = 30,
lock_type: LockType = LockType.NOTIFICATION,
max_lock_retries: int = 5,
):
"""
:param redis_key_template: Template string for Redis key, e.g. "notifier_fence:{}_{}"
:param debounce_period_seconds: How long to wait before sending notification
:param lock_type: Type of lock to use for notification locking
:param max_lock_retries: Maximum number of retries for lock acquisition
"""
self.redis_key_template = redis_key_template
self.debounce_period_seconds = debounce_period_seconds
self.lock_type = lock_type
self.max_lock_retries = max_lock_retries

def acquire_fencing_token(
self,
redis_client: Redis,
repo_id: int,
commit_sha: str,
) -> int:
"""
Acquire a fencing token for deduplication.

:returns: The acquired fencing token value
"""
redis_key = self.redis_key_template.format(repo_id, commit_sha)
with redis_client.pipeline() as pipeline:
pipeline.incr(redis_key)
pipeline.expire(redis_key, 24 * 60 * 60)
results = pipeline.execute()
return int(cast(str, results[0]))

def check_fencing_token_stale(
self,
redis_client: Redis,
repo_id: int,
commit_sha: str,
fencing_token: int,
) -> bool:
"""
Check if the fencing token is stale, indicating another notification task
is in progress and can take over.

This method should be called within a lock context.

:returns: True if the token is stale (task should exit early),
False if the token is current (task should continue).
"""
current_token = int(
cast(
str,
redis_client.get(self.redis_key_template.format(repo_id, commit_sha))
or "0",
)
)

return fencing_token < current_token

@contextmanager
def notification_lock(self, lock_manager: LockManager, task_attempts: int):
"""
Context manager to handle the repeated lock acquisition pattern
with automatic retry handling for LockRetry exceptions.
"""
try:
with lock_manager.locked(
self.lock_type,
max_retries=self.max_lock_retries,
retry_num=task_attempts,
):
yield
return
except LockRetry as e:
if e.max_retries_exceeded:
log.error(
"Not retrying lock acquisition - max retries exceeded",
extra={
"retry_num": e.retry_num,
"max_attempts": e.max_attempts,
},
)
raise LockAcquisitionLimitError(
f"Lock acquisition limit exceeded: {e.retry_num} >= {e.max_attempts}",
)
raise

def handle_lock_retry(
self,
task,
lock_retry: LockRetry,
failure_result: T,
) -> T:
"""
Handle LockRetry exception by attempting to retry the task.

:param task: The Celery task instance
:param lock_retry: The LockRetry exception
:param failure_result: The result to return if max retries exceeded
:returns: failure_result if max retries exceeded
:raises Retry: When retry is successfully scheduled (caller should catch this)
"""
try:
task.retry(max_retries=task.max_retries, countdown=lock_retry.countdown)
raise AssertionError("task.retry() should always raise Retry")
except CeleryMaxRetriesExceededError:
return failure_result
79 changes: 48 additions & 31 deletions apps/worker/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError
from helpers.log_context import LogContext, set_log_context
from helpers.save_commit_error import save_commit_error
from services.notification.debounce import LockAcquisitionLimitError
from services.repository import get_repo_provider_service
from shared.celery_config import (
TASK_RETRY_BACKOFF_BASE_SECONDS,
Expand Down Expand Up @@ -411,6 +412,24 @@ def _analyse_error(self, exception: SQLAlchemyError, *args, **kwargs):
exc_info=True,
)

def _handle_database_error_retry(self, db_session, *args, **kwargs):
"""Handle database error retry logic with rollback and exponential backoff.

Returns None if max retries exceeded, otherwise raises Retry exception.
safe_retry will raise Retry if retry is possible, so this method will
only return None if max retries have been exceeded.
"""
db_session.rollback()
retry_count = getattr(self.request, "retries", 0)
countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count)

# safe_retry handles MaxRetriesExceededError internally:
# - Returns False if max retries exceeded
# - Raises Retry if retry is possible (which propagates up to Celery)
if not self.safe_retry(countdown=countdown):
# Max retries exceeded - return None to match old behavior
return None

@sentry_sdk.trace
def run(self, *args, **kwargs):
with self.task_full_runtime.time():
Expand Down Expand Up @@ -462,45 +481,38 @@ def run(self, *args, **kwargs):
with self.task_core_runtime.time():
return self.run_impl(db_session, *args, **kwargs)
except InterfaceError as ex:
sentry_sdk.capture_exception(
ex,
)
sentry_sdk.capture_exception(ex)
# InterfaceError indicates a database connection issue
# Return None to indicate task failure without retry
return None
except (DataError, IntegrityError):
log.exception(
"Errors related to the constraints of database happened",
extra={"task_args": args, "task_kwargs": kwargs},
)
db_session.rollback()
retry_count = getattr(self.request, "retries", 0)
countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count)
try:
if not self.safe_retry(countdown=countdown):
return None
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
return None
return self._handle_database_error_retry(db_session, *args, **kwargs)
except SQLAlchemyError as ex:
self._analyse_error(ex, args, kwargs)
db_session.rollback()
retry_count = getattr(self.request, "retries", 0)
countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count)
try:
if not self.safe_retry(countdown=countdown):
return None
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
return None
except MaxRetriesExceededError as ex:
return self._handle_database_error_retry(db_session, *args, **kwargs)
except MaxRetriesExceededError:
# Log checkpoint if flows have begun
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
# Return None to indicate task failure after max retries exceeded
return None
except LockAcquisitionLimitError:
log.warning(
"Lock acquisition limit exceeded in task",
extra={"task": self.name},
exc_info=True,
)
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.NOTIF_LOCK_ERROR)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR)
return None
finally:
self.wrap_up_dbsession(db_session)

Expand Down Expand Up @@ -592,16 +604,21 @@ def get_repo_provider_service(
)
except NoConfiguredAppsAvailable as exp:
if exp.rate_limited_count > 0:
# At least one GitHub app is available but rate-limited. Retry after
# waiting until the next hour (minimum 1 minute delay).
retry_delay_seconds = max(60, get_seconds_to_next_hour())
# At least one GitHub app is available but rate-limited.
# Use the actual retry time from GitHub API response if available,
# otherwise fall back to waiting until the next hour (minimum 1 minute delay).
if exp.earliest_retry_after_seconds is not None:
retry_delay_seconds = max(60, exp.earliest_retry_after_seconds)
else:
retry_delay_seconds = max(60, get_seconds_to_next_hour())
log.warning(
"Unable to get repo provider service due to rate limits. Retrying again later.",
extra={
"apps_available": exp.apps_count,
"apps_rate_limited": exp.rate_limited_count,
"apps_suspended": exp.suspended_count,
"countdown_seconds": retry_delay_seconds,
"earliest_retry_after_seconds": exp.earliest_retry_after_seconds,
},
)
if commit:
Expand Down
Loading
Loading