From 7e65944e54e11248c8ed44b9c46c554fc170fe61 Mon Sep 17 00:00:00 2001 From: ubaskota <19787410+ubaskota@users.noreply.github.com> Date: Thu, 8 Jan 2026 15:10:15 -0500 Subject: [PATCH 1/3] Implement client-side rate limiting for adaptive retries. --- .../smithy-core/src/smithy_core/retries.py | 203 ++++++++++++++ .../smithy-core/tests/unit/test_retries.py | 261 +++++++++++++++++- 2 files changed, 463 insertions(+), 1 deletion(-) diff --git a/packages/smithy-core/src/smithy_core/retries.py b/packages/smithy-core/src/smithy_core/retries.py index d2318851..2d93c768 100644 --- a/packages/smithy-core/src/smithy_core/retries.py +++ b/packages/smithy-core/src/smithy_core/retries.py @@ -1,6 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import asyncio +import math import random import threading import time @@ -592,3 +593,205 @@ def fill_rate(self) -> float: :return: The rate at which tokens are added to the bucket (tokens/second). """ return self._fill_rate + + +class CubicCalculator: + """CubicCalculator calculates a new rate using CUBIC algorithm. + CubicCalculator implements the CUBIC congestion control algorithm for + adaptive rate limiting. It dynamically adjusts request rates based on + throttling responses, reducing rates by 70% when throttled and + gradually increasing rates using a scale function when the request + is successful. + """ + + _SCALE_CONSTANT = 0.4 + _BETA = 0.7 + + def __init__( + self, + starting_max_rate: float, + start_time: float, + scale_constant: float = _SCALE_CONSTANT, + beta: float = _BETA, + ): + """Initialize a new CubicCalculator. + :param starting_max_rate: Initial maximum rate of the CubicCalculator. + :param start_time: Initial time of the CubicCalculator. + :param scale_constant: Scale constant used to scale up requests. + :param beta: Beta constant used to slow down requests. + """ + self._last_max_rate = starting_max_rate + self._last_throttle_time = start_time + self._scale_constant = scale_constant + self._beta = beta + self._inflection_point_time = self.calculate_and_update_inflection_point() + + def calculate_and_update_inflection_point(self) -> float: + """Calculate and update the CUBIC inflection point for rate recovery. + After throttling, the CUBIC curve returns to the previous maximum rate + after exactly `_inflection_point_time` seconds. Before this point, the + rate grows slowly. After this point, it grows rapidly. + :return: Inflection point time in seconds for the CUBIC algorithm. + """ + self._inflection_point_time = ( + (self._last_max_rate * (1 - self._beta)) / self._scale_constant + ) ** (1 / 3.0) + return self._inflection_point_time + + def scale_request(self, timestamp: float) -> float: + """Scale up the request rate after a successful response. + :param timestamp: Timestamp of the response. + :return: New calculated request rate based on CUBIC scaling. + """ + dt = timestamp - self._last_throttle_time + calculated_rate = ( + self._scale_constant * ((dt - self._inflection_point_time) ** 3) + ) + self._last_max_rate + return calculated_rate + + def throttle_request(self, rate_to_use: float, timestamp: float) -> float: + """Throttle the request rate after a throttled response is received. + :param rate_to_use: Current request rate in use. + :param timestamp: Timestamp of the response. + :return: New calculated request rate based on CUBIC throttling. + """ + calculated_rate = rate_to_use * self._beta + self._last_max_rate = rate_to_use + self._last_throttle_time = timestamp + return calculated_rate + + @property + def last_max_rate(self) -> float: + return self._last_max_rate + + @property + def last_throttle_time(self) -> float: + return self._last_throttle_time + + +class RequestRateTracker: + """RequestRateTracker tracks the client's request sending rate. + RequestRateTracker measures the actual client request sending rate using + time-bucketed sampling with exponential smoothing. It tracks requests in + half-second intervals by default and calculates a smoothed average rate + to provide accurate measurements for adaptive rate limiting algorithms. + """ + + _DEFAULT_SMOOTHING = 0.8 + _TIME_BUCKET_RANGE = 0.5 + + def __init__( + self, + smoothing: float = _DEFAULT_SMOOTHING, + time_bucket_range: float = _TIME_BUCKET_RANGE, + ): + """Initialize a new RequestRateTracker. + :param smoothing: Exponential smoothing factor. This constant + represents how much weight is given to recent measurements. + Higher values place more emphasis on the most recent observations. + :param time_bucket_range: Time bucket duration in seconds. Rate + calculations are updated when transitioning between buckets. + """ + self._smoothing = smoothing + self._time_bucket_scale = 1 / time_bucket_range + self._request_count = 0 + self._last_calculated_time_bucket = math.floor(time.monotonic()) + self._measured_rate = 0 + + def measure_rate(self) -> float: + """Measure and return the current request rate. + Increments the request count and calculates a new smoothed rate when + transitioning to a new time bucket. Returns the current measured rate + without recalculation if still within the same time bucket. + :return: Current smoothed request rate in requests per second. + """ + curr_time = time.monotonic() + current_time_bucket = ( + math.floor(curr_time * self._time_bucket_scale) / self._time_bucket_scale + ) + self._request_count += 1 + if current_time_bucket > self._last_calculated_time_bucket: + current_rate = self._request_count / ( + current_time_bucket - self._last_calculated_time_bucket + ) + self._measured_rate = (current_rate * self._smoothing) + ( + self._measured_rate * (1 - self._smoothing) + ) + self._request_count = 0 + self._last_calculated_time_bucket = current_time_bucket + return self._measured_rate + + @property + def request_count(self) -> int: + return self._request_count + + @property + def measured_client_rate(self) -> float: + return self._measured_rate + + +class ClientRateLimiter: + """ClientRateLimiter limits the rate of requests. + ClientRateLimiter implements adaptive rate limiting using token bucket and CUBIC + algorithm. It controls request sending rates by acquiring tokens before requests + and dynamically adjusting rates based on service responses - reducing rates when + throttled and increasing rates during successful periods to optimize throughput + while preventing service overload. + """ + + _REQUEST_COST = 1.0 + + def __init__( + self, + token_bucket: TokenBucket, + cubic_calculator: CubicCalculator, + rate_tracker: RequestRateTracker, + rate_limiter_enabled: bool, + ): + """Initialize a new ClientRateLimiter. + :param token_bucket: Token bucket for controlling request sending rates. + :param cubic_calculator: CUBIC algorithm calculator for rate adjustments. + :param rate_tracker: Tracker for measuring actual client request rates. + :param rate_limiter_enabled: Whether rate limiting is enabled. + """ + + self._rate_tracker = rate_tracker + self._token_bucket = token_bucket + self._cubic_calculator = cubic_calculator + self._rate_limiter_enabled = rate_limiter_enabled + + async def before_sending_request(self) -> None: + """Acquire a token before making a request.""" + if self._rate_limiter_enabled: + await self._token_bucket.acquire(self._REQUEST_COST) + + async def after_receiving_response(self, throttling_error: bool) -> None: + """Update the request rate based on the response using CUBIC algorithm. + Reduces the rate by 70% when throttled, or increases the rate using + CUBIC scaling for successful responses. Updates the token bucket with + the new calculated rate, capped at 2x the measured client rate. + :param throttling_error: True if the response was a throttling error. + """ + measured_rate = self._rate_tracker.measure_rate() + timestamp = time.monotonic() + if throttling_error: + if not self._rate_limiter_enabled: + rate_to_use = measured_rate + else: + fill_rate = self._token_bucket.fill_rate + rate_to_use = min(measured_rate, fill_rate) + + self._cubic_calculator.calculate_and_update_inflection_point() + cubic_calculated_rate = self._cubic_calculator.throttle_request( + rate_to_use, timestamp + ) + self._rate_limiter_enabled = True + else: + cubic_calculated_rate = self._cubic_calculator.scale_request(timestamp) + + new_rate = min(cubic_calculated_rate, 2 * measured_rate) + await self._token_bucket.update_rate(new_rate) + + @property + def rate_limit_enabled(self) -> bool: + return self._rate_limiter_enabled diff --git a/packages/smithy-core/tests/unit/test_retries.py b/packages/smithy-core/tests/unit/test_retries.py index 6341bd89..45b79e8b 100644 --- a/packages/smithy-core/tests/unit/test_retries.py +++ b/packages/smithy-core/tests/unit/test_retries.py @@ -4,9 +4,11 @@ import pytest from smithy_core.exceptions import CallError, RetryError -from smithy_core.retries import ExponentialBackoffJitterType as EBJT from smithy_core.retries import ( + ClientRateLimiter, + CubicCalculator, ExponentialRetryBackoffStrategy, + RequestRateTracker, RetryStrategyOptions, RetryStrategyResolver, SimpleRetryStrategy, @@ -14,6 +16,7 @@ StandardRetryStrategy, TokenBucket, ) +from smithy_core.retries import ExponentialBackoffJitterType as EBJT @pytest.mark.parametrize( @@ -361,3 +364,259 @@ async def test_multiple_refills_over_time( await token_bucket.update_rate(value) assert token_bucket.current_capacity == expected_capacity + + +class TestRateLimiter: + @pytest.mark.asyncio + @pytest.mark.parametrize( + "rate_limiter_enabled, current_bucket_capacity, expected_bucket_capacity_after_acquire", + [ + (True, 2.0, 1.0), + (False, 0, 1.0), + ], + ) + async def test_token_consumption_before_request_scenarios( + self, + rate_limiter_enabled: bool, + current_bucket_capacity: float, + expected_bucket_capacity_after_acquire: float, + ): + token_bucket = TokenBucket(curr_capacity=current_bucket_capacity) + calculator = CubicCalculator(starting_max_rate=1.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=rate_limiter_enabled, + ) + + await limiter.before_sending_request() + assert token_bucket.current_capacity == expected_bucket_capacity_after_acquire + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "throttling_error, rate_limiter_enabled, measured_rate, expected_bucket_fill_rate", + [ + (True, False, 5.0, 3.5), + (True, True, 5.0, 0.5), + (False, True, 5.0, 7.83), + (False, False, 5.0, 7.83), + ], + ) + async def test_token_consumption_after_response_scenarios( + self, + throttling_error: bool, + rate_limiter_enabled: bool, + measured_rate: float, + expected_bucket_fill_rate: float, + ): + with patch("time.monotonic") as mock_time: + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=rate_limiter_enabled, + ) + + with patch.object(tracker, "measure_rate", return_value=measured_rate): + await limiter.after_receiving_response( + throttling_error=throttling_error + ) + + assert round(token_bucket.fill_rate, 2) == expected_bucket_fill_rate + + async def test_throttling_response_enables_rate_limiter(self): + with patch("time.monotonic") as mock_time: + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3, 0.4] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=False, + ) + + await limiter.after_receiving_response(throttling_error=True) + assert limiter.rate_limit_enabled is True + + async def test_calculated_rate_is_capped_at_2x_measured_rate(self): + with patch("time.monotonic") as mock_time: + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3, 0.4] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=True, + ) + + with ( + patch.object(tracker, "measure_rate", return_value=5.0), + patch.object(calculator, "scale_request", return_value=20.0), + patch.object(token_bucket, "update_rate") as mock_update, + ): + await limiter.after_receiving_response(throttling_error=False) + + # calculated rate should be capped at 2x measured rate at 2 * 5.0 = 10.0 + mock_update.assert_called_once_with(10.0) + + async def test_rate_in_success_responses_depend_on_client_rate(self): + with patch("time.monotonic") as mock_time: + # Timestamp usage breakdown: + # [0.0, 0.1] - TokenBucket and RequestRateTracker initialization + # [0.6, 0.7, 0.8] - First after_receiving_response(): measure_rate(), timestamp, update_rate() + # [0.9, 1.0, 1.1] - Second after_receiving_response(): measure_rate(), timestamp, update_rate() + # every after_receiving_response() call utilizes 3 timestamps + mock_time.side_effect = [ + 0.0, + 0.1, + 0.6, + 0.7, + 0.8, + 0.9, + 1.0, + 1.1, + 1.2, + 1.3, + 1.4, + 1.5, + 1.6, + 1.7, + 1.8, + 1.9, + 2.0, + ] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=True, + ) + # At the starting max rate of 10.0, CUBIC algorithm calculates + # the rate to be 8.9, but the request rate is capped at 2x the measured + # client sending rate to prevent unrealistic rate increases. So it will + # always be minimum of twice client's sending rate and cubic calculated + # rate. For the scenarios this test it testing, the measured rate acts as + # the limiting factor, + # so fill_rate = min(cubic_rate, 2 * measured_rate) = 2 * measured_rate. + + for _ in range(4): + await limiter.after_receiving_response(throttling_error=False) + assert token_bucket.fill_rate == tracker.measured_client_rate * 2 + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "rate_limiter_enabled, expected_request_rate", [(True, 0.5), (False, 1.12)] + ) + async def test_different_scenarios_when_throttled( + self, + rate_limiter_enabled: bool, + expected_request_rate: float, + ): + with patch("time.monotonic") as mock_time: + # Timestamp usage breakdown: + # [0.0, 0.1] - TokenBucket and RequestRateTracker initialization + # [0.6, 0.7, 0.8] - First after_receiving_response(): measure_rate(), timestamp, update_rate() + mock_time.side_effect = [0.0, 0.1, 0.6, 0.7, 0.8] + token_bucket = TokenBucket() + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + tracker = RequestRateTracker() + + limiter = ClientRateLimiter( + token_bucket=token_bucket, + cubic_calculator=calculator, + rate_tracker=tracker, + rate_limiter_enabled=rate_limiter_enabled, + ) + # This test is testing two different scenarios + # Scenario 1: + # When throttled with rate limiter enabled, CUBIC uses min(measured_rate, fill_rate) + # as the base rate. Here: min(1.6, 0.5) = 0.5 req/s. After 70% reduction: + # 0.5 * 0.7 = 0.35, but the min rate can't be less than 0.5 so it should be 0.5 req/s. + + # Scenario 2: + # When throttled with rate limiter disabled, CUBIC uses the measured client rate + # (1.6 req/s) directly, then reduces it by 70% (BETA=0.7) to get 1.12 req/s. + # The final rate is capped at min(1.12, 2*1.6) = 1.12 req/s. + await limiter.after_receiving_response(throttling_error=True) + assert round(token_bucket.fill_rate, 2) == expected_request_rate + + +class TestCubicCalculator: + def test_cubic_calculator_initializes(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + assert calculator.last_max_rate == 10.0 + assert calculator.last_throttle_time == 5.0 + + def test_calculate_inflection_point(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + time_window = calculator.calculate_and_update_inflection_point() + assert round(time_window, 2) == 1.96 + + def test_throttle_request(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + result = calculator.throttle_request(rate_to_use=8.0, timestamp=10.0) + + assert result == 8.0 * 0.7 + assert calculator.last_max_rate == 8.0 + assert calculator.last_throttle_time == 10.0 + + def test_request_rate_scales_over_time(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) + calculator.calculate_and_update_inflection_point() + + request_rate_prev = calculator.scale_request(timestamp=6.0) + request_rate_curr = calculator.scale_request(timestamp=8.0) + + assert request_rate_prev < request_rate_curr # Rate should increase over time + + def test_inflection_point_calculation_logic(self): + calculator = CubicCalculator(starting_max_rate=10.0, start_time=0.0) + + time_window = calculator.calculate_and_update_inflection_point() + expected = ((10.0 * 0.3) / 0.4) ** (1 / 3.0) + # verify that the calculated and expected inflection points are same + assert abs(time_window - expected) < 0.001 + + +class TestRequestRateTracker: + def test_measure_rate_same_bucket(self): + with patch("time.monotonic") as mock_time: + # Multiple calls in same time bucket should just increment count + mock_time.side_effect = [0.0, 0.3, 0.4] + tracker = RequestRateTracker() + rate1 = tracker.measure_rate() + rate2 = tracker.measure_rate() + + assert rate1 == 0 + assert rate2 == 0 + assert tracker.request_count == 2 + + def test_measure_rate_new_bucket(self): + with patch("time.monotonic") as mock_time: + # Multiple calls in different time buckets should increment rate and reset count + mock_time.side_effect = [0.0, 0.1, 0.7] + tracker = RequestRateTracker() + + tracker.measure_rate() + rate = tracker.measure_rate() + + assert rate > 0 + assert tracker.request_count == 0 From 4711e1ca78182e52fe8c9c3ecf192939ba068455 Mon Sep 17 00:00:00 2001 From: ubaskota <19787410+ubaskota@users.noreply.github.com> Date: Thu, 8 Jan 2026 16:23:42 -0500 Subject: [PATCH 2/3] Fix docstring formatting --- packages/smithy-core/src/smithy_core/retries.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/smithy-core/src/smithy_core/retries.py b/packages/smithy-core/src/smithy_core/retries.py index 2d93c768..33aca9f7 100644 --- a/packages/smithy-core/src/smithy_core/retries.py +++ b/packages/smithy-core/src/smithy_core/retries.py @@ -597,6 +597,7 @@ def fill_rate(self) -> float: class CubicCalculator: """CubicCalculator calculates a new rate using CUBIC algorithm. + CubicCalculator implements the CUBIC congestion control algorithm for adaptive rate limiting. It dynamically adjusts request rates based on throttling responses, reducing rates by 70% when throttled and @@ -615,6 +616,7 @@ def __init__( beta: float = _BETA, ): """Initialize a new CubicCalculator. + :param starting_max_rate: Initial maximum rate of the CubicCalculator. :param start_time: Initial time of the CubicCalculator. :param scale_constant: Scale constant used to scale up requests. @@ -628,6 +630,7 @@ def __init__( def calculate_and_update_inflection_point(self) -> float: """Calculate and update the CUBIC inflection point for rate recovery. + After throttling, the CUBIC curve returns to the previous maximum rate after exactly `_inflection_point_time` seconds. Before this point, the rate grows slowly. After this point, it grows rapidly. @@ -640,6 +643,7 @@ def calculate_and_update_inflection_point(self) -> float: def scale_request(self, timestamp: float) -> float: """Scale up the request rate after a successful response. + :param timestamp: Timestamp of the response. :return: New calculated request rate based on CUBIC scaling. """ @@ -651,6 +655,7 @@ def scale_request(self, timestamp: float) -> float: def throttle_request(self, rate_to_use: float, timestamp: float) -> float: """Throttle the request rate after a throttled response is received. + :param rate_to_use: Current request rate in use. :param timestamp: Timestamp of the response. :return: New calculated request rate based on CUBIC throttling. @@ -671,6 +676,7 @@ def last_throttle_time(self) -> float: class RequestRateTracker: """RequestRateTracker tracks the client's request sending rate. + RequestRateTracker measures the actual client request sending rate using time-bucketed sampling with exponential smoothing. It tracks requests in half-second intervals by default and calculates a smoothed average rate @@ -686,6 +692,7 @@ def __init__( time_bucket_range: float = _TIME_BUCKET_RANGE, ): """Initialize a new RequestRateTracker. + :param smoothing: Exponential smoothing factor. This constant represents how much weight is given to recent measurements. Higher values place more emphasis on the most recent observations. @@ -700,6 +707,7 @@ def __init__( def measure_rate(self) -> float: """Measure and return the current request rate. + Increments the request count and calculates a new smoothed rate when transitioning to a new time bucket. Returns the current measured rate without recalculation if still within the same time bucket. @@ -732,6 +740,7 @@ def measured_client_rate(self) -> float: class ClientRateLimiter: """ClientRateLimiter limits the rate of requests. + ClientRateLimiter implements adaptive rate limiting using token bucket and CUBIC algorithm. It controls request sending rates by acquiring tokens before requests and dynamically adjusting rates based on service responses - reducing rates when @@ -749,6 +758,7 @@ def __init__( rate_limiter_enabled: bool, ): """Initialize a new ClientRateLimiter. + :param token_bucket: Token bucket for controlling request sending rates. :param cubic_calculator: CUBIC algorithm calculator for rate adjustments. :param rate_tracker: Tracker for measuring actual client request rates. @@ -767,6 +777,7 @@ async def before_sending_request(self) -> None: async def after_receiving_response(self, throttling_error: bool) -> None: """Update the request rate based on the response using CUBIC algorithm. + Reduces the rate by 70% when throttled, or increases the rate using CUBIC scaling for successful responses. Updates the token bucket with the new calculated rate, capped at 2x the measured client rate. From e7adf7ec01974b2b4884a23374250283b7cbdee7 Mon Sep 17 00:00:00 2001 From: ubaskota <19787410+ubaskota@users.noreply.github.com> Date: Thu, 15 Jan 2026 21:46:08 -0500 Subject: [PATCH 3/3] Address reviews --- .../smithy-core/src/smithy_core/retries.py | 86 +++++++++++-------- .../smithy-core/tests/unit/test_retries.py | 24 +++--- 2 files changed, 64 insertions(+), 46 deletions(-) diff --git a/packages/smithy-core/src/smithy_core/retries.py b/packages/smithy-core/src/smithy_core/retries.py index 33aca9f7..a185b9ce 100644 --- a/packages/smithy-core/src/smithy_core/retries.py +++ b/packages/smithy-core/src/smithy_core/retries.py @@ -596,36 +596,39 @@ def fill_rate(self) -> float: class CubicCalculator: - """CubicCalculator calculates a new rate using CUBIC algorithm. + """CubicCalculator calculates a new rate using a modified CUBIC algorithm. CubicCalculator implements the CUBIC congestion control algorithm for adaptive rate limiting. It dynamically adjusts request rates based on - throttling responses, reducing rates by 70% when throttled and + throttling responses, reducing rates by 30% when throttled and gradually increasing rates using a scale function when the request is successful. """ + # Scale constant used to scale up requests _SCALE_CONSTANT = 0.4 + # Beta constant used to slow down requests _BETA = 0.7 def __init__( self, - starting_max_rate: float, - start_time: float, - scale_constant: float = _SCALE_CONSTANT, - beta: float = _BETA, + starting_max_rate: float = 0.5, + start_time: float | None = None, ): """Initialize a new CubicCalculator. - :param starting_max_rate: Initial maximum rate of the CubicCalculator. + :param starting_max_rate: Initial maximum request per second. :param start_time: Initial time of the CubicCalculator. - :param scale_constant: Scale constant used to scale up requests. - :param beta: Beta constant used to slow down requests. """ + if starting_max_rate <= 0: + raise ValueError( + f"starting_max_rate must be positive, got {starting_max_rate}" + ) + if start_time is None: + start_time = time.monotonic() + self._last_max_rate = starting_max_rate self._last_throttle_time = start_time - self._scale_constant = scale_constant - self._beta = beta self._inflection_point_time = self.calculate_and_update_inflection_point() def calculate_and_update_inflection_point(self) -> float: @@ -637,11 +640,11 @@ def calculate_and_update_inflection_point(self) -> float: :return: Inflection point time in seconds for the CUBIC algorithm. """ self._inflection_point_time = ( - (self._last_max_rate * (1 - self._beta)) / self._scale_constant + (self._last_max_rate * (1 - self._BETA)) / self._SCALE_CONSTANT ) ** (1 / 3.0) return self._inflection_point_time - def scale_request(self, timestamp: float) -> float: + def calculate_scaled_request_rate(self, timestamp: float) -> float: """Scale up the request rate after a successful response. :param timestamp: Timestamp of the response. @@ -649,18 +652,20 @@ def scale_request(self, timestamp: float) -> float: """ dt = timestamp - self._last_throttle_time calculated_rate = ( - self._scale_constant * ((dt - self._inflection_point_time) ** 3) + self._SCALE_CONSTANT * ((dt - self._inflection_point_time) ** 3) ) + self._last_max_rate return calculated_rate - def throttle_request(self, rate_to_use: float, timestamp: float) -> float: + def calculate_throttled_request_rate( + self, rate_to_use: float, timestamp: float + ) -> float: """Throttle the request rate after a throttled response is received. :param rate_to_use: Current request rate in use. :param timestamp: Timestamp of the response. :return: New calculated request rate based on CUBIC throttling. """ - calculated_rate = rate_to_use * self._beta + calculated_rate = rate_to_use * self._BETA self._last_max_rate = rate_to_use self._last_throttle_time = timestamp return calculated_rate @@ -704,8 +709,9 @@ def __init__( self._request_count = 0 self._last_calculated_time_bucket = math.floor(time.monotonic()) self._measured_rate = 0 + self._lock = asyncio.Lock() - def measure_rate(self) -> float: + async def measure_rate(self) -> float: """Measure and return the current request rate. Increments the request count and calculates a new smoothed rate when @@ -713,28 +719,32 @@ def measure_rate(self) -> float: without recalculation if still within the same time bucket. :return: Current smoothed request rate in requests per second. """ - curr_time = time.monotonic() - current_time_bucket = ( - math.floor(curr_time * self._time_bucket_scale) / self._time_bucket_scale - ) - self._request_count += 1 - if current_time_bucket > self._last_calculated_time_bucket: - current_rate = self._request_count / ( - current_time_bucket - self._last_calculated_time_bucket - ) - self._measured_rate = (current_rate * self._smoothing) + ( - self._measured_rate * (1 - self._smoothing) + async with self._lock: + curr_time = time.monotonic() + current_time_bucket = ( + math.floor(curr_time * self._time_bucket_scale) + / self._time_bucket_scale ) - self._request_count = 0 - self._last_calculated_time_bucket = current_time_bucket - return self._measured_rate + self._request_count += 1 + if current_time_bucket > self._last_calculated_time_bucket: + current_rate = self._request_count / ( + current_time_bucket - self._last_calculated_time_bucket + ) + self._measured_rate = (current_rate * self._smoothing) + ( + self._measured_rate * (1 - self._smoothing) + ) + self._request_count = 0 + self._last_calculated_time_bucket = current_time_bucket + return self._measured_rate @property def request_count(self) -> int: + """Get current request count. For testing only.""" return self._request_count @property def measured_client_rate(self) -> float: + """Get the client's sending rate. For testing only.""" return self._measured_rate @@ -778,12 +788,12 @@ async def before_sending_request(self) -> None: async def after_receiving_response(self, throttling_error: bool) -> None: """Update the request rate based on the response using CUBIC algorithm. - Reduces the rate by 70% when throttled, or increases the rate using + Reduces the rate by 30% when throttled, or increases the rate using CUBIC scaling for successful responses. Updates the token bucket with the new calculated rate, capped at 2x the measured client rate. :param throttling_error: True if the response was a throttling error. """ - measured_rate = self._rate_tracker.measure_rate() + measured_rate = await self._rate_tracker.measure_rate() timestamp = time.monotonic() if throttling_error: if not self._rate_limiter_enabled: @@ -793,12 +803,16 @@ async def after_receiving_response(self, throttling_error: bool) -> None: rate_to_use = min(measured_rate, fill_rate) self._cubic_calculator.calculate_and_update_inflection_point() - cubic_calculated_rate = self._cubic_calculator.throttle_request( - rate_to_use, timestamp + cubic_calculated_rate = ( + self._cubic_calculator.calculate_throttled_request_rate( + rate_to_use, timestamp + ) ) self._rate_limiter_enabled = True else: - cubic_calculated_rate = self._cubic_calculator.scale_request(timestamp) + cubic_calculated_rate = ( + self._cubic_calculator.calculate_scaled_request_rate(timestamp) + ) new_rate = min(cubic_calculated_rate, 2 * measured_rate) await self._token_bucket.update_rate(new_rate) diff --git a/packages/smithy-core/tests/unit/test_retries.py b/packages/smithy-core/tests/unit/test_retries.py index 45b79e8b..6ae98ffb 100644 --- a/packages/smithy-core/tests/unit/test_retries.py +++ b/packages/smithy-core/tests/unit/test_retries.py @@ -465,7 +465,9 @@ async def test_calculated_rate_is_capped_at_2x_measured_rate(self): with ( patch.object(tracker, "measure_rate", return_value=5.0), - patch.object(calculator, "scale_request", return_value=20.0), + patch.object( + calculator, "calculate_scaled_request_rate", return_value=20.0 + ), patch.object(token_bucket, "update_rate") as mock_update, ): await limiter.after_receiving_response(throttling_error=False) @@ -572,7 +574,9 @@ def test_calculate_inflection_point(self): def test_throttle_request(self): calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) - result = calculator.throttle_request(rate_to_use=8.0, timestamp=10.0) + result = calculator.calculate_throttled_request_rate( + rate_to_use=8.0, timestamp=10.0 + ) assert result == 8.0 * 0.7 assert calculator.last_max_rate == 8.0 @@ -582,8 +586,8 @@ def test_request_rate_scales_over_time(self): calculator = CubicCalculator(starting_max_rate=10.0, start_time=5.0) calculator.calculate_and_update_inflection_point() - request_rate_prev = calculator.scale_request(timestamp=6.0) - request_rate_curr = calculator.scale_request(timestamp=8.0) + request_rate_prev = calculator.calculate_scaled_request_rate(timestamp=6.0) + request_rate_curr = calculator.calculate_scaled_request_rate(timestamp=8.0) assert request_rate_prev < request_rate_curr # Rate should increase over time @@ -597,26 +601,26 @@ def test_inflection_point_calculation_logic(self): class TestRequestRateTracker: - def test_measure_rate_same_bucket(self): + async def test_measure_rate_same_bucket(self): with patch("time.monotonic") as mock_time: # Multiple calls in same time bucket should just increment count mock_time.side_effect = [0.0, 0.3, 0.4] tracker = RequestRateTracker() - rate1 = tracker.measure_rate() - rate2 = tracker.measure_rate() + rate1 = await tracker.measure_rate() + rate2 = await tracker.measure_rate() assert rate1 == 0 assert rate2 == 0 assert tracker.request_count == 2 - def test_measure_rate_new_bucket(self): + async def test_measure_rate_new_bucket(self): with patch("time.monotonic") as mock_time: # Multiple calls in different time buckets should increment rate and reset count mock_time.side_effect = [0.0, 0.1, 0.7] tracker = RequestRateTracker() - tracker.measure_rate() - rate = tracker.measure_rate() + await tracker.measure_rate() + rate = await tracker.measure_rate() assert rate > 0 assert tracker.request_count == 0