Skip to content

Implement client-side rate limiting for adaptive retries.#3

Merged
ubaskota merged 3 commits intoadaptive_retriesfrom
client_side_rate_limiter
Jan 29, 2026
Merged

Implement client-side rate limiting for adaptive retries.#3
ubaskota merged 3 commits intoadaptive_retriesfrom
client_side_rate_limiter

Conversation

@ubaskota
Copy link
Owner

@ubaskota ubaskota commented Jan 8, 2026

Issue #, if available:

Description of changes:
Implements client-side rate limiting for Adaptive Retries.

Core Components:

  • ClientRateLimiter: Orchestrates rate limiting using token bucket and CUBIC algorithm
  • CubicCalculator: Implements CUBIC congestion control (reduction to 70% on throttle, cubic scaling on success)
  • RequestRateTracker: Measures actual client request rates using time-bucketed exponential smoothing

Testing:

  • All tests pass with proper mocking to avoid time
  • Edge case testing for throttling, success responses, and rate capping are covered.
  • Type checking and linting pass without errors.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link

@SamRemis SamRemis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in person, I'm adding mostly nitty comments for now. I'm still wrapping my head around everything going on here for a more in-depth review, but it looks like a great start

await self._token_bucket.update_rate(new_rate)

@property
def rate_limit_enabled(self) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some context for me here as to why we are exposing this as a public property?

Copy link
Owner Author

@ubaskota ubaskota Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed earlier today, this is only for testing.


def __init__(
self,
starting_max_rate: float,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In trying to understand this, I initially have a few questions about the constructor:

  1. Why does starting_max_rate not have a default?
  2. Should we have some validations here to check that the rate is positive (and similar)
  3. Why are we letting users pass in start time? Do we want to default to the current time if none is passed?

Copy link
Owner Author

@ubaskota ubaskota Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. CubicCalculator is called from AdaptiveRetryStrategy and it always passes an explicit value, so I didn't set a default here. I'll add a default of 0.5 tokens/sec as a starting point.
  2. Good point. I'll add validation.
  3. Yes. I'll default it to time.monotonic() while keeping the override option.

):
"""Initialize a new CubicCalculator.

:param starting_max_rate: Initial maximum rate of the CubicCalculator.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add to this doc string what the max rate is in? At least initially reading this PR, I'm not sure what this represents - requests per second?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still using rst structure for our doc strings?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add to this doc string what the max rate is in? At least initially reading this PR, I'm not sure what this represents - requests per second?

Added. It is requests per second.

Are we still using rst structure for our doc strings?

Yes, that is what the existing code uses.

) ** (1 / 3.0)
return self._inflection_point_time

def scale_request(self, timestamp: float) -> float:
Copy link

@SamRemis SamRemis Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named increase_request_rate or similar?

async def after_receiving_response(self, throttling_error: bool) -> None:
"""Update the request rate based on the response using CUBIC algorithm.

Reduces the rate by 70% when throttled, or increases the rate using

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this comment is repeating a comment above in cubic calculator, maybe we should keep this minimal here.

raising TimeoutError.
"""
self._curr_capacity: float = max(curr_capacity, self.MIN_CAPACITY)
self._max_capacity: float = self._curr_capacity

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the cap on max capacity here that's preventing this from getting a large amounts of tokens for long-running processes where we have already seen a throttling event, but don't see any further ones for a large amount of time.

without recalculation if still within the same time bucket.
:return: Current smoothed request rate in requests per second.
"""
curr_time = time.monotonic()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need a lock over this? Say two requests call this at once when it's time to reset the bucket. Thread A could check that the bucket is ready for a reset and start the process, then thread B starts and completes the reset. Thread A then goes back and finishes the reset, overwriting what thread B did.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Yes, we need a lock over this to prevent a race condition. Added.

the new calculated rate, capped at 2x the measured client rate.
:param throttling_error: True if the response was a throttling error.
"""
measured_rate = self._rate_tracker.measure_rate()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea for a race condition here, we may just need a lock over this whole method since two requests shouldn't be calculating the new rate at once and overwriting each other?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a lock at the lowest level which is in measure_rate. Shouldn't that be sufficient?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if two or more requests call this method at once? Do you see any potential concurrency issues coming up?

) + self._last_max_rate
return calculated_rate

def throttle_request(self, rate_to_use: float, timestamp: float) -> float:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, probably could be renamed to something like get_throttled_rate or similar. The name of this method makes me think it's actually going to be throttling the request itself, not calculating and updating the rate.

@ubaskota ubaskota merged commit 75fe734 into adaptive_retries Jan 29, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants