diff --git a/apps/worker/helpers/checkpoint_logger/flows.py b/apps/worker/helpers/checkpoint_logger/flows.py index 9d4b78ec7a..c114b74143 100644 --- a/apps/worker/helpers/checkpoint_logger/flows.py +++ b/apps/worker/helpers/checkpoint_logger/flows.py @@ -76,7 +76,11 @@ class UploadFlow(BaseFlow): @failure_events( - "TEST_RESULTS_ERROR", "UNCAUGHT_RETRY_EXCEPTION", "CELERY_FAILURE", "CELERY_TIMEOUT" + "TEST_RESULTS_ERROR", + "NOTIF_LOCK_ERROR", + "UNCAUGHT_RETRY_EXCEPTION", + "CELERY_FAILURE", + "CELERY_TIMEOUT", ) @success_events("TEST_RESULTS_NOTIFY") @subflows( @@ -95,6 +99,7 @@ class TestResultsFlow(BaseFlow): FLAKE_DETECTION_NOTIFY = auto() TEST_RESULTS_ERROR = auto() TEST_RESULTS_FINISHER_BEGIN = auto() + NOTIF_LOCK_ERROR = auto() UNCAUGHT_RETRY_EXCEPTION = auto() CELERY_FAILURE = auto() CELERY_TIMEOUT = auto() diff --git a/apps/worker/services/notification/debounce.py b/apps/worker/services/notification/debounce.py new file mode 100644 index 0000000000..0ba77033e7 --- /dev/null +++ b/apps/worker/services/notification/debounce.py @@ -0,0 +1,149 @@ +import logging +from contextlib import contextmanager +from typing import cast + +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from redis import Redis + +from services.lock_manager import LockManager, LockRetry, LockType + +log = logging.getLogger(__name__) + +SKIP_DEBOUNCE_TOKEN = 1 +""" +Token value to use in tests to skip debounce acquisition step. + +When passed as fencing_token, the task will skip acquiring a new token +and proceed directly to the main notification logic. This is useful for +tests that want to test the core functionality without the debounce retry. +""" + + +class LockAcquisitionLimitError(Exception): + """Raised when lock acquisition attempts exceed the maximum allowed. + + This is distinct from Celery's MaxRetriesExceededError which tracks task retries. + This exception specifically tracks failed attempts to acquire a distributed lock. + """ + + pass + + +class NotificationDebouncer[T]: + """ + Handles debouncing logic for notification tasks using fencing tokens. + + This ensures that when multiple notification tasks are queued for the same + commit, only the latest one actually sends a notification after a debounce period. + """ + + def __init__( + self, + redis_key_template: str, + debounce_period_seconds: int = 30, + lock_type: LockType = LockType.NOTIFICATION, + max_lock_retries: int = 5, + ): + """ + :param redis_key_template: Template string for Redis key, e.g. "notifier_fence:{}_{}" + :param debounce_period_seconds: How long to wait before sending notification + :param lock_type: Type of lock to use for notification locking + :param max_lock_retries: Maximum number of retries for lock acquisition + """ + self.redis_key_template = redis_key_template + self.debounce_period_seconds = debounce_period_seconds + self.lock_type = lock_type + self.max_lock_retries = max_lock_retries + + def acquire_fencing_token( + self, + redis_client: Redis, + repo_id: int, + commit_sha: str, + ) -> int: + """ + Acquire a fencing token for deduplication. + + :returns: The acquired fencing token value + """ + redis_key = self.redis_key_template.format(repo_id, commit_sha) + with redis_client.pipeline() as pipeline: + pipeline.incr(redis_key) + pipeline.expire(redis_key, 24 * 60 * 60) + results = pipeline.execute() + return int(cast(str, results[0])) + + def check_fencing_token_stale( + self, + redis_client: Redis, + repo_id: int, + commit_sha: str, + fencing_token: int, + ) -> bool: + """ + Check if the fencing token is stale, indicating another notification task + is in progress and can take over. + + This method should be called within a lock context. + + :returns: True if the token is stale (task should exit early), + False if the token is current (task should continue). + """ + current_token = int( + cast( + str, + redis_client.get(self.redis_key_template.format(repo_id, commit_sha)) + or "0", + ) + ) + + return fencing_token < current_token + + @contextmanager + def notification_lock(self, lock_manager: LockManager, task_attempts: int): + """ + Context manager to handle the repeated lock acquisition pattern + with automatic retry handling for LockRetry exceptions. + """ + try: + with lock_manager.locked( + self.lock_type, + max_retries=self.max_lock_retries, + retry_num=task_attempts, + ): + yield + return + except LockRetry as e: + if e.max_retries_exceeded: + log.error( + "Not retrying lock acquisition - max retries exceeded", + extra={ + "retry_num": e.retry_num, + "max_attempts": e.max_attempts, + }, + ) + raise LockAcquisitionLimitError( + f"Lock acquisition limit exceeded: {e.retry_num} >= {e.max_attempts}", + ) + raise + + def handle_lock_retry( + self, + task, + lock_retry: LockRetry, + failure_result: T, + ) -> T: + """ + Handle LockRetry exception by attempting to retry the task. + + :param task: The Celery task instance + :param lock_retry: The LockRetry exception + :param failure_result: The result to return if max retries exceeded + :returns: failure_result if max retries exceeded + :raises Retry: When retry is successfully scheduled (caller should catch this) + """ + try: + task.retry(max_retries=task.max_retries, countdown=lock_retry.countdown) + raise AssertionError("task.retry() should always raise Retry") + except CeleryMaxRetriesExceededError: + return failure_result diff --git a/apps/worker/tasks/base.py b/apps/worker/tasks/base.py index 7534585877..6a71894f07 100644 --- a/apps/worker/tasks/base.py +++ b/apps/worker/tasks/base.py @@ -30,6 +30,7 @@ from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError from helpers.log_context import LogContext, set_log_context from helpers.save_commit_error import save_commit_error +from services.notification.debounce import LockAcquisitionLimitError from services.repository import get_repo_provider_service from shared.celery_config import ( TASK_RETRY_BACKOFF_BASE_SECONDS, @@ -411,6 +412,24 @@ def _analyse_error(self, exception: SQLAlchemyError, *args, **kwargs): exc_info=True, ) + def _handle_database_error_retry(self, db_session, *args, **kwargs): + """Handle database error retry logic with rollback and exponential backoff. + + Returns None if max retries exceeded, otherwise raises Retry exception. + safe_retry will raise Retry if retry is possible, so this method will + only return None if max retries have been exceeded. + """ + db_session.rollback() + retry_count = getattr(self.request, "retries", 0) + countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count) + + # safe_retry handles MaxRetriesExceededError internally: + # - Returns False if max retries exceeded + # - Raises Retry if retry is possible (which propagates up to Celery) + if not self.safe_retry(countdown=countdown): + # Max retries exceeded - return None to match old behavior + return None + @sentry_sdk.trace def run(self, *args, **kwargs): with self.task_full_runtime.time(): @@ -462,45 +481,38 @@ def run(self, *args, **kwargs): with self.task_core_runtime.time(): return self.run_impl(db_session, *args, **kwargs) except InterfaceError as ex: - sentry_sdk.capture_exception( - ex, - ) + sentry_sdk.capture_exception(ex) + # InterfaceError indicates a database connection issue + # Return None to indicate task failure without retry + return None except (DataError, IntegrityError): log.exception( "Errors related to the constraints of database happened", extra={"task_args": args, "task_kwargs": kwargs}, ) - db_session.rollback() - retry_count = getattr(self.request, "retries", 0) - countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count) - try: - if not self.safe_retry(countdown=countdown): - return None - except MaxRetriesExceededError: - if UploadFlow.has_begun(): - UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) - if TestResultsFlow.has_begun(): - TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) - return None + return self._handle_database_error_retry(db_session, *args, **kwargs) except SQLAlchemyError as ex: self._analyse_error(ex, args, kwargs) - db_session.rollback() - retry_count = getattr(self.request, "retries", 0) - countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count) - try: - if not self.safe_retry(countdown=countdown): - return None - except MaxRetriesExceededError: - if UploadFlow.has_begun(): - UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) - if TestResultsFlow.has_begun(): - TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) - return None - except MaxRetriesExceededError as ex: + return self._handle_database_error_retry(db_session, *args, **kwargs) + except MaxRetriesExceededError: + # Log checkpoint if flows have begun if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) + # Return None to indicate task failure after max retries exceeded + return None + except LockAcquisitionLimitError: + log.warning( + "Lock acquisition limit exceeded in task", + extra={"task": self.name}, + exc_info=True, + ) + if UploadFlow.has_begun(): + UploadFlow.log(UploadFlow.NOTIF_LOCK_ERROR) + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) + return None finally: self.wrap_up_dbsession(db_session) @@ -592,9 +604,13 @@ def get_repo_provider_service( ) except NoConfiguredAppsAvailable as exp: if exp.rate_limited_count > 0: - # At least one GitHub app is available but rate-limited. Retry after - # waiting until the next hour (minimum 1 minute delay). - retry_delay_seconds = max(60, get_seconds_to_next_hour()) + # At least one GitHub app is available but rate-limited. + # Use the actual retry time from GitHub API response if available, + # otherwise fall back to waiting until the next hour (minimum 1 minute delay). + if exp.earliest_retry_after_seconds is not None: + retry_delay_seconds = max(60, exp.earliest_retry_after_seconds) + else: + retry_delay_seconds = max(60, get_seconds_to_next_hour()) log.warning( "Unable to get repo provider service due to rate limits. Retrying again later.", extra={ @@ -602,6 +618,7 @@ def get_repo_provider_service( "apps_rate_limited": exp.rate_limited_count, "apps_suspended": exp.suspended_count, "countdown_seconds": retry_delay_seconds, + "earliest_retry_after_seconds": exp.earliest_retry_after_seconds, }, ) if commit: diff --git a/apps/worker/tasks/bundle_analysis_notify.py b/apps/worker/tasks/bundle_analysis_notify.py index 73023bf5e4..d302fcfe38 100644 --- a/apps/worker/tasks/bundle_analysis_notify.py +++ b/apps/worker/tasks/bundle_analysis_notify.py @@ -2,6 +2,8 @@ from typing import Any import sentry_sdk +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from app import celery_app from database.enums import ReportType @@ -10,19 +12,36 @@ from services.bundle_analysis.notify import BundleAnalysisNotifyService from services.bundle_analysis.notify.types import NotificationSuccess from services.lock_manager import LockManager, LockRetry, LockType +from services.notification.debounce import ( + LockAcquisitionLimitError, + NotificationDebouncer, +) from shared.celery_config import ( BUNDLE_ANALYSIS_NOTIFY_MAX_RETRIES, bundle_analysis_notify_task_name, ) +from shared.helpers.redis import get_redis_connection from shared.yaml import UserYaml from tasks.base import BaseCodecovTask log = logging.getLogger(__name__) +BUNDLE_ANALYSIS_NOTIFIER_FENCING_TOKEN = "bundle_analysis_notifier_fence:{}_{}" +DEBOUNCE_PERIOD_SECONDS = 30 + class BundleAnalysisNotifyTask(BaseCodecovTask, name=bundle_analysis_notify_task_name): max_retries = BUNDLE_ANALYSIS_NOTIFY_MAX_RETRIES + def __init__(self): + super().__init__() + self.debouncer = NotificationDebouncer[dict]( + redis_key_template=BUNDLE_ANALYSIS_NOTIFIER_FENCING_TOKEN, + debounce_period_seconds=DEBOUNCE_PERIOD_SECONDS, + lock_type=LockType.BUNDLE_ANALYSIS_NOTIFY, + max_lock_retries=self.max_retries, + ) + def run_impl( self, db_session, @@ -33,6 +52,7 @@ def run_impl( repoid: int, commitid: str, commit_yaml: dict, + fencing_token: int | None = None, **kwargs, ): repoid = int(repoid) @@ -47,18 +67,88 @@ def run_impl( }, ) + redis_connection = get_redis_connection() lock_manager = LockManager( repoid=repoid, commitid=commitid, report_type=ReportType.BUNDLE_ANALYSIS, + redis_connection=redis_connection, ) + if fencing_token is None: + try: + with self.debouncer.notification_lock(lock_manager, self.attempts): + fencing_token = self.debouncer.acquire_fencing_token( + redis_connection, repoid, commitid + ) + + log.info( + "Acquired fencing token, retrying for debounce period", + extra={ + "repoid": repoid, + "commit": commitid, + "fencing_token": fencing_token, + }, + ) + retry_kwargs = { + "previous_result": previous_result, + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "fencing_token": fencing_token, + **kwargs, + } + try: + self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=retry_kwargs) + except CeleryMaxRetriesExceededError: + # Max retries exceeded during debounce retry, but we have a valid token. + # Proceed immediately with notification instead of dropping it. + log.warning( + "Max retries exceeded during debounce retry, proceeding immediately with notification", + extra={ + "repoid": repoid, + "commit": commitid, + "fencing_token": fencing_token, + }, + ) + # Continue execution to process notification with acquired token + except LockAcquisitionLimitError: + return { + "notify_attempted": False, + "notify_succeeded": None, + } + except LockRetry as e: + try: + result = self.debouncer.handle_lock_retry( + self, + e, + { + "notify_attempted": False, + "notify_succeeded": None, + }, + ) + if result is not None: + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise + + assert fencing_token, "Fencing token not acquired" try: - with lock_manager.locked( - LockType.BUNDLE_ANALYSIS_NOTIFY, - max_retries=self.max_retries, - retry_num=self.attempts, - ): + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_connection, repoid, commitid, fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra={"repoid": repoid, "commit": commitid}, + ) + return { + "notify_attempted": False, + "notify_succeeded": None, + } + return self.process_impl_within_lock( db_session=db_session, repoid=repoid, @@ -67,22 +157,27 @@ def run_impl( previous_result=previous_result, **kwargs, ) + except LockAcquisitionLimitError: + return { + "notify_attempted": False, + "notify_succeeded": None, + } except LockRetry as retry: - if retry.max_retries_exceeded: - log.error( - "Not retrying lock acquisition - max retries exceeded", - extra={ - "commitid": commitid, - "repoid": repoid, - "retry_num": retry.retry_num, - "max_attempts": retry.max_attempts, + try: + result = self.debouncer.handle_lock_retry( + self, + retry, + { + "notify_attempted": False, + "notify_succeeded": None, }, ) - return { - "notify_attempted": False, - "notify_succeeded": None, - } - self.retry(max_retries=self.max_retries, countdown=retry.countdown) + if result is not None: + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise @sentry_sdk.trace def process_impl_within_lock( diff --git a/apps/worker/tasks/bundle_analysis_processor.py b/apps/worker/tasks/bundle_analysis_processor.py index 5336201c25..d6f1face08 100644 --- a/apps/worker/tasks/bundle_analysis_processor.py +++ b/apps/worker/tasks/bundle_analysis_processor.py @@ -6,6 +6,8 @@ from app import celery_app from database.enums import ReportType from database.models import Commit, CommitReport, Upload +from helpers.clock import get_seconds_to_next_hour +from helpers.exceptions import NoConfiguredAppsAvailable from services.bundle_analysis.report import ( BundleAnalysisReportService, ProcessingResult, @@ -76,6 +78,54 @@ def run_impl( params, previous_result, ) + except NoConfiguredAppsAvailable as exp: + # Handle NoConfiguredAppsAvailable errors that may occur during + # repository service initialization (e.g., in BundleAnalysisReportLoader + # or other components that need GitHub app access) + if exp.rate_limited_count > 0: + # At least one GitHub app is available but rate-limited + # Use the actual retry time from GitHub API response if available, + # otherwise fall back to waiting until the next hour (minimum 1 minute delay) + if exp.earliest_retry_after_seconds is not None: + retry_delay_seconds = max(60, exp.earliest_retry_after_seconds) + else: + retry_delay_seconds = max(60, get_seconds_to_next_hour()) + log.warning( + "Bundle analysis processor unable to acquire GitHub app due to rate limits. Retrying later.", + extra={ + "apps_available": exp.apps_count, + "apps_rate_limited": exp.rate_limited_count, + "apps_suspended": exp.suspended_count, + "commitid": commitid, + "countdown_seconds": retry_delay_seconds, + "earliest_retry_after_seconds": exp.earliest_retry_after_seconds, + "repoid": repoid, + }, + ) + if not self.safe_retry(countdown=retry_delay_seconds): + log.error( + "Bundle analysis processor exceeded max retries after rate limit", + extra={ + "attempts": self.attempts, + "commitid": commitid, + "max_attempts": self.max_retries + 1, + "repoid": repoid, + }, + ) + return previous_result + else: + # No apps available or all apps are suspended - skip processing + log.warning( + "Bundle analysis processor skipping processing due to no configured GitHub apps available", + extra={ + "apps_available": exp.apps_count, + "apps_rate_limited": exp.rate_limited_count, + "apps_suspended": exp.suspended_count, + "commitid": commitid, + "repoid": repoid, + }, + ) + return previous_result except LockRetry as retry: # Check max retries using self.attempts (includes visibility timeout re-deliveries) # This prevents infinite retry loops when max retries are exceeded diff --git a/apps/worker/tasks/notify.py b/apps/worker/tasks/notify.py index 660d5caf55..d3a5b2169e 100644 --- a/apps/worker/tasks/notify.py +++ b/apps/worker/tasks/notify.py @@ -2,16 +2,14 @@ import sentry_sdk from asgiref.sync import async_to_sync -from celery.exceptions import MaxRetriesExceededError +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from sqlalchemy import and_ from sqlalchemy.orm.session import Session from app import celery_app from database.enums import CommitErrorTypes, Decoration, NotificationState, ReportType -from database.models import ( - Commit, - Pull, -) +from database.models import Commit, Pull from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME, CompareCommit from helpers.checkpoint_logger.flows import UploadFlow from helpers.clock import get_seconds_to_next_hour @@ -32,6 +30,10 @@ from services.github import get_github_app_for_commit, set_github_app_for_commit from services.lock_manager import LockManager, LockRetry, LockType from services.notification import NotificationService +from services.notification.debounce import ( + LockAcquisitionLimitError, + NotificationDebouncer, +) from services.report import ReportService from services.repository import ( EnrichedPull, @@ -66,8 +68,24 @@ GENERIC_TA_ERROR_MSG = "Test Analytics upload error: We are unable to process any of the uploaded JUnit XML files. Please ensure your files are in the right format." +NOTIFIER_FENCING_TOKEN = "notifier_fence:{}_{}" +DEBOUNCE_PERIOD_SECONDS = 30 + class NotifyTask(BaseCodecovTask, name=notify_task_name): + # Explicitly define max_retries to ensure consistent behavior with debouncer. + # Celery's default is 3, which matches the old lock retry behavior. + max_retries = 3 + + def __init__(self): + super().__init__() + self.debouncer = NotificationDebouncer[dict]( + redis_key_template=NOTIFIER_FENCING_TOKEN, + debounce_period_seconds=DEBOUNCE_PERIOD_SECONDS, + lock_type=LockType.NOTIFICATION, + max_lock_retries=self.max_retries, + ) + def run_impl( self, db_session: Session, @@ -76,6 +94,7 @@ def run_impl( commitid: str, current_yaml=None, empty_upload=None, + fencing_token: int | None = None, **kwargs, ): milestone = Milestones.NOTIFICATIONS_SENT @@ -105,15 +124,100 @@ def run_impl( commitid=commitid, report_type=ReportType.COVERAGE, lock_timeout=max(80, self.hard_time_limit_task), + redis_connection=redis_connection, ) + if fencing_token is None: + try: + with self.debouncer.notification_lock(lock_manager, self.attempts): + fencing_token = self.debouncer.acquire_fencing_token( + redis_connection, repoid, commitid + ) + + log.info( + "Acquired fencing token, retrying for debounce period", + extra={ + "repoid": repoid, + "commitid": commitid, + "fencing_token": fencing_token, + }, + ) + retry_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": current_yaml, + "empty_upload": empty_upload, + "fencing_token": fencing_token, + **kwargs, + } + try: + self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=retry_kwargs) + except CeleryMaxRetriesExceededError: + # Max retries exceeded during debounce retry, but we have a valid token. + # Proceed immediately with notification instead of dropping it. + log.warning( + "Max retries exceeded during debounce retry, proceeding immediately with notification", + extra={ + "repoid": repoid, + "commitid": commitid, + "fencing_token": fencing_token, + }, + ) + # Continue execution to process notification with acquired token + except LockAcquisitionLimitError: + self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_LOCK_ERROR, + ) + return { + "notified": False, + "notifications": None, + "reason": "max_retries_exceeded", + } + except LockRetry as e: + try: + result = self.debouncer.handle_lock_retry( + self, + e, + { + "notified": False, + "notifications": None, + "reason": "unobtainable_lock", + }, + ) + if result is not None: + self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_LOCK_ERROR, + ) + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise + + assert fencing_token, "Fencing token not acquired" try: - lock_acquired = False - with lock_manager.locked( - lock_type=LockType.NOTIFICATION, - retry_num=self.attempts, - ): - lock_acquired = True + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_connection, repoid, commitid, fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra={"repoid": repoid, "commitid": commitid}, + ) + return { + "notified": False, + "notifications": None, + "reason": "stale_fencing_token", + } + return self.run_impl_within_lock( db_session, repoid=repoid, @@ -122,18 +226,7 @@ def run_impl( empty_upload=empty_upload, **kwargs, ) - except LockRetry as err: - ( - log.info( - "Not notifying because there is another notification already happening", - extra={ - "repoid": repoid, - "commitid": commitid, - "error_type": type(err), - "lock_acquired": lock_acquired, - }, - ), - ) + except LockAcquisitionLimitError: self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) self._call_upload_breadcrumb_task( commit_sha=commitid, @@ -141,17 +234,49 @@ def run_impl( milestone=milestone, error=Errors.INTERNAL_LOCK_ERROR, ) - self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - error=Errors.INTERNAL_OTHER_JOB, - ) return { "notified": False, "notifications": None, - "reason": "unobtainable_lock", + "reason": "max_retries_exceeded", } + except LockRetry as err: + log.info( + "Not notifying because there is another notification already happening", + extra={ + "repoid": repoid, + "commitid": commitid, + "error_type": type(err), + }, + ) + try: + result = self.debouncer.handle_lock_retry( + self, + err, + { + "notified": False, + "notifications": None, + "reason": "unobtainable_lock", + }, + ) + if result is not None: + self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_LOCK_ERROR, + ) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_OTHER_JOB, + ) + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise def log_checkpoint(self, checkpoint): """ @@ -183,7 +308,7 @@ def _attempt_retry( error=Errors.INTERNAL_RETRYING, ) self.retry(max_retries=max_retries, countdown=countdown) - except MaxRetriesExceededError: + except CeleryMaxRetriesExceededError: log.warning( "Not attempting to retry notifications since we already retried too many times", extra={ @@ -272,8 +397,12 @@ def run_impl_within_lock( if exp.rate_limited_count > 0: # There's at least 1 app that we can use to communicate with GitHub, # but this app happens to be rate limited now. We try again later. - # Min wait time of 1 minute - retry_delay_seconds = max(60, get_seconds_to_next_hour()) + # Use the actual retry time from GitHub API response if available, + # otherwise fall back to waiting until the next hour (minimum 1 minute delay). + if exp.earliest_retry_after_seconds is not None: + retry_delay_seconds = max(60, exp.earliest_retry_after_seconds) + else: + retry_delay_seconds = max(60, get_seconds_to_next_hour()) log.warning( "Unable to start notifications. Retrying again later.", extra={ @@ -283,6 +412,7 @@ def run_impl_within_lock( "apps_rate_limited": exp.rate_limited_count, "apps_suspended": exp.suspended_count, "countdown_seconds": retry_delay_seconds, + "earliest_retry_after_seconds": exp.earliest_retry_after_seconds, }, ) self._call_upload_breadcrumb_task( diff --git a/apps/worker/tasks/test_analytics_notifier.py b/apps/worker/tasks/test_analytics_notifier.py index b58b51f740..2c085b45fd 100644 --- a/apps/worker/tasks/test_analytics_notifier.py +++ b/apps/worker/tasks/test_analytics_notifier.py @@ -1,18 +1,20 @@ import logging -from contextlib import contextmanager -from typing import cast from asgiref.sync import async_to_sync from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError from django.conf import settings -from redis import Redis from sqlalchemy.orm import Session from app import celery_app from database.enums import ReportType +from helpers.checkpoint_logger.flows import TestResultsFlow from helpers.notifier import NotifierResult from helpers.string import shorten_file_paths from services.lock_manager import LockManager, LockRetry, LockType +from services.notification.debounce import ( + LockAcquisitionLimitError, + NotificationDebouncer, +) from services.repository import ( fetch_pull_request_information, get_repo_provider_service, @@ -35,7 +37,10 @@ TestResultsNotifier, should_do_flaky_detection, ) -from shared.celery_config import test_analytics_notifier_task_name +from shared.celery_config import ( + TASK_MAX_RETRIES_DEFAULT, + test_analytics_notifier_task_name, +) from shared.django_apps.core.models import Repository from shared.helpers.redis import get_redis_connection from shared.reports.types import UploadType @@ -44,13 +49,6 @@ from shared.yaml.user_yaml import UserYaml from tasks.base import BaseCodecovTask - -class MaxRetriesExceededError(Exception): - """Raised when lock acquisition exceeds max retries.""" - - pass - - log = logging.getLogger(__name__) TA_NOTIFIER_FENCING_TOKEN = "ta_notifier_fence:{}_{}" @@ -65,6 +63,17 @@ class TestAnalyticsNotifierTask( data retention policies. """ + max_retries = TASK_MAX_RETRIES_DEFAULT + + def __init__(self): + super().__init__() + self.debouncer = NotificationDebouncer[NotifierTaskResult]( + redis_key_template=TA_NOTIFIER_FENCING_TOKEN, + debounce_period_seconds=DEBOUNCE_PERIOD_SECONDS, + lock_type=LockType.NOTIFICATION, + max_lock_retries=self.max_retries, + ) + def run_impl( self, _db_session: Session, @@ -106,68 +115,76 @@ def run_impl( if fencing_token is None: try: - with self._notification_lock(lock_manager): - # Acquire a fencing token to deduplicate notifications - redis_key = TA_NOTIFIER_FENCING_TOKEN.format( - repo_id, upload_context["commit_sha"] + with self.debouncer.notification_lock(lock_manager, self.attempts): + fencing_token = self.debouncer.acquire_fencing_token( + redis_client, repo_id, upload_context["commit_sha"] ) - with redis_client.pipeline() as pipeline: - pipeline.incr(redis_key) - # Set expiry to prevent stale keys from accumulating - pipeline.expire(redis_key, 24 * 60 * 60) # 24 hours - results = pipeline.execute() - fencing_token = int(cast(str, results[0])) - - # Add fencing token to the args and then retry for a debounce period + self.extra_dict["fencing_token"] = fencing_token - kwargs["fencing_token"] = fencing_token + retry_kwargs = { + "repoid": repo_id, + "upload_context": upload_context, + "fencing_token": fencing_token, + **kwargs, + } log.info( "Acquired fencing token, retrying for debounce period", extra=self.extra_dict, ) - self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=kwargs) - except MaxRetriesExceededError: + try: + self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=retry_kwargs) + except CeleryMaxRetriesExceededError: + log.warning( + "Max retries exceeded during debounce retry, proceeding immediately with notification", + extra=self.extra_dict, + ) + # Continue execution to process notification with acquired token + except LockAcquisitionLimitError: + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) return NotifierTaskResult( attempted=False, succeeded=False, ) except LockRetry as e: - try: - self.retry(max_retries=5, countdown=e.countdown) - except CeleryMaxRetriesExceededError: + result = self.debouncer.handle_lock_retry( + self, + e, + NotifierTaskResult(attempted=False, succeeded=False), + ) + if result is not None: + return result + + assert fencing_token, "Fencing token not acquired" + try: + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_client, repo_id, upload_context["commit_sha"], fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra=self.extra_dict, + ) return NotifierTaskResult( attempted=False, succeeded=False, ) - - # At this point we have a fencing token, but want to check if another - # notification task has incremented it, indicating a newer notification - # task is in progress and can take over - assert fencing_token, "Fencing token not acquired" - # The lock here is not required for correctness (only the one at the - # end is), but it reduces duplicated work and database queries - try: - with self._notification_lock(lock_manager): - stale_token_result = self._check_fencing_token( - redis_client, repo_id, upload_context, fencing_token - ) - if stale_token_result: - return stale_token_result - except MaxRetriesExceededError: + except LockAcquisitionLimitError: + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) return NotifierTaskResult( attempted=False, succeeded=False, ) except LockRetry as e: - try: - self.retry(max_retries=5, countdown=e.countdown) - except CeleryMaxRetriesExceededError: - return NotifierTaskResult( - attempted=False, - succeeded=False, - ) + result = self.debouncer.handle_lock_retry( + self, + e, + NotifierTaskResult(attempted=False, succeeded=False), + ) + if result is not None: + return result - # Do preparation work without a lock notifier = self.notification_preparation( repo_id=repo_id, upload_context=upload_context, @@ -181,121 +198,49 @@ def run_impl( ) try: - with self._notification_lock(lock_manager): - # Check fencing token again before sending the notification - stale_token_result = self._check_fencing_token( - redis_client, repo_id, upload_context, fencing_token - ) - if stale_token_result: - return stale_token_result + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_client, repo_id, upload_context["commit_sha"], fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra=self.extra_dict, + ) + return NotifierTaskResult( + attempted=False, + succeeded=False, + ) - # Send notification notifier_result = notifier.notify() - success = ( - True if notifier_result is NotifierResult.COMMENT_POSTED else False - ) + success = notifier_result is NotifierResult.COMMENT_POSTED log.info("Posted TA comment", extra={**self.extra_dict, "success": success}) return NotifierTaskResult( attempted=True, succeeded=success, ) - except MaxRetriesExceededError: + except LockAcquisitionLimitError: + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) return NotifierTaskResult( attempted=False, succeeded=False, ) except LockRetry as e: - try: - self.retry(max_retries=5, countdown=e.countdown) - except CeleryMaxRetriesExceededError: - return NotifierTaskResult( - attempted=False, - succeeded=False, - ) - - @contextmanager - def _notification_lock(self, lock_manager: LockManager): - """ - Context manager to handle the repeated lock acquisition pattern - with automatic retry handling for LockRetry exceptions. - """ - try: - with lock_manager.locked( - LockType.NOTIFICATION, - max_retries=5, - retry_num=self.attempts, - ): - yield - return - except LockRetry as e: - # Lock acquisition failed - handle immediately without yielding - # This ensures the with block body never executes without lock protection - if e.max_retries_exceeded: - log.error( - "Not retrying lock acquisition - max retries exceeded", - extra={ - "retry_num": e.retry_num, - "max_attempts": e.max_attempts, - }, - ) - raise MaxRetriesExceededError( - f"Lock acquisition exceeded max retries: {e.retry_num} >= {e.max_attempts}", - ) - # Re-raise LockRetry to be handled by the caller's retry logic - # The caller will catch this and call self.retry() - raise - - def _check_fencing_token( - self, - redis_client: Redis, - repo_id: int, - upload_context: TAUploadContext, - fencing_token: int, - ) -> NotifierTaskResult | None: - """ - Check if the fencing token is stale, indicating another notification task - is in progress and can take over. - - This method should be called within a lock context. - - :returns: - NotifierTaskResult if the token is stale (task should exit early), - None if the token is current (task should continue). - """ - current_token = int( - cast( - str, - redis_client.get( - TA_NOTIFIER_FENCING_TOKEN.format( - repo_id, upload_context["commit_sha"] - ) - ) - or "0", + result = self.debouncer.handle_lock_retry( + self, + e, + NotifierTaskResult(attempted=False, succeeded=False), ) - ) - - # We do a less than comparison since it guarantees safety at - # the cost of no debouncing for that commit in the case of - # losing the key in Redis - if fencing_token < current_token: - log.info( - "Fencing token is stale, another notification task is in progress, exiting", - extra=self.extra_dict, - ) - return NotifierTaskResult( - attempted=False, - succeeded=False, - ) - - return None + if result is not None: + return result def notification_preparation( self, *, repo_id: int, upload_context: TAUploadContext, - commit_yaml: UserYaml = UserYaml({}), # TODO: Actual commit_yaml + commit_yaml: UserYaml = UserYaml({}), ) -> TestResultsNotifier | None: """ Prepare the test results notifier. @@ -312,9 +257,6 @@ def notification_preparation( log.info("Comment is disabled, not posting comment", extra=self.extra_dict) return None - # TODO: Add upload errors in a compliant way - - # TODO: Remove impl label with read_tests_totals_summary.labels(impl="new").time(): summary = get_pr_comment_agg(repo_id, upload_context["commit_sha"]) @@ -358,8 +300,6 @@ def notification_preparation( _repo_service=repo_service, ) - # TODO: Seat activation - with read_failures_summary.labels(impl="new").time(): failures = get_pr_comment_failures(repo_id, upload_context["commit_sha"]) @@ -385,24 +325,23 @@ def notification_preparation( def transform_failures( failures: list[FailedTestInstance], ) -> list[TestResultsNotificationFailure[bytes]]: - notif_failures = [] + result = [] for failure in failures: - if failure["failure_message"] is not None: - failure["failure_message"] = shorten_file_paths( - failure["failure_message"] - ).replace("\r", "") + failure_message = failure["failure_message"] + if failure_message is not None: + failure_message = shorten_file_paths(failure_message).replace("\r", "") - notif_failures.append( + result.append( TestResultsNotificationFailure( display_name=failure["computed_name"], - failure_message=failure["failure_message"], + failure_message=failure_message, test_id=failure["test_id"], envs=failure["flags"], duration_seconds=failure["duration_seconds"] or 0, - build_url=None, # TODO: Figure out how we can save this in a compliant way + build_url=None, ) ) - return notif_failures + return result RegisteredTestAnalyticsNotifierTask = celery_app.register_task( diff --git a/apps/worker/tasks/tests/integration/test_notify_task.py b/apps/worker/tasks/tests/integration/test_notify_task.py index af8d6b51a1..1a0359d1a3 100644 --- a/apps/worker/tasks/tests/integration/test_notify_task.py +++ b/apps/worker/tasks/tests/integration/test_notify_task.py @@ -7,6 +7,7 @@ from database.models.core import CompareCommit from database.tests.factories import CommitFactory, PullFactory, RepositoryFactory from services.comparison import get_or_create_comparison +from services.notification.debounce import SKIP_DEBOUNCE_TOKEN from services.notification.notifiers.base import NotificationResult from services.repository import EnrichedPull from shared.api_archive.archive import ArchiveService @@ -86,7 +87,11 @@ def test_simple_call_no_notifiers( task = NotifyTask() result = task.run_impl( - dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml={} + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml={}, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == { diff --git a/apps/worker/tasks/tests/unit/test_base.py b/apps/worker/tasks/tests/unit/test_base.py index 237ba9b51a..aa344ba7ff 100644 --- a/apps/worker/tasks/tests/unit/test_base.py +++ b/apps/worker/tasks/tests/unit/test_base.py @@ -319,7 +319,54 @@ def test_get_repo_provider_service_rate_limited(self, mocker, mock_self_app): mock_retry = mocker.patch.object(task, "retry") mock_repo = mocker.MagicMock() assert task.get_repo_provider_service(mock_repo, commit=mock_commit) is None + # When earliest_retry_after_seconds is None, falls back to get_seconds_to_next_hour() task.retry.assert_called_with(countdown=120) + + def test_get_repo_provider_service_rate_limited_uses_actual_retry_time( + self, mocker, mock_self_app + ): + """Test that base task uses actual retry time from GitHub API when available.""" + mocker.patch( + "tasks.base.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=2, + suspended_count=0, + earliest_retry_after_seconds=180, # 3 minutes + ), + ) + mock_commit = mocker.MagicMock() + mock_commit.commitid = "abc123" + + task = BaseCodecovTask() + mock_retry = mocker.patch.object(task, "retry") + mock_repo = mocker.MagicMock() + assert task.get_repo_provider_service(mock_repo, commit=mock_commit) is None + # Should use actual retry time (180 seconds) instead of falling back + task.retry.assert_called_with(countdown=180) + + def test_get_repo_provider_service_rate_limited_enforces_minimum_delay( + self, mocker, mock_self_app + ): + """Test that base task enforces minimum 60-second delay even with short retry time.""" + mocker.patch( + "tasks.base.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=2, + suspended_count=0, + earliest_retry_after_seconds=30, # Less than minimum + ), + ) + mock_commit = mocker.MagicMock() + mock_commit.commitid = "abc123" + + task = BaseCodecovTask() + mock_retry = mocker.patch.object(task, "retry") + mock_repo = mocker.MagicMock() + assert task.get_repo_provider_service(mock_repo, commit=mock_commit) is None + # Should enforce minimum 60-second delay + task.retry.assert_called_with(countdown=60) mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls( [ call( diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py index bdfa00f51f..2c2ac867f3 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py @@ -1,9 +1,19 @@ +from contextlib import contextmanager + +import pytest +from celery.exceptions import Retry + from database.tests.factories import CommitFactory from services.bundle_analysis.notify import BundleAnalysisNotifyReturn from services.bundle_analysis.notify.types import ( NotificationSuccess, NotificationType, ) +from services.lock_manager import LockRetry +from services.notification.debounce import ( + SKIP_DEBOUNCE_TOKEN, + LockAcquisitionLimitError, +) from tasks.bundle_analysis_notify import BundleAnalysisNotifyTask @@ -34,6 +44,7 @@ def test_bundle_analysis_notify_task( repoid=commit.repoid, commitid=commit.commitid, commit_yaml={}, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == { "notify_attempted": True, @@ -51,8 +62,560 @@ def test_bundle_analysis_notify_skips_if_all_processing_fail(dbsession): repoid=commit.repoid, commitid=commit.commitid, commit_yaml={}, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == { "notify_attempted": False, "notify_succeeded": NotificationSuccess.ALL_ERRORED, } + + +def test_debounce_retry_includes_all_required_params( + mocker, dbsession, celery_app, mock_redis +): + """Test that retry kwargs include all required parameters for debounce retry. + + This test ensures that when the debounce logic triggers a retry, all required + parameters are included in the kwargs passed to self.retry(). This prevents + TypeError when Celery attempts to retry the task. + """ + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + mock_redis.get.return_value = False + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock retry to capture kwargs + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + previous_result = [{"error": None}] + commit_yaml = {"codecov": {"flags": ["test"]}} + + with pytest.raises(Retry): + task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml=commit_yaml, + # Don't pass fencing_token - let it go through retry path + ) + + # Verify retry was called + assert mock_retry.called + + # CRITICAL: Verify all required params are in kwargs + call_kwargs = mock_retry.call_args[1]["kwargs"] + assert call_kwargs["previous_result"] == previous_result + assert call_kwargs["repoid"] == commit.repoid + assert call_kwargs["commitid"] == commit.commitid + assert call_kwargs["commit_yaml"] == commit_yaml # Should be dict, not UserYaml + assert isinstance(call_kwargs["commit_yaml"], dict) # Verify type + assert call_kwargs["fencing_token"] == 1 # Token from acquisition + assert mock_retry.call_args[1]["countdown"] == 30 # DEBOUNCE_PERIOD_SECONDS + + +def test_bundle_analysis_notify_lock_retry_limit_exceeded_during_token_acquisition( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockAcquisitionLimitError during token acquisition returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError + mock_notification_lock = mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + # Don't pass fencing_token - let it go through token acquisition path + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_lock_retry_during_token_acquisition( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockRetry during token acquisition triggers task retry.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock retry to raise Retry exception + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + with pytest.raises(Retry): + task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + # Don't pass fencing_token - let it go through token acquisition path + ) + + # Verify retry was called with max_retries from debouncer + assert mock_retry.called + + +def test_bundle_analysis_notify_lock_retry_max_retries_exceeded_during_token_acquisition( + mocker, dbsession, celery_app, mock_redis +): + """Test that when max retries exceeded during LockRetry handling, returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock handle_lock_retry to return failure result (simulating max retries exceeded) + mocker.patch.object( + task.debouncer, + "handle_lock_retry", + return_value={ + "notify_attempted": False, + "notify_succeeded": None, + }, + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + # Don't pass fencing_token - let it go through token acquisition path + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_stale_fencing_token( + mocker, dbsession, celery_app, mock_redis +): + """Test that stale fencing token causes task to exit early.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return stale token (current token > fencing_token) + mock_redis.get.return_value = "2" # Current token is 2, but we have token 1 + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed (for the stale token check lock) + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock check_fencing_token_stale to return True (token is stale) + mocker.patch.object(task.debouncer, "check_fencing_token_stale", return_value=True) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, # Pass fencing token, but it's stale + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_lock_retry_limit_exceeded_during_notification( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockAcquisitionLimitError during notification lock returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError during notification + mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_lock_retry_during_notification( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockRetry during notification lock triggers task retry.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry during notification + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock retry to raise Retry exception + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + with pytest.raises(Retry): + task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + # Verify retry was called + assert mock_retry.called + + +def test_bundle_analysis_notify_lock_retry_max_retries_exceeded_during_notification( + mocker, dbsession, celery_app, mock_redis +): + """Test that when max retries exceeded during LockRetry handling in notification, returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry during notification + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock handle_lock_retry to return failure result (simulating max retries exceeded) + mocker.patch.object( + task.debouncer, + "handle_lock_retry", + return_value={ + "notify_attempted": False, + "notify_succeeded": None, + }, + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_commit_not_found( + mocker, dbsession, celery_app, mock_redis +): + """Test that missing commit raises assertion error.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + # Create a commit but don't add it to session (or use non-existent commitid) + commit = CommitFactory.build() # build() doesn't add to session + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + with pytest.raises(AssertionError, match="commit not found"): + task.process_impl_within_lock( + db_session=dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + previous_result=[{"error": None}], + ) + + +def test_bundle_analysis_notify_previous_result_not_list( + mocker, dbsession, celery_app, mock_redis +): + """Test that previous_result that's not a list is handled gracefully.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Pass previous_result as a dict instead of list + # When not a list, it becomes empty list [], and all([]) is True, + # so it returns ALL_ERRORED + result = task.run_impl( + dbsession, + {"error": None}, # Not a list + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + # Should handle gracefully and treat as empty list, which means all errored + assert result == { + "notify_attempted": False, + "notify_succeeded": NotificationSuccess.ALL_ERRORED, + } + + +def test_bundle_analysis_notify_task_initialization(mocker): + """Test that task initialization sets up debouncer correctly.""" + mocker.patch("tasks.bundle_analysis_notify.BaseCodecovTask.__init__") + + task = BundleAnalysisNotifyTask() + + assert task.debouncer is not None + assert task.debouncer.redis_key_template == "bundle_analysis_notifier_fence:{}_{}" + assert task.debouncer.debounce_period_seconds == 30 + assert task.debouncer.max_lock_retries == task.max_retries + + +def test_bundle_analysis_notify_partial_success( + mocker, dbsession, celery_app, mock_redis +): + """Test notification with partial success.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + mocker.patch( + "services.bundle_analysis.notify.BundleAnalysisNotifyService.notify", + return_value=BundleAnalysisNotifyReturn( + notifications_configured=( + NotificationType.PR_COMMENT, + NotificationType.COMMIT_STATUS, + ), + notifications_attempted=( + NotificationType.PR_COMMENT, + NotificationType.COMMIT_STATUS, + ), + notifications_successful=( + NotificationType.PR_COMMENT, + ), # Only one succeeded + ), + ) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": True, + "notify_succeeded": NotificationSuccess.PARTIAL_SUCCESS, + } + + +def test_bundle_analysis_notify_no_notifications_configured( + mocker, dbsession, celery_app, mock_redis +): + """Test notification when no notifications are configured.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + mocker.patch( + "services.bundle_analysis.notify.BundleAnalysisNotifyService.notify", + return_value=BundleAnalysisNotifyReturn( + notifications_configured=(), # No notifications configured + notifications_attempted=(), + notifications_successful=(), + ), + ) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": True, + "notify_succeeded": NotificationSuccess.NOTHING_TO_NOTIFY, + } diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py index 78037f4178..7f50065733 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py @@ -5,6 +5,7 @@ from database.enums import ReportType from database.models import CommitReport, Upload from database.tests.factories import CommitFactory, RepositoryFactory, UploadFactory +from helpers.exceptions import NoConfiguredAppsAvailable from services.bundle_analysis.report import ProcessingError, ProcessingResult from shared.api_archive.archive import ArchiveService from shared.bundle_analysis.storage import get_bucket_name @@ -2021,3 +2022,118 @@ def test_bundle_analysis_processor_task_cleanup_with_none_result( # Should not crash even though result is None # The finally block should handle None result gracefully + + def test_bundle_analysis_processor_no_apps_available_rate_limited_uses_actual_retry_time( + self, mocker, dbsession, mock_redis + ): + """Test that bundle analysis processor uses actual retry time from GitHub API when available.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mocker.patch( + "tasks.bundle_analysis_processor.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=240, # 4 minutes + ), + ) + + task = BundleAnalysisProcessorTask() + task.request.retries = 0 + task.max_retries = 5 + mock_safe_retry = mocker.patch.object(task, "safe_retry", return_value=True) + + previous_result = [{"error": None}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={"upload_id": "test", "commit": commit.commitid}, + ) + + # Should return previous result and retry with actual retry time + assert result == previous_result + mock_safe_retry.assert_called_once_with(countdown=240) + + def test_bundle_analysis_processor_no_apps_available_rate_limited_fallback( + self, mocker, dbsession, mock_redis + ): + """Test that bundle analysis processor falls back to next hour when retry time is None.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mocker.patch( + "tasks.bundle_analysis_processor.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=None, # No retry time available + ), + ) + mocker.patch( + "tasks.bundle_analysis_processor.get_seconds_to_next_hour", + return_value=1800, + ) + + task = BundleAnalysisProcessorTask() + task.request.retries = 0 + task.max_retries = 5 + mock_safe_retry = mocker.patch.object(task, "safe_retry", return_value=True) + + previous_result = [{"error": None}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={"upload_id": "test", "commit": commit.commitid}, + ) + + # Should return previous result and retry with fallback time + assert result == previous_result + mock_safe_retry.assert_called_once_with(countdown=1800) + + def test_bundle_analysis_processor_no_apps_available_enforces_minimum_delay( + self, mocker, dbsession, mock_redis + ): + """Test that bundle analysis processor enforces minimum 60-second delay.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mocker.patch( + "tasks.bundle_analysis_processor.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=30, # Less than minimum + ), + ) + + task = BundleAnalysisProcessorTask() + task.request.retries = 0 + task.max_retries = 5 + mock_safe_retry = mocker.patch.object(task, "safe_retry", return_value=True) + + previous_result = [{"error": None}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={"upload_id": "test", "commit": commit.commitid}, + ) + + # Should return previous result and retry with minimum delay + assert result == previous_result + mock_safe_retry.assert_called_once_with(countdown=60) diff --git a/apps/worker/tasks/tests/unit/test_notify_task.py b/apps/worker/tasks/tests/unit/test_notify_task.py index 38aff49364..8bdd41343f 100644 --- a/apps/worker/tasks/tests/unit/test_notify_task.py +++ b/apps/worker/tasks/tests/unit/test_notify_task.py @@ -5,7 +5,8 @@ import httpx import pytest import respx -from celery.exceptions import MaxRetriesExceededError, Retry +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from freezegun import freeze_time from database.enums import Decoration, Notification, NotificationState @@ -27,6 +28,7 @@ from services.decoration import DecorationDetails from services.lock_manager import LockRetry from services.notification import NotificationService +from services.notification.debounce import SKIP_DEBOUNCE_TOKEN from services.notification.notifiers.base import ( AbstractBaseNotifier, NotificationResult, @@ -1057,6 +1059,7 @@ def test_notify_task_no_ghapp_available_one_rate_limited( "tasks.notify.get_repo_provider_service" ) mock_retry = mocker.patch.object(NotifyTask, "retry", return_value=None) + mocker.patch("tasks.notify.get_seconds_to_next_hour", return_value=45 * 60) get_repo_provider_service.side_effect = NoConfiguredAppsAvailable( apps_count=2, rate_limited_count=1, suspended_count=1 ) @@ -1078,6 +1081,7 @@ def test_notify_task_no_ghapp_available_one_rate_limited( current_yaml=current_yaml, ) assert res is None + # When earliest_retry_after_seconds is None, falls back to get_seconds_to_next_hour() mock_retry.assert_called_with(max_retries=10, countdown=45 * 60) mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls( [ @@ -1108,6 +1112,80 @@ def test_notify_task_no_ghapp_available_one_rate_limited( ] ) + @freeze_time("2024-04-22T11:15:00") + def test_notify_task_no_ghapp_available_uses_actual_retry_time( + self, dbsession, mocker, mock_self_app + ): + """Test that notify task uses actual retry time from GitHub API when available.""" + get_repo_provider_service = mocker.patch( + "tasks.notify.get_repo_provider_service" + ) + mock_retry = mocker.patch.object(NotifyTask, "retry", return_value=None) + # Provide actual retry time from Redis (300 seconds = 5 minutes) + get_repo_provider_service.side_effect = NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=300, + ) + commit = CommitFactory.create( + message="", + pullid=None, + branch="test-branch-1", + commitid="649eaaf2924e92dc7fd8d370ddb857033231e67a", + repository__using_integration=True, + ) + dbsession.add(commit) + dbsession.flush() + current_yaml = {"codecov": {"require_ci_to_pass": True}} + task = NotifyTask() + res = task.run_impl_within_lock( + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml=current_yaml, + ) + assert res is None + # Should use actual retry time (300 seconds) instead of falling back to next hour + mock_retry.assert_called_with(max_retries=10, countdown=300) + + @freeze_time("2024-04-22T11:15:00") + def test_notify_task_no_ghapp_available_enforces_minimum_delay( + self, dbsession, mocker, mock_self_app + ): + """Test that notify task enforces minimum 60-second delay even with short retry time.""" + get_repo_provider_service = mocker.patch( + "tasks.notify.get_repo_provider_service" + ) + mock_retry = mocker.patch.object(NotifyTask, "retry", return_value=None) + # Provide retry time less than 60 seconds + get_repo_provider_service.side_effect = NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=30, # Less than minimum + ) + commit = CommitFactory.create( + message="", + pullid=None, + branch="test-branch-1", + commitid="649eaaf2924e92dc7fd8d370ddb857033231e67a", + repository__using_integration=True, + ) + dbsession.add(commit) + dbsession.flush() + current_yaml = {"codecov": {"require_ci_to_pass": True}} + task = NotifyTask() + res = task.run_impl_within_lock( + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml=current_yaml, + ) + assert res is None + # Should enforce minimum 60-second delay + mock_retry.assert_called_with(max_retries=10, countdown=60) + @freeze_time("2024-04-22T11:15:00") def test_notify_task_no_ghapp_available_all_suspended( self, dbsession, mocker, mock_self_app @@ -1286,7 +1364,9 @@ def test_notify_task_max_retries_exceeded( self, dbsession, mocker, mock_repo_provider, mock_self_app ): mocker.patch.object(NotifyTask, "should_wait_longer", return_value=True) - mocker.patch.object(NotifyTask, "retry", side_effect=MaxRetriesExceededError()) + mocker.patch.object( + NotifyTask, "retry", side_effect=CeleryMaxRetriesExceededError() + ) mocked_fetch_and_update_whether_ci_passed = mocker.patch.object( NotifyTask, "fetch_and_update_whether_ci_passed" ) @@ -1360,12 +1440,14 @@ def test_run_impl_unobtainable_lock( m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.notify.LockManager", m) + mocker.patch.object(task, "retry", side_effect=CeleryMaxRetriesExceededError()) res = task.run_impl( dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml=current_yaml, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert res == { @@ -1464,6 +1546,7 @@ def test_run_impl_can_run_logic(self, dbsession, mock_redis, mocker): repoid=commit.repoid, commitid=commit.commitid, current_yaml=current_yaml, + fencing_token=SKIP_DEBOUNCE_TOKEN, **kwargs, ) assert res == {"notifications": [], "notified": True, "reason": "yay"} @@ -1477,6 +1560,57 @@ def test_run_impl_can_run_logic(self, dbsession, mock_redis, mocker): **kwargs, ) + def test_debounce_retry_includes_all_required_params( + self, dbsession, mock_redis, mocker + ): + """Test that retry kwargs include all required parameters for debounce retry. + + This test ensures that when the debounce logic triggers a retry, all required + parameters are included in the kwargs passed to self.retry(). This prevents + TypeError when Celery attempts to retry the task. + """ + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + mock_redis.get.return_value = False + + task = NotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock retry to capture kwargs + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + current_yaml = {"codecov": {"require_ci_to_pass": True}} + empty_upload = None + + with pytest.raises(Retry): + task.run_impl( + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml=current_yaml, + empty_upload=empty_upload, + # Don't pass fencing_token - let it go through retry path + ) + + # Verify retry was called + assert mock_retry.called + + # CRITICAL: Verify all required params are in kwargs + call_kwargs = mock_retry.call_args[1]["kwargs"] + assert call_kwargs["repoid"] == commit.repoid + assert call_kwargs["commitid"] == commit.commitid + assert call_kwargs["current_yaml"] == current_yaml + assert call_kwargs["empty_upload"] == empty_upload + assert call_kwargs["fencing_token"] == 1 # Token from acquisition + assert mock_retry.call_args[1]["countdown"] == 30 # DEBOUNCE_PERIOD_SECONDS + def test_checkpoints_not_logged_outside_upload_flow( self, dbsession, mock_redis, mocker, mock_checkpoint_submit, mock_configuration ): diff --git a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py index 201cfbb162..b8de8bd72b 100644 --- a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py +++ b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py @@ -1,11 +1,22 @@ +from contextlib import contextmanager + import pytest -from celery.exceptions import MaxRetriesExceededError, Retry +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from database.tests.factories import CommitFactory from helpers.notifier import NotifierResult from services.lock_manager import LockRetry +from services.notification.debounce import ( + SKIP_DEBOUNCE_TOKEN, + LockAcquisitionLimitError, +) +from services.test_analytics.ta_timeseries import FailedTestInstance from services.test_results import NotifierTaskResult -from tasks.test_analytics_notifier import TestAnalyticsNotifierTask +from tasks.test_analytics_notifier import ( + TestAnalyticsNotifierTask, + transform_failures, +) @pytest.mark.django_db @@ -23,28 +34,24 @@ def test_lock_retry_will_retry(self, mocker, dbsession, mock_redis): "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.test_analytics_notifier.LockManager", m) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] task = TestAnalyticsNotifierTask() - task.request.retries = 0 # Will retry + task.request.retries = 0 task.request.headers = {} task.max_retries = 5 - # Task should call self.retry() which raises Retry exception with pytest.raises(Retry): task.run_impl( dbsession, @@ -68,18 +75,15 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.test_analytics_notifier.LockManager", m) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] @@ -89,13 +93,9 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded( task.request.headers = {} task.max_retries = 5 - # Mock self.retry() to raise MaxRetriesExceededError when called from LockRetry handler - # We distinguish by checking if max_retries=5 is passed (from LockRetry handler) def retry_side_effect(*args, **kwargs): - # Check if this is the retry call from LockRetry handler (has max_retries=5) if kwargs.get("max_retries") == 5: - raise MaxRetriesExceededError() - # Otherwise, this is the debounce retry - raise Retry exception + raise CeleryMaxRetriesExceededError() raise Retry() mocker.patch.object(task, "retry", side_effect=retry_side_effect) @@ -114,7 +114,7 @@ def retry_side_effect(*args, **kwargs): def test_max_retries_exceeded_from_notification_lock( self, mocker, dbsession, mock_redis ): - """Test that MaxRetriesExceededError from _notification_lock returns graceful failure""" + """Test that LockAcquisitionLimitError from notification_lock returns graceful failure""" commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -126,13 +126,11 @@ def test_max_retries_exceeded_from_notification_lock( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry with max_retries_exceeded=True m = mocker.MagicMock() lock_retry_with_max_exceeded = LockRetry( countdown=0, @@ -148,7 +146,6 @@ def test_max_retries_exceeded_from_notification_lock( ) mocker.patch("tasks.test_analytics_notifier.LockManager", m) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] @@ -185,37 +182,30 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded_fencing_token_check( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry during fencing token check lock acquisition lock_manager_mock = mocker.MagicMock() lock_manager_mock.return_value.locked.return_value.__enter__.side_effect = ( LockRetry(60) ) mocker.patch("tasks.test_analytics_notifier.LockManager", lock_manager_mock) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] - mock_redis.get.return_value = "1" # Fencing token check + mock_redis.get.return_value = "1" task = TestAnalyticsNotifierTask() task.request.retries = 0 task.request.headers = {} task.max_retries = 5 - # Mock self.retry() to raise MaxRetriesExceededError when called from LockRetry handler - # (identified by max_retries=5 parameter) def retry_side_effect(*args, **kwargs): - # Check if this is the retry call from LockRetry handler (has max_retries=5) if kwargs.get("max_retries") == 5: - raise MaxRetriesExceededError() - # Otherwise, this is a normal retry - raise Retry exception + raise CeleryMaxRetriesExceededError() raise Retry() mocker.patch.object(task, "retry", side_effect=retry_side_effect) @@ -224,7 +214,7 @@ def retry_side_effect(*args, **kwargs): dbsession, repoid=commit.repoid, upload_context=upload_context, - fencing_token=1, # Pass fencing token to skip first lock acquisition + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == NotifierTaskResult( @@ -248,13 +238,11 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded_notification_send( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # First lock acquisition succeeds (fencing token check), second raises LockRetry (notification send) lock_manager_mock = mocker.MagicMock() context_manager = mocker.MagicMock() @@ -263,10 +251,8 @@ def lock_side_effect(*args, **kwargs): lock_side_effect.call_count = 0 lock_side_effect.call_count += 1 if lock_side_effect.call_count == 1: - # First call succeeds (fencing token check) return context_manager else: - # Second call raises LockRetry (notification send) raise LockRetry(60) lock_manager_mock.return_value.locked.return_value.__enter__.side_effect = ( @@ -274,24 +260,152 @@ def lock_side_effect(*args, **kwargs): ) mocker.patch("tasks.test_analytics_notifier.LockManager", lock_manager_mock) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] - mock_redis.get.return_value = "1" # Fencing token check + mock_redis.get.return_value = "1" task = TestAnalyticsNotifierTask() task.request.retries = 0 task.request.headers = {} task.max_retries = 5 - # Mock self.retry() to raise MaxRetriesExceededError when called from LockRetry handler - # (identified by max_retries=5 parameter) def retry_side_effect(*args, **kwargs): - # Check if this is the retry call from LockRetry handler (has max_retries=5) if kwargs.get("max_retries") == 5: - raise MaxRetriesExceededError() - # Otherwise, this is a normal retry - raise Retry exception + raise CeleryMaxRetriesExceededError() + raise Retry() + + mocker.patch.object(task, "retry", side_effect=retry_side_effect) + + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=SKIP_DEBOUNCE_TOKEN, + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_debounce_retry_includes_all_required_params( + self, mocker, dbsession, mock_redis + ): + """Test that retry kwargs include all required parameters for debounce retry. + + This test ensures that when the debounce logic triggers a retry, all required + parameters are included in the kwargs passed to self.retry(). This prevents + TypeError when Celery attempts to retry the task. + """ + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + mock_redis.get.return_value = False + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock retry to capture kwargs + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + with pytest.raises(Retry): + task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + # Don't pass fencing_token - let it go through retry path + ) + + # Verify retry was called + assert mock_retry.called + + # CRITICAL: Verify all required params are in kwargs + call_kwargs = mock_retry.call_args[1]["kwargs"] + assert call_kwargs["repoid"] == commit.repoid + assert call_kwargs["upload_context"] == upload_context + assert call_kwargs["fencing_token"] == 1 # Token from acquisition + assert mock_retry.call_args[1]["countdown"] == 30 # DEBOUNCE_PERIOD_SECONDS + + def test_max_retries_exceeded_during_debounce_retry( + self, mocker, dbsession, mock_redis + ): + """Test that max retries exceeded during debounce retry proceeds with notification.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + mock_redis.get.return_value = "1" # Non-stale token + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=False + ) + + # Mock retry to raise CeleryMaxRetriesExceededError during debounce retry + def retry_side_effect(*args, **kwargs): + if kwargs.get("countdown") == 30: # Debounce retry + raise CeleryMaxRetriesExceededError() raise Retry() mocker.patch.object(task, "retry", side_effect=retry_side_effect) @@ -307,10 +421,444 @@ def retry_side_effect(*args, **kwargs): dbsession, repoid=commit.repoid, upload_context=upload_context, - fencing_token=1, # Pass fencing token to skip first lock acquisition + ) + + # Should proceed with notification despite max retries exceeded + assert result == NotifierTaskResult( + attempted=True, + succeeded=True, + ) + + def test_stale_fencing_token_during_check(self, mocker, dbsession, mock_redis): + """Test that stale fencing token causes task to exit early.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return stale token (current token > fencing_token) + mock_redis.get.return_value = "2" # Current token is 2, but we have token 1 + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock check_fencing_token_stale to return True (token is stale) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=True + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, # Pass fencing token, but it's stale ) assert result == NotifierTaskResult( attempted=False, succeeded=False, ) + + def test_stale_fencing_token_during_notification( + self, mocker, dbsession, mock_redis + ): + """Test that stale fencing token during notification causes task to exit early.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token initially, then stale token + mock_redis.get.return_value = "2" # Current token is 2, but we have token 1 + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock check_fencing_token_stale to return False initially, then True + call_count = {"count": 0} + + def stale_check_side_effect(*args, **kwargs): + call_count["count"] += 1 + if call_count["count"] == 1: + return False # First check passes + return True # Second check (during notification) fails + + mocker.patch.object( + task.debouncer, + "check_fencing_token_stale", + side_effect=stale_check_side_effect, + ) + + # Mock notification_preparation to return a mock notifier + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + # Should exit early due to stale token + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_successful_notification_flow(self, mocker, dbsession, mock_redis): + """Test successful notification flow with valid fencing token.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=False + ) + + # Mock notification_preparation to return a mock notifier + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + assert result == NotifierTaskResult( + attempted=True, + succeeded=True, + ) + mock_notifier.notify.assert_called_once() + + def test_notification_preparation_returns_none(self, mocker, dbsession, mock_redis): + """Test that when notification_preparation returns None, task exits gracefully.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=False + ) + + # Mock notification_preparation to return None + mocker.patch.object(task, "notification_preparation", return_value=None) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_lock_retry_limit_exceeded_during_token_acquisition( + self, mocker, dbsession, mock_redis + ): + """Test that LockAcquisitionLimitError during token acquisition returns graceful failure.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError + mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + # Don't pass fencing_token - let it go through token acquisition path + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_lock_retry_limit_exceeded_during_notification( + self, mocker, dbsession, mock_redis + ): + """Test that LockAcquisitionLimitError during notification lock returns graceful failure.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError during notification + mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + # Mock notification_preparation to return a mock notifier + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_task_initialization(self, mocker): + """Test that task initialization sets up debouncer correctly.""" + mocker.patch("tasks.test_analytics_notifier.BaseCodecovTask.__init__") + + task = TestAnalyticsNotifierTask() + + assert task.debouncer is not None + assert task.debouncer.redis_key_template == "ta_notifier_fence:{}_{}" + assert task.debouncer.debounce_period_seconds == 30 + assert task.debouncer.lock_type.value == "notify" + + +class TestTransformFailures: + """Test the transform_failures helper function.""" + + def test_transform_failures_empty_list(self): + """Test that empty list returns empty list.""" + result = transform_failures([]) + assert result == [] + + def test_transform_failures_with_failure_message(self): + """Test that failure messages are transformed correctly.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test_example.TestClass.test_method", + "failure_message": "AssertionError: Expected 5, got 3\n File /path/to/test.py:42", + "test_id": "test123", + "flags": ["unit"], + "duration_seconds": 1.5, + } + ] + + result = transform_failures(failures) + + assert len(result) == 1 + assert result[0].display_name == "test_example.TestClass.test_method" + assert "\r" not in result[0].failure_message + assert result[0].test_id == "test123" + assert result[0].envs == ["unit"] + assert result[0].duration_seconds == 1.5 + assert result[0].build_url is None + + def test_transform_failures_with_none_failure_message(self): + """Test that None failure_message is handled correctly.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test_example.TestClass.test_method", + "failure_message": None, + "test_id": "test123", + "flags": ["unit"], + "duration_seconds": None, + } + ] + + result = transform_failures(failures) + + assert len(result) == 1 + assert result[0].failure_message is None + assert result[0].duration_seconds == 0 # None becomes 0 + + def test_transform_failures_with_carriage_return(self): + """Test that carriage returns are removed from failure messages.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test_example.TestClass.test_method", + "failure_message": "Line 1\rLine 2\r\nLine 3", + "test_id": "test123", + "flags": [], + "duration_seconds": 0.5, + } + ] + + result = transform_failures(failures) + + assert "\r" not in result[0].failure_message + assert "\n" in result[0].failure_message # \n should remain + + def test_transform_failures_multiple_failures(self): + """Test that multiple failures are transformed correctly.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test1", + "failure_message": "Error 1", + "test_id": "id1", + "flags": ["flag1"], + "duration_seconds": 1.0, + }, + { + "computed_name": "test2", + "failure_message": "Error 2", + "test_id": "id2", + "flags": ["flag2"], + "duration_seconds": 2.0, + }, + ] + + result = transform_failures(failures) + + assert len(result) == 2 + assert result[0].display_name == "test1" + assert result[1].display_name == "test2" diff --git a/libs/shared/shared/bots/exceptions.py b/libs/shared/shared/bots/exceptions.py index 1b2f8fea39..cd54e64ff7 100644 --- a/libs/shared/shared/bots/exceptions.py +++ b/libs/shared/shared/bots/exceptions.py @@ -8,11 +8,16 @@ class OwnerWithoutValidBotError(Exception): class NoConfiguredAppsAvailable(Exception): def __init__( - self, apps_count: int, rate_limited_count: int, suspended_count: int + self, + apps_count: int, + rate_limited_count: int, + suspended_count: int, + earliest_retry_after_seconds: int | None = None, ) -> None: self.apps_count = apps_count self.rate_limited_count = rate_limited_count self.suspended_count = suspended_count + self.earliest_retry_after_seconds = earliest_retry_after_seconds class RepositoryWithoutValidBotError(Exception): diff --git a/libs/shared/shared/bots/github_apps.py b/libs/shared/shared/bots/github_apps.py index 6965872c8c..e9be284d4a 100644 --- a/libs/shared/shared/bots/github_apps.py +++ b/libs/shared/shared/bots/github_apps.py @@ -17,7 +17,11 @@ from shared.helpers.redis import get_redis_connection from shared.helpers.sentry import owner_uses_sentry from shared.orms.owner_helper import DjangoSQLAlchemyOwnerWrapper -from shared.rate_limits import determine_if_entity_is_rate_limited, gh_app_key_name +from shared.rate_limits import ( + RATE_LIMIT_REDIS_KEY_PREFIX, + determine_if_entity_is_rate_limited, + gh_app_key_name, +) from shared.typings.oauth_token_types import Token from shared.typings.torngit import GithubInstallationInfo @@ -242,6 +246,95 @@ def _filter_suspended_apps( return list(filter(lambda obj: not obj.is_suspended, apps_to_consider)) +def _partition_apps_by_rate_limit_status( + apps: list[GithubAppInstallation], +) -> tuple[list[GithubAppInstallation], list[GithubAppInstallation]]: + """ + Partition apps into rate-limited and non-rate-limited lists in a single Redis pass. + + This function checks rate limit status once for all apps and returns both lists, + preventing race conditions that could occur if rate limit status changes between + separate calls to check and filter. + + Returns: + tuple: (rate_limited_apps, non_rate_limited_apps) + """ + redis_connection = get_redis_connection() + rate_limited_apps = [] + non_rate_limited_apps = [] + + for app in apps: + is_rate_limited = determine_if_entity_is_rate_limited( + redis_connection, + gh_app_key_name(app_id=app.app_id, installation_id=app.installation_id), + ) + if is_rate_limited: + rate_limited_apps.append(app) + else: + non_rate_limited_apps.append(app) + + return rate_limited_apps, non_rate_limited_apps + + +def _get_rate_limited_apps( + apps: list[GithubAppInstallation], +) -> list[GithubAppInstallation]: + """ + Filter and return only the apps that are currently rate-limited. + + Note: This function is kept for backward compatibility but should be avoided + when you also need to filter apps, as it causes a race condition. Use + _partition_apps_by_rate_limit_status instead. + """ + redis_connection = get_redis_connection() + return [ + app + for app in apps + if determine_if_entity_is_rate_limited( + redis_connection, + gh_app_key_name(app_id=app.app_id, installation_id=app.installation_id), + ) + ] + + +def _get_earliest_rate_limit_ttl( + apps: list[GithubAppInstallation], +) -> int | None: + """ + Get the earliest TTL (time-to-live in seconds) from Redis for rate-limited apps. + + Returns the minimum TTL among all rate-limited apps, or None if no apps are rate-limited + or if Redis operations fail. + """ + if not apps: + return None + + redis_connection = get_redis_connection() + ttl_values = [] + + for app in apps: + key_name = gh_app_key_name( + app_id=app.app_id, installation_id=app.installation_id + ) + redis_key = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name}" + try: + ttl = redis_connection.ttl(redis_key) + if ttl > 0: # TTL > 0 means the key exists and has time remaining + ttl_values.append(ttl) + except Exception: + # If Redis operation fails, log and continue + log.warning( + "Failed to get TTL for rate-limited app", + extra={ + "app_id": app.app_id, + "installation_id": app.installation_id, + }, + exc_info=True, + ) + + return min(ttl_values) if ttl_values else None + + def get_github_app_info_for_owner( owner: Owner, *, @@ -304,9 +397,12 @@ def get_github_app_info_for_owner( owner, installation_name, repository ) apps_matching_criteria_count = len(apps_to_consider) - # We can't use apps that are rate limited - apps_to_consider = _filter_rate_limited_apps(apps_to_consider) - rate_limited_apps_count = apps_matching_criteria_count - len(apps_to_consider) + # Partition apps by rate limit status in a single Redis pass to avoid race conditions + # where rate limit status changes between checking and filtering + rate_limited_apps, apps_to_consider = _partition_apps_by_rate_limit_status( + apps_to_consider + ) + rate_limited_apps_count = len(rate_limited_apps) # We can't use apps that are suspended (by the user) apps_to_consider = _filter_suspended_apps(apps_to_consider) suspended_apps_count = ( @@ -339,10 +435,17 @@ def get_github_app_info_for_owner( elif apps_matching_criteria_count > 0: # There are apps that match the criteria, but we can't use them. # Either they are currently rate limited or they have been suspended. + # Get the earliest retry time from rate-limited apps if any exist + earliest_retry_after_seconds = None + if rate_limited_apps_count > 0: + earliest_retry_after_seconds = _get_earliest_rate_limit_ttl( + rate_limited_apps + ) raise NoConfiguredAppsAvailable( apps_count=apps_matching_criteria_count, rate_limited_count=rate_limited_apps_count, suspended_count=suspended_apps_count, + earliest_retry_after_seconds=earliest_retry_after_seconds, ) # DEPRECATED FLOW - begin if owner.integration_id and ( diff --git a/libs/shared/tests/unit/bots/test_github_apps.py b/libs/shared/tests/unit/bots/test_github_apps.py index 5892aca161..e4827ba798 100644 --- a/libs/shared/tests/unit/bots/test_github_apps.py +++ b/libs/shared/tests/unit/bots/test_github_apps.py @@ -1,10 +1,15 @@ import datetime +from unittest.mock import MagicMock import pytest from django.test import override_settings +from redis import RedisError from shared.bots.exceptions import NoConfiguredAppsAvailable, RequestedGithubAppNotFound from shared.bots.github_apps import ( + _get_earliest_rate_limit_ttl, + _get_rate_limited_apps, + _partition_apps_by_rate_limit_status, get_github_app_info_for_owner, get_github_app_token, get_specific_github_app_details, @@ -18,6 +23,7 @@ ) from shared.django_apps.codecov_auth.tests.factories import OwnerFactory from shared.github import InvalidInstallationError +from shared.rate_limits import RATE_LIMIT_REDIS_KEY_PREFIX, gh_app_key_name from shared.typings.torngit import GithubInstallationInfo @@ -251,3 +257,494 @@ def test_mark_installation_suspended_side_effect_not_called(self, mocker): installations[0].refresh_from_db() installations[1].refresh_from_db() assert all(installation.is_suspended == False for installation in installations) + + +class TestGetRateLimitedApps: + @pytest.mark.django_db + def test_get_rate_limited_apps_empty_list(self): + """Test that empty list returns empty list.""" + assert _get_rate_limited_apps([]) == [] + + @pytest.mark.django_db + def test_get_rate_limited_apps_all_rate_limited(self, mocker): + """Test that all rate-limited apps are returned.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + result = _get_rate_limited_apps([app1, app2]) + assert len(result) == 2 + assert app1 in result + assert app2 in result + + @pytest.mark.django_db + def test_get_rate_limited_apps_none_rate_limited(self, mocker): + """Test that no rate-limited apps returns empty list.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=False, + ) + + result = _get_rate_limited_apps([app1, app2]) + assert result == [] + + @pytest.mark.django_db + def test_get_rate_limited_apps_mixed(self, mocker): + """Test that only rate-limited apps are returned when mixed.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + def is_rate_limited_side_effect(redis_connection, key_name): + # app1 is rate-limited, app2 is not + if "10_1001" in key_name: + return True + return False + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + side_effect=is_rate_limited_side_effect, + ) + + result = _get_rate_limited_apps([app1, app2]) + assert len(result) == 1 + assert app1 in result + assert app2 not in result + + +class TestPartitionAppsByRateLimitStatus: + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_empty_list(self): + """Test that empty list returns two empty lists.""" + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status([]) + assert rate_limited == [] + assert non_rate_limited == [] + + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_all_rate_limited(self, mocker): + """Test that all rate-limited apps are partitioned correctly.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status( + [app1, app2] + ) + assert len(rate_limited) == 2 + assert len(non_rate_limited) == 0 + assert app1 in rate_limited + assert app2 in rate_limited + + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_none_rate_limited(self, mocker): + """Test that no rate-limited apps returns empty rate-limited list.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=False, + ) + + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status( + [app1, app2] + ) + assert len(rate_limited) == 0 + assert len(non_rate_limited) == 2 + assert app1 in non_rate_limited + assert app2 in non_rate_limited + + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_mixed(self, mocker): + """Test that apps are correctly partitioned when mixed.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + def is_rate_limited_side_effect(redis_connection, key_name): + # app1 is rate-limited, app2 is not + if "10_1001" in key_name: + return True + return False + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + side_effect=is_rate_limited_side_effect, + ) + + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status( + [app1, app2] + ) + assert len(rate_limited) == 1 + assert len(non_rate_limited) == 1 + assert app1 in rate_limited + assert app2 in non_rate_limited + + +class TestGetEarliestRateLimitTtl: + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_empty_list(self): + """Test that empty list returns None.""" + assert _get_earliest_rate_limit_ttl([]) is None + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_single_app(self, mocker): + """Test that single app TTL is returned.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app.save() + + mock_redis = MagicMock() + mock_redis.ttl.return_value = 300 # 5 minutes + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app]) + assert result == 300 + key_name = gh_app_key_name( + app_id=app.app_id, installation_id=app.installation_id + ) + expected_key = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name}" + mock_redis.ttl.assert_called_once_with(expected_key) + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_multiple_apps_returns_minimum(self, mocker): + """Test that minimum TTL is returned when multiple apps have different TTLs.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mock_redis = MagicMock() + key_name1 = gh_app_key_name( + app_id=app1.app_id, installation_id=app1.installation_id + ) + key_name2 = gh_app_key_name( + app_id=app2.app_id, installation_id=app2.installation_id + ) + expected_key1 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name1}" + expected_key2 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name2}" + + def ttl_side_effect(key): + if key == expected_key1: + return 600 # 10 minutes + elif key == expected_key2: + return 180 # 3 minutes (minimum) + return -1 + + mock_redis.ttl.side_effect = ttl_side_effect + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1, app2]) + assert result == 180 # Should return minimum + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_ttl_zero_or_negative_ignored(self, mocker): + """Test that TTL <= 0 is ignored.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mock_redis = MagicMock() + key_name1 = gh_app_key_name( + app_id=app1.app_id, installation_id=app1.installation_id + ) + key_name2 = gh_app_key_name( + app_id=app2.app_id, installation_id=app2.installation_id + ) + expected_key1 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name1}" + expected_key2 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name2}" + + def ttl_side_effect(key): + if key == expected_key1: + return 0 # Key doesn't exist or expired + elif key == expected_key2: + return -1 # Key doesn't exist + return -1 + + mock_redis.ttl.side_effect = ttl_side_effect + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1, app2]) + assert result is None # No valid TTLs + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_redis_exception_continues(self, mocker): + """Test that Redis exceptions are caught and processing continues.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mock_redis = MagicMock() + key_name1 = gh_app_key_name( + app_id=app1.app_id, installation_id=app1.installation_id + ) + key_name2 = gh_app_key_name( + app_id=app2.app_id, installation_id=app2.installation_id + ) + expected_key1 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name1}" + expected_key2 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name2}" + + def ttl_side_effect(key): + if key == expected_key1: + raise RedisError("Redis connection failed") + elif key == expected_key2: + return 300 # Valid TTL + return -1 + + mock_redis.ttl.side_effect = ttl_side_effect + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1, app2]) + assert result == 300 # Should return valid TTL from app2 despite app1 failing + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_all_redis_exceptions_returns_none( + self, mocker + ): + """Test that if all Redis operations fail, None is returned.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app1.save() + + mock_redis = MagicMock() + mock_redis.ttl.side_effect = RedisError("Redis connection failed") + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1]) + assert result is None # All operations failed + + +class TestGetGithubAppInfoForOwnerWithEarliestRetryTime: + @pytest.mark.django_db + def test_get_github_app_info_for_owner_sets_earliest_retry_when_rate_limited( + self, mocker + ): + """Test that earliest_retry_after_seconds is set when rate-limited apps exist.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + is_suspended=False, + ) + app.save() + + # Mock rate-limited check to return True + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + # Mock Redis TTL to return a value + mock_redis = MagicMock() + mock_redis.ttl.return_value = 240 # 4 minutes + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + with pytest.raises(NoConfiguredAppsAvailable) as exp: + get_github_app_info_for_owner(owner) + + assert exp.value.apps_count == 1 + assert exp.value.rate_limited_count == 1 + assert exp.value.suspended_count == 0 + assert exp.value.earliest_retry_after_seconds == 240 + + @pytest.mark.django_db + def test_get_github_app_info_for_owner_no_earliest_retry_when_not_rate_limited( + self, mocker + ): + """Test that earliest_retry_after_seconds is None when no rate-limited apps.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + is_suspended=True, # Suspended but not rate-limited + ) + app.save() + + # Mock rate-limited check to return False + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=False, + ) + + with pytest.raises(NoConfiguredAppsAvailable) as exp: + get_github_app_info_for_owner(owner) + + assert exp.value.apps_count == 1 + assert exp.value.rate_limited_count == 0 + assert exp.value.suspended_count == 1 + assert exp.value.earliest_retry_after_seconds is None + + @pytest.mark.django_db + def test_get_github_app_info_for_owner_earliest_retry_none_when_ttl_fails( + self, mocker + ): + """Test that earliest_retry_after_seconds is None when TTL lookup fails.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + is_suspended=False, + ) + app.save() + + # Mock rate-limited check to return True + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + # Mock Redis TTL to raise exception + mock_redis = MagicMock() + mock_redis.ttl.side_effect = RedisError("Redis connection failed") + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + with pytest.raises(NoConfiguredAppsAvailable) as exp: + get_github_app_info_for_owner(owner) + + assert exp.value.apps_count == 1 + assert exp.value.rate_limited_count == 1 + assert exp.value.suspended_count == 0 + assert exp.value.earliest_retry_after_seconds is None # TTL lookup failed