Implement client-side rate limiting for adaptive retries.#3
Implement client-side rate limiting for adaptive retries.#3ubaskota merged 3 commits intoadaptive_retriesfrom
Conversation
SamRemis
left a comment
There was a problem hiding this comment.
As discussed in person, I'm adding mostly nitty comments for now. I'm still wrapping my head around everything going on here for a more in-depth review, but it looks like a great start
| await self._token_bucket.update_rate(new_rate) | ||
|
|
||
| @property | ||
| def rate_limit_enabled(self) -> bool: |
There was a problem hiding this comment.
Can you add some context for me here as to why we are exposing this as a public property?
There was a problem hiding this comment.
As discussed earlier today, this is only for testing.
|
|
||
| def __init__( | ||
| self, | ||
| starting_max_rate: float, |
There was a problem hiding this comment.
In trying to understand this, I initially have a few questions about the constructor:
- Why does starting_max_rate not have a default?
- Should we have some validations here to check that the rate is positive (and similar)
- Why are we letting users pass in start time? Do we want to default to the current time if none is passed?
There was a problem hiding this comment.
CubicCalculatoris called from AdaptiveRetryStrategy and it always passes an explicit value, so I didn't set a default here. I'll add a default of 0.5 tokens/sec as a starting point.- Good point. I'll add validation.
- Yes. I'll default it to
time.monotonic()while keeping the override option.
| ): | ||
| """Initialize a new CubicCalculator. | ||
|
|
||
| :param starting_max_rate: Initial maximum rate of the CubicCalculator. |
There was a problem hiding this comment.
Can we add to this doc string what the max rate is in? At least initially reading this PR, I'm not sure what this represents - requests per second?
There was a problem hiding this comment.
Are we still using rst structure for our doc strings?
There was a problem hiding this comment.
Can we add to this doc string what the max rate is in? At least initially reading this PR, I'm not sure what this represents - requests per second?
Added. It is requests per second.
Are we still using rst structure for our doc strings?
Yes, that is what the existing code uses.
| ) ** (1 / 3.0) | ||
| return self._inflection_point_time | ||
|
|
||
| def scale_request(self, timestamp: float) -> float: |
There was a problem hiding this comment.
Should this be named increase_request_rate or similar?
| async def after_receiving_response(self, throttling_error: bool) -> None: | ||
| """Update the request rate based on the response using CUBIC algorithm. | ||
|
|
||
| Reduces the rate by 70% when throttled, or increases the rate using |
There was a problem hiding this comment.
Nit: this comment is repeating a comment above in cubic calculator, maybe we should keep this minimal here.
| raising TimeoutError. | ||
| """ | ||
| self._curr_capacity: float = max(curr_capacity, self.MIN_CAPACITY) | ||
| self._max_capacity: float = self._curr_capacity |
There was a problem hiding this comment.
What is the cap on max capacity here that's preventing this from getting a large amounts of tokens for long-running processes where we have already seen a throttling event, but don't see any further ones for a large amount of time.
| without recalculation if still within the same time bucket. | ||
| :return: Current smoothed request rate in requests per second. | ||
| """ | ||
| curr_time = time.monotonic() |
There was a problem hiding this comment.
Don't we need a lock over this? Say two requests call this at once when it's time to reset the bucket. Thread A could check that the bucket is ready for a reset and start the process, then thread B starts and completes the reset. Thread A then goes back and finishes the reset, overwriting what thread B did.
There was a problem hiding this comment.
Good catch. Yes, we need a lock over this to prevent a race condition. Added.
| the new calculated rate, capped at 2x the measured client rate. | ||
| :param throttling_error: True if the response was a throttling error. | ||
| """ | ||
| measured_rate = self._rate_tracker.measure_rate() |
There was a problem hiding this comment.
Same idea for a race condition here, we may just need a lock over this whole method since two requests shouldn't be calculating the new rate at once and overwriting each other?
There was a problem hiding this comment.
I added a lock at the lowest level which is in measure_rate. Shouldn't that be sufficient?
There was a problem hiding this comment.
What happens here if two or more requests call this method at once? Do you see any potential concurrency issues coming up?
| ) + self._last_max_rate | ||
| return calculated_rate | ||
|
|
||
| def throttle_request(self, rate_to_use: float, timestamp: float) -> float: |
There was a problem hiding this comment.
Again, probably could be renamed to something like get_throttled_rate or similar. The name of this method makes me think it's actually going to be throttling the request itself, not calculating and updating the rate.
Issue #, if available:
Description of changes:
Implements client-side rate limiting for Adaptive Retries.
Core Components:
ClientRateLimiter: Orchestrates rate limiting using token bucket and CUBIC algorithmCubicCalculator: Implements CUBIC congestion control (reduction to 70% on throttle, cubic scaling on success)RequestRateTracker: Measures actual client request rates using time-bucketed exponential smoothingTesting:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.