diff --git a/botocore/credentials.py b/botocore/credentials.py index 90bd1ad57f..56125c0bec 100644 --- a/botocore/credentials.py +++ b/botocore/credentials.py @@ -44,7 +44,7 @@ UnknownCredentialError, ) from botocore.tokens import SSOTokenProvider -from botocore.useragent import register_feature_id +from botocore.useragent import register_feature_id, register_feature_ids from botocore.utils import ( ArnParser, ContainerMetadataFetcher, @@ -705,6 +705,15 @@ def __init__(self, cache=None, expiry_window_seconds=None): if expiry_window_seconds is None: expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS self._expiry_window_seconds = expiry_window_seconds + self._feature_ids = set() + + @property + def feature_ids(self): + return self._feature_ids + + @feature_ids.setter + def feature_ids(self, value): + self._feature_ids = value def _create_cache_key(self): raise NotImplementedError('_create_cache_key()') @@ -885,6 +894,7 @@ def __init__( def _get_credentials(self): """Get credentials by calling assume role.""" + register_feature_ids(self._feature_ids) kwargs = self._assume_role_kwargs() client = self._create_client() response = client.assume_role(**kwargs) @@ -971,6 +981,7 @@ def __init__( def _get_credentials(self): """Get credentials by calling assume role.""" + register_feature_ids(self._feature_ids) kwargs = self._assume_role_kwargs() # Assume role with web identity does not require credentials other than # the token, explicitly configure the client to not sign requests. @@ -1365,6 +1376,7 @@ def load(self): ) token = self._get_session_token(config) account_id = self._get_account_id(config) + register_feature_id('CREDENTIALS_PROFILE') return Credentials( access_key, secret_key, @@ -1432,6 +1444,7 @@ def load(self): ) token = self._get_session_token(profile_config) account_id = self._get_account_id(profile_config) + register_feature_id('CREDENTIALS_PROFILE') return Credentials( access_key, secret_key, @@ -1510,6 +1523,11 @@ class AssumeRoleProvider(CredentialProvider): # remaining time left until the credentials expires is less than the # EXPIRY_WINDOW. EXPIRY_WINDOW_SECONDS = 60 * 15 + NAMED_PROVIDER_FEATURE_MAP = { + 'Ec2InstanceMetadata': 'CREDENTIALS_IMDS', + 'Environment': 'CREDENTIALS_ENV_VARS', + 'EcsContainer': 'CREDENTIALS_HTTP', + } def __init__( self, @@ -1572,6 +1590,7 @@ def __init__( self._credential_sourcer = credential_sourcer self._profile_provider_builder = profile_provider_builder self._visited_profiles = [self._profile_name] + self._feature_ids = set() def load(self): self._loaded_config = self._load_config() @@ -1622,10 +1641,13 @@ def _load_creds_via_assume_role(self, profile_name): mfa_prompter=self._prompter, cache=self.cache, ) + fetcher.feature_ids = self._feature_ids.copy() refresher = fetcher.fetch_credentials if mfa_serial is not None: refresher = create_mfa_serial_refresher(refresher) + self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE') + register_feature_ids(self._feature_ids) # The initial credentials are empty and the expiration time is set # to now so that we can delay the call to assume role until it is # strictly needed. @@ -1754,18 +1776,20 @@ def _has_static_credentials(self, profile): def _resolve_source_credentials(self, role_config, profile_name): credential_source = role_config.get('credential_source') if credential_source is not None: + self._feature_ids.add('CREDENTIALS_PROFILE_NAMED_PROVIDER') return self._resolve_credentials_from_source( credential_source, profile_name ) source_profile = role_config['source_profile'] self._visited_profiles.append(source_profile) + self._feature_ids.add('CREDENTIALS_PROFILE_SOURCE_PROFILE') return self._resolve_credentials_from_profile(source_profile) def _resolve_credentials_from_profile(self, profile_name): profiles = self._loaded_config.get('profiles', {}) profile = profiles[profile_name] - + self._feature_ids.add('CREDENTIALS_PROFILE') if ( self._has_static_credentials(profile) and not self._profile_provider_builder @@ -1821,6 +1845,11 @@ def _resolve_credentials_from_source( f'in profile {profile_name}' ), ) + named_provider_feature_id = self.NAMED_PROVIDER_FEATURE_MAP.get( + credential_source + ) + if named_provider_feature_id: + self._feature_ids.add(named_provider_feature_id) return credentials @@ -1851,6 +1880,7 @@ def __init__( if token_loader_cls is None: token_loader_cls = FileWebIdentityTokenLoader self._token_loader_cls = token_loader_cls + self._feature_ids = set() def load(self): return self._assume_role_with_web_identity() @@ -1873,8 +1903,15 @@ def _get_env_config(self, key): def _get_config(self, key): env_value = self._get_env_config(key) if env_value is not None: + self._feature_ids.add('CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN') return env_value - return self._get_profile_config(key) + + config_value = self._get_profile_config(key) + if config_value is not None: + self._feature_ids.add('CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN') + return config_value + + return None def _assume_role_with_web_identity(self): token_path = self._get_config('web_identity_token_file') @@ -1904,6 +1941,10 @@ def _assume_role_with_web_identity(self): extra_args=extra_args, cache=self.cache, ) + fetcher.feature_ids = self._feature_ids.copy() + + self._feature_ids.add('CREDENTIALS_STS_ASSUME_ROLE_WEB_ID') + register_feature_ids(self._feature_ids) # The initial credentials are empty and the expiration time is set # to now so that we can delay the call to assume role until it is # strictly needed. diff --git a/botocore/useragent.py b/botocore/useragent.py index 98d171af43..5ef41de265 100644 --- a/botocore/useragent.py +++ b/botocore/useragent.py @@ -80,6 +80,13 @@ 'FLEXIBLE_CHECKSUMS_RES_WHEN_REQUIRED': 'c', 'CREDENTIALS_CODE': 'e', 'CREDENTIALS_ENV_VARS': 'g', + 'CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN': 'h', + 'CREDENTIALS_STS_ASSUME_ROLE': 'i', + 'CREDENTIALS_STS_ASSUME_ROLE_WEB_ID': 'k', + 'CREDENTIALS_PROFILE': 'n', + 'CREDENTIALS_PROFILE_SOURCE_PROFILE': 'o', + 'CREDENTIALS_PROFILE_NAMED_PROVIDER': 'p', + 'CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN': 'q', 'CREDENTIALS_HTTP': 'z', 'CREDENTIALS_IMDS': '0', 'BEARER_SERVICE_ENV_VARS': '3', @@ -105,6 +112,17 @@ def register_feature_id(feature_id): ctx.features.add(val) +def register_feature_ids(feature_ids): + """Adds multiple feature IDs to the current context object's ``features`` set. + + :type feature_ids: iterable of str + :param feature_ids: An iterable of feature ID strings to register. Each + value must be a key in the ``_USERAGENT_FEATURE_MAPPINGS`` dict. + """ + for feature_id in feature_ids: + register_feature_id(feature_id) + + def sanitize_user_agent_string_component(raw_str, allow_hash): """Replaces all not allowed characters in the string with a dash ("-"). diff --git a/tests/functional/test_credentials.py b/tests/functional/test_credentials.py index 94b2de7a3c..0538e949f4 100644 --- a/tests/functional/test_credentials.py +++ b/tests/functional/test_credentials.py @@ -1307,6 +1307,59 @@ def test_user_agent_feature_ids( patch_obj.stop() +@pytest.mark.parametrize( + "creds_env_var,creds_file_content,patches,expected_feature_id", + [ + ( + 'AWS_SHARED_CREDENTIALS_FILE', + '[default]\naws_access_key_id = FAKEACCESSKEY\naws_secret_access_key = FAKESECRET', + [ + patch( + "botocore.credentials.EnvProvider.load", return_value=None + ), + ], + 'n', + ), + ( + 'AWS_CONFIG_FILE', + '[default]\naws_access_key_id = FAKEACCESSKEY\naws_secret_access_key = FAKESECRET', + [ + patch( + "botocore.credentials.EnvProvider.load", return_value=None + ), + patch( + "botocore.credentials.SharedCredentialProvider.load", + return_value=None, + ), + ], + 'n', + ), + ], +) +def test_user_agent_has_file_based_feature_ids( + creds_env_var, + creds_file_content, + patches, + expected_feature_id, + tmp_path, + monkeypatch, +): + credentials_file = tmp_path / "creds" + credentials_file.write_text(creds_file_content) + monkeypatch.setenv(creds_env_var, str(credentials_file)) + + for patch_obj in patches: + patch_obj.start() + + try: + session = Session() + client = session.create_client("s3", region_name="us-east-1") + _assert_feature_ids_in_ua(client, expected_feature_id) + finally: + for patch_obj in patches: + patch_obj.stop() + + def _assert_feature_ids_in_ua(client, expected_feature_ids): """Helper to test feature IDs appear in user agent for multiple calls.""" with ClientHTTPStubber(client, strict=True) as http_stubber: @@ -1320,3 +1373,189 @@ def _assert_feature_ids_in_ua(client, expected_feature_ids): feature_list = parse_registered_feature_ids(ua_string) for expected_id in expected_feature_ids: assert expected_id in feature_list + + +@pytest.mark.parametrize( + "config_content,env_vars,expected_source_features,expected_provider_feature", + [ + # Test Case 1: Assume Role with source profile + ( + '''[profile assume-role-test] +role_arn = arn:aws:iam::123456789012:role/test-role +source_profile = base + +[profile base] +aws_access_key_id = FAKEACCESSKEY +aws_secret_access_key = FAKESECRET''', + {}, + [ + 'o', # CREDENTIALS_PROFILE_SOURCE_PROFILE + 'n', # CREDENTIALS_PROFILE + ], + 'i', # CREDENTIALS_STS_ASSUME_ROLE + ), + # Test Case 2: Assume Role with named provider + ( + '''[profile assume-role-test] +role_arn = arn:aws:iam::123456789012:role/test-role +credential_source = Environment''', + { + 'AWS_ACCESS_KEY_ID': 'FAKEACCESSKEY', + 'AWS_SECRET_ACCESS_KEY': 'FAKESECRET', + }, + [ + 'g', # CREDENTIALS_ENV_VARS + 'p', # CREDENTIALS_PROFILE_NAMED_PROVIDER + ], + 'i', # CREDENTIALS_STS_ASSUME_ROLE + ), + ], +) +def test_user_agent_has_assume_role_feature_ids( + config_content, + env_vars, + expected_source_features, + expected_provider_feature, + tmp_path, +): + session = _create_assume_role_session(config_content, tmp_path) + + # Set env vars if needed + with patch.dict(os.environ, env_vars, clear=True): + with SessionHTTPStubber(session) as stubber: + s3 = session.create_client('s3', region_name='us-east-1') + _add_assume_role_http_response(stubber, with_web_identity=False) + stubber.add_response() + stubber.add_response() + s3.list_buckets() + s3.list_buckets() + + ua_strings = get_captured_ua_strings(stubber) + _assert_deferred_credential_feature_ids( + ua_strings, expected_source_features, expected_provider_feature + ) + + +@pytest.mark.parametrize( + "test_case,config_content,env_vars,expected_source_features,expected_provider_feature", + [ + # Test Case 1: Assume Role with Web Identity through config profile + ( + "assume_role_with_web_identity_profile", + '''[profile assume-role-test] +role_arn = arn:aws:iam::123456789012:role/test-role +web_identity_token_file = {token_file}''', + {}, + ['q'], # CREDENTIALS_PROFILE_STS_WEB_ID_TOKEN + 'k', # CREDENTIALS_STS_ASSUME_ROLE_WEB_ID + ), + # Test Case 2: Assume Role with Web Identity through env vars + ( + "assume_role_with_web_identity_env_vars", + '', + { + 'AWS_ROLE_ARN': 'arn:aws:iam::123456789012:role/test-role', + 'AWS_WEB_IDENTITY_TOKEN_FILE': '{token_file}', + 'AWS_ROLE_SESSION_NAME': 'test-session', + }, + ['h'], # CREDENTIALS_ENV_VARS_STS_WEB_ID_TOKEN + 'k', # CREDENTIALS_STS_ASSUME_ROLE_WEB_ID + ), + ], +) +def test_user_agent_has_assume_role_with_web_identity_feature_ids( + test_case, + config_content, + env_vars, + expected_source_features, + expected_provider_feature, + monkeypatch, + tmp_path, +): + token_file = tmp_path / 'token.jwt' + token_file.write_text('fake-jwt-token') + if 'AWS_WEB_IDENTITY_TOKEN_FILE' in env_vars: + env_vars['AWS_WEB_IDENTITY_TOKEN_FILE'] = str(token_file) + elif config_content and 'web_identity_token_file' in config_content: + config_content = config_content.replace( + '{token_file}', str(token_file) + ) + + session = _create_assume_role_session(config_content, tmp_path) + + # Set env vars if needed + with patch.dict(os.environ, env_vars, clear=True): + with SessionHTTPStubber(session) as stubber: + s3 = session.create_client('s3', region_name='us-east-1') + _add_assume_role_http_response(stubber, with_web_identity=True) + stubber.add_response() + stubber.add_response() + s3.list_buckets() + s3.list_buckets() + + ua_strings = get_captured_ua_strings(stubber) + _assert_deferred_credential_feature_ids( + ua_strings, expected_source_features, expected_provider_feature + ) + + +def _create_assume_role_session(config_content, tmp_path): + if config_content: + config_file = tmp_path / 'config' + config_file.write_text(config_content) + session = Session(profile='assume-role-test') + session.set_config_variable('config_file', str(config_file)) + else: + session = Session() + return session + + +def _add_assume_role_http_response(stubber, with_web_identity): + """Add HTTP response for AssumeRole or AssumeRoleWithWebIdentity call with proper credentials""" + expiration = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime( + '%Y-%m-%dT%H:%M:%SZ' + ) + method_name = ( + 'AssumeRoleWithWebIdentity' if with_web_identity else 'AssumeRole' + ) + body = ( + f'<{method_name}Response>' + f' <{method_name}Result>' + ' ' + ' arn:aws:sts::123456789012:user' + ' AKID:test-session-123' + ' ' + ' ' + f' FAKEASSUMEROLEKEY' + f' FAKEASSUMEROLSECRET' + ' FAKETOKEN' + f' {expiration}' + ' ' + f' ' + f'' + ) + stubber.add_response(body=body.encode('utf-8')) + + +def _assert_deferred_credential_feature_ids( + ua_strings, + expected_source_features, + expected_provider_feature, +): + """Helper to assert feature IDs for deferred credential provider tests""" + assert len(ua_strings) == 3 + + # Request to fetch credentials should only register feature ids for the credential source + credential_source_feature_list = parse_registered_feature_ids( + ua_strings[0] + ) + for feature in expected_source_features: + assert feature in credential_source_feature_list + assert expected_provider_feature not in credential_source_feature_list + + # Original operation request should register feature ids for both the credential source and the provider + for i in [1, 2]: + operation_feature_list = parse_registered_feature_ids(ua_strings[i]) + for feature in expected_source_features: + assert feature in operation_feature_list + assert expected_provider_feature in operation_feature_list diff --git a/tests/unit/test_credentials.py b/tests/unit/test_credentials.py index 06a271095e..3c7b46d415 100644 --- a/tests/unit/test_credentials.py +++ b/tests/unit/test_credentials.py @@ -746,6 +746,30 @@ def test_account_id_with_invalid_arn(self): self.assertEqual(response, expected_response) self.assertEqual(response['account_id'], None) + @mock.patch('botocore.credentials.register_feature_ids') + def test_feature_ids_registered_during_get_credentials( + self, mock_register + ): + response = { + 'Credentials': { + 'AccessKeyId': 'foo', + 'SecretAccessKey': 'bar', + 'SessionToken': 'baz', + 'Expiration': self.some_future_time(), + } + } + client_creator = self.create_client_creator(with_response=response) + fetcher = credentials.AssumeRoleCredentialFetcher( + client_creator, self.source_creds, self.role_arn + ) + + test_feature_ids = {'test_feature_1', 'test_feature_2'} + fetcher.feature_ids = test_feature_ids + + fetcher.fetch_credentials() + # Verify register_credential_feature_ids was called with test feature IDs + mock_register.assert_called_once_with(test_feature_ids) + class TestAssumeRoleWithWebIdentityCredentialFetcher(BaseEnvVar): def setUp(self): @@ -903,6 +927,30 @@ def test_account_id_with_invalid_arn(self): self.assertEqual(response, expected_response) self.assertEqual(response['account_id'], None) + @mock.patch('botocore.credentials.register_feature_ids') + def test_feature_ids_registered_during_get_credentials( + self, mock_register + ): + response = { + 'Credentials': { + 'AccessKeyId': 'foo', + 'SecretAccessKey': 'bar', + 'SessionToken': 'baz', + 'Expiration': self.some_future_time(), + } + } + client_creator = self.create_client_creator(with_response=response) + fetcher = credentials.AssumeRoleWithWebIdentityCredentialFetcher( + client_creator, self.load_token, self.role_arn + ) + + test_feature_ids = {'test_feature_1', 'test_feature_2'} + fetcher.feature_ids = test_feature_ids + + fetcher.fetch_credentials() + # Verify register_credential_feature_ids was called with test feature IDs + mock_register.assert_called_once_with(test_feature_ids) + class TestAssumeRoleWithWebIdentityCredentialProvider(unittest.TestCase): def setUp(self):