-
Notifications
You must be signed in to change notification settings - Fork 0
Implement a token bucket used for adaptive retries #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
a46a6e0
927a9fa
1ac8dab
a591b3d
dede6fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,9 @@ | ||
| # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| import asyncio | ||
| import time | ||
| from unittest.mock import AsyncMock, patch | ||
|
|
||
| import pytest | ||
| from smithy_core.exceptions import CallError, RetryError | ||
| from smithy_core.retries import ExponentialBackoffJitterType as EBJT | ||
|
|
@@ -10,6 +14,7 @@ | |
| SimpleRetryStrategy, | ||
| StandardRetryQuota, | ||
| StandardRetryStrategy, | ||
| TokenBucket, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -275,3 +280,111 @@ async def test_retry_strategy_resolver_rejects_invalid_type() -> None: | |
| match="retry_strategy must be RetryStrategy, RetryStrategyOptions, or None", | ||
| ): | ||
| await resolver.resolve_retry_strategy(retry_strategy="invalid") # type: ignore | ||
|
|
||
|
|
||
| class TestTokenBucket: | ||
| @pytest.mark.asyncio | ||
| async def test_initial_state(self): | ||
| token_bucket = TokenBucket() | ||
| assert token_bucket.current_capacity == token_bucket.MIN_CAPACITY | ||
| assert token_bucket.max_capacity == token_bucket.MIN_CAPACITY | ||
| assert token_bucket.fill_rate == token_bucket.MIN_FILL_RATE | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_acquire_succeeds_immediately_within_capacity(self): | ||
| token_bucket = TokenBucket() | ||
| start_time = time.monotonic() | ||
| await token_bucket.acquire(1) | ||
| elapsed = time.monotonic() - start_time | ||
|
|
||
| assert elapsed < 0.001 # Should be near instant | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| assert token_bucket.current_capacity == 0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_acquire_waits_when_capacity_insufficient(self): | ||
| token_bucket = TokenBucket(curr_capacity=0) | ||
|
|
||
| with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: | ||
|
|
||
| async def side_effect(delay: float): | ||
| async with token_bucket._lock: # type: ignore | ||
| token_bucket._curr_capacity = 1.0 # type: ignore | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| mock_sleep.side_effect = side_effect | ||
| await token_bucket.acquire(1) | ||
| assert mock_sleep.call_count == 1 | ||
|
|
||
| actual_delay = mock_sleep.call_args[0][0] | ||
| assert actual_delay == pytest.approx(2.0, abs=0.05) # type: ignore | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_multiple_refills_over_time(self): | ||
| token_bucket = TokenBucket(curr_capacity=0, max_capacity=10, fill_rate=2.0) | ||
|
|
||
| time_values = iter([1.0, 1.5, 4.0]) | ||
| with patch("time.monotonic", side_effect=lambda: next(time_values)): | ||
| token_bucket._last_timestamp = 0.0 # type: ignore | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| async with token_bucket._lock: # type: ignore | ||
| token_bucket._refill() # type: ignore | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| assert token_bucket.current_capacity == pytest.approx(2.0, abs=0.05) # type: ignore | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| with patch("time.monotonic", side_effect=lambda: next(time_values)): | ||
| async with token_bucket._lock: # type: ignore | ||
| token_bucket._refill() # type: ignore | ||
| assert token_bucket.current_capacity == pytest.approx(3.0, abs=0.05) # type: ignore | ||
|
|
||
| with patch("time.monotonic", side_effect=lambda: next(time_values)): | ||
| async with token_bucket._lock: # type: ignore | ||
| token_bucket._refill() # type: ignore | ||
| assert token_bucket.current_capacity == pytest.approx(8.0, abs=0.05) # type: ignore | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_update_bucket_updates_capacity(self): | ||
| token_bucket = TokenBucket() | ||
|
|
||
| await token_bucket.update_bucket(5.0) | ||
| assert token_bucket.fill_rate == 5.0 | ||
| assert token_bucket.max_capacity == 5.0 | ||
| assert token_bucket.current_capacity == 1.0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_rate_can_never_be_zero(self): | ||
| token_bucket = TokenBucket() | ||
| await token_bucket.update_bucket(0.0) | ||
|
|
||
| assert token_bucket.fill_rate != 0.0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_refill_caps_at_max_capacity(self): | ||
| token_bucket = TokenBucket() | ||
| # Max and current capacity of the bucket is set to 1.0 initially | ||
| await token_bucket.update_bucket(10.0) | ||
|
|
||
| async with token_bucket._lock: # type: ignore | ||
| token_bucket._refill() # type: ignore | ||
|
Comment on lines
+334
to
+335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we talked about this previously, but we don't want to be calling private methods if we can avoid it. Is there another way we can test this that better reflects the public interfaces?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we did talk about this earlier. I updated the other tests to use a public method, but for this we are specifically testing whether the refill exceeds the max capacity when the rate is increased, so I decided to leave it as it is. |
||
|
|
||
| assert token_bucket.current_capacity == pytest.approx(1.0, abs=0.05) # type: ignore | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_many_concurrent_tasks_succeed(self): | ||
| token_bucket = TokenBucket(curr_capacity=2.0) | ||
| await token_bucket.update_bucket(4.0) | ||
| completed_tasks: list[int] = [] | ||
|
|
||
| async def worker(worker_id: int): | ||
| await token_bucket.acquire(0.1) | ||
| completed_tasks.append(worker_id) | ||
|
|
||
| try: | ||
| # At the fill rate of 4/second and acquire cost of 0.1, it should take | ||
| # around 2 seconds to process 100 tasks. | ||
| await asyncio.wait_for( | ||
| asyncio.gather(*[worker(i) for i in range(100)]), timeout=3 | ||
| ) | ||
| except TimeoutError: | ||
| pytest.fail("Deadlock detected: concurrent acquire operations timed out") | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| assert len(completed_tasks) == 100 | ||
| assert len(set(completed_tasks)) == 100 | ||
| assert token_bucket.current_capacity >= 0.0 | ||
Uh oh!
There was an error while loading. Please reload this page.