-
Notifications
You must be signed in to change notification settings - Fork 0
Implement a token bucket used for adaptive retries #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a46a6e0
927a9fa
1ac8dab
a591b3d
dede6fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
| from smithy_core.exceptions import CallError, RetryError | ||
| from smithy_core.retries import ExponentialBackoffJitterType as EBJT | ||
|
|
@@ -10,6 +12,7 @@ | |
| SimpleRetryStrategy, | ||
| StandardRetryQuota, | ||
| StandardRetryStrategy, | ||
| TokenBucket, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -275,3 +278,80 @@ async def test_retry_strategy_resolver_rejects_invalid_type() -> None: | |
| match="retry_strategy must be RetryStrategy, RetryStrategyOptions, or None", | ||
| ): | ||
| await resolver.resolve_retry_strategy(retry_strategy="invalid") # type: ignore | ||
|
|
||
|
|
||
| class TestTokenBucket: | ||
| @pytest.mark.asyncio | ||
| async def test_initial_state(self): | ||
| token_bucket = TokenBucket() | ||
| assert token_bucket.current_capacity == token_bucket.MIN_CAPACITY | ||
| assert token_bucket.max_capacity == token_bucket.MIN_CAPACITY | ||
| assert token_bucket.fill_rate == token_bucket.MIN_FILL_RATE | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_acquire_succeeds_immediately_within_capacity(self): | ||
| token_bucket = TokenBucket() | ||
|
|
||
| with patch("asyncio.sleep") as mock_sleep: | ||
| await token_bucket.acquire(1) | ||
| mock_sleep.assert_not_called() | ||
|
|
||
| assert token_bucket.current_capacity == 0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_acquire_waits_when_capacity_insufficient(self): | ||
| token_bucket = TokenBucket(fill_rate=1.0) | ||
| await token_bucket.acquire(1) | ||
|
|
||
| with patch("asyncio.sleep") as mock_sleep: | ||
| await token_bucket.acquire(1) | ||
| mock_sleep.assert_called() | ||
|
|
||
| assert token_bucket.current_capacity == 0.0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_multiple_refills_over_time(self): | ||
| time_values = iter([0.0, 1.0, 1.0, 1.5, 1.5, 4.0, 4.0, 5.0]) | ||
| with patch("time.monotonic", side_effect=lambda: next(time_values)): | ||
ubaskota marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| token_bucket = TokenBucket(curr_capacity=0, fill_rate=2.0) | ||
| await token_bucket.acquire(1) | ||
| assert token_bucket.current_capacity == 0.0 | ||
|
|
||
| with patch("time.monotonic", side_effect=lambda: next(time_values)): | ||
| await token_bucket.update_bucket(4) # Update the rate of refill | ||
| assert token_bucket.current_capacity == 1.0 | ||
|
|
||
| with patch("time.monotonic", side_effect=lambda: next(time_values)): | ||
| await token_bucket.acquire(1) | ||
| assert token_bucket.current_capacity == 3 | ||
|
|
||
| with patch("time.monotonic", side_effect=lambda: next(time_values)): | ||
| await token_bucket.acquire(1) | ||
| assert token_bucket.current_capacity == 3 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_update_bucket_updates_capacity(self): | ||
| token_bucket = TokenBucket() | ||
|
|
||
| await token_bucket.update_bucket(5.0) | ||
| assert token_bucket.fill_rate == 5.0 | ||
| assert token_bucket.max_capacity == 5.0 | ||
| assert token_bucket.current_capacity == 1.0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_rate_can_never_be_zero(self): | ||
| token_bucket = TokenBucket() | ||
| await token_bucket.update_bucket(0.0) | ||
|
|
||
| assert token_bucket.fill_rate != 0.0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_refill_caps_at_max_capacity(self): | ||
| token_bucket = TokenBucket() | ||
| # Max and current capacity of the bucket is set to 1.0 initially | ||
| await token_bucket.update_bucket(10.0) | ||
|
|
||
| async with token_bucket._lock: # type: ignore | ||
| token_bucket._refill() # type: ignore | ||
|
Comment on lines
+334
to
+335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we talked about this previously, but we don't want to be calling private methods if we can avoid it. Is there another way we can test this that better reflects the public interfaces?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we did talk about this earlier. I updated the other tests to use a public method, but for this we are specifically testing whether the refill exceeds the max capacity when the rate is increased, so I decided to leave it as it is. |
||
|
|
||
| assert round(token_bucket.current_capacity, 1) == 1.0 | ||
Uh oh!
There was an error while loading. Please reload this page.