Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions packages/smithy-core/src/smithy_core/retries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import math
import random
import threading
import time
Expand Down Expand Up @@ -592,3 +593,216 @@ def fill_rate(self) -> float:
:return: The rate at which tokens are added to the bucket (tokens/second).
"""
return self._fill_rate


class CubicCalculator:
"""CubicCalculator calculates a new rate using CUBIC algorithm.

CubicCalculator implements the CUBIC congestion control algorithm for
adaptive rate limiting. It dynamically adjusts request rates based on
throttling responses, reducing rates by 70% when throttled and
gradually increasing rates using a scale function when the request
is successful.
"""

_SCALE_CONSTANT = 0.4
_BETA = 0.7

def __init__(
self,
starting_max_rate: float,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In trying to understand this, I initially have a few questions about the constructor:

  1. Why does starting_max_rate not have a default?
  2. Should we have some validations here to check that the rate is positive (and similar)
  3. Why are we letting users pass in start time? Do we want to default to the current time if none is passed?

Copy link
Owner Author

@ubaskota ubaskota Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. CubicCalculator is called from AdaptiveRetryStrategy and it always passes an explicit value, so I didn't set a default here. I'll add a default of 0.5 tokens/sec as a starting point.
  2. Good point. I'll add validation.
  3. Yes. I'll default it to time.monotonic() while keeping the override option.

start_time: float,
scale_constant: float = _SCALE_CONSTANT,
beta: float = _BETA,
):
"""Initialize a new CubicCalculator.

:param starting_max_rate: Initial maximum rate of the CubicCalculator.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add to this doc string what the max rate is in? At least initially reading this PR, I'm not sure what this represents - requests per second?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still using rst structure for our doc strings?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add to this doc string what the max rate is in? At least initially reading this PR, I'm not sure what this represents - requests per second?

Added. It is requests per second.

Are we still using rst structure for our doc strings?

Yes, that is what the existing code uses.

:param start_time: Initial time of the CubicCalculator.
:param scale_constant: Scale constant used to scale up requests.
:param beta: Beta constant used to slow down requests.
"""
self._last_max_rate = starting_max_rate
self._last_throttle_time = start_time
self._scale_constant = scale_constant
self._beta = beta
self._inflection_point_time = self.calculate_and_update_inflection_point()

def calculate_and_update_inflection_point(self) -> float:
"""Calculate and update the CUBIC inflection point for rate recovery.

After throttling, the CUBIC curve returns to the previous maximum rate
after exactly `_inflection_point_time` seconds. Before this point, the
rate grows slowly. After this point, it grows rapidly.
:return: Inflection point time in seconds for the CUBIC algorithm.
"""
self._inflection_point_time = (
(self._last_max_rate * (1 - self._beta)) / self._scale_constant
) ** (1 / 3.0)
return self._inflection_point_time

def scale_request(self, timestamp: float) -> float:
Copy link

@SamRemis SamRemis Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named increase_request_rate or similar?

"""Scale up the request rate after a successful response.

:param timestamp: Timestamp of the response.
:return: New calculated request rate based on CUBIC scaling.
"""
dt = timestamp - self._last_throttle_time
calculated_rate = (
self._scale_constant * ((dt - self._inflection_point_time) ** 3)
) + self._last_max_rate
return calculated_rate

def throttle_request(self, rate_to_use: float, timestamp: float) -> float:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, probably could be renamed to something like get_throttled_rate or similar. The name of this method makes me think it's actually going to be throttling the request itself, not calculating and updating the rate.

"""Throttle the request rate after a throttled response is received.

:param rate_to_use: Current request rate in use.
:param timestamp: Timestamp of the response.
:return: New calculated request rate based on CUBIC throttling.
"""
calculated_rate = rate_to_use * self._beta
self._last_max_rate = rate_to_use
self._last_throttle_time = timestamp
return calculated_rate

@property
def last_max_rate(self) -> float:
return self._last_max_rate

@property
def last_throttle_time(self) -> float:
return self._last_throttle_time


class RequestRateTracker:
"""RequestRateTracker tracks the client's request sending rate.

RequestRateTracker measures the actual client request sending rate using
time-bucketed sampling with exponential smoothing. It tracks requests in
half-second intervals by default and calculates a smoothed average rate
to provide accurate measurements for adaptive rate limiting algorithms.
"""

_DEFAULT_SMOOTHING = 0.8
_TIME_BUCKET_RANGE = 0.5

def __init__(
self,
smoothing: float = _DEFAULT_SMOOTHING,
time_bucket_range: float = _TIME_BUCKET_RANGE,
):
"""Initialize a new RequestRateTracker.

:param smoothing: Exponential smoothing factor. This constant
represents how much weight is given to recent measurements.
Higher values place more emphasis on the most recent observations.
:param time_bucket_range: Time bucket duration in seconds. Rate
calculations are updated when transitioning between buckets.
"""
self._smoothing = smoothing
self._time_bucket_scale = 1 / time_bucket_range
self._request_count = 0
self._last_calculated_time_bucket = math.floor(time.monotonic())
self._measured_rate = 0

def measure_rate(self) -> float:
"""Measure and return the current request rate.

Increments the request count and calculates a new smoothed rate when
transitioning to a new time bucket. Returns the current measured rate
without recalculation if still within the same time bucket.
:return: Current smoothed request rate in requests per second.
"""
curr_time = time.monotonic()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need a lock over this? Say two requests call this at once when it's time to reset the bucket. Thread A could check that the bucket is ready for a reset and start the process, then thread B starts and completes the reset. Thread A then goes back and finishes the reset, overwriting what thread B did.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Yes, we need a lock over this to prevent a race condition. Added.

current_time_bucket = (
math.floor(curr_time * self._time_bucket_scale) / self._time_bucket_scale
)
self._request_count += 1
if current_time_bucket > self._last_calculated_time_bucket:
current_rate = self._request_count / (
current_time_bucket - self._last_calculated_time_bucket
)
self._measured_rate = (current_rate * self._smoothing) + (
self._measured_rate * (1 - self._smoothing)
)
self._request_count = 0
self._last_calculated_time_bucket = current_time_bucket
return self._measured_rate

@property
def request_count(self) -> int:
return self._request_count

@property
def measured_client_rate(self) -> float:
return self._measured_rate


class ClientRateLimiter:
"""ClientRateLimiter limits the rate of requests.

ClientRateLimiter implements adaptive rate limiting using token bucket and CUBIC
algorithm. It controls request sending rates by acquiring tokens before requests
and dynamically adjusting rates based on service responses - reducing rates when
throttled and increasing rates during successful periods to optimize throughput
while preventing service overload.
"""

_REQUEST_COST = 1.0

def __init__(
self,
token_bucket: TokenBucket,
cubic_calculator: CubicCalculator,
rate_tracker: RequestRateTracker,
rate_limiter_enabled: bool,
):
"""Initialize a new ClientRateLimiter.

:param token_bucket: Token bucket for controlling request sending rates.
:param cubic_calculator: CUBIC algorithm calculator for rate adjustments.
:param rate_tracker: Tracker for measuring actual client request rates.
:param rate_limiter_enabled: Whether rate limiting is enabled.
"""

self._rate_tracker = rate_tracker
self._token_bucket = token_bucket
self._cubic_calculator = cubic_calculator
self._rate_limiter_enabled = rate_limiter_enabled

async def before_sending_request(self) -> None:
"""Acquire a token before making a request."""
if self._rate_limiter_enabled:
await self._token_bucket.acquire(self._REQUEST_COST)

async def after_receiving_response(self, throttling_error: bool) -> None:
"""Update the request rate based on the response using CUBIC algorithm.

Reduces the rate by 70% when throttled, or increases the rate using

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this comment is repeating a comment above in cubic calculator, maybe we should keep this minimal here.

CUBIC scaling for successful responses. Updates the token bucket with
the new calculated rate, capped at 2x the measured client rate.
:param throttling_error: True if the response was a throttling error.
"""
measured_rate = self._rate_tracker.measure_rate()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea for a race condition here, we may just need a lock over this whole method since two requests shouldn't be calculating the new rate at once and overwriting each other?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a lock at the lowest level which is in measure_rate. Shouldn't that be sufficient?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if two or more requests call this method at once? Do you see any potential concurrency issues coming up?

timestamp = time.monotonic()
if throttling_error:
if not self._rate_limiter_enabled:
rate_to_use = measured_rate
else:
fill_rate = self._token_bucket.fill_rate
rate_to_use = min(measured_rate, fill_rate)

self._cubic_calculator.calculate_and_update_inflection_point()
cubic_calculated_rate = self._cubic_calculator.throttle_request(
rate_to_use, timestamp
)
self._rate_limiter_enabled = True
else:
cubic_calculated_rate = self._cubic_calculator.scale_request(timestamp)

new_rate = min(cubic_calculated_rate, 2 * measured_rate)
await self._token_bucket.update_rate(new_rate)

@property
def rate_limit_enabled(self) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some context for me here as to why we are exposing this as a public property?

Copy link
Owner Author

@ubaskota ubaskota Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed earlier today, this is only for testing.

return self._rate_limiter_enabled
Loading
Loading