Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions packages/smithy-core/src/smithy_core/retries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import math
import random
import threading
import time
Expand Down Expand Up @@ -592,3 +593,221 @@ def fill_rate(self) -> float:
:return: The rate at which tokens are added to the bucket (tokens/second).
"""
return self._fill_rate


class CubicCalculator:
"""CubicCalculator calculates a new rate using CUBIC algorithm.

CubicCalculator implements the CUBIC congestion control algorithm for
adaptive rate limiting. It dynamically adjusts request rates based on
throttling responses, reducing rates by 70% when throttled and
gradually increasing rates using a scale function when the request
is successful.
"""

_SCALE_CONSTANT = 0.4
_BETA = 0.7

def __init__(
self,
starting_max_rate: float,
start_time: float,
scale_constant: float = _SCALE_CONSTANT,
beta: float = _BETA,
):
"""Initialize a new CubicCalculator.

:param starting_max_rate: Initial maximum rate of the CubicCalculator.
:param start_time: Initial time of the CubicCalculator.
:param scale_constant: Scale constant used to scale up requests.
:param beta: Beta constant used to slow down requests.
"""
self._last_max_rate = starting_max_rate
self._last_throttle_time = start_time
self._scale_constant = scale_constant
self._beta = beta
self._inflection_point_time = self.calculate_and_update_inflection_point()

def calculate_and_update_inflection_point(self) -> float:
"""Calculate and update the CUBIC inflection point for rate recovery.

After throttling, the CUBIC curve returns to the previous maximum rate
after exactly `_inflection_point_time` seconds. Before this point, the
rate grows slowly. After this point, it grows rapidly.

:return: Inflection point time in seconds for the CUBIC algorithm.
"""
self._inflection_point_time = (
(self._last_max_rate * (1 - self._beta)) / self._scale_constant
) ** (1 / 3.0)
return self._inflection_point_time

def scale_request(self, timestamp: float) -> float:
"""Scale up the request rate after a successful response.

:param timestamp: Timestamp of the response.

:return: New calculated request rate based on CUBIC scaling.
"""
dt = timestamp - self._last_throttle_time
calculated_rate = (
self._scale_constant * ((dt - self._inflection_point_time) ** 3)
) + self._last_max_rate
return calculated_rate

def throttle_request(self, rate_to_use: float, timestamp: float) -> float:
"""Throttle the request rate after a throttled response is received.

:param rate_to_use: Current request rate in use.
:param timestamp: Timestamp of the response.

:return: New calculated request rate based on CUBIC throttling.
"""
calculated_rate = rate_to_use * self._beta
self._last_max_rate = rate_to_use
self._last_throttle_time = timestamp
return calculated_rate

@property
def last_max_rate(self) -> float:
return self._last_max_rate

@property
def last_throttle_time(self) -> float:
return self._last_throttle_time


class RequestRateTracker:
"""RequestRateTracker tracks the client's request sending rate.

RequestRateTracker measures the actual client request sending rate using
time-bucketed sampling with exponential smoothing. It tracks requests in
half-second intervals by default and calculates a smoothed average rate
to provide accurate measurements for adaptive rate limiting algorithms.
"""

_DEFAULT_SMOOTHING = 0.8
_TIME_BUCKET_RANGE = 0.5

def __init__(
self,
smoothing: float = _DEFAULT_SMOOTHING,
time_bucket_range: float = _TIME_BUCKET_RANGE,
):
"""Initialize a new RequestRateTracker.

:param smoothing: Exponential smoothing factor. This constant
represents how much weight is given to recent measurements.
Higher values place more emphasis on the most recent observations.
:param time_bucket_range: Time bucket duration in seconds. Rate
calculations are updated when transitioning between buckets.
"""
self._smoothing = smoothing
self._time_bucket_scale = 1 / time_bucket_range
self._request_count = 0
self._last_calculated_time_bucket = math.floor(time.monotonic())
self._measured_rate = 0

def measure_rate(self) -> float:
"""Measure and return the current request rate.

Increments the request count and calculates a new smoothed rate when
transitioning to a new time bucket. Returns the current measured rate
without recalculation if still within the same time bucket.

:return: Current smoothed request rate in requests per second.
"""
curr_time = time.monotonic()
current_time_bucket = (
math.floor(curr_time * self._time_bucket_scale) / self._time_bucket_scale
)
self._request_count += 1
if current_time_bucket > self._last_calculated_time_bucket:
current_rate = self._request_count / (
current_time_bucket - self._last_calculated_time_bucket
)
self._measured_rate = (current_rate * self._smoothing) + (
self._measured_rate * (1 - self._smoothing)
)
self._request_count = 0
self._last_calculated_time_bucket = current_time_bucket
return self._measured_rate

@property
def request_count(self) -> int:
return self._request_count

@property
def measured_client_rate(self) -> float:
return self._measured_rate


class ClientRateLimiter:
"""ClientRateLimiter limits the rate of requests.

ClientRateLimiter implements adaptive rate limiting using token bucket and CUBIC
algorithm. It controls request sending rates by acquiring tokens before requests
and dynamically adjusting rates based on service responses - reducing rates when
throttled and increasing rates during successful periods to optimize throughput
while preventing service overload.
"""

_REQUEST_COST = 1.0

def __init__(
self,
token_bucket: TokenBucket,
cubic_calculator: CubicCalculator,
rate_tracker: RequestRateTracker,
rate_limiter_enabled: bool,
):
"""Initialize a new ClientRateLimiter.

:param token_bucket: Token bucket for controlling request sending rates.
:param cubic_calculator: CUBIC algorithm calculator for rate adjustments.
:param rate_tracker: Tracker for measuring actual client request rates.
:param rate_limiter_enabled: Whether rate limiting is enabled.
"""

self._rate_tracker = rate_tracker
self._token_bucket = token_bucket
self._cubic_calculator = cubic_calculator
self._rate_limiter_enabled = rate_limiter_enabled

async def before_sending_request(self) -> None:
"""Acquire a token before making a request."""
if self._rate_limiter_enabled:
await self._token_bucket.acquire(self._REQUEST_COST)

async def after_receiving_response(self, throttling_error: bool) -> None:
"""Update the request rate based on the response using CUBIC algorithm.

Reduces the rate by 70% when throttled, or increases the rate using
CUBIC scaling for successful responses. Updates the token bucket with
the new calculated rate, capped at 2x the measured client rate.

:param throttling_error: True if the response was a throttling error.
"""
measured_rate = self._rate_tracker.measure_rate()
timestamp = time.monotonic()
if throttling_error:
if not self._rate_limiter_enabled:
rate_to_use = measured_rate
else:
fill_rate = self._token_bucket.fill_rate
rate_to_use = min(measured_rate, fill_rate)

self._cubic_calculator.calculate_and_update_inflection_point()
cubic_calculated_rate = self._cubic_calculator.throttle_request(
rate_to_use, timestamp
)
self._rate_limiter_enabled = True
else:
cubic_calculated_rate = self._cubic_calculator.scale_request(timestamp)

new_rate = min(cubic_calculated_rate, 2 * measured_rate)
await self._token_bucket.update_rate(new_rate)

@property
def rate_limit_enabled(self) -> bool:
return self._rate_limiter_enabled
Loading
Loading