-
Notifications
You must be signed in to change notification settings - Fork 0
Implement client-side rate limiting for adaptive retries. #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| import asyncio | ||
| import math | ||
| import random | ||
| import threading | ||
| import time | ||
|
|
@@ -592,3 +593,216 @@ def fill_rate(self) -> float: | |
| :return: The rate at which tokens are added to the bucket (tokens/second). | ||
| """ | ||
| return self._fill_rate | ||
|
|
||
|
|
||
| class CubicCalculator: | ||
| """CubicCalculator calculates a new rate using CUBIC algorithm. | ||
|
|
||
| CubicCalculator implements the CUBIC congestion control algorithm for | ||
| adaptive rate limiting. It dynamically adjusts request rates based on | ||
| throttling responses, reducing rates by 70% when throttled and | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| gradually increasing rates using a scale function when the request | ||
| is successful. | ||
| """ | ||
|
|
||
| _SCALE_CONSTANT = 0.4 | ||
| _BETA = 0.7 | ||
|
|
||
| def __init__( | ||
| self, | ||
| starting_max_rate: float, | ||
|
||
| start_time: float, | ||
| scale_constant: float = _SCALE_CONSTANT, | ||
| beta: float = _BETA, | ||
| ): | ||
| """Initialize a new CubicCalculator. | ||
|
|
||
| :param starting_max_rate: Initial maximum rate of the CubicCalculator. | ||
|
||
| :param start_time: Initial time of the CubicCalculator. | ||
| :param scale_constant: Scale constant used to scale up requests. | ||
| :param beta: Beta constant used to slow down requests. | ||
| """ | ||
| self._last_max_rate = starting_max_rate | ||
| self._last_throttle_time = start_time | ||
| self._scale_constant = scale_constant | ||
| self._beta = beta | ||
| self._inflection_point_time = self.calculate_and_update_inflection_point() | ||
|
|
||
| def calculate_and_update_inflection_point(self) -> float: | ||
| """Calculate and update the CUBIC inflection point for rate recovery. | ||
|
|
||
| After throttling, the CUBIC curve returns to the previous maximum rate | ||
| after exactly `_inflection_point_time` seconds. Before this point, the | ||
| rate grows slowly. After this point, it grows rapidly. | ||
| :return: Inflection point time in seconds for the CUBIC algorithm. | ||
| """ | ||
| self._inflection_point_time = ( | ||
| (self._last_max_rate * (1 - self._beta)) / self._scale_constant | ||
| ) ** (1 / 3.0) | ||
| return self._inflection_point_time | ||
|
|
||
| def scale_request(self, timestamp: float) -> float: | ||
|
||
| """Scale up the request rate after a successful response. | ||
|
|
||
| :param timestamp: Timestamp of the response. | ||
| :return: New calculated request rate based on CUBIC scaling. | ||
| """ | ||
| dt = timestamp - self._last_throttle_time | ||
| calculated_rate = ( | ||
| self._scale_constant * ((dt - self._inflection_point_time) ** 3) | ||
| ) + self._last_max_rate | ||
| return calculated_rate | ||
|
|
||
| def throttle_request(self, rate_to_use: float, timestamp: float) -> float: | ||
|
||
| """Throttle the request rate after a throttled response is received. | ||
|
|
||
| :param rate_to_use: Current request rate in use. | ||
| :param timestamp: Timestamp of the response. | ||
| :return: New calculated request rate based on CUBIC throttling. | ||
| """ | ||
| calculated_rate = rate_to_use * self._beta | ||
| self._last_max_rate = rate_to_use | ||
| self._last_throttle_time = timestamp | ||
| return calculated_rate | ||
|
|
||
| @property | ||
| def last_max_rate(self) -> float: | ||
| return self._last_max_rate | ||
|
|
||
| @property | ||
| def last_throttle_time(self) -> float: | ||
| return self._last_throttle_time | ||
|
|
||
|
|
||
| class RequestRateTracker: | ||
| """RequestRateTracker tracks the client's request sending rate. | ||
|
|
||
| RequestRateTracker measures the actual client request sending rate using | ||
| time-bucketed sampling with exponential smoothing. It tracks requests in | ||
| half-second intervals by default and calculates a smoothed average rate | ||
| to provide accurate measurements for adaptive rate limiting algorithms. | ||
| """ | ||
|
|
||
| _DEFAULT_SMOOTHING = 0.8 | ||
| _TIME_BUCKET_RANGE = 0.5 | ||
|
|
||
| def __init__( | ||
| self, | ||
| smoothing: float = _DEFAULT_SMOOTHING, | ||
| time_bucket_range: float = _TIME_BUCKET_RANGE, | ||
| ): | ||
| """Initialize a new RequestRateTracker. | ||
|
|
||
| :param smoothing: Exponential smoothing factor. This constant | ||
| represents how much weight is given to recent measurements. | ||
| Higher values place more emphasis on the most recent observations. | ||
| :param time_bucket_range: Time bucket duration in seconds. Rate | ||
| calculations are updated when transitioning between buckets. | ||
| """ | ||
| self._smoothing = smoothing | ||
| self._time_bucket_scale = 1 / time_bucket_range | ||
| self._request_count = 0 | ||
| self._last_calculated_time_bucket = math.floor(time.monotonic()) | ||
| self._measured_rate = 0 | ||
|
|
||
| def measure_rate(self) -> float: | ||
| """Measure and return the current request rate. | ||
|
|
||
| Increments the request count and calculates a new smoothed rate when | ||
| transitioning to a new time bucket. Returns the current measured rate | ||
| without recalculation if still within the same time bucket. | ||
| :return: Current smoothed request rate in requests per second. | ||
| """ | ||
| curr_time = time.monotonic() | ||
|
||
| current_time_bucket = ( | ||
| math.floor(curr_time * self._time_bucket_scale) / self._time_bucket_scale | ||
| ) | ||
| self._request_count += 1 | ||
| if current_time_bucket > self._last_calculated_time_bucket: | ||
| current_rate = self._request_count / ( | ||
| current_time_bucket - self._last_calculated_time_bucket | ||
| ) | ||
| self._measured_rate = (current_rate * self._smoothing) + ( | ||
| self._measured_rate * (1 - self._smoothing) | ||
| ) | ||
| self._request_count = 0 | ||
| self._last_calculated_time_bucket = current_time_bucket | ||
| return self._measured_rate | ||
|
|
||
| @property | ||
| def request_count(self) -> int: | ||
| return self._request_count | ||
|
|
||
| @property | ||
| def measured_client_rate(self) -> float: | ||
| return self._measured_rate | ||
|
|
||
|
|
||
| class ClientRateLimiter: | ||
| """ClientRateLimiter limits the rate of requests. | ||
|
|
||
| ClientRateLimiter implements adaptive rate limiting using token bucket and CUBIC | ||
| algorithm. It controls request sending rates by acquiring tokens before requests | ||
| and dynamically adjusting rates based on service responses - reducing rates when | ||
| throttled and increasing rates during successful periods to optimize throughput | ||
| while preventing service overload. | ||
| """ | ||
|
|
||
| _REQUEST_COST = 1.0 | ||
|
|
||
| def __init__( | ||
| self, | ||
| token_bucket: TokenBucket, | ||
| cubic_calculator: CubicCalculator, | ||
| rate_tracker: RequestRateTracker, | ||
| rate_limiter_enabled: bool, | ||
| ): | ||
| """Initialize a new ClientRateLimiter. | ||
|
|
||
| :param token_bucket: Token bucket for controlling request sending rates. | ||
| :param cubic_calculator: CUBIC algorithm calculator for rate adjustments. | ||
| :param rate_tracker: Tracker for measuring actual client request rates. | ||
| :param rate_limiter_enabled: Whether rate limiting is enabled. | ||
| """ | ||
|
|
||
| self._rate_tracker = rate_tracker | ||
| self._token_bucket = token_bucket | ||
| self._cubic_calculator = cubic_calculator | ||
| self._rate_limiter_enabled = rate_limiter_enabled | ||
|
|
||
| async def before_sending_request(self) -> None: | ||
| """Acquire a token before making a request.""" | ||
| if self._rate_limiter_enabled: | ||
| await self._token_bucket.acquire(self._REQUEST_COST) | ||
|
|
||
| async def after_receiving_response(self, throttling_error: bool) -> None: | ||
| """Update the request rate based on the response using CUBIC algorithm. | ||
|
|
||
| Reduces the rate by 70% when throttled, or increases the rate using | ||
|
||
| CUBIC scaling for successful responses. Updates the token bucket with | ||
| the new calculated rate, capped at 2x the measured client rate. | ||
| :param throttling_error: True if the response was a throttling error. | ||
| """ | ||
| measured_rate = self._rate_tracker.measure_rate() | ||
|
||
| timestamp = time.monotonic() | ||
| if throttling_error: | ||
| if not self._rate_limiter_enabled: | ||
| rate_to_use = measured_rate | ||
| else: | ||
| fill_rate = self._token_bucket.fill_rate | ||
| rate_to_use = min(measured_rate, fill_rate) | ||
|
|
||
| self._cubic_calculator.calculate_and_update_inflection_point() | ||
| cubic_calculated_rate = self._cubic_calculator.throttle_request( | ||
| rate_to_use, timestamp | ||
| ) | ||
| self._rate_limiter_enabled = True | ||
| else: | ||
| cubic_calculated_rate = self._cubic_calculator.scale_request(timestamp) | ||
|
|
||
| new_rate = min(cubic_calculated_rate, 2 * measured_rate) | ||
| await self._token_bucket.update_rate(new_rate) | ||
|
|
||
| @property | ||
| def rate_limit_enabled(self) -> bool: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add some context for me here as to why we are exposing this as a public property?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed earlier today, this is only for testing. |
||
| return self._rate_limiter_enabled | ||
Uh oh!
There was an error while loading. Please reload this page.