diff --git a/packages/smithy-core/src/smithy_core/retries.py b/packages/smithy-core/src/smithy_core/retries.py index d2318851..a185b9ce 100644 --- a/packages/smithy-core/src/smithy_core/retries.py +++ b/packages/smithy-core/src/smithy_core/retries.py @@ -1,6 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import asyncio +import math import random import threading import time @@ -592,3 +593,230 @@ def fill_rate(self) -> float: :return: The rate at which tokens are added to the bucket (tokens/second). """ return self._fill_rate + + +class CubicCalculator: + """CubicCalculator calculates a new rate using a modified CUBIC algorithm. + + CubicCalculator implements the CUBIC congestion control algorithm for + adaptive rate limiting. It dynamically adjusts request rates based on + throttling responses, reducing rates by 30% when throttled and + gradually increasing rates using a scale function when the request + is successful. + """ + + # Scale constant used to scale up requests + _SCALE_CONSTANT = 0.4 + # Beta constant used to slow down requests + _BETA = 0.7 + + def __init__( + self, + starting_max_rate: float = 0.5, + start_time: float | None = None, + ): + """Initialize a new CubicCalculator. + + :param starting_max_rate: Initial maximum request per second. + :param start_time: Initial time of the CubicCalculator. + """ + if starting_max_rate <= 0: + raise ValueError( + f"starting_max_rate must be positive, got {starting_max_rate}" + ) + if start_time is None: + start_time = time.monotonic() + + self._last_max_rate = starting_max_rate + self._last_throttle_time = start_time + self._inflection_point_time = self.calculate_and_update_inflection_point() + + def calculate_and_update_inflection_point(self) -> float: + """Calculate and update the CUBIC inflection point for rate recovery. + + After throttling, the CUBIC curve returns to the previous maximum rate + after exactly `_inflection_point_time` seconds. Before this point, the + rate grows slowly. After this point, it grows rapidly. + :return: Inflection point time in seconds for the CUBIC algorithm. + """ + self._inflection_point_time = ( + (self._last_max_rate * (1 - self._BETA)) / self._SCALE_CONSTANT + ) ** (1 / 3.0) + return self._inflection_point_time + + def calculate_scaled_request_rate(self, timestamp: float) -> float: + """Scale up the request rate after a successful response. + + :param timestamp: Timestamp of the response. + :return: New calculated request rate based on CUBIC scaling. + """ + dt = timestamp - self._last_throttle_time + calculated_rate = ( + self._SCALE_CONSTANT * ((dt - self._inflection_point_time) ** 3) + ) + self._last_max_rate + return calculated_rate + + def calculate_throttled_request_rate( + self, rate_to_use: float, timestamp: float + ) -> float: + """Throttle the request rate after a throttled response is received. + + :param rate_to_use: Current request rate in use. + :param timestamp: Timestamp of the response. + :return: New calculated request rate based on CUBIC throttling. + """ + calculated_rate = rate_to_use * self._BETA + self._last_max_rate = rate_to_use + self._last_throttle_time = timestamp + return calculated_rate + + @property + def last_max_rate(self) -> float: + return self._last_max_rate + + @property + def last_throttle_time(self) -> float: + return self._last_throttle_time + + +class RequestRateTracker: + """RequestRateTracker tracks the client's request sending rate. + + RequestRateTracker measures the actual client request sending rate using + time-bucketed sampling with exponential smoothing. It tracks requests in + half-second intervals by default and calculates a smoothed average rate + to provide accurate measurements for adaptive rate limiting algorithms. + """ + + _DEFAULT_SMOOTHING = 0.8 + _TIME_BUCKET_RANGE = 0.5 + + def __init__( + self, + smoothing: float = _DEFAULT_SMOOTHING, + time_bucket_range: float = _TIME_BUCKET_RANGE, + ): + """Initialize a new RequestRateTracker. + + :param smoothing: Exponential smoothing factor. This constant + represents how much weight is given to recent measurements. + Higher values place more emphasis on the most recent observations. + :param time_bucket_range: Time bucket duration in seconds. Rate + calculations are updated when transitioning between buckets. + """ + self._smoothing = smoothing + self._time_bucket_scale = 1 / time_bucket_range + self._request_count = 0 + self._last_calculated_time_bucket = math.floor(time.monotonic()) + self._measured_rate = 0 + self._lock = asyncio.Lock() + + async def measure_rate(self) -> float: + """Measure and return the current request rate. + + Increments the request count and calculates a new smoothed rate when + transitioning to a new time bucket. Returns the current measured rate + without recalculation if still within the same time bucket. + :return: Current smoothed request rate in requests per second. + """ + async with self._lock: + curr_time = time.monotonic() + current_time_bucket = ( + math.floor(curr_time * self._time_bucket_scale) + / self._time_bucket_scale + ) + self._request_count += 1 + if current_time_bucket > self._last_calculated_time_bucket: + current_rate = self._request_count / ( + current_time_bucket - self._last_calculated_time_bucket + ) + self._measured_rate = (current_rate * self._smoothing) + ( + self._measured_rate * (1 - self._smoothing) + ) + self._request_count = 0 + self._last_calculated_time_bucket = current_time_bucket + return self._measured_rate + + @property + def request_count(self) -> int: + """Get current request count. For testing only.""" + return self._request_count + + @property + def measured_client_rate(self) -> float: + """Get the client's sending rate. For testing only.""" + return self._measured_rate + + +class ClientRateLimiter: + """ClientRateLimiter limits the rate of requests. + + ClientRateLimiter implements adaptive rate limiting using token bucket and CUBIC + algorithm. It controls request sending rates by acquiring tokens before requests + and dynamically adjusting rates based on service responses - reducing rates when + throttled and increasing rates during successful periods to optimize throughput + while preventing service overload. + """ + + _REQUEST_COST = 1.0 + + def __init__( + self, + token_bucket: TokenBucket, + cubic_calculator: CubicCalculator, + rate_tracker: RequestRateTracker, + rate_limiter_enabled: bool, + ): + """Initialize a new ClientRateLimiter. + + :param token_bucket: Token bucket for controlling request sending rates. + :param cubic_calculator: CUBIC algorithm calculator for rate adjustments. + :param rate_tracker: Tracker for measuring actual client request rates. + :param rate_limiter_enabled: Whether rate limiting is enabled. + """ + + self._rate_tracker = rate_tracker + self._token_bucket = token_bucket + self._cubic_calculator = cubic_calculator + self._rate_limiter_enabled = rate_limiter_enabled + + async def before_sending_request(self) -> None: + """Acquire a token before making a request.""" + if self._rate_limiter_enabled: + await self._token_bucket.acquire(self._REQUEST_COST) + + async def after_receiving_response(self, throttling_error: bool) -> None: + """Update the request rate based on the response using CUBIC algorithm. + + Reduces the rate by 30% when throttled, or increases the rate using + CUBIC scaling for successful responses. Updates the token bucket with + the new calculated rate, capped at 2x the measured client rate. + :param throttling_error: True if the response was a throttling error. + """ + measured_rate = await self._rate_tracker.measure_rate() + timestamp = time.monotonic() + if throttling_error: + if not self._rate_limiter_enabled: + rate_to_use = measured_rate + else: + fill_rate = self._token_bucket.fill_rate + rate_to_use = min(measured_rate, fill_rate) + + self._cubic_calculator.calculate_and_update_inflection_point() + cubic_calculated_rate = ( + self._cubic_calculator.calculate_throttled_request_rate( + rate_to_use, timestamp + ) + ) + self._rate_limiter_enabled = True + else: + cubic_calculated_rate = ( + self._cubic_calculator.calculate_scaled_request_rate(timestamp) + ) + + new_rate = min(cubic_calculated_rate, 2 * measured_rate) + await self._token_bucket.update_rate(new_rate) + + @property + def rate_limit_enabled(self) -> bool: + return self._rate_limiter_enabled diff --git a/packages/smithy-core/tests/unit/test_retries.py b/packages/smithy-core/tests/unit/test_retries.py index 6341bd89..6ae98ffb 100644 --- a/packages/smithy-core/tests/unit/test_retries.py +++ b/packages/smithy-core/tests/unit/test_retries.py @@ -4,9 +4,11 @@ import pytest from smithy_core.exceptions import CallError, RetryError -from smithy_core.retries import ExponentialBackoffJitterType as EBJT from smithy_core.retries import ( + ClientRateLimiter, + CubicCalculator, ExponentialRetryBackoffStrategy, + RequestRateTracker, RetryStrategyOptions, RetryStrategyResolver, SimpleRetryStrategy, @@ -14,6 +16,7 @@ StandardRetryStrategy, TokenBucket, ) +from smithy_core.retries import ExponentialBackoffJitterType as EBJT @pytest.mark.parametrize( @@ -361,3 +364,263 @@ async def test_multiple_refills_over_time( await token_bucket.update_rate(value) assert token_bucket.current_capacity == expected_capacity + + +class TestRateLimiter: + @pytest.mark.asyncio + @pytest.mark.parametrize( + "rate_limiter_enabled, current_bucket_capacity, expected_bucket_capacity_after_acquire", + [ + (True, 2.0, 1.0), + (False, 0, 1.0), + ], + ) + async def test_token_consumption_before_request_scenarios( + self, + rate_limiter_enabled: bool, + current_bucket_capacity: float, + expected_bucket_capacity_after_acquire: float, + ): + token_bucket = TokenBucket(curr_capacity=current_bucket_capacity) + calculator = CubicCalculator(starting_max_rate=1.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=rate_limiter_enabled, + ) + + await limiter.before_sending_request() + assert token_bucket.current_capacity == expected_bucket_capacity_after_acquire + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "throttling_error, rate_limiter_enabled, measured_rate, expected_bucket_fill_rate", + [ + (True, False, 5.0, 3.5), + (True, True, 5.0, 0.5), + (False, True, 5.0, 7.83), + (False, False, 5.0, 7.83), + ], + ) + async def test_token_consumption_after_response_scenarios( + self, + throttling_error: bool, + rate_limiter_enabled: bool, + measured_rate: float, + expected_bucket_fill_rate: float, + ): + with patch("time.monotonic") as mock_time: + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=rate_limiter_enabled, + ) + + with patch.object(tracker, "measure_rate", return_value=measured_rate): + await limiter.after_receiving_response( + throttling_error=throttling_error + ) + + assert round(token_bucket.fill_rate, 2) == expected_bucket_fill_rate + + async def test_throttling_response_enables_rate_limiter(self): + with patch("time.monotonic") as mock_time: + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3, 0.4] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=False, + ) + + await limiter.after_receiving_response(throttling_error=True) + assert limiter.rate_limit_enabled is True + + async def test_calculated_rate_is_capped_at_2x_measured_rate(self): + with patch("time.monotonic") as mock_time: + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3, 0.4] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=True, + ) + + with ( + patch.object(tracker, "measure_rate", return_value=5.0), + patch.object( + calculator, "calculate_scaled_request_rate", return_value=20.0 + ), + patch.object(token_bucket, "update_rate") as mock_update, + ): + await limiter.after_receiving_response(throttling_error=False) + + # calculated rate should be capped at 2x measured rate at 2 * 5.0 = 10.0 + mock_update.assert_called_once_with(10.0) + + async def test_rate_in_success_responses_depend_on_client_rate(self): + with patch("time.monotonic") as mock_time: + # Timestamp usage breakdown: + # [0.0, 0.1] - TokenBucket and RequestRateTracker initialization + # [0.6, 0.7, 0.8] - First after_receiving_response(): measure_rate(), timestamp, update_rate() + # [0.9, 1.0, 1.1] - Second after_receiving_response(): measure_rate(), timestamp, update_rate() + # every after_receiving_response() call utilizes 3 timestamps + mock_time.side_effect = [ + 0.0, + 0.1, + 0.6, + 0.7, + 0.8, + 0.9, + 1.0, + 1.1, + 1.2, + 1.3, + 1.4, + 1.5, + 1.6, + 1.7, + 1.8, + 1.9, + 2.0, + ] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=True, + ) + # At the starting max rate of 10.0, CUBIC algorithm calculates + # the rate to be 8.9, but the request rate is capped at 2x the measured + # client sending rate to prevent unrealistic rate increases. So it will + # always be minimum of twice client's sending rate and cubic calculated + # rate. For the scenarios this test it testing, the measured rate acts as + # the limiting factor, + # so fill_rate = min(cubic_rate, 2 * measured_rate) = 2 * measured_rate. + + for _ in range(4): + await limiter.after_receiving_response(throttling_error=False) + assert token_bucket.fill_rate == tracker.measured_client_rate * 2 + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "rate_limiter_enabled, expected_request_rate", [(True, 0.5), (False, 1.12)] + ) + async def test_different_scenarios_when_throttled( + self, + rate_limiter_enabled: bool, + expected_request_rate: float, + ): + with patch("time.monotonic") as mock_time: + # Timestamp usage breakdown: + # [0.0, 0.1] - TokenBucket and RequestRateTracker initialization + # [0.6, 0.7, 0.8] - First after_receiving_response(): measure_rate(), timestamp, update_rate() + mock_time.side_effect = [0.0, 0.1, 0.6, 0.7, 0.8] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=rate_limiter_enabled, + ) + # This test is testing two different scenarios + # Scenario 1: + # When throttled with rate limiter enabled, CUBIC uses min(measured_rate, fill_rate) + # as the base rate. Here: min(1.6, 0.5) = 0.5 req/s. After 70% reduction: + # 0.5 * 0.7 = 0.35, but the min rate can't be less than 0.5 so it should be 0.5 req/s. + + # Scenario 2: + # When throttled with rate limiter disabled, CUBIC uses the measured client rate + # (1.6 req/s) directly, then reduces it by 70% (BETA=0.7) to get 1.12 req/s. + # The final rate is capped at min(1.12, 2*1.6) = 1.12 req/s. + await limiter.after_receiving_response(throttling_error=True) + assert round(token_bucket.fill_rate, 2) == expected_request_rate + + +class TestCubicCalculator: + def test_cubic_calculator_initializes(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + assert calculator.last_max_rate == 10.0 + assert calculator.last_throttle_time == 5.0 + + def test_calculate_inflection_point(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + time_window = calculator.calculate_and_update_inflection_point() + assert round(time_window, 2) == 1.96 + + def test_throttle_request(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + result = calculator.calculate_throttled_request_rate( + rate_to_use=8.0, timestamp=10.0 + ) + + assert result == 8.0 * 0.7 + assert calculator.last_max_rate == 8.0 + assert calculator.last_throttle_time == 10.0 + + def test_request_rate_scales_over_time(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + calculator.calculate_and_update_inflection_point() + + request_rate_prev = calculator.calculate_scaled_request_rate(timestamp=6.0) + request_rate_curr = calculator.calculate_scaled_request_rate(timestamp=8.0) + + assert request_rate_prev < request_rate_curr # Rate should increase over time + + def test_inflection_point_calculation_logic(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + + time_window = calculator.calculate_and_update_inflection_point() + expected = ((10.0 * 0.3) / 0.4) ** (1 / 3.0) + # verify that the calculated and expected inflection points are same + assert abs(time_window - expected) < 0.001 + + +class TestRequestRateTracker: + async def test_measure_rate_same_bucket(self): + with patch("time.monotonic") as mock_time: + # Multiple calls in same time bucket should just increment count + mock_time.side_effect = [0.0, 0.3, 0.4] + tracker = RequestRateTracker() + rate1 = await tracker.measure_rate() + rate2 = await tracker.measure_rate() + + assert rate1 == 0 + assert rate2 == 0 + assert tracker.request_count == 2 + + async def test_measure_rate_new_bucket(self): + with patch("time.monotonic") as mock_time: + # Multiple calls in different time buckets should increment rate and reset count + mock_time.side_effect = [0.0, 0.1, 0.7] + tracker = RequestRateTracker() + + await tracker.measure_rate() + rate = await tracker.measure_rate() + + assert rate > 0 + assert tracker.request_count == 0