From 5cba3e84210728f8f0a32ed2ca696126091d34bf Mon Sep 17 00:00:00 2001 From: Joe Becher Date: Fri, 16 Jan 2026 12:20:36 -0500 Subject: [PATCH] fix(tasks): handle GitHub app rate limits and improve notification debouncing Add notification debouncing with Redis fencing tokens and improve lock handling across notification tasks. Handle GitHub app rate limits by retrying with appropriate delays from the API response. Changes: - Add NotificationDebouncer with fencing tokens to coalesce concurrent notifications - Standardize lock acquisition via notification_lock helper - Add LockAcquisitionLimitError and handle_lock_retry for consistent retry handling - Handle NoConfiguredAppsAvailable in BundleAnalysisProcessorTask - Use earliest_retry_after_seconds from exception or fall back to next hour - Add NOTIF_LOCK_ERROR checkpoint to TestResultsFlow for observability - Add _partition_apps_by_rate_limit_status for improved rate limit handling Fixes: - WORKER-WJ5 - NoConfiguredAppsAvailable in bundle analysis - WORKER-WJ8 - "Not retrying lock acquisition - max retries exceeded" - WORKER-WJA - LockNotOwnedError from lock timeout - WORKER-WJ4 - LockError in BundleAnalysisProcessor - WORKER-WJ3 - Bundle analysis processor exceeded max retries --- .../worker/helpers/checkpoint_logger/flows.py | 7 +- apps/worker/services/notification/debounce.py | 149 +++++ apps/worker/tasks/base.py | 79 ++- apps/worker/tasks/bundle_analysis_notify.py | 131 +++- .../worker/tasks/bundle_analysis_processor.py | 50 ++ apps/worker/tasks/notify.py | 196 +++++- apps/worker/tasks/test_analytics_notifier.py | 261 +++----- .../tests/integration/test_notify_task.py | 7 +- apps/worker/tasks/tests/unit/test_base.py | 47 ++ .../unit/test_bundle_analysis_notify_task.py | 563 ++++++++++++++++ .../test_bundle_analysis_processor_task.py | 116 ++++ .../tasks/tests/unit/test_notify_task.py | 138 +++- .../unit/test_test_analytics_notifier.py | 630 ++++++++++++++++-- libs/shared/shared/bots/exceptions.py | 7 +- libs/shared/shared/bots/github_apps.py | 111 ++- .../tests/unit/bots/test_github_apps.py | 497 ++++++++++++++ 16 files changed, 2696 insertions(+), 293 deletions(-) create mode 100644 apps/worker/services/notification/debounce.py diff --git a/apps/worker/helpers/checkpoint_logger/flows.py b/apps/worker/helpers/checkpoint_logger/flows.py index 9d4b78ec7a..c114b74143 100644 --- a/apps/worker/helpers/checkpoint_logger/flows.py +++ b/apps/worker/helpers/checkpoint_logger/flows.py @@ -76,7 +76,11 @@ class UploadFlow(BaseFlow): @failure_events( - "TEST_RESULTS_ERROR", "UNCAUGHT_RETRY_EXCEPTION", "CELERY_FAILURE", "CELERY_TIMEOUT" + "TEST_RESULTS_ERROR", + "NOTIF_LOCK_ERROR", + "UNCAUGHT_RETRY_EXCEPTION", + "CELERY_FAILURE", + "CELERY_TIMEOUT", ) @success_events("TEST_RESULTS_NOTIFY") @subflows( @@ -95,6 +99,7 @@ class TestResultsFlow(BaseFlow): FLAKE_DETECTION_NOTIFY = auto() TEST_RESULTS_ERROR = auto() TEST_RESULTS_FINISHER_BEGIN = auto() + NOTIF_LOCK_ERROR = auto() UNCAUGHT_RETRY_EXCEPTION = auto() CELERY_FAILURE = auto() CELERY_TIMEOUT = auto() diff --git a/apps/worker/services/notification/debounce.py b/apps/worker/services/notification/debounce.py new file mode 100644 index 0000000000..0ba77033e7 --- /dev/null +++ b/apps/worker/services/notification/debounce.py @@ -0,0 +1,149 @@ +import logging +from contextlib import contextmanager +from typing import cast + +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from redis import Redis + +from services.lock_manager import LockManager, LockRetry, LockType + +log = logging.getLogger(__name__) + +SKIP_DEBOUNCE_TOKEN = 1 +""" +Token value to use in tests to skip debounce acquisition step. + +When passed as fencing_token, the task will skip acquiring a new token +and proceed directly to the main notification logic. This is useful for +tests that want to test the core functionality without the debounce retry. +""" + + +class LockAcquisitionLimitError(Exception): + """Raised when lock acquisition attempts exceed the maximum allowed. + + This is distinct from Celery's MaxRetriesExceededError which tracks task retries. + This exception specifically tracks failed attempts to acquire a distributed lock. + """ + + pass + + +class NotificationDebouncer[T]: + """ + Handles debouncing logic for notification tasks using fencing tokens. + + This ensures that when multiple notification tasks are queued for the same + commit, only the latest one actually sends a notification after a debounce period. + """ + + def __init__( + self, + redis_key_template: str, + debounce_period_seconds: int = 30, + lock_type: LockType = LockType.NOTIFICATION, + max_lock_retries: int = 5, + ): + """ + :param redis_key_template: Template string for Redis key, e.g. "notifier_fence:{}_{}" + :param debounce_period_seconds: How long to wait before sending notification + :param lock_type: Type of lock to use for notification locking + :param max_lock_retries: Maximum number of retries for lock acquisition + """ + self.redis_key_template = redis_key_template + self.debounce_period_seconds = debounce_period_seconds + self.lock_type = lock_type + self.max_lock_retries = max_lock_retries + + def acquire_fencing_token( + self, + redis_client: Redis, + repo_id: int, + commit_sha: str, + ) -> int: + """ + Acquire a fencing token for deduplication. + + :returns: The acquired fencing token value + """ + redis_key = self.redis_key_template.format(repo_id, commit_sha) + with redis_client.pipeline() as pipeline: + pipeline.incr(redis_key) + pipeline.expire(redis_key, 24 * 60 * 60) + results = pipeline.execute() + return int(cast(str, results[0])) + + def check_fencing_token_stale( + self, + redis_client: Redis, + repo_id: int, + commit_sha: str, + fencing_token: int, + ) -> bool: + """ + Check if the fencing token is stale, indicating another notification task + is in progress and can take over. + + This method should be called within a lock context. + + :returns: True if the token is stale (task should exit early), + False if the token is current (task should continue). + """ + current_token = int( + cast( + str, + redis_client.get(self.redis_key_template.format(repo_id, commit_sha)) + or "0", + ) + ) + + return fencing_token < current_token + + @contextmanager + def notification_lock(self, lock_manager: LockManager, task_attempts: int): + """ + Context manager to handle the repeated lock acquisition pattern + with automatic retry handling for LockRetry exceptions. + """ + try: + with lock_manager.locked( + self.lock_type, + max_retries=self.max_lock_retries, + retry_num=task_attempts, + ): + yield + return + except LockRetry as e: + if e.max_retries_exceeded: + log.error( + "Not retrying lock acquisition - max retries exceeded", + extra={ + "retry_num": e.retry_num, + "max_attempts": e.max_attempts, + }, + ) + raise LockAcquisitionLimitError( + f"Lock acquisition limit exceeded: {e.retry_num} >= {e.max_attempts}", + ) + raise + + def handle_lock_retry( + self, + task, + lock_retry: LockRetry, + failure_result: T, + ) -> T: + """ + Handle LockRetry exception by attempting to retry the task. + + :param task: The Celery task instance + :param lock_retry: The LockRetry exception + :param failure_result: The result to return if max retries exceeded + :returns: failure_result if max retries exceeded + :raises Retry: When retry is successfully scheduled (caller should catch this) + """ + try: + task.retry(max_retries=task.max_retries, countdown=lock_retry.countdown) + raise AssertionError("task.retry() should always raise Retry") + except CeleryMaxRetriesExceededError: + return failure_result diff --git a/apps/worker/tasks/base.py b/apps/worker/tasks/base.py index 7534585877..6a71894f07 100644 --- a/apps/worker/tasks/base.py +++ b/apps/worker/tasks/base.py @@ -30,6 +30,7 @@ from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError from helpers.log_context import LogContext, set_log_context from helpers.save_commit_error import save_commit_error +from services.notification.debounce import LockAcquisitionLimitError from services.repository import get_repo_provider_service from shared.celery_config import ( TASK_RETRY_BACKOFF_BASE_SECONDS, @@ -411,6 +412,24 @@ def _analyse_error(self, exception: SQLAlchemyError, *args, **kwargs): exc_info=True, ) + def _handle_database_error_retry(self, db_session, *args, **kwargs): + """Handle database error retry logic with rollback and exponential backoff. + + Returns None if max retries exceeded, otherwise raises Retry exception. + safe_retry will raise Retry if retry is possible, so this method will + only return None if max retries have been exceeded. + """ + db_session.rollback() + retry_count = getattr(self.request, "retries", 0) + countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count) + + # safe_retry handles MaxRetriesExceededError internally: + # - Returns False if max retries exceeded + # - Raises Retry if retry is possible (which propagates up to Celery) + if not self.safe_retry(countdown=countdown): + # Max retries exceeded - return None to match old behavior + return None + @sentry_sdk.trace def run(self, *args, **kwargs): with self.task_full_runtime.time(): @@ -462,45 +481,38 @@ def run(self, *args, **kwargs): with self.task_core_runtime.time(): return self.run_impl(db_session, *args, **kwargs) except InterfaceError as ex: - sentry_sdk.capture_exception( - ex, - ) + sentry_sdk.capture_exception(ex) + # InterfaceError indicates a database connection issue + # Return None to indicate task failure without retry + return None except (DataError, IntegrityError): log.exception( "Errors related to the constraints of database happened", extra={"task_args": args, "task_kwargs": kwargs}, ) - db_session.rollback() - retry_count = getattr(self.request, "retries", 0) - countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count) - try: - if not self.safe_retry(countdown=countdown): - return None - except MaxRetriesExceededError: - if UploadFlow.has_begun(): - UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) - if TestResultsFlow.has_begun(): - TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) - return None + return self._handle_database_error_retry(db_session, *args, **kwargs) except SQLAlchemyError as ex: self._analyse_error(ex, args, kwargs) - db_session.rollback() - retry_count = getattr(self.request, "retries", 0) - countdown = TASK_RETRY_BACKOFF_BASE_SECONDS * (2**retry_count) - try: - if not self.safe_retry(countdown=countdown): - return None - except MaxRetriesExceededError: - if UploadFlow.has_begun(): - UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) - if TestResultsFlow.has_begun(): - TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) - return None - except MaxRetriesExceededError as ex: + return self._handle_database_error_retry(db_session, *args, **kwargs) + except MaxRetriesExceededError: + # Log checkpoint if flows have begun if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) + # Return None to indicate task failure after max retries exceeded + return None + except LockAcquisitionLimitError: + log.warning( + "Lock acquisition limit exceeded in task", + extra={"task": self.name}, + exc_info=True, + ) + if UploadFlow.has_begun(): + UploadFlow.log(UploadFlow.NOTIF_LOCK_ERROR) + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) + return None finally: self.wrap_up_dbsession(db_session) @@ -592,9 +604,13 @@ def get_repo_provider_service( ) except NoConfiguredAppsAvailable as exp: if exp.rate_limited_count > 0: - # At least one GitHub app is available but rate-limited. Retry after - # waiting until the next hour (minimum 1 minute delay). - retry_delay_seconds = max(60, get_seconds_to_next_hour()) + # At least one GitHub app is available but rate-limited. + # Use the actual retry time from GitHub API response if available, + # otherwise fall back to waiting until the next hour (minimum 1 minute delay). + if exp.earliest_retry_after_seconds is not None: + retry_delay_seconds = max(60, exp.earliest_retry_after_seconds) + else: + retry_delay_seconds = max(60, get_seconds_to_next_hour()) log.warning( "Unable to get repo provider service due to rate limits. Retrying again later.", extra={ @@ -602,6 +618,7 @@ def get_repo_provider_service( "apps_rate_limited": exp.rate_limited_count, "apps_suspended": exp.suspended_count, "countdown_seconds": retry_delay_seconds, + "earliest_retry_after_seconds": exp.earliest_retry_after_seconds, }, ) if commit: diff --git a/apps/worker/tasks/bundle_analysis_notify.py b/apps/worker/tasks/bundle_analysis_notify.py index 73023bf5e4..d302fcfe38 100644 --- a/apps/worker/tasks/bundle_analysis_notify.py +++ b/apps/worker/tasks/bundle_analysis_notify.py @@ -2,6 +2,8 @@ from typing import Any import sentry_sdk +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from app import celery_app from database.enums import ReportType @@ -10,19 +12,36 @@ from services.bundle_analysis.notify import BundleAnalysisNotifyService from services.bundle_analysis.notify.types import NotificationSuccess from services.lock_manager import LockManager, LockRetry, LockType +from services.notification.debounce import ( + LockAcquisitionLimitError, + NotificationDebouncer, +) from shared.celery_config import ( BUNDLE_ANALYSIS_NOTIFY_MAX_RETRIES, bundle_analysis_notify_task_name, ) +from shared.helpers.redis import get_redis_connection from shared.yaml import UserYaml from tasks.base import BaseCodecovTask log = logging.getLogger(__name__) +BUNDLE_ANALYSIS_NOTIFIER_FENCING_TOKEN = "bundle_analysis_notifier_fence:{}_{}" +DEBOUNCE_PERIOD_SECONDS = 30 + class BundleAnalysisNotifyTask(BaseCodecovTask, name=bundle_analysis_notify_task_name): max_retries = BUNDLE_ANALYSIS_NOTIFY_MAX_RETRIES + def __init__(self): + super().__init__() + self.debouncer = NotificationDebouncer[dict]( + redis_key_template=BUNDLE_ANALYSIS_NOTIFIER_FENCING_TOKEN, + debounce_period_seconds=DEBOUNCE_PERIOD_SECONDS, + lock_type=LockType.BUNDLE_ANALYSIS_NOTIFY, + max_lock_retries=self.max_retries, + ) + def run_impl( self, db_session, @@ -33,6 +52,7 @@ def run_impl( repoid: int, commitid: str, commit_yaml: dict, + fencing_token: int | None = None, **kwargs, ): repoid = int(repoid) @@ -47,18 +67,88 @@ def run_impl( }, ) + redis_connection = get_redis_connection() lock_manager = LockManager( repoid=repoid, commitid=commitid, report_type=ReportType.BUNDLE_ANALYSIS, + redis_connection=redis_connection, ) + if fencing_token is None: + try: + with self.debouncer.notification_lock(lock_manager, self.attempts): + fencing_token = self.debouncer.acquire_fencing_token( + redis_connection, repoid, commitid + ) + + log.info( + "Acquired fencing token, retrying for debounce period", + extra={ + "repoid": repoid, + "commit": commitid, + "fencing_token": fencing_token, + }, + ) + retry_kwargs = { + "previous_result": previous_result, + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "fencing_token": fencing_token, + **kwargs, + } + try: + self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=retry_kwargs) + except CeleryMaxRetriesExceededError: + # Max retries exceeded during debounce retry, but we have a valid token. + # Proceed immediately with notification instead of dropping it. + log.warning( + "Max retries exceeded during debounce retry, proceeding immediately with notification", + extra={ + "repoid": repoid, + "commit": commitid, + "fencing_token": fencing_token, + }, + ) + # Continue execution to process notification with acquired token + except LockAcquisitionLimitError: + return { + "notify_attempted": False, + "notify_succeeded": None, + } + except LockRetry as e: + try: + result = self.debouncer.handle_lock_retry( + self, + e, + { + "notify_attempted": False, + "notify_succeeded": None, + }, + ) + if result is not None: + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise + + assert fencing_token, "Fencing token not acquired" try: - with lock_manager.locked( - LockType.BUNDLE_ANALYSIS_NOTIFY, - max_retries=self.max_retries, - retry_num=self.attempts, - ): + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_connection, repoid, commitid, fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra={"repoid": repoid, "commit": commitid}, + ) + return { + "notify_attempted": False, + "notify_succeeded": None, + } + return self.process_impl_within_lock( db_session=db_session, repoid=repoid, @@ -67,22 +157,27 @@ def run_impl( previous_result=previous_result, **kwargs, ) + except LockAcquisitionLimitError: + return { + "notify_attempted": False, + "notify_succeeded": None, + } except LockRetry as retry: - if retry.max_retries_exceeded: - log.error( - "Not retrying lock acquisition - max retries exceeded", - extra={ - "commitid": commitid, - "repoid": repoid, - "retry_num": retry.retry_num, - "max_attempts": retry.max_attempts, + try: + result = self.debouncer.handle_lock_retry( + self, + retry, + { + "notify_attempted": False, + "notify_succeeded": None, }, ) - return { - "notify_attempted": False, - "notify_succeeded": None, - } - self.retry(max_retries=self.max_retries, countdown=retry.countdown) + if result is not None: + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise @sentry_sdk.trace def process_impl_within_lock( diff --git a/apps/worker/tasks/bundle_analysis_processor.py b/apps/worker/tasks/bundle_analysis_processor.py index 5336201c25..d6f1face08 100644 --- a/apps/worker/tasks/bundle_analysis_processor.py +++ b/apps/worker/tasks/bundle_analysis_processor.py @@ -6,6 +6,8 @@ from app import celery_app from database.enums import ReportType from database.models import Commit, CommitReport, Upload +from helpers.clock import get_seconds_to_next_hour +from helpers.exceptions import NoConfiguredAppsAvailable from services.bundle_analysis.report import ( BundleAnalysisReportService, ProcessingResult, @@ -76,6 +78,54 @@ def run_impl( params, previous_result, ) + except NoConfiguredAppsAvailable as exp: + # Handle NoConfiguredAppsAvailable errors that may occur during + # repository service initialization (e.g., in BundleAnalysisReportLoader + # or other components that need GitHub app access) + if exp.rate_limited_count > 0: + # At least one GitHub app is available but rate-limited + # Use the actual retry time from GitHub API response if available, + # otherwise fall back to waiting until the next hour (minimum 1 minute delay) + if exp.earliest_retry_after_seconds is not None: + retry_delay_seconds = max(60, exp.earliest_retry_after_seconds) + else: + retry_delay_seconds = max(60, get_seconds_to_next_hour()) + log.warning( + "Bundle analysis processor unable to acquire GitHub app due to rate limits. Retrying later.", + extra={ + "apps_available": exp.apps_count, + "apps_rate_limited": exp.rate_limited_count, + "apps_suspended": exp.suspended_count, + "commitid": commitid, + "countdown_seconds": retry_delay_seconds, + "earliest_retry_after_seconds": exp.earliest_retry_after_seconds, + "repoid": repoid, + }, + ) + if not self.safe_retry(countdown=retry_delay_seconds): + log.error( + "Bundle analysis processor exceeded max retries after rate limit", + extra={ + "attempts": self.attempts, + "commitid": commitid, + "max_attempts": self.max_retries + 1, + "repoid": repoid, + }, + ) + return previous_result + else: + # No apps available or all apps are suspended - skip processing + log.warning( + "Bundle analysis processor skipping processing due to no configured GitHub apps available", + extra={ + "apps_available": exp.apps_count, + "apps_rate_limited": exp.rate_limited_count, + "apps_suspended": exp.suspended_count, + "commitid": commitid, + "repoid": repoid, + }, + ) + return previous_result except LockRetry as retry: # Check max retries using self.attempts (includes visibility timeout re-deliveries) # This prevents infinite retry loops when max retries are exceeded diff --git a/apps/worker/tasks/notify.py b/apps/worker/tasks/notify.py index 660d5caf55..d3a5b2169e 100644 --- a/apps/worker/tasks/notify.py +++ b/apps/worker/tasks/notify.py @@ -2,16 +2,14 @@ import sentry_sdk from asgiref.sync import async_to_sync -from celery.exceptions import MaxRetriesExceededError +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from sqlalchemy import and_ from sqlalchemy.orm.session import Session from app import celery_app from database.enums import CommitErrorTypes, Decoration, NotificationState, ReportType -from database.models import ( - Commit, - Pull, -) +from database.models import Commit, Pull from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME, CompareCommit from helpers.checkpoint_logger.flows import UploadFlow from helpers.clock import get_seconds_to_next_hour @@ -32,6 +30,10 @@ from services.github import get_github_app_for_commit, set_github_app_for_commit from services.lock_manager import LockManager, LockRetry, LockType from services.notification import NotificationService +from services.notification.debounce import ( + LockAcquisitionLimitError, + NotificationDebouncer, +) from services.report import ReportService from services.repository import ( EnrichedPull, @@ -66,8 +68,24 @@ GENERIC_TA_ERROR_MSG = "Test Analytics upload error: We are unable to process any of the uploaded JUnit XML files. Please ensure your files are in the right format." +NOTIFIER_FENCING_TOKEN = "notifier_fence:{}_{}" +DEBOUNCE_PERIOD_SECONDS = 30 + class NotifyTask(BaseCodecovTask, name=notify_task_name): + # Explicitly define max_retries to ensure consistent behavior with debouncer. + # Celery's default is 3, which matches the old lock retry behavior. + max_retries = 3 + + def __init__(self): + super().__init__() + self.debouncer = NotificationDebouncer[dict]( + redis_key_template=NOTIFIER_FENCING_TOKEN, + debounce_period_seconds=DEBOUNCE_PERIOD_SECONDS, + lock_type=LockType.NOTIFICATION, + max_lock_retries=self.max_retries, + ) + def run_impl( self, db_session: Session, @@ -76,6 +94,7 @@ def run_impl( commitid: str, current_yaml=None, empty_upload=None, + fencing_token: int | None = None, **kwargs, ): milestone = Milestones.NOTIFICATIONS_SENT @@ -105,15 +124,100 @@ def run_impl( commitid=commitid, report_type=ReportType.COVERAGE, lock_timeout=max(80, self.hard_time_limit_task), + redis_connection=redis_connection, ) + if fencing_token is None: + try: + with self.debouncer.notification_lock(lock_manager, self.attempts): + fencing_token = self.debouncer.acquire_fencing_token( + redis_connection, repoid, commitid + ) + + log.info( + "Acquired fencing token, retrying for debounce period", + extra={ + "repoid": repoid, + "commitid": commitid, + "fencing_token": fencing_token, + }, + ) + retry_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": current_yaml, + "empty_upload": empty_upload, + "fencing_token": fencing_token, + **kwargs, + } + try: + self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=retry_kwargs) + except CeleryMaxRetriesExceededError: + # Max retries exceeded during debounce retry, but we have a valid token. + # Proceed immediately with notification instead of dropping it. + log.warning( + "Max retries exceeded during debounce retry, proceeding immediately with notification", + extra={ + "repoid": repoid, + "commitid": commitid, + "fencing_token": fencing_token, + }, + ) + # Continue execution to process notification with acquired token + except LockAcquisitionLimitError: + self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_LOCK_ERROR, + ) + return { + "notified": False, + "notifications": None, + "reason": "max_retries_exceeded", + } + except LockRetry as e: + try: + result = self.debouncer.handle_lock_retry( + self, + e, + { + "notified": False, + "notifications": None, + "reason": "unobtainable_lock", + }, + ) + if result is not None: + self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_LOCK_ERROR, + ) + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise + + assert fencing_token, "Fencing token not acquired" try: - lock_acquired = False - with lock_manager.locked( - lock_type=LockType.NOTIFICATION, - retry_num=self.attempts, - ): - lock_acquired = True + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_connection, repoid, commitid, fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra={"repoid": repoid, "commitid": commitid}, + ) + return { + "notified": False, + "notifications": None, + "reason": "stale_fencing_token", + } + return self.run_impl_within_lock( db_session, repoid=repoid, @@ -122,18 +226,7 @@ def run_impl( empty_upload=empty_upload, **kwargs, ) - except LockRetry as err: - ( - log.info( - "Not notifying because there is another notification already happening", - extra={ - "repoid": repoid, - "commitid": commitid, - "error_type": type(err), - "lock_acquired": lock_acquired, - }, - ), - ) + except LockAcquisitionLimitError: self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) self._call_upload_breadcrumb_task( commit_sha=commitid, @@ -141,17 +234,49 @@ def run_impl( milestone=milestone, error=Errors.INTERNAL_LOCK_ERROR, ) - self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - error=Errors.INTERNAL_OTHER_JOB, - ) return { "notified": False, "notifications": None, - "reason": "unobtainable_lock", + "reason": "max_retries_exceeded", } + except LockRetry as err: + log.info( + "Not notifying because there is another notification already happening", + extra={ + "repoid": repoid, + "commitid": commitid, + "error_type": type(err), + }, + ) + try: + result = self.debouncer.handle_lock_retry( + self, + err, + { + "notified": False, + "notifications": None, + "reason": "unobtainable_lock", + }, + ) + if result is not None: + self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_LOCK_ERROR, + ) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_OTHER_JOB, + ) + return result + except Retry: + # Re-raise Retry exception to ensure task retry is scheduled + # and execution doesn't continue past this point + raise def log_checkpoint(self, checkpoint): """ @@ -183,7 +308,7 @@ def _attempt_retry( error=Errors.INTERNAL_RETRYING, ) self.retry(max_retries=max_retries, countdown=countdown) - except MaxRetriesExceededError: + except CeleryMaxRetriesExceededError: log.warning( "Not attempting to retry notifications since we already retried too many times", extra={ @@ -272,8 +397,12 @@ def run_impl_within_lock( if exp.rate_limited_count > 0: # There's at least 1 app that we can use to communicate with GitHub, # but this app happens to be rate limited now. We try again later. - # Min wait time of 1 minute - retry_delay_seconds = max(60, get_seconds_to_next_hour()) + # Use the actual retry time from GitHub API response if available, + # otherwise fall back to waiting until the next hour (minimum 1 minute delay). + if exp.earliest_retry_after_seconds is not None: + retry_delay_seconds = max(60, exp.earliest_retry_after_seconds) + else: + retry_delay_seconds = max(60, get_seconds_to_next_hour()) log.warning( "Unable to start notifications. Retrying again later.", extra={ @@ -283,6 +412,7 @@ def run_impl_within_lock( "apps_rate_limited": exp.rate_limited_count, "apps_suspended": exp.suspended_count, "countdown_seconds": retry_delay_seconds, + "earliest_retry_after_seconds": exp.earliest_retry_after_seconds, }, ) self._call_upload_breadcrumb_task( diff --git a/apps/worker/tasks/test_analytics_notifier.py b/apps/worker/tasks/test_analytics_notifier.py index b58b51f740..2c085b45fd 100644 --- a/apps/worker/tasks/test_analytics_notifier.py +++ b/apps/worker/tasks/test_analytics_notifier.py @@ -1,18 +1,20 @@ import logging -from contextlib import contextmanager -from typing import cast from asgiref.sync import async_to_sync from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError from django.conf import settings -from redis import Redis from sqlalchemy.orm import Session from app import celery_app from database.enums import ReportType +from helpers.checkpoint_logger.flows import TestResultsFlow from helpers.notifier import NotifierResult from helpers.string import shorten_file_paths from services.lock_manager import LockManager, LockRetry, LockType +from services.notification.debounce import ( + LockAcquisitionLimitError, + NotificationDebouncer, +) from services.repository import ( fetch_pull_request_information, get_repo_provider_service, @@ -35,7 +37,10 @@ TestResultsNotifier, should_do_flaky_detection, ) -from shared.celery_config import test_analytics_notifier_task_name +from shared.celery_config import ( + TASK_MAX_RETRIES_DEFAULT, + test_analytics_notifier_task_name, +) from shared.django_apps.core.models import Repository from shared.helpers.redis import get_redis_connection from shared.reports.types import UploadType @@ -44,13 +49,6 @@ from shared.yaml.user_yaml import UserYaml from tasks.base import BaseCodecovTask - -class MaxRetriesExceededError(Exception): - """Raised when lock acquisition exceeds max retries.""" - - pass - - log = logging.getLogger(__name__) TA_NOTIFIER_FENCING_TOKEN = "ta_notifier_fence:{}_{}" @@ -65,6 +63,17 @@ class TestAnalyticsNotifierTask( data retention policies. """ + max_retries = TASK_MAX_RETRIES_DEFAULT + + def __init__(self): + super().__init__() + self.debouncer = NotificationDebouncer[NotifierTaskResult]( + redis_key_template=TA_NOTIFIER_FENCING_TOKEN, + debounce_period_seconds=DEBOUNCE_PERIOD_SECONDS, + lock_type=LockType.NOTIFICATION, + max_lock_retries=self.max_retries, + ) + def run_impl( self, _db_session: Session, @@ -106,68 +115,76 @@ def run_impl( if fencing_token is None: try: - with self._notification_lock(lock_manager): - # Acquire a fencing token to deduplicate notifications - redis_key = TA_NOTIFIER_FENCING_TOKEN.format( - repo_id, upload_context["commit_sha"] + with self.debouncer.notification_lock(lock_manager, self.attempts): + fencing_token = self.debouncer.acquire_fencing_token( + redis_client, repo_id, upload_context["commit_sha"] ) - with redis_client.pipeline() as pipeline: - pipeline.incr(redis_key) - # Set expiry to prevent stale keys from accumulating - pipeline.expire(redis_key, 24 * 60 * 60) # 24 hours - results = pipeline.execute() - fencing_token = int(cast(str, results[0])) - - # Add fencing token to the args and then retry for a debounce period + self.extra_dict["fencing_token"] = fencing_token - kwargs["fencing_token"] = fencing_token + retry_kwargs = { + "repoid": repo_id, + "upload_context": upload_context, + "fencing_token": fencing_token, + **kwargs, + } log.info( "Acquired fencing token, retrying for debounce period", extra=self.extra_dict, ) - self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=kwargs) - except MaxRetriesExceededError: + try: + self.retry(countdown=DEBOUNCE_PERIOD_SECONDS, kwargs=retry_kwargs) + except CeleryMaxRetriesExceededError: + log.warning( + "Max retries exceeded during debounce retry, proceeding immediately with notification", + extra=self.extra_dict, + ) + # Continue execution to process notification with acquired token + except LockAcquisitionLimitError: + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) return NotifierTaskResult( attempted=False, succeeded=False, ) except LockRetry as e: - try: - self.retry(max_retries=5, countdown=e.countdown) - except CeleryMaxRetriesExceededError: + result = self.debouncer.handle_lock_retry( + self, + e, + NotifierTaskResult(attempted=False, succeeded=False), + ) + if result is not None: + return result + + assert fencing_token, "Fencing token not acquired" + try: + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_client, repo_id, upload_context["commit_sha"], fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra=self.extra_dict, + ) return NotifierTaskResult( attempted=False, succeeded=False, ) - - # At this point we have a fencing token, but want to check if another - # notification task has incremented it, indicating a newer notification - # task is in progress and can take over - assert fencing_token, "Fencing token not acquired" - # The lock here is not required for correctness (only the one at the - # end is), but it reduces duplicated work and database queries - try: - with self._notification_lock(lock_manager): - stale_token_result = self._check_fencing_token( - redis_client, repo_id, upload_context, fencing_token - ) - if stale_token_result: - return stale_token_result - except MaxRetriesExceededError: + except LockAcquisitionLimitError: + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) return NotifierTaskResult( attempted=False, succeeded=False, ) except LockRetry as e: - try: - self.retry(max_retries=5, countdown=e.countdown) - except CeleryMaxRetriesExceededError: - return NotifierTaskResult( - attempted=False, - succeeded=False, - ) + result = self.debouncer.handle_lock_retry( + self, + e, + NotifierTaskResult(attempted=False, succeeded=False), + ) + if result is not None: + return result - # Do preparation work without a lock notifier = self.notification_preparation( repo_id=repo_id, upload_context=upload_context, @@ -181,121 +198,49 @@ def run_impl( ) try: - with self._notification_lock(lock_manager): - # Check fencing token again before sending the notification - stale_token_result = self._check_fencing_token( - redis_client, repo_id, upload_context, fencing_token - ) - if stale_token_result: - return stale_token_result + with self.debouncer.notification_lock(lock_manager, self.attempts): + if self.debouncer.check_fencing_token_stale( + redis_client, repo_id, upload_context["commit_sha"], fencing_token + ): + log.info( + "Fencing token is stale, another notification task is in progress, exiting", + extra=self.extra_dict, + ) + return NotifierTaskResult( + attempted=False, + succeeded=False, + ) - # Send notification notifier_result = notifier.notify() - success = ( - True if notifier_result is NotifierResult.COMMENT_POSTED else False - ) + success = notifier_result is NotifierResult.COMMENT_POSTED log.info("Posted TA comment", extra={**self.extra_dict, "success": success}) return NotifierTaskResult( attempted=True, succeeded=success, ) - except MaxRetriesExceededError: + except LockAcquisitionLimitError: + if TestResultsFlow.has_begun(): + TestResultsFlow.log(TestResultsFlow.NOTIF_LOCK_ERROR) return NotifierTaskResult( attempted=False, succeeded=False, ) except LockRetry as e: - try: - self.retry(max_retries=5, countdown=e.countdown) - except CeleryMaxRetriesExceededError: - return NotifierTaskResult( - attempted=False, - succeeded=False, - ) - - @contextmanager - def _notification_lock(self, lock_manager: LockManager): - """ - Context manager to handle the repeated lock acquisition pattern - with automatic retry handling for LockRetry exceptions. - """ - try: - with lock_manager.locked( - LockType.NOTIFICATION, - max_retries=5, - retry_num=self.attempts, - ): - yield - return - except LockRetry as e: - # Lock acquisition failed - handle immediately without yielding - # This ensures the with block body never executes without lock protection - if e.max_retries_exceeded: - log.error( - "Not retrying lock acquisition - max retries exceeded", - extra={ - "retry_num": e.retry_num, - "max_attempts": e.max_attempts, - }, - ) - raise MaxRetriesExceededError( - f"Lock acquisition exceeded max retries: {e.retry_num} >= {e.max_attempts}", - ) - # Re-raise LockRetry to be handled by the caller's retry logic - # The caller will catch this and call self.retry() - raise - - def _check_fencing_token( - self, - redis_client: Redis, - repo_id: int, - upload_context: TAUploadContext, - fencing_token: int, - ) -> NotifierTaskResult | None: - """ - Check if the fencing token is stale, indicating another notification task - is in progress and can take over. - - This method should be called within a lock context. - - :returns: - NotifierTaskResult if the token is stale (task should exit early), - None if the token is current (task should continue). - """ - current_token = int( - cast( - str, - redis_client.get( - TA_NOTIFIER_FENCING_TOKEN.format( - repo_id, upload_context["commit_sha"] - ) - ) - or "0", + result = self.debouncer.handle_lock_retry( + self, + e, + NotifierTaskResult(attempted=False, succeeded=False), ) - ) - - # We do a less than comparison since it guarantees safety at - # the cost of no debouncing for that commit in the case of - # losing the key in Redis - if fencing_token < current_token: - log.info( - "Fencing token is stale, another notification task is in progress, exiting", - extra=self.extra_dict, - ) - return NotifierTaskResult( - attempted=False, - succeeded=False, - ) - - return None + if result is not None: + return result def notification_preparation( self, *, repo_id: int, upload_context: TAUploadContext, - commit_yaml: UserYaml = UserYaml({}), # TODO: Actual commit_yaml + commit_yaml: UserYaml = UserYaml({}), ) -> TestResultsNotifier | None: """ Prepare the test results notifier. @@ -312,9 +257,6 @@ def notification_preparation( log.info("Comment is disabled, not posting comment", extra=self.extra_dict) return None - # TODO: Add upload errors in a compliant way - - # TODO: Remove impl label with read_tests_totals_summary.labels(impl="new").time(): summary = get_pr_comment_agg(repo_id, upload_context["commit_sha"]) @@ -358,8 +300,6 @@ def notification_preparation( _repo_service=repo_service, ) - # TODO: Seat activation - with read_failures_summary.labels(impl="new").time(): failures = get_pr_comment_failures(repo_id, upload_context["commit_sha"]) @@ -385,24 +325,23 @@ def notification_preparation( def transform_failures( failures: list[FailedTestInstance], ) -> list[TestResultsNotificationFailure[bytes]]: - notif_failures = [] + result = [] for failure in failures: - if failure["failure_message"] is not None: - failure["failure_message"] = shorten_file_paths( - failure["failure_message"] - ).replace("\r", "") + failure_message = failure["failure_message"] + if failure_message is not None: + failure_message = shorten_file_paths(failure_message).replace("\r", "") - notif_failures.append( + result.append( TestResultsNotificationFailure( display_name=failure["computed_name"], - failure_message=failure["failure_message"], + failure_message=failure_message, test_id=failure["test_id"], envs=failure["flags"], duration_seconds=failure["duration_seconds"] or 0, - build_url=None, # TODO: Figure out how we can save this in a compliant way + build_url=None, ) ) - return notif_failures + return result RegisteredTestAnalyticsNotifierTask = celery_app.register_task( diff --git a/apps/worker/tasks/tests/integration/test_notify_task.py b/apps/worker/tasks/tests/integration/test_notify_task.py index af8d6b51a1..1a0359d1a3 100644 --- a/apps/worker/tasks/tests/integration/test_notify_task.py +++ b/apps/worker/tasks/tests/integration/test_notify_task.py @@ -7,6 +7,7 @@ from database.models.core import CompareCommit from database.tests.factories import CommitFactory, PullFactory, RepositoryFactory from services.comparison import get_or_create_comparison +from services.notification.debounce import SKIP_DEBOUNCE_TOKEN from services.notification.notifiers.base import NotificationResult from services.repository import EnrichedPull from shared.api_archive.archive import ArchiveService @@ -86,7 +87,11 @@ def test_simple_call_no_notifiers( task = NotifyTask() result = task.run_impl( - dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml={} + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml={}, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == { diff --git a/apps/worker/tasks/tests/unit/test_base.py b/apps/worker/tasks/tests/unit/test_base.py index 237ba9b51a..aa344ba7ff 100644 --- a/apps/worker/tasks/tests/unit/test_base.py +++ b/apps/worker/tasks/tests/unit/test_base.py @@ -319,7 +319,54 @@ def test_get_repo_provider_service_rate_limited(self, mocker, mock_self_app): mock_retry = mocker.patch.object(task, "retry") mock_repo = mocker.MagicMock() assert task.get_repo_provider_service(mock_repo, commit=mock_commit) is None + # When earliest_retry_after_seconds is None, falls back to get_seconds_to_next_hour() task.retry.assert_called_with(countdown=120) + + def test_get_repo_provider_service_rate_limited_uses_actual_retry_time( + self, mocker, mock_self_app + ): + """Test that base task uses actual retry time from GitHub API when available.""" + mocker.patch( + "tasks.base.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=2, + suspended_count=0, + earliest_retry_after_seconds=180, # 3 minutes + ), + ) + mock_commit = mocker.MagicMock() + mock_commit.commitid = "abc123" + + task = BaseCodecovTask() + mock_retry = mocker.patch.object(task, "retry") + mock_repo = mocker.MagicMock() + assert task.get_repo_provider_service(mock_repo, commit=mock_commit) is None + # Should use actual retry time (180 seconds) instead of falling back + task.retry.assert_called_with(countdown=180) + + def test_get_repo_provider_service_rate_limited_enforces_minimum_delay( + self, mocker, mock_self_app + ): + """Test that base task enforces minimum 60-second delay even with short retry time.""" + mocker.patch( + "tasks.base.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=2, + suspended_count=0, + earliest_retry_after_seconds=30, # Less than minimum + ), + ) + mock_commit = mocker.MagicMock() + mock_commit.commitid = "abc123" + + task = BaseCodecovTask() + mock_retry = mocker.patch.object(task, "retry") + mock_repo = mocker.MagicMock() + assert task.get_repo_provider_service(mock_repo, commit=mock_commit) is None + # Should enforce minimum 60-second delay + task.retry.assert_called_with(countdown=60) mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls( [ call( diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py index bdfa00f51f..2c2ac867f3 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_notify_task.py @@ -1,9 +1,19 @@ +from contextlib import contextmanager + +import pytest +from celery.exceptions import Retry + from database.tests.factories import CommitFactory from services.bundle_analysis.notify import BundleAnalysisNotifyReturn from services.bundle_analysis.notify.types import ( NotificationSuccess, NotificationType, ) +from services.lock_manager import LockRetry +from services.notification.debounce import ( + SKIP_DEBOUNCE_TOKEN, + LockAcquisitionLimitError, +) from tasks.bundle_analysis_notify import BundleAnalysisNotifyTask @@ -34,6 +44,7 @@ def test_bundle_analysis_notify_task( repoid=commit.repoid, commitid=commit.commitid, commit_yaml={}, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == { "notify_attempted": True, @@ -51,8 +62,560 @@ def test_bundle_analysis_notify_skips_if_all_processing_fail(dbsession): repoid=commit.repoid, commitid=commit.commitid, commit_yaml={}, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == { "notify_attempted": False, "notify_succeeded": NotificationSuccess.ALL_ERRORED, } + + +def test_debounce_retry_includes_all_required_params( + mocker, dbsession, celery_app, mock_redis +): + """Test that retry kwargs include all required parameters for debounce retry. + + This test ensures that when the debounce logic triggers a retry, all required + parameters are included in the kwargs passed to self.retry(). This prevents + TypeError when Celery attempts to retry the task. + """ + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + mock_redis.get.return_value = False + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock retry to capture kwargs + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + previous_result = [{"error": None}] + commit_yaml = {"codecov": {"flags": ["test"]}} + + with pytest.raises(Retry): + task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml=commit_yaml, + # Don't pass fencing_token - let it go through retry path + ) + + # Verify retry was called + assert mock_retry.called + + # CRITICAL: Verify all required params are in kwargs + call_kwargs = mock_retry.call_args[1]["kwargs"] + assert call_kwargs["previous_result"] == previous_result + assert call_kwargs["repoid"] == commit.repoid + assert call_kwargs["commitid"] == commit.commitid + assert call_kwargs["commit_yaml"] == commit_yaml # Should be dict, not UserYaml + assert isinstance(call_kwargs["commit_yaml"], dict) # Verify type + assert call_kwargs["fencing_token"] == 1 # Token from acquisition + assert mock_retry.call_args[1]["countdown"] == 30 # DEBOUNCE_PERIOD_SECONDS + + +def test_bundle_analysis_notify_lock_retry_limit_exceeded_during_token_acquisition( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockAcquisitionLimitError during token acquisition returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError + mock_notification_lock = mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + # Don't pass fencing_token - let it go through token acquisition path + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_lock_retry_during_token_acquisition( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockRetry during token acquisition triggers task retry.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock retry to raise Retry exception + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + with pytest.raises(Retry): + task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + # Don't pass fencing_token - let it go through token acquisition path + ) + + # Verify retry was called with max_retries from debouncer + assert mock_retry.called + + +def test_bundle_analysis_notify_lock_retry_max_retries_exceeded_during_token_acquisition( + mocker, dbsession, celery_app, mock_redis +): + """Test that when max retries exceeded during LockRetry handling, returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = ["1"] + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock handle_lock_retry to return failure result (simulating max retries exceeded) + mocker.patch.object( + task.debouncer, + "handle_lock_retry", + return_value={ + "notify_attempted": False, + "notify_succeeded": None, + }, + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + # Don't pass fencing_token - let it go through token acquisition path + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_stale_fencing_token( + mocker, dbsession, celery_app, mock_redis +): + """Test that stale fencing token causes task to exit early.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return stale token (current token > fencing_token) + mock_redis.get.return_value = "2" # Current token is 2, but we have token 1 + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed (for the stale token check lock) + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock check_fencing_token_stale to return True (token is stale) + mocker.patch.object(task.debouncer, "check_fencing_token_stale", return_value=True) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, # Pass fencing token, but it's stale + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_lock_retry_limit_exceeded_during_notification( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockAcquisitionLimitError during notification lock returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError during notification + mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_lock_retry_during_notification( + mocker, dbsession, celery_app, mock_redis +): + """Test that LockRetry during notification lock triggers task retry.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry during notification + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock retry to raise Retry exception + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + with pytest.raises(Retry): + task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + # Verify retry was called + assert mock_retry.called + + +def test_bundle_analysis_notify_lock_retry_max_retries_exceeded_during_notification( + mocker, dbsession, celery_app, mock_redis +): + """Test that when max retries exceeded during LockRetry handling in notification, returns graceful failure.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + task.request.headers = {} + + # Mock debouncer's notification_lock to raise LockRetry during notification + @contextmanager + def mock_lock_raises_retry(*args, **kwargs): + raise LockRetry(60) + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_raises_retry + ) + + # Mock handle_lock_retry to return failure result (simulating max retries exceeded) + mocker.patch.object( + task.debouncer, + "handle_lock_retry", + return_value={ + "notify_attempted": False, + "notify_succeeded": None, + }, + ) + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": False, + "notify_succeeded": None, + } + + +def test_bundle_analysis_notify_commit_not_found( + mocker, dbsession, celery_app, mock_redis +): + """Test that missing commit raises assertion error.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + # Create a commit but don't add it to session (or use non-existent commitid) + commit = CommitFactory.build() # build() doesn't add to session + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + with pytest.raises(AssertionError, match="commit not found"): + task.process_impl_within_lock( + db_session=dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + previous_result=[{"error": None}], + ) + + +def test_bundle_analysis_notify_previous_result_not_list( + mocker, dbsession, celery_app, mock_redis +): + """Test that previous_result that's not a list is handled gracefully.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Pass previous_result as a dict instead of list + # When not a list, it becomes empty list [], and all([]) is True, + # so it returns ALL_ERRORED + result = task.run_impl( + dbsession, + {"error": None}, # Not a list + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + # Should handle gracefully and treat as empty list, which means all errored + assert result == { + "notify_attempted": False, + "notify_succeeded": NotificationSuccess.ALL_ERRORED, + } + + +def test_bundle_analysis_notify_task_initialization(mocker): + """Test that task initialization sets up debouncer correctly.""" + mocker.patch("tasks.bundle_analysis_notify.BaseCodecovTask.__init__") + + task = BundleAnalysisNotifyTask() + + assert task.debouncer is not None + assert task.debouncer.redis_key_template == "bundle_analysis_notifier_fence:{}_{}" + assert task.debouncer.debounce_period_seconds == 30 + assert task.debouncer.max_lock_retries == task.max_retries + + +def test_bundle_analysis_notify_partial_success( + mocker, dbsession, celery_app, mock_redis +): + """Test notification with partial success.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + mocker.patch( + "services.bundle_analysis.notify.BundleAnalysisNotifyService.notify", + return_value=BundleAnalysisNotifyReturn( + notifications_configured=( + NotificationType.PR_COMMENT, + NotificationType.COMMIT_STATUS, + ), + notifications_attempted=( + NotificationType.PR_COMMENT, + NotificationType.COMMIT_STATUS, + ), + notifications_successful=( + NotificationType.PR_COMMENT, + ), # Only one succeeded + ), + ) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": True, + "notify_succeeded": NotificationSuccess.PARTIAL_SUCCESS, + } + + +def test_bundle_analysis_notify_no_notifications_configured( + mocker, dbsession, celery_app, mock_redis +): + """Test notification when no notifications are configured.""" + mocker.patch.object(BundleAnalysisNotifyTask, "app", celery_app) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + # Mock LockManager to succeed + lock_manager_mock = mocker.MagicMock() + lock_manager_mock.return_value.locked.return_value.__enter__.return_value = None + mocker.patch("tasks.bundle_analysis_notify.LockManager", lock_manager_mock) + + mocker.patch( + "services.bundle_analysis.notify.BundleAnalysisNotifyService.notify", + return_value=BundleAnalysisNotifyReturn( + notifications_configured=(), # No notifications configured + notifications_attempted=(), + notifications_successful=(), + ), + ) + + task = BundleAnalysisNotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + result = task.run_impl( + dbsession, + [{"error": None}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + fencing_token=1, + ) + + assert result == { + "notify_attempted": True, + "notify_succeeded": NotificationSuccess.NOTHING_TO_NOTIFY, + } diff --git a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py index 78037f4178..7f50065733 100644 --- a/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py +++ b/apps/worker/tasks/tests/unit/test_bundle_analysis_processor_task.py @@ -5,6 +5,7 @@ from database.enums import ReportType from database.models import CommitReport, Upload from database.tests.factories import CommitFactory, RepositoryFactory, UploadFactory +from helpers.exceptions import NoConfiguredAppsAvailable from services.bundle_analysis.report import ProcessingError, ProcessingResult from shared.api_archive.archive import ArchiveService from shared.bundle_analysis.storage import get_bucket_name @@ -2021,3 +2022,118 @@ def test_bundle_analysis_processor_task_cleanup_with_none_result( # Should not crash even though result is None # The finally block should handle None result gracefully + + def test_bundle_analysis_processor_no_apps_available_rate_limited_uses_actual_retry_time( + self, mocker, dbsession, mock_redis + ): + """Test that bundle analysis processor uses actual retry time from GitHub API when available.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mocker.patch( + "tasks.bundle_analysis_processor.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=240, # 4 minutes + ), + ) + + task = BundleAnalysisProcessorTask() + task.request.retries = 0 + task.max_retries = 5 + mock_safe_retry = mocker.patch.object(task, "safe_retry", return_value=True) + + previous_result = [{"error": None}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={"upload_id": "test", "commit": commit.commitid}, + ) + + # Should return previous result and retry with actual retry time + assert result == previous_result + mock_safe_retry.assert_called_once_with(countdown=240) + + def test_bundle_analysis_processor_no_apps_available_rate_limited_fallback( + self, mocker, dbsession, mock_redis + ): + """Test that bundle analysis processor falls back to next hour when retry time is None.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mocker.patch( + "tasks.bundle_analysis_processor.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=None, # No retry time available + ), + ) + mocker.patch( + "tasks.bundle_analysis_processor.get_seconds_to_next_hour", + return_value=1800, + ) + + task = BundleAnalysisProcessorTask() + task.request.retries = 0 + task.max_retries = 5 + mock_safe_retry = mocker.patch.object(task, "safe_retry", return_value=True) + + previous_result = [{"error": None}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={"upload_id": "test", "commit": commit.commitid}, + ) + + # Should return previous result and retry with fallback time + assert result == previous_result + mock_safe_retry.assert_called_once_with(countdown=1800) + + def test_bundle_analysis_processor_no_apps_available_enforces_minimum_delay( + self, mocker, dbsession, mock_redis + ): + """Test that bundle analysis processor enforces minimum 60-second delay.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mocker.patch( + "tasks.bundle_analysis_processor.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=30, # Less than minimum + ), + ) + + task = BundleAnalysisProcessorTask() + task.request.retries = 0 + task.max_retries = 5 + mock_safe_retry = mocker.patch.object(task, "safe_retry", return_value=True) + + previous_result = [{"error": None}] + result = task.run_impl( + dbsession, + previous_result, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + params={"upload_id": "test", "commit": commit.commitid}, + ) + + # Should return previous result and retry with minimum delay + assert result == previous_result + mock_safe_retry.assert_called_once_with(countdown=60) diff --git a/apps/worker/tasks/tests/unit/test_notify_task.py b/apps/worker/tasks/tests/unit/test_notify_task.py index 38aff49364..8bdd41343f 100644 --- a/apps/worker/tasks/tests/unit/test_notify_task.py +++ b/apps/worker/tasks/tests/unit/test_notify_task.py @@ -5,7 +5,8 @@ import httpx import pytest import respx -from celery.exceptions import MaxRetriesExceededError, Retry +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from freezegun import freeze_time from database.enums import Decoration, Notification, NotificationState @@ -27,6 +28,7 @@ from services.decoration import DecorationDetails from services.lock_manager import LockRetry from services.notification import NotificationService +from services.notification.debounce import SKIP_DEBOUNCE_TOKEN from services.notification.notifiers.base import ( AbstractBaseNotifier, NotificationResult, @@ -1057,6 +1059,7 @@ def test_notify_task_no_ghapp_available_one_rate_limited( "tasks.notify.get_repo_provider_service" ) mock_retry = mocker.patch.object(NotifyTask, "retry", return_value=None) + mocker.patch("tasks.notify.get_seconds_to_next_hour", return_value=45 * 60) get_repo_provider_service.side_effect = NoConfiguredAppsAvailable( apps_count=2, rate_limited_count=1, suspended_count=1 ) @@ -1078,6 +1081,7 @@ def test_notify_task_no_ghapp_available_one_rate_limited( current_yaml=current_yaml, ) assert res is None + # When earliest_retry_after_seconds is None, falls back to get_seconds_to_next_hour() mock_retry.assert_called_with(max_retries=10, countdown=45 * 60) mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls( [ @@ -1108,6 +1112,80 @@ def test_notify_task_no_ghapp_available_one_rate_limited( ] ) + @freeze_time("2024-04-22T11:15:00") + def test_notify_task_no_ghapp_available_uses_actual_retry_time( + self, dbsession, mocker, mock_self_app + ): + """Test that notify task uses actual retry time from GitHub API when available.""" + get_repo_provider_service = mocker.patch( + "tasks.notify.get_repo_provider_service" + ) + mock_retry = mocker.patch.object(NotifyTask, "retry", return_value=None) + # Provide actual retry time from Redis (300 seconds = 5 minutes) + get_repo_provider_service.side_effect = NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=300, + ) + commit = CommitFactory.create( + message="", + pullid=None, + branch="test-branch-1", + commitid="649eaaf2924e92dc7fd8d370ddb857033231e67a", + repository__using_integration=True, + ) + dbsession.add(commit) + dbsession.flush() + current_yaml = {"codecov": {"require_ci_to_pass": True}} + task = NotifyTask() + res = task.run_impl_within_lock( + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml=current_yaml, + ) + assert res is None + # Should use actual retry time (300 seconds) instead of falling back to next hour + mock_retry.assert_called_with(max_retries=10, countdown=300) + + @freeze_time("2024-04-22T11:15:00") + def test_notify_task_no_ghapp_available_enforces_minimum_delay( + self, dbsession, mocker, mock_self_app + ): + """Test that notify task enforces minimum 60-second delay even with short retry time.""" + get_repo_provider_service = mocker.patch( + "tasks.notify.get_repo_provider_service" + ) + mock_retry = mocker.patch.object(NotifyTask, "retry", return_value=None) + # Provide retry time less than 60 seconds + get_repo_provider_service.side_effect = NoConfiguredAppsAvailable( + apps_count=2, + rate_limited_count=1, + suspended_count=0, + earliest_retry_after_seconds=30, # Less than minimum + ) + commit = CommitFactory.create( + message="", + pullid=None, + branch="test-branch-1", + commitid="649eaaf2924e92dc7fd8d370ddb857033231e67a", + repository__using_integration=True, + ) + dbsession.add(commit) + dbsession.flush() + current_yaml = {"codecov": {"require_ci_to_pass": True}} + task = NotifyTask() + res = task.run_impl_within_lock( + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml=current_yaml, + ) + assert res is None + # Should enforce minimum 60-second delay + mock_retry.assert_called_with(max_retries=10, countdown=60) + @freeze_time("2024-04-22T11:15:00") def test_notify_task_no_ghapp_available_all_suspended( self, dbsession, mocker, mock_self_app @@ -1286,7 +1364,9 @@ def test_notify_task_max_retries_exceeded( self, dbsession, mocker, mock_repo_provider, mock_self_app ): mocker.patch.object(NotifyTask, "should_wait_longer", return_value=True) - mocker.patch.object(NotifyTask, "retry", side_effect=MaxRetriesExceededError()) + mocker.patch.object( + NotifyTask, "retry", side_effect=CeleryMaxRetriesExceededError() + ) mocked_fetch_and_update_whether_ci_passed = mocker.patch.object( NotifyTask, "fetch_and_update_whether_ci_passed" ) @@ -1360,12 +1440,14 @@ def test_run_impl_unobtainable_lock( m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.notify.LockManager", m) + mocker.patch.object(task, "retry", side_effect=CeleryMaxRetriesExceededError()) res = task.run_impl( dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml=current_yaml, + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert res == { @@ -1464,6 +1546,7 @@ def test_run_impl_can_run_logic(self, dbsession, mock_redis, mocker): repoid=commit.repoid, commitid=commit.commitid, current_yaml=current_yaml, + fencing_token=SKIP_DEBOUNCE_TOKEN, **kwargs, ) assert res == {"notifications": [], "notified": True, "reason": "yay"} @@ -1477,6 +1560,57 @@ def test_run_impl_can_run_logic(self, dbsession, mock_redis, mocker): **kwargs, ) + def test_debounce_retry_includes_all_required_params( + self, dbsession, mock_redis, mocker + ): + """Test that retry kwargs include all required parameters for debounce retry. + + This test ensures that when the debounce logic triggers a retry, all required + parameters are included in the kwargs passed to self.retry(). This prevents + TypeError when Celery attempts to retry the task. + """ + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + mock_redis.get.return_value = False + + task = NotifyTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock retry to capture kwargs + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + current_yaml = {"codecov": {"require_ci_to_pass": True}} + empty_upload = None + + with pytest.raises(Retry): + task.run_impl( + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml=current_yaml, + empty_upload=empty_upload, + # Don't pass fencing_token - let it go through retry path + ) + + # Verify retry was called + assert mock_retry.called + + # CRITICAL: Verify all required params are in kwargs + call_kwargs = mock_retry.call_args[1]["kwargs"] + assert call_kwargs["repoid"] == commit.repoid + assert call_kwargs["commitid"] == commit.commitid + assert call_kwargs["current_yaml"] == current_yaml + assert call_kwargs["empty_upload"] == empty_upload + assert call_kwargs["fencing_token"] == 1 # Token from acquisition + assert mock_retry.call_args[1]["countdown"] == 30 # DEBOUNCE_PERIOD_SECONDS + def test_checkpoints_not_logged_outside_upload_flow( self, dbsession, mock_redis, mocker, mock_checkpoint_submit, mock_configuration ): diff --git a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py index 201cfbb162..b8de8bd72b 100644 --- a/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py +++ b/apps/worker/tasks/tests/unit/test_test_analytics_notifier.py @@ -1,11 +1,22 @@ +from contextlib import contextmanager + import pytest -from celery.exceptions import MaxRetriesExceededError, Retry +from celery.exceptions import MaxRetriesExceededError as CeleryMaxRetriesExceededError +from celery.exceptions import Retry from database.tests.factories import CommitFactory from helpers.notifier import NotifierResult from services.lock_manager import LockRetry +from services.notification.debounce import ( + SKIP_DEBOUNCE_TOKEN, + LockAcquisitionLimitError, +) +from services.test_analytics.ta_timeseries import FailedTestInstance from services.test_results import NotifierTaskResult -from tasks.test_analytics_notifier import TestAnalyticsNotifierTask +from tasks.test_analytics_notifier import ( + TestAnalyticsNotifierTask, + transform_failures, +) @pytest.mark.django_db @@ -23,28 +34,24 @@ def test_lock_retry_will_retry(self, mocker, dbsession, mock_redis): "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.test_analytics_notifier.LockManager", m) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] task = TestAnalyticsNotifierTask() - task.request.retries = 0 # Will retry + task.request.retries = 0 task.request.headers = {} task.max_retries = 5 - # Task should call self.retry() which raises Retry exception with pytest.raises(Retry): task.run_impl( dbsession, @@ -68,18 +75,15 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.test_analytics_notifier.LockManager", m) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] @@ -89,13 +93,9 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded( task.request.headers = {} task.max_retries = 5 - # Mock self.retry() to raise MaxRetriesExceededError when called from LockRetry handler - # We distinguish by checking if max_retries=5 is passed (from LockRetry handler) def retry_side_effect(*args, **kwargs): - # Check if this is the retry call from LockRetry handler (has max_retries=5) if kwargs.get("max_retries") == 5: - raise MaxRetriesExceededError() - # Otherwise, this is the debounce retry - raise Retry exception + raise CeleryMaxRetriesExceededError() raise Retry() mocker.patch.object(task, "retry", side_effect=retry_side_effect) @@ -114,7 +114,7 @@ def retry_side_effect(*args, **kwargs): def test_max_retries_exceeded_from_notification_lock( self, mocker, dbsession, mock_redis ): - """Test that MaxRetriesExceededError from _notification_lock returns graceful failure""" + """Test that LockAcquisitionLimitError from notification_lock returns graceful failure""" commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -126,13 +126,11 @@ def test_max_retries_exceeded_from_notification_lock( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry with max_retries_exceeded=True m = mocker.MagicMock() lock_retry_with_max_exceeded = LockRetry( countdown=0, @@ -148,7 +146,6 @@ def test_max_retries_exceeded_from_notification_lock( ) mocker.patch("tasks.test_analytics_notifier.LockManager", m) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] @@ -185,37 +182,30 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded_fencing_token_check( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # Mock LockManager to raise LockRetry during fencing token check lock acquisition lock_manager_mock = mocker.MagicMock() lock_manager_mock.return_value.locked.return_value.__enter__.side_effect = ( LockRetry(60) ) mocker.patch("tasks.test_analytics_notifier.LockManager", lock_manager_mock) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] - mock_redis.get.return_value = "1" # Fencing token check + mock_redis.get.return_value = "1" task = TestAnalyticsNotifierTask() task.request.retries = 0 task.request.headers = {} task.max_retries = 5 - # Mock self.retry() to raise MaxRetriesExceededError when called from LockRetry handler - # (identified by max_retries=5 parameter) def retry_side_effect(*args, **kwargs): - # Check if this is the retry call from LockRetry handler (has max_retries=5) if kwargs.get("max_retries") == 5: - raise MaxRetriesExceededError() - # Otherwise, this is a normal retry - raise Retry exception + raise CeleryMaxRetriesExceededError() raise Retry() mocker.patch.object(task, "retry", side_effect=retry_side_effect) @@ -224,7 +214,7 @@ def retry_side_effect(*args, **kwargs): dbsession, repoid=commit.repoid, upload_context=upload_context, - fencing_token=1, # Pass fencing token to skip first lock acquisition + fencing_token=SKIP_DEBOUNCE_TOKEN, ) assert result == NotifierTaskResult( @@ -248,13 +238,11 @@ def test_lock_retry_self_retry_raises_max_retries_exceeded_notification_send( "pipeline": "codecov", } - # Mock get_redis_connection to return mock_redis mocker.patch( "tasks.test_analytics_notifier.get_redis_connection", return_value=mock_redis, ) - # First lock acquisition succeeds (fencing token check), second raises LockRetry (notification send) lock_manager_mock = mocker.MagicMock() context_manager = mocker.MagicMock() @@ -263,10 +251,8 @@ def lock_side_effect(*args, **kwargs): lock_side_effect.call_count = 0 lock_side_effect.call_count += 1 if lock_side_effect.call_count == 1: - # First call succeeds (fencing token check) return context_manager else: - # Second call raises LockRetry (notification send) raise LockRetry(60) lock_manager_mock.return_value.locked.return_value.__enter__.side_effect = ( @@ -274,24 +260,152 @@ def lock_side_effect(*args, **kwargs): ) mocker.patch("tasks.test_analytics_notifier.LockManager", lock_manager_mock) - # Mock Redis operations mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ "1" ] - mock_redis.get.return_value = "1" # Fencing token check + mock_redis.get.return_value = "1" task = TestAnalyticsNotifierTask() task.request.retries = 0 task.request.headers = {} task.max_retries = 5 - # Mock self.retry() to raise MaxRetriesExceededError when called from LockRetry handler - # (identified by max_retries=5 parameter) def retry_side_effect(*args, **kwargs): - # Check if this is the retry call from LockRetry handler (has max_retries=5) if kwargs.get("max_retries") == 5: - raise MaxRetriesExceededError() - # Otherwise, this is a normal retry - raise Retry exception + raise CeleryMaxRetriesExceededError() + raise Retry() + + mocker.patch.object(task, "retry", side_effect=retry_side_effect) + + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=SKIP_DEBOUNCE_TOKEN, + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_debounce_retry_includes_all_required_params( + self, mocker, dbsession, mock_redis + ): + """Test that retry kwargs include all required parameters for debounce retry. + + This test ensures that when the debounce logic triggers a retry, all required + parameters are included in the kwargs passed to self.retry(). This prevents + TypeError when Celery attempts to retry the task. + """ + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + mock_redis.get.return_value = False + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock retry to capture kwargs + mock_retry = mocker.patch.object(task, "retry", side_effect=Retry()) + + with pytest.raises(Retry): + task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + # Don't pass fencing_token - let it go through retry path + ) + + # Verify retry was called + assert mock_retry.called + + # CRITICAL: Verify all required params are in kwargs + call_kwargs = mock_retry.call_args[1]["kwargs"] + assert call_kwargs["repoid"] == commit.repoid + assert call_kwargs["upload_context"] == upload_context + assert call_kwargs["fencing_token"] == 1 # Token from acquisition + assert mock_retry.call_args[1]["countdown"] == 30 # DEBOUNCE_PERIOD_SECONDS + + def test_max_retries_exceeded_during_debounce_retry( + self, mocker, dbsession, mock_redis + ): + """Test that max retries exceeded during debounce retry proceeds with notification.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to allow token acquisition + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + mock_redis.get.return_value = "1" # Non-stale token + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=False + ) + + # Mock retry to raise CeleryMaxRetriesExceededError during debounce retry + def retry_side_effect(*args, **kwargs): + if kwargs.get("countdown") == 30: # Debounce retry + raise CeleryMaxRetriesExceededError() raise Retry() mocker.patch.object(task, "retry", side_effect=retry_side_effect) @@ -307,10 +421,444 @@ def retry_side_effect(*args, **kwargs): dbsession, repoid=commit.repoid, upload_context=upload_context, - fencing_token=1, # Pass fencing token to skip first lock acquisition + ) + + # Should proceed with notification despite max retries exceeded + assert result == NotifierTaskResult( + attempted=True, + succeeded=True, + ) + + def test_stale_fencing_token_during_check(self, mocker, dbsession, mock_redis): + """Test that stale fencing token causes task to exit early.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return stale token (current token > fencing_token) + mock_redis.get.return_value = "2" # Current token is 2, but we have token 1 + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock check_fencing_token_stale to return True (token is stale) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=True + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, # Pass fencing token, but it's stale ) assert result == NotifierTaskResult( attempted=False, succeeded=False, ) + + def test_stale_fencing_token_during_notification( + self, mocker, dbsession, mock_redis + ): + """Test that stale fencing token during notification causes task to exit early.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token initially, then stale token + mock_redis.get.return_value = "2" # Current token is 2, but we have token 1 + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + + # Mock check_fencing_token_stale to return False initially, then True + call_count = {"count": 0} + + def stale_check_side_effect(*args, **kwargs): + call_count["count"] += 1 + if call_count["count"] == 1: + return False # First check passes + return True # Second check (during notification) fails + + mocker.patch.object( + task.debouncer, + "check_fencing_token_stale", + side_effect=stale_check_side_effect, + ) + + # Mock notification_preparation to return a mock notifier + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + # Should exit early due to stale token + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_successful_notification_flow(self, mocker, dbsession, mock_redis): + """Test successful notification flow with valid fencing token.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" # Current token matches fencing_token + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=False + ) + + # Mock notification_preparation to return a mock notifier + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + assert result == NotifierTaskResult( + attempted=True, + succeeded=True, + ) + mock_notifier.notify.assert_called_once() + + def test_notification_preparation_returns_none(self, mocker, dbsession, mock_redis): + """Test that when notification_preparation returns None, task exits gracefully.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to succeed + @contextmanager + def mock_lock_succeeds(*args, **kwargs): + yield + + mocker.patch.object( + task.debouncer, "notification_lock", side_effect=mock_lock_succeeds + ) + mocker.patch.object( + task.debouncer, "check_fencing_token_stale", return_value=False + ) + + # Mock notification_preparation to return None + mocker.patch.object(task, "notification_preparation", return_value=None) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_lock_retry_limit_exceeded_during_token_acquisition( + self, mocker, dbsession, mock_redis + ): + """Test that LockAcquisitionLimitError during token acquisition returns graceful failure.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to allow token acquisition attempt + mock_redis.pipeline.return_value.__enter__.return_value.execute.return_value = [ + "1" + ] + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError + mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + # Don't pass fencing_token - let it go through token acquisition path + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_lock_retry_limit_exceeded_during_notification( + self, mocker, dbsession, mock_redis + ): + """Test that LockAcquisitionLimitError during notification lock returns graceful failure.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + upload_context = { + "commit_sha": commit.commitid, + "branch": "test-branch", + "pull_id": 123, + "pipeline": "codecov", + } + + mocker.patch( + "tasks.test_analytics_notifier.get_redis_connection", + return_value=mock_redis, + ) + + # Mock Redis to return non-stale token + mock_redis.get.return_value = "1" + + task = TestAnalyticsNotifierTask() + task.request.retries = 0 + task.max_retries = 5 + + # Mock debouncer's notification_lock to raise LockAcquisitionLimitError during notification + mocker.patch.object( + task.debouncer, + "notification_lock", + side_effect=LockAcquisitionLimitError("Max retries exceeded"), + ) + + # Mock notification_preparation to return a mock notifier + mock_notifier = mocker.MagicMock() + mock_notifier.notify.return_value = NotifierResult.COMMENT_POSTED + mocker.patch.object( + task, "notification_preparation", return_value=mock_notifier + ) + + result = task.run_impl( + dbsession, + repoid=commit.repoid, + upload_context=upload_context, + fencing_token=1, + ) + + assert result == NotifierTaskResult( + attempted=False, + succeeded=False, + ) + + def test_task_initialization(self, mocker): + """Test that task initialization sets up debouncer correctly.""" + mocker.patch("tasks.test_analytics_notifier.BaseCodecovTask.__init__") + + task = TestAnalyticsNotifierTask() + + assert task.debouncer is not None + assert task.debouncer.redis_key_template == "ta_notifier_fence:{}_{}" + assert task.debouncer.debounce_period_seconds == 30 + assert task.debouncer.lock_type.value == "notify" + + +class TestTransformFailures: + """Test the transform_failures helper function.""" + + def test_transform_failures_empty_list(self): + """Test that empty list returns empty list.""" + result = transform_failures([]) + assert result == [] + + def test_transform_failures_with_failure_message(self): + """Test that failure messages are transformed correctly.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test_example.TestClass.test_method", + "failure_message": "AssertionError: Expected 5, got 3\n File /path/to/test.py:42", + "test_id": "test123", + "flags": ["unit"], + "duration_seconds": 1.5, + } + ] + + result = transform_failures(failures) + + assert len(result) == 1 + assert result[0].display_name == "test_example.TestClass.test_method" + assert "\r" not in result[0].failure_message + assert result[0].test_id == "test123" + assert result[0].envs == ["unit"] + assert result[0].duration_seconds == 1.5 + assert result[0].build_url is None + + def test_transform_failures_with_none_failure_message(self): + """Test that None failure_message is handled correctly.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test_example.TestClass.test_method", + "failure_message": None, + "test_id": "test123", + "flags": ["unit"], + "duration_seconds": None, + } + ] + + result = transform_failures(failures) + + assert len(result) == 1 + assert result[0].failure_message is None + assert result[0].duration_seconds == 0 # None becomes 0 + + def test_transform_failures_with_carriage_return(self): + """Test that carriage returns are removed from failure messages.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test_example.TestClass.test_method", + "failure_message": "Line 1\rLine 2\r\nLine 3", + "test_id": "test123", + "flags": [], + "duration_seconds": 0.5, + } + ] + + result = transform_failures(failures) + + assert "\r" not in result[0].failure_message + assert "\n" in result[0].failure_message # \n should remain + + def test_transform_failures_multiple_failures(self): + """Test that multiple failures are transformed correctly.""" + failures: list[FailedTestInstance] = [ + { + "computed_name": "test1", + "failure_message": "Error 1", + "test_id": "id1", + "flags": ["flag1"], + "duration_seconds": 1.0, + }, + { + "computed_name": "test2", + "failure_message": "Error 2", + "test_id": "id2", + "flags": ["flag2"], + "duration_seconds": 2.0, + }, + ] + + result = transform_failures(failures) + + assert len(result) == 2 + assert result[0].display_name == "test1" + assert result[1].display_name == "test2" diff --git a/libs/shared/shared/bots/exceptions.py b/libs/shared/shared/bots/exceptions.py index 1b2f8fea39..cd54e64ff7 100644 --- a/libs/shared/shared/bots/exceptions.py +++ b/libs/shared/shared/bots/exceptions.py @@ -8,11 +8,16 @@ class OwnerWithoutValidBotError(Exception): class NoConfiguredAppsAvailable(Exception): def __init__( - self, apps_count: int, rate_limited_count: int, suspended_count: int + self, + apps_count: int, + rate_limited_count: int, + suspended_count: int, + earliest_retry_after_seconds: int | None = None, ) -> None: self.apps_count = apps_count self.rate_limited_count = rate_limited_count self.suspended_count = suspended_count + self.earliest_retry_after_seconds = earliest_retry_after_seconds class RepositoryWithoutValidBotError(Exception): diff --git a/libs/shared/shared/bots/github_apps.py b/libs/shared/shared/bots/github_apps.py index 6965872c8c..e9be284d4a 100644 --- a/libs/shared/shared/bots/github_apps.py +++ b/libs/shared/shared/bots/github_apps.py @@ -17,7 +17,11 @@ from shared.helpers.redis import get_redis_connection from shared.helpers.sentry import owner_uses_sentry from shared.orms.owner_helper import DjangoSQLAlchemyOwnerWrapper -from shared.rate_limits import determine_if_entity_is_rate_limited, gh_app_key_name +from shared.rate_limits import ( + RATE_LIMIT_REDIS_KEY_PREFIX, + determine_if_entity_is_rate_limited, + gh_app_key_name, +) from shared.typings.oauth_token_types import Token from shared.typings.torngit import GithubInstallationInfo @@ -242,6 +246,95 @@ def _filter_suspended_apps( return list(filter(lambda obj: not obj.is_suspended, apps_to_consider)) +def _partition_apps_by_rate_limit_status( + apps: list[GithubAppInstallation], +) -> tuple[list[GithubAppInstallation], list[GithubAppInstallation]]: + """ + Partition apps into rate-limited and non-rate-limited lists in a single Redis pass. + + This function checks rate limit status once for all apps and returns both lists, + preventing race conditions that could occur if rate limit status changes between + separate calls to check and filter. + + Returns: + tuple: (rate_limited_apps, non_rate_limited_apps) + """ + redis_connection = get_redis_connection() + rate_limited_apps = [] + non_rate_limited_apps = [] + + for app in apps: + is_rate_limited = determine_if_entity_is_rate_limited( + redis_connection, + gh_app_key_name(app_id=app.app_id, installation_id=app.installation_id), + ) + if is_rate_limited: + rate_limited_apps.append(app) + else: + non_rate_limited_apps.append(app) + + return rate_limited_apps, non_rate_limited_apps + + +def _get_rate_limited_apps( + apps: list[GithubAppInstallation], +) -> list[GithubAppInstallation]: + """ + Filter and return only the apps that are currently rate-limited. + + Note: This function is kept for backward compatibility but should be avoided + when you also need to filter apps, as it causes a race condition. Use + _partition_apps_by_rate_limit_status instead. + """ + redis_connection = get_redis_connection() + return [ + app + for app in apps + if determine_if_entity_is_rate_limited( + redis_connection, + gh_app_key_name(app_id=app.app_id, installation_id=app.installation_id), + ) + ] + + +def _get_earliest_rate_limit_ttl( + apps: list[GithubAppInstallation], +) -> int | None: + """ + Get the earliest TTL (time-to-live in seconds) from Redis for rate-limited apps. + + Returns the minimum TTL among all rate-limited apps, or None if no apps are rate-limited + or if Redis operations fail. + """ + if not apps: + return None + + redis_connection = get_redis_connection() + ttl_values = [] + + for app in apps: + key_name = gh_app_key_name( + app_id=app.app_id, installation_id=app.installation_id + ) + redis_key = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name}" + try: + ttl = redis_connection.ttl(redis_key) + if ttl > 0: # TTL > 0 means the key exists and has time remaining + ttl_values.append(ttl) + except Exception: + # If Redis operation fails, log and continue + log.warning( + "Failed to get TTL for rate-limited app", + extra={ + "app_id": app.app_id, + "installation_id": app.installation_id, + }, + exc_info=True, + ) + + return min(ttl_values) if ttl_values else None + + def get_github_app_info_for_owner( owner: Owner, *, @@ -304,9 +397,12 @@ def get_github_app_info_for_owner( owner, installation_name, repository ) apps_matching_criteria_count = len(apps_to_consider) - # We can't use apps that are rate limited - apps_to_consider = _filter_rate_limited_apps(apps_to_consider) - rate_limited_apps_count = apps_matching_criteria_count - len(apps_to_consider) + # Partition apps by rate limit status in a single Redis pass to avoid race conditions + # where rate limit status changes between checking and filtering + rate_limited_apps, apps_to_consider = _partition_apps_by_rate_limit_status( + apps_to_consider + ) + rate_limited_apps_count = len(rate_limited_apps) # We can't use apps that are suspended (by the user) apps_to_consider = _filter_suspended_apps(apps_to_consider) suspended_apps_count = ( @@ -339,10 +435,17 @@ def get_github_app_info_for_owner( elif apps_matching_criteria_count > 0: # There are apps that match the criteria, but we can't use them. # Either they are currently rate limited or they have been suspended. + # Get the earliest retry time from rate-limited apps if any exist + earliest_retry_after_seconds = None + if rate_limited_apps_count > 0: + earliest_retry_after_seconds = _get_earliest_rate_limit_ttl( + rate_limited_apps + ) raise NoConfiguredAppsAvailable( apps_count=apps_matching_criteria_count, rate_limited_count=rate_limited_apps_count, suspended_count=suspended_apps_count, + earliest_retry_after_seconds=earliest_retry_after_seconds, ) # DEPRECATED FLOW - begin if owner.integration_id and ( diff --git a/libs/shared/tests/unit/bots/test_github_apps.py b/libs/shared/tests/unit/bots/test_github_apps.py index 5892aca161..e4827ba798 100644 --- a/libs/shared/tests/unit/bots/test_github_apps.py +++ b/libs/shared/tests/unit/bots/test_github_apps.py @@ -1,10 +1,15 @@ import datetime +from unittest.mock import MagicMock import pytest from django.test import override_settings +from redis import RedisError from shared.bots.exceptions import NoConfiguredAppsAvailable, RequestedGithubAppNotFound from shared.bots.github_apps import ( + _get_earliest_rate_limit_ttl, + _get_rate_limited_apps, + _partition_apps_by_rate_limit_status, get_github_app_info_for_owner, get_github_app_token, get_specific_github_app_details, @@ -18,6 +23,7 @@ ) from shared.django_apps.codecov_auth.tests.factories import OwnerFactory from shared.github import InvalidInstallationError +from shared.rate_limits import RATE_LIMIT_REDIS_KEY_PREFIX, gh_app_key_name from shared.typings.torngit import GithubInstallationInfo @@ -251,3 +257,494 @@ def test_mark_installation_suspended_side_effect_not_called(self, mocker): installations[0].refresh_from_db() installations[1].refresh_from_db() assert all(installation.is_suspended == False for installation in installations) + + +class TestGetRateLimitedApps: + @pytest.mark.django_db + def test_get_rate_limited_apps_empty_list(self): + """Test that empty list returns empty list.""" + assert _get_rate_limited_apps([]) == [] + + @pytest.mark.django_db + def test_get_rate_limited_apps_all_rate_limited(self, mocker): + """Test that all rate-limited apps are returned.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + result = _get_rate_limited_apps([app1, app2]) + assert len(result) == 2 + assert app1 in result + assert app2 in result + + @pytest.mark.django_db + def test_get_rate_limited_apps_none_rate_limited(self, mocker): + """Test that no rate-limited apps returns empty list.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=False, + ) + + result = _get_rate_limited_apps([app1, app2]) + assert result == [] + + @pytest.mark.django_db + def test_get_rate_limited_apps_mixed(self, mocker): + """Test that only rate-limited apps are returned when mixed.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + def is_rate_limited_side_effect(redis_connection, key_name): + # app1 is rate-limited, app2 is not + if "10_1001" in key_name: + return True + return False + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + side_effect=is_rate_limited_side_effect, + ) + + result = _get_rate_limited_apps([app1, app2]) + assert len(result) == 1 + assert app1 in result + assert app2 not in result + + +class TestPartitionAppsByRateLimitStatus: + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_empty_list(self): + """Test that empty list returns two empty lists.""" + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status([]) + assert rate_limited == [] + assert non_rate_limited == [] + + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_all_rate_limited(self, mocker): + """Test that all rate-limited apps are partitioned correctly.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status( + [app1, app2] + ) + assert len(rate_limited) == 2 + assert len(non_rate_limited) == 0 + assert app1 in rate_limited + assert app2 in rate_limited + + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_none_rate_limited(self, mocker): + """Test that no rate-limited apps returns empty rate-limited list.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=False, + ) + + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status( + [app1, app2] + ) + assert len(rate_limited) == 0 + assert len(non_rate_limited) == 2 + assert app1 in non_rate_limited + assert app2 in non_rate_limited + + @pytest.mark.django_db + def test_partition_apps_by_rate_limit_status_mixed(self, mocker): + """Test that apps are correctly partitioned when mixed.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + def is_rate_limited_side_effect(redis_connection, key_name): + # app1 is rate-limited, app2 is not + if "10_1001" in key_name: + return True + return False + + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + side_effect=is_rate_limited_side_effect, + ) + + rate_limited, non_rate_limited = _partition_apps_by_rate_limit_status( + [app1, app2] + ) + assert len(rate_limited) == 1 + assert len(non_rate_limited) == 1 + assert app1 in rate_limited + assert app2 in non_rate_limited + + +class TestGetEarliestRateLimitTtl: + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_empty_list(self): + """Test that empty list returns None.""" + assert _get_earliest_rate_limit_ttl([]) is None + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_single_app(self, mocker): + """Test that single app TTL is returned.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app.save() + + mock_redis = MagicMock() + mock_redis.ttl.return_value = 300 # 5 minutes + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app]) + assert result == 300 + key_name = gh_app_key_name( + app_id=app.app_id, installation_id=app.installation_id + ) + expected_key = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name}" + mock_redis.ttl.assert_called_once_with(expected_key) + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_multiple_apps_returns_minimum(self, mocker): + """Test that minimum TTL is returned when multiple apps have different TTLs.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mock_redis = MagicMock() + key_name1 = gh_app_key_name( + app_id=app1.app_id, installation_id=app1.installation_id + ) + key_name2 = gh_app_key_name( + app_id=app2.app_id, installation_id=app2.installation_id + ) + expected_key1 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name1}" + expected_key2 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name2}" + + def ttl_side_effect(key): + if key == expected_key1: + return 600 # 10 minutes + elif key == expected_key2: + return 180 # 3 minutes (minimum) + return -1 + + mock_redis.ttl.side_effect = ttl_side_effect + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1, app2]) + assert result == 180 # Should return minimum + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_ttl_zero_or_negative_ignored(self, mocker): + """Test that TTL <= 0 is ignored.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mock_redis = MagicMock() + key_name1 = gh_app_key_name( + app_id=app1.app_id, installation_id=app1.installation_id + ) + key_name2 = gh_app_key_name( + app_id=app2.app_id, installation_id=app2.installation_id + ) + expected_key1 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name1}" + expected_key2 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name2}" + + def ttl_side_effect(key): + if key == expected_key1: + return 0 # Key doesn't exist or expired + elif key == expected_key2: + return -1 # Key doesn't exist + return -1 + + mock_redis.ttl.side_effect = ttl_side_effect + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1, app2]) + assert result is None # No valid TTLs + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_redis_exception_continues(self, mocker): + """Test that Redis exceptions are caught and processing continues.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app2 = GithubAppInstallation( + owner=owner, + installation_id=1002, + app_id=11, + pem_path="pem2", + ) + GithubAppInstallation.objects.bulk_create([app1, app2]) + + mock_redis = MagicMock() + key_name1 = gh_app_key_name( + app_id=app1.app_id, installation_id=app1.installation_id + ) + key_name2 = gh_app_key_name( + app_id=app2.app_id, installation_id=app2.installation_id + ) + expected_key1 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name1}" + expected_key2 = f"{RATE_LIMIT_REDIS_KEY_PREFIX}{key_name2}" + + def ttl_side_effect(key): + if key == expected_key1: + raise RedisError("Redis connection failed") + elif key == expected_key2: + return 300 # Valid TTL + return -1 + + mock_redis.ttl.side_effect = ttl_side_effect + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1, app2]) + assert result == 300 # Should return valid TTL from app2 despite app1 failing + + @pytest.mark.django_db + def test_get_earliest_rate_limit_ttl_all_redis_exceptions_returns_none( + self, mocker + ): + """Test that if all Redis operations fail, None is returned.""" + owner = OwnerFactory(service="github") + app1 = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + ) + app1.save() + + mock_redis = MagicMock() + mock_redis.ttl.side_effect = RedisError("Redis connection failed") + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + result = _get_earliest_rate_limit_ttl([app1]) + assert result is None # All operations failed + + +class TestGetGithubAppInfoForOwnerWithEarliestRetryTime: + @pytest.mark.django_db + def test_get_github_app_info_for_owner_sets_earliest_retry_when_rate_limited( + self, mocker + ): + """Test that earliest_retry_after_seconds is set when rate-limited apps exist.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + is_suspended=False, + ) + app.save() + + # Mock rate-limited check to return True + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + # Mock Redis TTL to return a value + mock_redis = MagicMock() + mock_redis.ttl.return_value = 240 # 4 minutes + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + with pytest.raises(NoConfiguredAppsAvailable) as exp: + get_github_app_info_for_owner(owner) + + assert exp.value.apps_count == 1 + assert exp.value.rate_limited_count == 1 + assert exp.value.suspended_count == 0 + assert exp.value.earliest_retry_after_seconds == 240 + + @pytest.mark.django_db + def test_get_github_app_info_for_owner_no_earliest_retry_when_not_rate_limited( + self, mocker + ): + """Test that earliest_retry_after_seconds is None when no rate-limited apps.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + is_suspended=True, # Suspended but not rate-limited + ) + app.save() + + # Mock rate-limited check to return False + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=False, + ) + + with pytest.raises(NoConfiguredAppsAvailable) as exp: + get_github_app_info_for_owner(owner) + + assert exp.value.apps_count == 1 + assert exp.value.rate_limited_count == 0 + assert exp.value.suspended_count == 1 + assert exp.value.earliest_retry_after_seconds is None + + @pytest.mark.django_db + def test_get_github_app_info_for_owner_earliest_retry_none_when_ttl_fails( + self, mocker + ): + """Test that earliest_retry_after_seconds is None when TTL lookup fails.""" + owner = OwnerFactory(service="github") + app = GithubAppInstallation( + owner=owner, + installation_id=1001, + app_id=10, + pem_path="pem1", + is_suspended=False, + ) + app.save() + + # Mock rate-limited check to return True + mocker.patch( + "shared.bots.github_apps.determine_if_entity_is_rate_limited", + return_value=True, + ) + + # Mock Redis TTL to raise exception + mock_redis = MagicMock() + mock_redis.ttl.side_effect = RedisError("Redis connection failed") + mocker.patch( + "shared.bots.github_apps.get_redis_connection", return_value=mock_redis + ) + + with pytest.raises(NoConfiguredAppsAvailable) as exp: + get_github_app_info_for_owner(owner) + + assert exp.value.apps_count == 1 + assert exp.value.rate_limited_count == 1 + assert exp.value.suspended_count == 0 + assert exp.value.earliest_retry_after_seconds is None # TTL lookup failed