Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 228 additions & 0 deletions packages/smithy-core/src/smithy_core/retries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import math
import random
import threading
import time
Expand Down Expand Up @@ -592,3 +593,230 @@ def fill_rate(self) -> float:
:return: The rate at which tokens are added to the bucket (tokens/second).
"""
return self._fill_rate


class CubicCalculator:
"""CubicCalculator calculates a new rate using a modified CUBIC algorithm.

CubicCalculator implements the CUBIC congestion control algorithm for
adaptive rate limiting. It dynamically adjusts request rates based on
throttling responses, reducing rates by 30% when throttled and
gradually increasing rates using a scale function when the request
is successful.
"""

# Scale constant used to scale up requests
_SCALE_CONSTANT = 0.4
# Beta constant used to slow down requests
_BETA = 0.7

def __init__(
self,
starting_max_rate: float = 0.5,
start_time: float | None = None,
):
"""Initialize a new CubicCalculator.

:param starting_max_rate: Initial maximum request per second.
:param start_time: Initial time of the CubicCalculator.
"""
if starting_max_rate <= 0:
raise ValueError(
f"starting_max_rate must be positive, got {starting_max_rate}"
)
if start_time is None:
start_time = time.monotonic()

self._last_max_rate = starting_max_rate
self._last_throttle_time = start_time
self._inflection_point_time = self.calculate_and_update_inflection_point()

def calculate_and_update_inflection_point(self) -> float:
"""Calculate and update the CUBIC inflection point for rate recovery.

After throttling, the CUBIC curve returns to the previous maximum rate
after exactly `_inflection_point_time` seconds. Before this point, the
rate grows slowly. After this point, it grows rapidly.
:return: Inflection point time in seconds for the CUBIC algorithm.
"""
self._inflection_point_time = (
(self._last_max_rate * (1 - self._BETA)) / self._SCALE_CONSTANT
) ** (1 / 3.0)
return self._inflection_point_time

def calculate_scaled_request_rate(self, timestamp: float) -> float:
"""Scale up the request rate after a successful response.

:param timestamp: Timestamp of the response.
:return: New calculated request rate based on CUBIC scaling.
"""
dt = timestamp - self._last_throttle_time
calculated_rate = (
self._SCALE_CONSTANT * ((dt - self._inflection_point_time) ** 3)
) + self._last_max_rate
return calculated_rate

def calculate_throttled_request_rate(
self, rate_to_use: float, timestamp: float
) -> float:
"""Throttle the request rate after a throttled response is received.

:param rate_to_use: Current request rate in use.
:param timestamp: Timestamp of the response.
:return: New calculated request rate based on CUBIC throttling.
"""
calculated_rate = rate_to_use * self._BETA
self._last_max_rate = rate_to_use
self._last_throttle_time = timestamp
return calculated_rate

@property
def last_max_rate(self) -> float:
return self._last_max_rate

@property
def last_throttle_time(self) -> float:
return self._last_throttle_time


class RequestRateTracker:
"""RequestRateTracker tracks the client's request sending rate.

RequestRateTracker measures the actual client request sending rate using
time-bucketed sampling with exponential smoothing. It tracks requests in
half-second intervals by default and calculates a smoothed average rate
to provide accurate measurements for adaptive rate limiting algorithms.
"""

_DEFAULT_SMOOTHING = 0.8
_TIME_BUCKET_RANGE = 0.5

def __init__(
self,
smoothing: float = _DEFAULT_SMOOTHING,
time_bucket_range: float = _TIME_BUCKET_RANGE,
):
"""Initialize a new RequestRateTracker.

:param smoothing: Exponential smoothing factor. This constant
represents how much weight is given to recent measurements.
Higher values place more emphasis on the most recent observations.
:param time_bucket_range: Time bucket duration in seconds. Rate
calculations are updated when transitioning between buckets.
"""
self._smoothing = smoothing
self._time_bucket_scale = 1 / time_bucket_range
self._request_count = 0
self._last_calculated_time_bucket = math.floor(time.monotonic())
self._measured_rate = 0
self._lock = asyncio.Lock()

async def measure_rate(self) -> float:
"""Measure and return the current request rate.

Increments the request count and calculates a new smoothed rate when
transitioning to a new time bucket. Returns the current measured rate
without recalculation if still within the same time bucket.
:return: Current smoothed request rate in requests per second.
"""
async with self._lock:
curr_time = time.monotonic()
current_time_bucket = (
math.floor(curr_time * self._time_bucket_scale)
/ self._time_bucket_scale
)
self._request_count += 1
if current_time_bucket > self._last_calculated_time_bucket:
current_rate = self._request_count / (
current_time_bucket - self._last_calculated_time_bucket
)
self._measured_rate = (current_rate * self._smoothing) + (
self._measured_rate * (1 - self._smoothing)
)
self._request_count = 0
self._last_calculated_time_bucket = current_time_bucket
return self._measured_rate

@property
def request_count(self) -> int:
"""Get current request count. For testing only."""
return self._request_count

@property
def measured_client_rate(self) -> float:
"""Get the client's sending rate. For testing only."""
return self._measured_rate


class ClientRateLimiter:
"""ClientRateLimiter limits the rate of requests.

ClientRateLimiter implements adaptive rate limiting using token bucket and CUBIC
algorithm. It controls request sending rates by acquiring tokens before requests
and dynamically adjusting rates based on service responses - reducing rates when
throttled and increasing rates during successful periods to optimize throughput
while preventing service overload.
"""

_REQUEST_COST = 1.0

def __init__(
self,
token_bucket: TokenBucket,
cubic_calculator: CubicCalculator,
rate_tracker: RequestRateTracker,
rate_limiter_enabled: bool,
):
"""Initialize a new ClientRateLimiter.

:param token_bucket: Token bucket for controlling request sending rates.
:param cubic_calculator: CUBIC algorithm calculator for rate adjustments.
:param rate_tracker: Tracker for measuring actual client request rates.
:param rate_limiter_enabled: Whether rate limiting is enabled.
"""

self._rate_tracker = rate_tracker
self._token_bucket = token_bucket
self._cubic_calculator = cubic_calculator
self._rate_limiter_enabled = rate_limiter_enabled

async def before_sending_request(self) -> None:
"""Acquire a token before making a request."""
if self._rate_limiter_enabled:
await self._token_bucket.acquire(self._REQUEST_COST)

async def after_receiving_response(self, throttling_error: bool) -> None:
"""Update the request rate based on the response using CUBIC algorithm.

Reduces the rate by 30% when throttled, or increases the rate using
CUBIC scaling for successful responses. Updates the token bucket with
the new calculated rate, capped at 2x the measured client rate.
:param throttling_error: True if the response was a throttling error.
"""
measured_rate = await self._rate_tracker.measure_rate()
timestamp = time.monotonic()
if throttling_error:
if not self._rate_limiter_enabled:
rate_to_use = measured_rate
else:
fill_rate = self._token_bucket.fill_rate
rate_to_use = min(measured_rate, fill_rate)

self._cubic_calculator.calculate_and_update_inflection_point()
cubic_calculated_rate = (
self._cubic_calculator.calculate_throttled_request_rate(
rate_to_use, timestamp
)
)
self._rate_limiter_enabled = True
else:
cubic_calculated_rate = (
self._cubic_calculator.calculate_scaled_request_rate(timestamp)
)

new_rate = min(cubic_calculated_rate, 2 * measured_rate)
await self._token_bucket.update_rate(new_rate)

@property
def rate_limit_enabled(self) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some context for me here as to why we are exposing this as a public property?

Copy link
Owner Author

@ubaskota ubaskota Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed earlier today, this is only for testing.

return self._rate_limiter_enabled
Loading
Loading