diff --git a/botocore/credentials.py b/botocore/credentials.py
index c0f798e05b..99285c61fd 100644
--- a/botocore/credentials.py
+++ b/botocore/credentials.py
@@ -44,7 +44,7 @@
UnknownCredentialError,
)
from botocore.tokens import SSOTokenProvider
-from botocore.useragent import register_feature_id
+from botocore.useragent import register_feature_id, register_feature_ids
from botocore.utils import (
ArnParser,
ContainerMetadataFetcher,
@@ -705,6 +705,7 @@ def __init__(self, cache=None, expiry_window_seconds=None):
if expiry_window_seconds is None:
expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS
self._expiry_window_seconds = expiry_window_seconds
+ self.feature_ids = set()
def _create_cache_key(self):
raise NotImplementedError('_create_cache_key()')
@@ -885,6 +886,7 @@ def __init__(
def _get_credentials(self):
"""Get credentials by calling assume role."""
+ register_feature_ids(self.feature_ids)
kwargs = self._assume_role_kwargs()
client = self._create_client()
response = client.assume_role(**kwargs)
@@ -971,6 +973,7 @@ def __init__(
def _get_credentials(self):
"""Get credentials by calling assume role."""
+ register_feature_ids(self.feature_ids)
kwargs = self._assume_role_kwargs()
# Assume role with web identity does not require credentials other than
# the token, explicitly configure the client to not sign requests.
@@ -1367,6 +1370,7 @@ def load(self):
)
token = self._get_session_token(config)
account_id = self._get_account_id(config)
+ register_feature_id('CREDENTIALS_PROFILE')
return Credentials(
access_key,
secret_key,
@@ -1434,6 +1438,7 @@ def load(self):
)
token = self._get_session_token(profile_config)
account_id = self._get_account_id(profile_config)
+ register_feature_id('CREDENTIALS_PROFILE')
return Credentials(
access_key,
secret_key,
@@ -1513,6 +1518,11 @@ class AssumeRoleProvider(CredentialProvider):
# remaining time left until the credentials expires is less than the
# EXPIRY_WINDOW.
EXPIRY_WINDOW_SECONDS = 60 * 15
+ NAMED_PROVIDER_FEATURE_MAP = {
+ 'Ec2InstanceMetadata': 'CREDENTIALS_IMDS',
+ 'Environment': 'CREDENTIALS_ENV_VARS',
+ 'EcsContainer': 'CREDENTIALS_HTTP',
+ }
def __init__(
self,
@@ -1575,6 +1585,7 @@ def __init__(
self._credential_sourcer = credential_sourcer
self._profile_provider_builder = profile_provider_builder
self._visited_profiles = [self._profile_name]
+ self._feature_ids = set()
def load(self):
self._loaded_config = self._load_config()
@@ -1625,10 +1636,13 @@ def _load_creds_via_assume_role(self, profile_name):
mfa_prompter=self._prompter,
cache=self.cache,
)
+ fetcher.feature_ids = self._feature_ids.copy()
refresher = fetcher.fetch_credentials
if mfa_serial is not None:
refresher = create_mfa_serial_refresher(refresher)
+ self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE')
+ register_feature_ids(self._feature_ids)
# The initial credentials are empty and the expiration time is set
# to now so that we can delay the call to assume role until it is
# strictly needed.
@@ -1757,18 +1771,20 @@ def _has_static_credentials(self, profile):
def _resolve_source_credentials(self, role_config, profile_name):
credential_source = role_config.get('credential_source')
if credential_source is not None:
+ self._feature_ids.add('CREDENTIALS_PROFILE_NAMED_PROVIDER')
return self._resolve_credentials_from_source(
credential_source, profile_name
)
source_profile = role_config['source_profile']
self._visited_profiles.append(source_profile)
+ self._feature_ids.add('CREDENTIALS_PROFILE_SOURCE_PROFILE')
return self._resolve_credentials_from_profile(source_profile)
def _resolve_credentials_from_profile(self, profile_name):
profiles = self._loaded_config.get('profiles', {})
profile = profiles[profile_name]
-
+ self._feature_ids.add('CREDENTIALS_PROFILE')
if (
self._has_static_credentials(profile)
and not self._profile_provider_builder
@@ -1824,6 +1840,11 @@ def _resolve_credentials_from_source(
f'in profile {profile_name}'
),
)
+ named_provider_feature_id = self.NAMED_PROVIDER_FEATURE_MAP.get(
+ credential_source
+ )
+ if named_provider_feature_id:
+ self._feature_ids.add(named_provider_feature_id)
return credentials
@@ -1854,6 +1875,7 @@ def __init__(
if token_loader_cls is None:
token_loader_cls = FileWebIdentityTokenLoader
self._token_loader_cls = token_loader_cls
+ self._feature_ids = set()
def load(self):
return self._assume_role_with_web_identity()
@@ -1876,8 +1898,15 @@ def _get_env_config(self, key):
def _get_config(self, key):
env_value = self._get_env_config(key)
if env_value is not None:
+ self._feature_ids.add('CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN')
return env_value
- return self._get_profile_config(key)
+
+ config_value = self._get_profile_config(key)
+ if config_value is not None:
+ self._feature_ids.add('CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN')
+ return config_value
+
+ return None
def _assume_role_with_web_identity(self):
token_path = self._get_config('web_identity_token_file')
@@ -1907,6 +1936,10 @@ def _assume_role_with_web_identity(self):
extra_args=extra_args,
cache=self.cache,
)
+ fetcher.feature_ids = self._feature_ids.copy()
+
+ self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE_WEB_ID')
+ register_feature_ids(self._feature_ids)
# The initial credentials are empty and the expiration time is set
# to now so that we can delay the call to assume role until it is
# strictly needed.
diff --git a/botocore/useragent.py b/botocore/useragent.py
index a59ae20952..7b32f11412 100644
--- a/botocore/useragent.py
+++ b/botocore/useragent.py
@@ -80,6 +80,13 @@
'FLEXIBLE_CHECKSUMS_RES_WHEN_REQUIRED': 'c',
'CREDENTIALS_CODE': 'e',
'CREDENTIALS_ENV_VARS': 'g',
+ 'CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN': 'h',
+ 'CREDENTIALS_STS_ASSUME_ROLE': 'i',
+ 'CREDENTIALS_STS_ASSUME_ROLE_WEB_ID': 'k',
+ 'CREDENTIALS_PROFILE': 'n',
+ 'CREDENTIALS_PROFILE_SOURCE_PROFILE': 'o',
+ 'CREDENTIALS_PROFILE_NAMED_PROVIDER': 'p',
+ 'CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN': 'q',
'CREDENTIALS_PROFILE_PROCESS': 'v',
'CREDENTIALS_PROCESS': 'w',
'CREDENTIALS_BOTO2_CONFIG_FILE': 'x',
@@ -108,6 +115,17 @@ def register_feature_id(feature_id):
ctx.features.add(val)
+def register_feature_ids(feature_ids):
+ """Adds multiple feature IDs to the current context object's ``features`` set.
+
+ :type feature_ids: iterable of str
+ :param feature_ids: An iterable of feature ID strings to register. Each
+ value must be a key in the ``_USERAGENT_FEATURE_MAPPINGS`` dict.
+ """
+ for feature_id in feature_ids:
+ register_feature_id(feature_id)
+
+
def sanitize_user_agent_string_component(raw_str, allow_hash):
"""Replaces all not allowed characters in the string with a dash ("-").
diff --git a/tests/functional/test_credentials.py b/tests/functional/test_credentials.py
index 62c69b0a7e..a39b79b32b 100644
--- a/tests/functional/test_credentials.py
+++ b/tests/functional/test_credentials.py
@@ -1370,6 +1370,67 @@ def test_user_agent_feature_ids(
patch_obj.stop()
+@pytest.mark.parametrize(
+ "creds_env_var,creds_file_content,patches,expected_feature_id",
+ [
+ (
+ 'AWS_SHARED_CREDENTIALS_FILE',
+ '[default]\naws_access_key_id = FAKEACCESSKEY\naws_secret_access_key = FAKESECRET',
+ [
+ patch(
+ "botocore.credentials.AssumeRoleProvider.load",
+ return_value=None,
+ ),
+ patch(
+ "botocore.credentials.EnvProvider.load", return_value=None
+ ),
+ ],
+ 'n',
+ ),
+ (
+ 'AWS_CONFIG_FILE',
+ '[default]\naws_access_key_id = FAKEACCESSKEY\naws_secret_access_key = FAKESECRET',
+ [
+ patch(
+ "botocore.credentials.AssumeRoleProvider.load",
+ return_value=None,
+ ),
+ patch(
+ "botocore.credentials.EnvProvider.load", return_value=None
+ ),
+ patch(
+ "botocore.credentials.SharedCredentialProvider.load",
+ return_value=None,
+ ),
+ ],
+ 'n',
+ ),
+ ],
+)
+def test_user_agent_has_file_based_feature_ids(
+ creds_env_var,
+ creds_file_content,
+ patches,
+ expected_feature_id,
+ tmp_path,
+ monkeypatch,
+):
+ credentials_file = tmp_path / "creds"
+ credentials_file.write_text(creds_file_content)
+ monkeypatch.setenv(creds_env_var, str(credentials_file))
+
+ for patch_obj in patches:
+ patch_obj.start()
+
+ try:
+ session = Session()
+ client = session.create_client("s3", region_name="us-east-1")
+ _assert_feature_ids_in_ua(client, expected_feature_id)
+ finally:
+ for patch_obj in patches:
+ patch_obj.stop()
+
+
def _assert_feature_ids_in_ua(client, expected_feature_ids):
"""Helper to test feature IDs appear in user agent for multiple calls."""
with ClientHTTPStubber(client, strict=True) as http_stubber:
@@ -1383,3 +1444,185 @@ def _assert_feature_ids_in_ua(client, expected_feature_ids):
feature_list = parse_registered_feature_ids(ua_string)
for expected_id in expected_feature_ids:
assert expected_id in feature_list
+
+
+@pytest.mark.parametrize(
+ "config_content,env_vars,expected_source_features,expected_provider_feature",
+ [
+ # Test Case 1: Assume Role with source profile
+ (
+ '''[profile assume-role-test]
+role_arn = arn:aws:iam::123456789012:role/test-role
+source_profile = base
+
+[profile base]
+aws_access_key_id = FAKEACCESSKEY
+aws_secret_access_key = FAKESECRET''',
+ {},
+ [
+ 'n', # CREDENTIALS_PROFILE
+ 'o', # CREDENTIALS_PROFILE_SOURCE_PROFILE
+ ],
+ 'i', # CREDENTIALS_STS_ASSUME_ROLE
+ ),
+ # Test Case 2: Assume Role with named provider
+ (
+ '''[profile assume-role-test]
+role_arn = arn:aws:iam::123456789012:role/test-role
+credential_source = Environment''',
+ {
+ 'AWS_ACCESS_KEY_ID': 'FAKEACCESSKEY',
+ 'AWS_SECRET_ACCESS_KEY': 'FAKESECRET',
+ },
+ [
+ 'g', # CREDENTIALS_ENV_VARS
+ 'p', # CREDENTIALS_PROFILE_NAMED_PROVIDER
+ ],
+ 'i', # CREDENTIALS_STS_ASSUME_ROLE
+ ),
+ ],
+)
+def test_user_agent_has_assume_role_feature_ids(
+ config_content,
+ env_vars,
+ expected_source_features,
+ expected_provider_feature,
+ tmp_path,
+):
+ session = _create_assume_role_session(config_content, tmp_path)
+
+ # Set env vars if needed
+ with patch.dict(os.environ, env_vars, clear=True):
+ with SessionHTTPStubber(session) as stubber:
+ s3 = session.create_client('s3', region_name='us-east-1')
+ _add_assume_role_http_response(stubber, with_web_identity=False)
+ stubber.add_response()
+ stubber.add_response()
+ s3.list_buckets()
+ s3.list_buckets()
+
+ ua_strings = get_captured_ua_strings(stubber)
+ _assert_deferred_credential_feature_ids(
+ ua_strings, expected_source_features, expected_provider_feature
+ )
+
+
+@pytest.mark.parametrize(
+ "config_content,env_vars,expected_source_features,expected_provider_feature",
+ [
+ # Test Case 1: Assume Role with Web Identity through config profile
+ (
+ '''[profile assume-role-test]
+role_arn = arn:aws:iam::123456789012:role/test-role
+web_identity_token_file = {token_file}''',
+ {},
+ ['q'], # CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN
+ 'k', # CREDENTIALS_STS_ASSUME_ROLE_WEB_ID
+ ),
+ # Test Case 2: Assume Role with Web Identity through env vars
+ (
+ '',
+ {
+ 'AWS_ROLE_ARN': 'arn:aws:iam::123456789012:role/test-role',
+ 'AWS_WEB_IDENTITY_TOKEN_FILE': '{token_file}',
+ 'AWS_ROLE_SESSION_NAME': 'test-session',
+ },
+ ['h'], # CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN
+ 'k', # CREDENTIALS_STS_ASSUME_ROLE_WEB_ID
+ ),
+ ],
+)
+def test_user_agent_has_assume_role_with_web_identity_feature_ids(
+ config_content,
+ env_vars,
+ expected_source_features,
+ expected_provider_feature,
+ tmp_path,
+):
+ token_file = tmp_path / 'token.jwt'
+ token_file.write_text('fake-jwt-token')
+ if 'AWS_WEB_IDENTITY_TOKEN_FILE' in env_vars:
+ env_vars['AWS_WEB_IDENTITY_TOKEN_FILE'] = str(token_file)
+ elif config_content and 'web_identity_token_file' in config_content:
+ config_content = config_content.replace(
+ '{token_file}', str(token_file)
+ )
+
+ session = _create_assume_role_session(config_content, tmp_path)
+
+ # Set env vars if needed
+ with patch.dict(os.environ, env_vars, clear=True):
+ with SessionHTTPStubber(session) as stubber:
+ s3 = session.create_client('s3', region_name='us-east-1')
+ _add_assume_role_http_response(stubber, with_web_identity=True)
+ stubber.add_response()
+ stubber.add_response()
+ s3.list_buckets()
+ s3.list_buckets()
+
+ ua_strings = get_captured_ua_strings(stubber)
+ _assert_deferred_credential_feature_ids(
+ ua_strings, expected_source_features, expected_provider_feature
+ )
+
+
+def _create_assume_role_session(config_content, tmp_path):
+ if config_content:
+ config_file = tmp_path / 'config'
+ config_file.write_text(config_content)
+ session = Session(profile='assume-role-test')
+ session.set_config_variable('config_file', str(config_file))
+ else:
+ session = Session()
+ return session
+
+
+def _add_assume_role_http_response(stubber, with_web_identity):
+ """Add HTTP response for AssumeRole or AssumeRoleWithWebIdentity call with proper credentials"""
+ expiration = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime(
+ '%Y-%m-%dT%H:%M:%SZ'
+ )
+ method_name = (
+ 'AssumeRoleWithWebIdentity' if with_web_identity else 'AssumeRole'
+ )
+ body = (
+ f'<{method_name}Response>'
+ f' <{method_name}Result>'
+ ' '
+ ' arn:aws:sts::123456789012:user'
+ ' AKID:test-session-123'
+ ' '
+ ' '
+ f' FAKEASSUMEROLEKEY'
+ f' FAKEASSUMEROLSECRET'
+ ' FAKETOKEN'
+ f' {expiration}'
+ ' '
+ f' {method_name}Result>'
+ f'{method_name}Response>'
+ )
+ stubber.add_response(body=body.encode('utf-8'))
+
+
+def _assert_deferred_credential_feature_ids(
+ ua_strings,
+ expected_source_features,
+ expected_provider_feature,
+):
+ """Helper to assert feature IDs for deferred credential provider tests"""
+ assert len(ua_strings) == 3
+
+ # Request to fetch credentials should only register feature ids for the credential source
+ credential_source_feature_list = parse_registered_feature_ids(
+ ua_strings[0]
+ )
+ for feature in expected_source_features:
+ assert feature in credential_source_feature_list
+ assert expected_provider_feature not in credential_source_feature_list
+
+ # Original operation request should register feature ids for both the credential source and the provider
+ for i in [1, 2]:
+ operation_feature_list = parse_registered_feature_ids(ua_strings[i])
+ for feature in expected_source_features:
+ assert feature in operation_feature_list
+ assert expected_provider_feature in operation_feature_list
diff --git a/tests/unit/test_credentials.py b/tests/unit/test_credentials.py
index 06a271095e..3c7b46d415 100644
--- a/tests/unit/test_credentials.py
+++ b/tests/unit/test_credentials.py
@@ -746,6 +746,30 @@ def test_account_id_with_invalid_arn(self):
self.assertEqual(response, expected_response)
self.assertEqual(response['account_id'], None)
+ @mock.patch('botocore.credentials.register_feature_ids')
+ def test_feature_ids_registered_during_get_credentials(
+ self, mock_register
+ ):
+ response = {
+ 'Credentials': {
+ 'AccessKeyId': 'foo',
+ 'SecretAccessKey': 'bar',
+ 'SessionToken': 'baz',
+ 'Expiration': self.some_future_time(),
+ }
+ }
+ client_creator = self.create_client_creator(with_response=response)
+ fetcher = credentials.AssumeRoleCredentialFetcher(
+ client_creator, self.source_creds, self.role_arn
+ )
+
+ test_feature_ids = {'test_feature_1', 'test_feature_2'}
+ fetcher.feature_ids = test_feature_ids
+
+ fetcher.fetch_credentials()
+ # Verify register_credential_feature_ids was called with test feature IDs
+ mock_register.assert_called_once_with(test_feature_ids)
+
class TestAssumeRoleWithWebIdentityCredentialFetcher(BaseEnvVar):
def setUp(self):
@@ -903,6 +927,30 @@ def test_account_id_with_invalid_arn(self):
self.assertEqual(response, expected_response)
self.assertEqual(response['account_id'], None)
+ @mock.patch('botocore.credentials.register_feature_ids')
+ def test_feature_ids_registered_during_get_credentials(
+ self, mock_register
+ ):
+ response = {
+ 'Credentials': {
+ 'AccessKeyId': 'foo',
+ 'SecretAccessKey': 'bar',
+ 'SessionToken': 'baz',
+ 'Expiration': self.some_future_time(),
+ }
+ }
+ client_creator = self.create_client_creator(with_response=response)
+ fetcher = credentials.AssumeRoleWithWebIdentityCredentialFetcher(
+ client_creator, self.load_token, self.role_arn
+ )
+
+ test_feature_ids = {'test_feature_1', 'test_feature_2'}
+ fetcher.feature_ids = test_feature_ids
+
+ fetcher.fetch_credentials()
+ # Verify register_credential_feature_ids was called with test feature IDs
+ mock_register.assert_called_once_with(test_feature_ids)
+
class TestAssumeRoleWithWebIdentityCredentialProvider(unittest.TestCase):
def setUp(self):