diff --git a/CHANGELOG.md b/CHANGELOG.md index 90299bdf5..1bb06882a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,25 @@ [1]: https://pypi.org/project/google-auth/#history +## [2.42.1](https://github.com/googleapis/google-auth-library-python/compare/v2.42.0...v2.42.1) (2025-10-30) + + +### Bug Fixes + +* Catch ValueError for json.loads() ([#1842](https://github.com/googleapis/google-auth-library-python/issues/1842)) ([b074cad](https://github.com/googleapis/google-auth-library-python/commit/b074cad460589633adfc6744c01726ae86f2aa2b)) + +## [2.42.0](https://github.com/googleapis/google-auth-library-python/compare/v2.41.1...v2.42.0) (2025-10-24) + + +### Features + +* Add trust boundary support for external accounts. ([#1809](https://github.com/googleapis/google-auth-library-python/issues/1809)) ([36ecb1d](https://github.com/googleapis/google-auth-library-python/commit/36ecb1d65883477d27faf9c2281fc289659b9903)) + + +### Bug Fixes + +* Read scopes from ADC json for impersoanted cred ([#1820](https://github.com/googleapis/google-auth-library-python/issues/1820)) ([62c0fc8](https://github.com/googleapis/google-auth-library-python/commit/62c0fc82a3625542381f85c698595446fc99ddae)) + ## [2.41.1](https://github.com/googleapis/google-auth-library-python/compare/v2.41.0...v2.41.1) (2025-09-30) diff --git a/google/auth/_constants.py b/google/auth/_constants.py new file mode 100644 index 000000000..28e47025f --- /dev/null +++ b/google/auth/_constants.py @@ -0,0 +1,5 @@ +"""Shared constants.""" + +_SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}/allowedLocations" +_WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/locations/global/workforcePools/{pool_id}/allowedLocations" +_WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT = "https://iamcredentials.{universe_domain}/v1/projects/{project_number}/locations/global/workloadIdentityPools/{pool_id}/allowedLocations" diff --git a/google/auth/_helpers.py b/google/auth/_helpers.py index fba0ba3fa..4a69d4c61 100644 --- a/google/auth/_helpers.py +++ b/google/auth/_helpers.py @@ -489,7 +489,7 @@ def _parse_request_body(body: Optional[bytes], content_type: str = "") -> Any: if not content_type or "application/json" in content_type: try: return json.loads(body_str) - except (json.JSONDecodeError, TypeError): + except (TypeError, ValueError): return body_str if "application/x-www-form-urlencoded" in content_type: parsed_query = urllib.parse.parse_qs(body_str) diff --git a/google/auth/external_account.py b/google/auth/external_account.py index e4eb908f6..8eba0d249 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -36,6 +36,7 @@ import json import re +from google.auth import _constants from google.auth import _helpers from google.auth import credentials from google.auth import exceptions @@ -81,6 +82,7 @@ class Credentials( credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.CredentialsWithTokenUri, + credentials.CredentialsWithTrustBoundary, metaclass=abc.ABCMeta, ): """Base class for all external account credentials. @@ -173,10 +175,7 @@ def __init__( self._scopes = scopes self._default_scopes = default_scopes self._workforce_pool_user_project = workforce_pool_user_project - self._trust_boundary = { - "locations": [], - "encoded_locations": "0x0", - } # expose a placeholder trust boundary value. + self._trust_boundary = trust_boundary if self._client_id: self._client_auth = utils.ClientAuthentication( @@ -242,6 +241,7 @@ def _constructor_args(self): "scopes": self._scopes, "default_scopes": self._default_scopes, "universe_domain": self._universe_domain, + "trust_boundary": self._trust_boundary, } if not self.is_workforce_pool: args.pop("workforce_pool_user_project") @@ -412,8 +412,23 @@ def get_project_id(self, request): return None - @_helpers.copy_docstring(credentials.Credentials) def refresh(self, request): + """Refreshes the access token. + + For impersonated credentials, this method will refresh the underlying + source credentials and the impersonated credentials. For non-impersonated + credentials, it will refresh the access token and the trust boundary. + """ + self._refresh_token(request) + # If we are impersonating, the trust boundary is handled by the + # impersonated credentials object. We need to get it from there. + if self._service_account_impersonation_url: + self._trust_boundary = self._impersonated_credentials._trust_boundary + else: + # Otherwise, refresh the trust boundary for the external account. + self._refresh_trust_boundary(request) + + def _refresh_token(self, request): scopes = self._scopes if self._scopes is not None else self._default_scopes # Inject client certificate into request. @@ -463,6 +478,40 @@ def refresh(self, request): self.expiry = now + lifetime + def _build_trust_boundary_lookup_url(self): + """Builds and returns the URL for the trust boundary lookup API.""" + url = None + # Try to parse as a workload identity pool. + # Audience format: //iam.googleapis.com/projects/PROJECT_NUMBER/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID + workload_match = re.search( + r"projects/([^/]+)/locations/global/workloadIdentityPools/([^/]+)", + self._audience, + ) + if workload_match: + project_number, pool_id = workload_match.groups() + url = _constants._WORKLOAD_IDENTITY_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + universe_domain=self._universe_domain, + project_number=project_number, + pool_id=pool_id, + ) + else: + # If that fails, try to parse as a workforce pool. + # Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID + workforce_match = re.search( + r"locations/[^/]+/workforcePools/([^/]+)", self._audience + ) + if workforce_match: + pool_id = workforce_match.groups()[0] + url = _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + universe_domain=self._universe_domain, pool_id=pool_id + ) + + if url: + return url + else: + # If both fail, the audience format is invalid. + raise exceptions.InvalidValue("Invalid audience format.") + def _make_copy(self): kwargs = self._constructor_args() new_cred = self.__class__(**kwargs) @@ -489,6 +538,12 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred + @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + def with_trust_boundary(self, trust_boundary): + cred = self._make_copy() + cred._trust_boundary = trust_boundary + return cred + def _should_initialize_impersonated_credentials(self): return ( self._service_account_impersonation_url is not None @@ -537,6 +592,7 @@ def _initialize_impersonated_credentials(self): lifetime=self._service_account_impersonation_options.get( "token_lifetime_seconds" ), + trust_boundary=self._trust_boundary, ) def _create_default_metrics_options(self): @@ -623,6 +679,7 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), + trust_boundary=info.get("trust_boundary"), **kwargs ) diff --git a/google/auth/external_account_authorized_user.py b/google/auth/external_account_authorized_user.py index 53f75cf93..f8fbf950b 100644 --- a/google/auth/external_account_authorized_user.py +++ b/google/auth/external_account_authorized_user.py @@ -36,7 +36,9 @@ import datetime import io import json +import re +from google.auth import _constants from google.auth import _helpers from google.auth import credentials from google.auth import exceptions @@ -50,6 +52,7 @@ class Credentials( credentials.CredentialsWithQuotaProject, credentials.ReadOnlyScoped, credentials.CredentialsWithTokenUri, + credentials.CredentialsWithTrustBoundary, ): """Credentials for External Account Authorized Users. @@ -83,6 +86,7 @@ def __init__( scopes=None, quota_project_id=None, universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ): """Instantiates a external account authorized user credentials object. @@ -108,6 +112,7 @@ def __init__( create the credentials. universe_domain (Optional[str]): The universe domain. The default value is googleapis.com. + trust_boundary (Mapping[str,str]): A credential trust boundary. Returns: google.auth.external_account_authorized_user.Credentials: The @@ -118,7 +123,7 @@ def __init__( self.token = token self.expiry = expiry self._audience = audience - self._refresh_token = refresh_token + self._refresh_token_val = refresh_token self._token_url = token_url self._token_info_url = token_info_url self._client_id = client_id @@ -128,6 +133,7 @@ def __init__( self._scopes = scopes self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN self._cred_file_path = None + self._trust_boundary = trust_boundary if not self.valid and not self.can_refresh: raise exceptions.InvalidOperation( @@ -164,7 +170,7 @@ def info(self): def constructor_args(self): return { "audience": self._audience, - "refresh_token": self._refresh_token, + "refresh_token": self._refresh_token_val, "token_url": self._token_url, "token_info_url": self._token_info_url, "client_id": self._client_id, @@ -175,6 +181,7 @@ def constructor_args(self): "scopes": self._scopes, "quota_project_id": self._quota_project_id, "universe_domain": self._universe_domain, + "trust_boundary": self._trust_boundary, } @property @@ -184,7 +191,7 @@ def scopes(self): @property def requires_scopes(self): - """ False: OAuth 2.0 credentials have their scopes set when + """False: OAuth 2.0 credentials have their scopes set when the initial token is requested and can not be changed.""" return False @@ -201,13 +208,13 @@ def client_secret(self): @property def audience(self): """Optional[str]: The STS audience which contains the resource name for the - workforce pool and the provider identifier in that pool.""" + workforce pool and the provider identifier in that pool.""" return self._audience @property def refresh_token(self): """Optional[str]: The OAuth 2.0 refresh token.""" - return self._refresh_token + return self._refresh_token_val @property def token_url(self): @@ -226,13 +233,18 @@ def revoke_url(self): @property def is_user(self): - """ True: This credential always represents a user.""" + """True: This credential always represents a user.""" return True @property def can_refresh(self): return all( - (self._refresh_token, self._token_url, self._client_id, self._client_secret) + ( + self._refresh_token_val, + self._token_url, + self._client_id, + self._client_secret, + ) ) def get_project_id(self, request=None): @@ -266,7 +278,7 @@ def to_json(self, strip=None): strip = strip if strip else [] return json.dumps({k: v for (k, v) in self.info.items() if k not in strip}) - def refresh(self, request): + def _refresh_token(self, request): """Refreshes the access token. Args: @@ -285,7 +297,7 @@ def refresh(self, request): ) now = _helpers.utcnow() - response_data = self._make_sts_request(request) + response_data = self._sts_client.refresh_token(request, self._refresh_token_val) self.token = response_data.get("access_token") @@ -293,10 +305,21 @@ def refresh(self, request): self.expiry = now + lifetime if "refresh_token" in response_data: - self._refresh_token = response_data["refresh_token"] + self._refresh_token_val = response_data["refresh_token"] + + def _build_trust_boundary_lookup_url(self): + """Builds and returns the URL for the trust boundary lookup API.""" + # Audience format: //iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID + match = re.search(r"locations/[^/]+/workforcePools/([^/]+)", self._audience) + + if not match: + raise exceptions.InvalidValue("Invalid workforce pool audience format.") + + pool_id = match.groups()[0] - def _make_sts_request(self, request): - return self._sts_client.refresh_token(request, self._refresh_token) + return _constants._WORKFORCE_POOL_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + universe_domain=self._universe_domain, pool_id=pool_id + ) @_helpers.copy_docstring(credentials.Credentials) def get_cred_info(self): @@ -331,6 +354,12 @@ def with_universe_domain(self, universe_domain): cred._universe_domain = universe_domain return cred + @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) + def with_trust_boundary(self, trust_boundary): + cred = self._make_copy() + cred._trust_boundary = trust_boundary + return cred + @classmethod def from_info(cls, info, **kwargs): """Creates a Credentials instance from parsed external account info. @@ -375,6 +404,7 @@ def from_info(cls, info, **kwargs): universe_domain=info.get( "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), + trust_boundary=info.get("trust_boundary"), **kwargs ) diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index 1b67e4406..334573428 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -286,7 +286,7 @@ def _refresh_token(self, request): self._source_credentials.token_state == credentials.TokenState.STALE or self._source_credentials.token_state == credentials.TokenState.INVALID ): - self._source_credentials.refresh(request) + self._source_credentials._refresh_token(request) body = { "delegates": self._delegates, @@ -526,6 +526,8 @@ def from_impersonated_service_account_info(cls, info, scopes=None): target_principal = impersonation_url[start_index + 1 : end_index] delegates = info.get("delegates") quota_project_id = info.get("quota_project_id") + scopes = scopes or info.get("scopes") + trust_boundary = info.get("trust_boundary") return cls( source_credentials, @@ -533,6 +535,7 @@ def from_impersonated_service_account_info(cls, info, scopes=None): scopes, delegates, quota_project_id=quota_project_id, + trust_boundary=trust_boundary, ) diff --git a/google/auth/version.py b/google/auth/version.py index 6f67e6b34..4b6c4fb25 100644 --- a/google/auth/version.py +++ b/google/auth/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.41.1" +__version__ = "2.42.1" diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py index 55e020125..7520fe3bb 100644 --- a/google/oauth2/service_account.py +++ b/google/oauth2/service_account.py @@ -73,6 +73,7 @@ import copy import datetime +from google.auth import _constants from google.auth import _helpers from google.auth import _service_account_info from google.auth import credentials @@ -84,9 +85,6 @@ _DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds _GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" -_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( - "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" -) class Credentials( @@ -520,8 +518,9 @@ def _build_trust_boundary_lookup_url(self): raise ValueError( "Service account email is required to build the trust boundary lookup URL." ) - return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( - self._universe_domain, self._service_account_email + return _constants._SERVICE_ACCOUNT_TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( + universe_domain=self._universe_domain, + service_account_email=self._service_account_email, ) @_helpers.copy_docstring(credentials.Signing) diff --git a/system_tests/secrets.tar.enc b/system_tests/secrets.tar.enc index 8456a41bd..fa5030046 100644 Binary files a/system_tests/secrets.tar.enc and b/system_tests/secrets.tar.enc differ diff --git a/tests/test__default.py b/tests/test__default.py index e42b4dd94..78874c01a 100644 --- a/tests/test__default.py +++ b/tests/test__default.py @@ -14,6 +14,7 @@ import json import os +import warnings import mock import pytest # type: ignore @@ -398,7 +399,6 @@ def test_load_credentials_from_file_impersonated_passing_scopes(): def test_load_credentials_from_file_impersonated_wrong_target_principal(tmpdir): - with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE) as fh: impersonated_credentials_info = json.load(fh) impersonated_credentials_info[ @@ -414,7 +414,6 @@ def test_load_credentials_from_file_impersonated_wrong_target_principal(tmpdir): def test_load_credentials_from_file_impersonated_wrong_source_type(tmpdir): - with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE) as fh: impersonated_credentials_info = json.load(fh) impersonated_credentials_info["source_credentials"]["type"] = "external_account" @@ -1325,7 +1324,7 @@ def test_default_impersonated_service_account_set_default_scopes(get_adc_path): "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True ) def test_default_impersonated_service_account_set_both_scopes_and_default_scopes( - get_adc_path + get_adc_path, ): get_adc_path.return_value = IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE scopes = ["scope1", "scope2"] @@ -1410,3 +1409,54 @@ def test_quota_gce_credentials(unused_get, unused_ping): quota_project_id=explicit_quota ) assert credentials.quota_project_id == explicit_quota + + +def test_load_credentials_from_file_deprecation_warning(): + with pytest.warns( + DeprecationWarning, match="The load_credentials_from_file method is deprecated" + ): + _default.load_credentials_from_file(SERVICE_ACCOUNT_FILE) + + +def test_load_credentials_from_dict_deprecation_warning(): + with pytest.warns( + DeprecationWarning, match="The load_credentials_from_dict method is deprecated" + ): + _default.load_credentials_from_dict(SERVICE_ACCOUNT_FILE_DATA) + + +@mock.patch("google.auth._cloud_sdk.get_project_id", return_value=None, autospec=True) +@mock.patch("os.path.isfile", return_value=True, autospec=True) +@mock.patch( + "google.auth._cloud_sdk.get_application_default_credentials_path", + return_value=SERVICE_ACCOUNT_FILE, + autospec=True, +) +def test_get_gcloud_sdk_credentials_suppresses_deprecation_warning( + get_adc_path, isfile, get_project_id +): + with warnings.catch_warnings(record=True) as caught_warnings: + warnings.simplefilter("always") + + _default._get_gcloud_sdk_credentials() + + assert not any( + isinstance(w.message, DeprecationWarning) + and "load_credentials_from_file" in str(w.message) + for w in caught_warnings + ) + + +def test_get_explicit_environ_credentials_suppresses_deprecation_warning(monkeypatch): + monkeypatch.setenv(environment_vars.CREDENTIALS, SERVICE_ACCOUNT_FILE) + + with warnings.catch_warnings(record=True) as caught_warnings: + warnings.simplefilter("always") + + _default._get_explicit_environ_credentials() + + assert not any( + isinstance(w.message, DeprecationWarning) + and "load_credentials_from_file" in str(w.message) + for w in caught_warnings + ) diff --git a/tests/test__helpers.py b/tests/test__helpers.py index ce3ec11e2..2aecc0b7e 100644 --- a/tests/test__helpers.py +++ b/tests/test__helpers.py @@ -623,6 +623,34 @@ def test_parse_request_body_other_type(): assert _helpers._parse_request_body("string") is None +def test_parse_request_body_json_type_error(): + body = b'{"key": "value"}' + with mock.patch("json.loads", side_effect=TypeError): + # json.loads should raise a TypeError, and the function should return the + # original string + assert _helpers._parse_request_body(body, "application/json") == body.decode( + "utf-8" + ) + + +def test_parse_request_body_json_value_error(): + body = b'{"key": "value"}' + content_type = "application/json" + with mock.patch("json.loads", side_effect=ValueError): + # json.loads should raise a ValueError, and the function should return the + # original string + assert _helpers._parse_request_body(body, content_type) == body.decode("utf-8") + + +def test_parse_request_body_json_decode_error(): + body = b'{"key": "value"}' + content_type = "application/json" + with mock.patch("json.loads", side_effect=json.JSONDecodeError("msg", "doc", 0)): + # json.loads should raise a JSONDecodeError, and the function should return the + # original string + assert _helpers._parse_request_body(body, content_type) == body.decode("utf-8") + + def test_parse_response_json_valid(): class MockResponse: def json(self): diff --git a/tests/test_aws.py b/tests/test_aws.py index 41ce970d1..4f70bda4f 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -971,6 +971,7 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1000,6 +1001,7 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1031,6 +1033,7 @@ def test_from_info_supplier(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1068,6 +1071,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(aws.Credentials, "__init__", return_value=None) @@ -1098,6 +1102,7 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) def test_constructor_invalid_credential_source(self): diff --git a/tests/test_external_account.py b/tests/test_external_account.py index d86a19bef..2fa64361d 100644 --- a/tests/test_external_account.py +++ b/tests/test_external_account.py @@ -15,12 +15,14 @@ import datetime import http.client as http_client import json +import os import urllib import mock import pytest # type: ignore from google.auth import _helpers +from google.auth import environment_vars from google.auth import exceptions from google.auth import external_account from google.auth import transport @@ -126,6 +128,11 @@ class TestCredentials(object): "status": "INVALID_ARGUMENT", } } + NO_OP_TRUST_BOUNDARY = {"locations": [], "encodedLocations": "0x0"} + VALID_TRUST_BOUNDARY = { + "locations": ["us-central1", "us-east1"], + "encodedLocations": "0xVALIDHEXSA", + } PROJECT_ID = "my-proj-id" CLOUD_RESOURCE_MANAGER_URL = ( "https://cloudresourcemanager.googleapis.com/v1/projects/" @@ -151,6 +158,7 @@ def make_credentials( service_account_impersonation_url=None, service_account_impersonation_options={}, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ): return CredentialsImpl( audience=cls.AUDIENCE, @@ -166,6 +174,7 @@ def make_credentials( scopes=scopes, default_scopes=default_scopes, universe_domain=universe_domain, + trust_boundary=trust_boundary, ) @classmethod @@ -178,6 +187,7 @@ def make_workforce_pool_credentials( default_scopes=None, service_account_impersonation_url=None, workforce_pool_user_project=None, + trust_boundary=None, ): return CredentialsImpl( audience=cls.WORKFORCE_AUDIENCE, @@ -191,6 +201,7 @@ def make_workforce_pool_credentials( scopes=scopes, default_scopes=default_scopes, workforce_pool_user_project=workforce_pool_user_project, + trust_boundary=trust_boundary, ) @classmethod @@ -322,7 +333,7 @@ def test_default_state(self): assert not credentials.token_info_url def test_nonworkforce_with_workforce_pool_user_project(self): - with pytest.raises(ValueError) as excinfo: + with pytest.raises(exceptions.InvalidValue) as excinfo: CredentialsImpl( audience=self.AUDIENCE, subject_token_type=self.SUBJECT_TOKEN_TYPE, @@ -424,6 +435,7 @@ def test_with_scopes_full_options_propagated(self): scopes=["email"], default_scopes=["default2"], universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) def test_with_token_uri(self): @@ -512,6 +524,7 @@ def test_with_quota_project_full_options_propagated(self): scopes=self.SCOPES, default_scopes=["default1"], universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) # Confirm with_quota_project sets the correct quota project after @@ -706,6 +719,258 @@ def test_refresh_without_client_auth_success( assert not credentials.expired assert credentials.token == response["access_token"] + @mock.patch("google.auth.external_account.Credentials._lookup_trust_boundary") + def test_refresh_skips_trust_boundary_lookup_when_disabled( + self, mock_lookup_trust_boundary + ): + credentials = self.make_credentials() + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + + credentials.refresh(request) + + assert credentials.valid + assert credentials.token == self.SUCCESS_RESPONSE["access_token"] + mock_lookup_trust_boundary.assert_not_called() + headers_applied = {} + credentials.apply(headers_applied) + assert "x-allowed-locations" not in headers_applied + + def test_refresh_skips_sending_allowed_locations_header_with_trust_boundary(self): + # This test verifies that the x-allowed-locations header is not sent with + # the STS request even if a trust boundary is cached. + trust_boundary_value = {"encodedLocations": "0x12345"} + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", + } + request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "audience": self.AUDIENCE, + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "subject_token": "subject_token_0", + "subject_token_type": self.SUBJECT_TOKEN_TYPE, + } + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_credentials() + # Set a cached trust boundary. + credentials._trust_boundary = trust_boundary_value + + with mock.patch( + "google.auth.metrics.python_and_auth_lib_version", + return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, + ): + credentials.refresh(request) + + self.assert_token_request_kwargs(request.call_args[1], headers, request_data) + + def test_refresh_on_impersonated_credential_skips_parent_trust_boundary_lookup( + self, + ): + # This test verifies that the top-level impersonating credential + # does not perform a trust boundary lookup. + request = self.make_mock_request( + status=http_client.OK, + data=self.SUCCESS_RESPONSE, + impersonation_status=http_client.OK, + impersonation_data={ + "accessToken": "SA_ACCESS_TOKEN", + "expireTime": "2025-01-01T00:00:00Z", + }, + ) + credentials = self.make_credentials( + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL + ) + + with mock.patch.object( + credentials, "_refresh_trust_boundary", autospec=True + ) as mock_refresh_trust_boundary: + credentials.refresh(request) + + mock_refresh_trust_boundary.assert_not_called() + + def test_refresh_fetches_no_op_trust_boundary(self): + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_credentials() + + with mock.patch.object( + credentials, + "_lookup_trust_boundary", + return_value=self.NO_OP_TRUST_BOUNDARY, + ) as mock_lookup, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + mock_lookup.assert_called_once() + headers = {} + credentials.apply(headers) + assert headers["x-allowed-locations"] == "" + + def test_refresh_skips_lookup_with_cached_no_op_boundary(self): + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_credentials() + credentials._trust_boundary = self.NO_OP_TRUST_BOUNDARY + + with mock.patch.object( + credentials, "_lookup_trust_boundary" + ) as mock_lookup, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + mock_lookup.assert_not_called() + headers = {} + credentials.apply(headers) + assert headers["x-allowed-locations"] == "" + + def test_refresh_fails_on_lookup_failure_with_no_cache(self): + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_credentials() + + with mock.patch.object( + credentials, + "_lookup_trust_boundary", + side_effect=exceptions.RefreshError("Lookup failed"), + ) as mock_lookup, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ), pytest.raises( + exceptions.RefreshError, match="Lookup failed" + ): + credentials.refresh(request) + + mock_lookup.assert_called_once() + + def test_refresh_uses_cached_boundary_on_lookup_failure(self): + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_credentials() + credentials._trust_boundary = {"encodedLocations": "0x123"} + + with mock.patch.object( + credentials, + "_lookup_trust_boundary", + side_effect=exceptions.RefreshError("Lookup failed"), + ) as mock_lookup, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + mock_lookup.assert_called_once() + headers = {} + credentials.apply(headers) + assert headers["x-allowed-locations"] == "0x123" + + def test_refresh_propagates_trust_boundary_to_impersonated_credential(self): + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_credentials( + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, + trust_boundary=self.VALID_TRUST_BOUNDARY, + ) + impersonated_creds_mock = mock.Mock() + impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY + + with mock.patch( + "google.auth.external_account.impersonated_credentials.Credentials", + return_value=impersonated_creds_mock, + ) as mock_impersonated_creds, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + mock_impersonated_creds.assert_called_once_with( + source_credentials=mock.ANY, + target_principal=mock.ANY, + target_scopes=mock.ANY, + quota_project_id=mock.ANY, + iam_endpoint_override=mock.ANY, + lifetime=mock.ANY, + trust_boundary=self.VALID_TRUST_BOUNDARY, + ) + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + + def test_build_trust_boundary_lookup_url_workload(self): + credentials = self.make_credentials() + expected_url = "https://iamcredentials.googleapis.com/v1/projects/123456/locations/global/workloadIdentityPools/POOL_ID/allowedLocations" + assert credentials._build_trust_boundary_lookup_url() == expected_url + + def test_build_trust_boundary_lookup_url_workforce(self): + credentials = self.make_workforce_pool_credentials() + expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" + assert credentials._build_trust_boundary_lookup_url() == expected_url + + @pytest.mark.parametrize( + "audience", + [ + "invalid", + "//iam.googleapis.com/projects/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", + "//iam.googleapis.com/locations/global/workforcsePools//providers/provider-id", + ], + ) + def test_build_trust_boundary_lookup_url_invalid_audience(self, audience): + credentials = self.make_credentials() + credentials._audience = audience + with pytest.raises(exceptions.InvalidValue, match="Invalid audience format."): + credentials._build_trust_boundary_lookup_url() + + def test_refresh_fetches_trust_boundary_workload(self): + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_credentials() + + with mock.patch.object( + credentials, + "_lookup_trust_boundary", + return_value=self.VALID_TRUST_BOUNDARY, + ) as mock_lookup, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + mock_lookup.assert_called_once() + headers = {} + credentials.apply(headers) + assert ( + headers["x-allowed-locations"] + == self.VALID_TRUST_BOUNDARY["encodedLocations"] + ) + + def test_refresh_fetches_trust_boundary_workforce(self): + request = self.make_mock_request( + status=http_client.OK, data=self.SUCCESS_RESPONSE + ) + credentials = self.make_workforce_pool_credentials() + + with mock.patch.object( + credentials, + "_lookup_trust_boundary", + return_value=self.VALID_TRUST_BOUNDARY, + ) as mock_lookup, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + mock_lookup.assert_called_once() + headers = {} + credentials.apply(headers) + assert ( + headers["x-allowed-locations"] + == self.VALID_TRUST_BOUNDARY["encodedLocations"] + ) + @mock.patch( "google.auth.metrics.python_and_auth_lib_version", return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, @@ -920,9 +1185,6 @@ def test_refresh_impersonation_without_client_auth_success( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # TODO(negarb): Uncomment and update when trust boundary is supported - # for external account credentials. - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1012,7 +1274,6 @@ def test_refresh_impersonation_with_mtls_success( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1099,7 +1360,6 @@ def test_refresh_workforce_impersonation_without_client_auth_success( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1262,6 +1522,21 @@ def test_refresh_impersonation_invalid_impersonated_url_error(self): assert not credentials.expired assert credentials.token is None + def test_refresh_impersonation_invalid_url_format_error(self): + credentials = self.make_credentials( + service_account_impersonation_url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/:generateAccessToken/invalid", + scopes=self.SCOPES, + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(None) + + assert excinfo.match( + r"Unable to determine target principal from service account impersonation URL." + ) + assert not credentials.expired + assert credentials.token is None + @mock.patch( "google.auth.metrics.python_and_auth_lib_version", return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, @@ -1333,7 +1608,6 @@ def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1417,7 +1691,6 @@ def test_refresh_impersonation_with_client_auth_success_use_default_scopes( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1472,8 +1745,7 @@ def test_apply_without_quota_project_id(self): credentials.apply(headers) assert headers == { - "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - # "x-allowed-locations": "0x0", + "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) } def test_apply_workforce_without_quota_project_id(self): @@ -1489,8 +1761,7 @@ def test_apply_workforce_without_quota_project_id(self): credentials.apply(headers) assert headers == { - "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - # "x-allowed-locations": "0x0", + "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) } def test_apply_impersonation_without_quota_project_id(self): @@ -1521,8 +1792,7 @@ def test_apply_impersonation_without_quota_project_id(self): credentials.apply(headers) assert headers == { - "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - # "x-allowed-locations": "0x0", + "authorization": "Bearer {}".format(impersonation_response["accessToken"]) } def test_apply_with_quota_project_id(self): @@ -1539,7 +1809,6 @@ def test_apply_with_quota_project_id(self): "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), "x-goog-user-project": self.QUOTA_PROJECT_ID, - # "x-allowed-locations": "0x0", } def test_apply_impersonation_with_quota_project_id(self): @@ -1574,7 +1843,6 @@ def test_apply_impersonation_with_quota_project_id(self): "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), "x-goog-user-project": self.QUOTA_PROJECT_ID, - # "x-allowed-locations": "0x0", } def test_before_request(self): @@ -1590,7 +1858,6 @@ def test_before_request(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1599,7 +1866,6 @@ def test_before_request(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - # "x-allowed-locations": "0x0", } def test_before_request_workforce(self): @@ -1617,7 +1883,6 @@ def test_before_request_workforce(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1626,7 +1891,6 @@ def test_before_request_workforce(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - # "x-allowed-locations": "0x0", } def test_before_request_impersonation(self): @@ -1657,7 +1921,6 @@ def test_before_request_impersonation(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - # "x-allowed-locations": "0x0", } # Second call shouldn't call refresh. @@ -1666,7 +1929,6 @@ def test_before_request_impersonation(self): assert headers == { "other": "header-value", "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - # "x-allowed-locations": "0x0", } @mock.patch("google.auth._helpers.utcnow") @@ -1693,10 +1955,7 @@ def test_before_request_expired(self, utcnow): credentials.before_request(request, "POST", "https://example.com/api", headers) # Cached token should be used. - assert headers == { - "authorization": "Bearer token", - # "x-allowed-locations": "0x0", - } + assert headers == {"authorization": "Bearer token"} # Next call should simulate 1 second passed. utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1) @@ -1710,10 +1969,38 @@ def test_before_request_expired(self, utcnow): # New token should be retrieved. assert headers == { - "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), - # "x-allowed-locations": "0x0", + "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) } + def test_refresh_impersonation_trust_boundary(self): + request = self.make_mock_request( + status=http_client.OK, + data=self.SUCCESS_RESPONSE, + impersonation_status=http_client.OK, + impersonation_data={ + "accessToken": "SA_ACCESS_TOKEN", + "expireTime": "2025-01-01T00:00:00Z", + }, + ) + credentials = self.make_credentials( + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL + ) + impersonated_creds_mock = mock.Mock() + impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY + + with mock.patch( + "google.auth.external_account.impersonated_credentials.Credentials", + return_value=impersonated_creds_mock, + ): + credentials.refresh(request) + + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + + def test_with_trust_boundary(self): + credentials = self.make_credentials() + new_credentials = credentials.with_trust_boundary(self.VALID_TRUST_BOUNDARY) + assert new_credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + @mock.patch("google.auth._helpers.utcnow") def test_before_request_impersonation_expired(self, utcnow): headers = {} @@ -1754,10 +2041,7 @@ def test_before_request_impersonation_expired(self, utcnow): assert credentials.token_state == TokenState.FRESH # Cached token should be used. - assert headers == { - "authorization": "Bearer token", - # "x-allowed-locations": "0x0", - } + assert headers == {"authorization": "Bearer token"} # Next call should simulate 1 second passed. This will trigger the expiration # threshold. @@ -1774,8 +2058,7 @@ def test_before_request_impersonation_expired(self, utcnow): # New token should be retrieved. assert headers == { - "authorization": "Bearer {}".format(impersonation_response["accessToken"]), - # "x-allowed-locations": "0x0", + "authorization": "Bearer {}".format(impersonation_response["accessToken"]) } @pytest.mark.parametrize( @@ -1874,7 +2157,6 @@ def test_get_project_id_cloud_resource_manager_success( "x-goog-user-project": self.QUOTA_PROJECT_ID, "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -1928,7 +2210,6 @@ def test_get_project_id_cloud_resource_manager_success( "authorization": "Bearer {}".format( impersonation_response["accessToken"] ), - # "x-allowed-locations": "0x0", }, ) @@ -2000,7 +2281,6 @@ def test_workforce_pool_get_project_id_cloud_resource_manager_success( "authorization": "Bearer {}".format( self.SUCCESS_RESPONSE["access_token"] ), - # "x-allowed-locations": "0x0", }, ) @@ -2050,7 +2330,6 @@ def test_refresh_impersonation_with_lifetime( "Content-Type": "application/json", "authorization": "Bearer {}".format(token_response["access_token"]), "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, - # "x-allowed-locations": "0x0", } impersonation_request_data = { "delegates": None, @@ -2107,6 +2386,22 @@ def test_get_project_id_cloud_resource_manager_error(self): # Only 2 requests to STS and cloud resource manager should be sent. assert len(request.call_args_list) == 2 + def test_refresh_with_existing_impersonated_credentials(self): + credentials = self.make_credentials( + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL + ) + credentials._impersonated_credentials = mock.Mock() + request = self.make_mock_request() + + credentials.refresh(request) + + credentials._impersonated_credentials.refresh.assert_called_once_with(request) + + def test_get_mtls_cert_and_key_paths(self): + credentials = self.make_credentials() + with pytest.raises(NotImplementedError): + credentials._get_mtls_cert_and_key_paths() + def test_supplier_context(): context = external_account.SupplierContext("TestTokenType", "TestAudience") diff --git a/tests/test_external_account_authorized_user.py b/tests/test_external_account_authorized_user.py index 93926a131..a4e121781 100644 --- a/tests/test_external_account_authorized_user.py +++ b/tests/test_external_account_authorized_user.py @@ -15,10 +15,12 @@ import datetime import http.client as http_client import json +import os import mock import pytest # type: ignore +from google.auth import environment_vars from google.auth import exceptions from google.auth import external_account_authorized_user from google.auth import transport @@ -27,15 +29,12 @@ TOKEN_URL = "https://sts.googleapis.com/v1/token" TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect" REVOKE_URL = "https://sts.googleapis.com/v1/revoke" -PROJECT_NUMBER = "123456" QUOTA_PROJECT_ID = "654321" POOL_ID = "POOL_ID" PROVIDER_ID = "PROVIDER_ID" -AUDIENCE = ( - "//iam.googleapis.com/projects/{}" - "/locations/global/workloadIdentityPools/{}" - "/providers/{}" -).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID) +AUDIENCE = "//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}".format( + POOL_ID, PROVIDER_ID +) REFRESH_TOKEN = "REFRESH_TOKEN" NEW_REFRESH_TOKEN = "NEW_REFRESH_TOKEN" ACCESS_TOKEN = "ACCESS_TOKEN" @@ -193,7 +192,7 @@ def test_refresh_auth_success(self, utcnow): assert creds.valid assert not creds.requires_scopes assert creds.is_user - assert creds._refresh_token == REFRESH_TOKEN + assert creds._refresh_token_val == REFRESH_TOKEN request.assert_called_once_with( url=TOKEN_URL, @@ -227,7 +226,7 @@ def test_refresh_auth_success_new_refresh_token(self, utcnow): assert creds.valid assert not creds.requires_scopes assert creds.is_user - assert creds._refresh_token == NEW_REFRESH_TOKEN + assert creds._refresh_token_val == NEW_REFRESH_TOKEN request.assert_called_once_with( url=TOKEN_URL, @@ -465,7 +464,7 @@ def test_with_quota_project(self): ) new_creds = creds.with_quota_project(QUOTA_PROJECT_ID) assert new_creds._audience == creds._audience - assert new_creds._refresh_token == creds._refresh_token + assert new_creds._refresh_token_val == creds.refresh_token assert new_creds._token_url == creds._token_url assert new_creds._token_info_url == creds._token_info_url assert new_creds._client_id == creds._client_id @@ -484,7 +483,7 @@ def test_with_token_uri(self): ) new_creds = creds.with_token_uri("https://google.com") assert new_creds._audience == creds._audience - assert new_creds._refresh_token == creds._refresh_token + assert new_creds._refresh_token_val == creds.refresh_token assert new_creds._token_url == "https://google.com" assert new_creds._token_info_url == creds._token_info_url assert new_creds._client_id == creds._client_id @@ -503,7 +502,7 @@ def test_with_universe_domain(self): ) new_creds = creds.with_universe_domain(FAKE_UNIVERSE_DOMAIN) assert new_creds._audience == creds._audience - assert new_creds._refresh_token == creds._refresh_token + assert new_creds._refresh_token_val == creds.refresh_token assert new_creds._token_url == creds._token_url assert new_creds._token_info_url == creds._token_info_url assert new_creds._client_id == creds._client_id @@ -514,6 +513,26 @@ def test_with_universe_domain(self): assert new_creds._quota_project_id == QUOTA_PROJECT_ID assert new_creds.universe_domain == FAKE_UNIVERSE_DOMAIN + def test_with_trust_boundary(self): + creds = self.make_credentials( + token=ACCESS_TOKEN, + expiry=NOW, + revoke_url=REVOKE_URL, + quota_project_id=QUOTA_PROJECT_ID, + ) + new_creds = creds.with_trust_boundary({"encodedLocations": "new_boundary"}) + assert new_creds._audience == creds._audience + assert new_creds._refresh_token_val == creds.refresh_token + assert new_creds._token_url == creds._token_url + assert new_creds._token_info_url == creds._token_info_url + assert new_creds._client_id == creds._client_id + assert new_creds._client_secret == creds._client_secret + assert new_creds.token == creds.token + assert new_creds.expiry == creds.expiry + assert new_creds._revoke_url == creds._revoke_url + assert new_creds._quota_project_id == QUOTA_PROJECT_ID + assert new_creds._trust_boundary == {"encodedLocations": "new_boundary"} + def test_from_file_required_options_only(self, tmpdir): from_creds = self.make_credentials() config_file = tmpdir.join("config.json") @@ -557,3 +576,65 @@ def test_from_file_full_options(self, tmpdir): assert creds.scopes == SCOPES assert creds._revoke_url == REVOKE_URL assert creds._quota_project_id == QUOTA_PROJECT_ID + + def test_refresh_fetches_trust_boundary(self): + request = self.make_mock_request( + status=http_client.OK, + data={"access_token": ACCESS_TOKEN, "expires_in": 3600}, + ) + credentials = self.make_credentials() + + with mock.patch.object( + credentials, + "_lookup_trust_boundary", + return_value={"encodedLocations": "0x123"}, + ) as mock_lookup, mock.patch.dict( + os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} + ): + credentials.refresh(request) + + mock_lookup.assert_called_once() + headers = {} + credentials.apply(headers) + assert headers["x-allowed-locations"] == "0x123" + + def test_refresh_skips_trust_boundary_lookup_when_disabled(self): + request = self.make_mock_request( + status=http_client.OK, + data={"access_token": ACCESS_TOKEN, "expires_in": 3600}, + ) + credentials = self.make_credentials() + + with mock.patch.object( + credentials, "_lookup_trust_boundary" + ) as mock_lookup, mock.patch.dict(os.environ, {}, clear=True): + credentials.refresh(request) + + mock_lookup.assert_not_called() + headers = {} + credentials.apply(headers) + assert "x-allowed-locations" not in headers + + def test_build_trust_boundary_lookup_url(self): + credentials = self.make_credentials() + expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" + assert credentials._build_trust_boundary_lookup_url() == expected_url + + @pytest.mark.parametrize( + "audience", + [ + "invalid", + "//iam.googleapis.com/locations/global/workforcePools/", + "//iam.googleapis.com/locations/global/providers/", + "//iam.googleapis.com/workforcePools/POOL_ID/providers/PROVIDER_ID", + ], + ) + def test_build_trust_boundary_lookup_url_invalid_audience(self, audience): + credentials = self.make_credentials(audience=audience) + with pytest.raises(exceptions.InvalidValue): + credentials._build_trust_boundary_lookup_url() + + def test_build_trust_boundary_lookup_url_different_universe(self): + credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN) + expected_url = "https://iamcredentials.fake-universe-domain/v1/locations/global/workforcePools/POOL_ID/allowedLocations" + assert credentials._build_trust_boundary_lookup_url() == expected_url diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py index ec0f5074c..dbbdbf53a 100644 --- a/tests/test_identity_pool.py +++ b/tests/test_identity_pool.py @@ -506,6 +506,7 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -535,6 +536,7 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -566,6 +568,7 @@ def test_from_info_supplier(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -596,6 +599,7 @@ def test_from_info_workforce_pool(self, mock_init): quota_project_id=None, workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -632,6 +636,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -662,6 +667,7 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None) @@ -693,6 +699,7 @@ def test_from_file_workforce_pool(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) def test_constructor_nonworkforce_with_workforce_pool_user_project(self): diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 05cdb9dcc..2cfc05bef 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -178,6 +178,15 @@ def test_from_impersonated_service_account_info(self): ) assert isinstance(credentials, impersonated_credentials.Credentials) + def test_from_impersonated_service_account_info_with_trust_boundary(self): + info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) + info["trust_boundary"] = self.VALID_TRUST_BOUNDARY + credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info( + info + ) + assert isinstance(credentials, impersonated_credentials.Credentials) + assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY + def test_from_impersonated_service_account_info_with_invalid_source_credentials_type( self, ): @@ -204,6 +213,23 @@ def test_from_impersonated_service_account_info_with_invalid_impersonation_url( ) assert excinfo.match(r"Cannot extract target principal from") + def test_from_impersonated_service_account_info_with_scopes(self): + info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) + info["scopes"] = ["scope1", "scope2"] + credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info( + info + ) + assert credentials._target_scopes == ["scope1", "scope2"] + + def test_from_impersonated_service_account_info_with_scopes_param(self): + info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) + info["scopes"] = ["scope_from_info_1", "scope_from_info_2"] + scopes_param = ["scope_from_param_1", "scope_from_param_2"] + credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info( + info, scopes=scopes_param + ) + assert credentials._target_scopes == scopes_param + def test_get_cred_info(self): credentials = self.make_credentials() assert not credentials.get_cred_info() @@ -646,8 +672,8 @@ def test_refresh_source_credentials(self, time_skew): credentials._source_credentials.token = "Token" with mock.patch( - "google.oauth2.service_account.Credentials.refresh", autospec=True - ) as source_cred_refresh: + "google.oauth2.service_account.Credentials._refresh_token", autospec=True + ) as source_cred_refresh_token: expire_time = ( _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) @@ -659,15 +685,10 @@ def test_refresh_source_credentials(self, time_skew): credentials.refresh(request) - assert credentials.valid - assert not credentials.expired - - # Source credentials is refreshed only if it is expired within - # _helpers.REFRESH_THRESHOLD - if time_skew > 0: - source_cred_refresh.assert_not_called() + if time_skew <= 0: + source_cred_refresh_token.assert_called_once() else: - source_cred_refresh.assert_called_once() + source_cred_refresh_token.assert_not_called() def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials): credentials = self.make_credentials(lifetime=None) diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py index 6bee054c5..066920b22 100644 --- a/tests/test_pluggable.py +++ b/tests/test_pluggable.py @@ -272,6 +272,7 @@ def test_from_info_full_options(self, mock_init): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -300,6 +301,7 @@ def test_from_info_required_options_only(self, mock_init): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -335,6 +337,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): quota_project_id=QUOTA_PROJECT_ID, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -364,6 +367,7 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): quota_project_id=None, workforce_pool_user_project=None, universe_domain=DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, ) def test_constructor_invalid_options(self):