# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import http.client as http_client
import json
import os
import urllib.parse

import mock
import pytest  # type: ignore

from google.auth import _helpers
from google.auth import aws
from google.auth import environment_vars
from google.auth import exceptions
from google.auth import transport
from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN

IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
    "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp"
)

LANG_LIBRARY_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1"

CLIENT_ID = "username"
CLIENT_SECRET = "password"
# Base64 encoding of "username:password".
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = (
    "https://us-east1-iamcredentials.googleapis.com"
)
SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
    SERVICE_ACCOUNT_EMAIL
)
SERVICE_ACCOUNT_IMPERSONATION_URL = (
    SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
)
QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
SCOPES = ["scope1", "scope2"]
TOKEN_URL = "https://sts.googleapis.com/v1/token"
TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect"
SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request"
AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
IMDSV2_SESSION_TOKEN_URL = "http://169.254.169.254/latest/api/token"
SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
REGION_URL_IPV6 = "http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone"
IMDSV2_SESSION_TOKEN_URL_IPV6 = "http://[fd00:ec2::254]/latest/api/token"
SECURITY_CREDS_URL_IPV6 = (
    "http://[fd00:ec2::254]/latest/meta-data/iam/security-credentials"
)
CRED_VERIFICATION_URL = (
    "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
)
# Sample fictitious AWS security credentials to be used with tests that require a session token.
ACCESS_KEY_ID = "AKIAIOSFODNN7EXAMPLE"
SECRET_ACCESS_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
TOKEN = "AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE"
# To avoid json.dumps() differing behavior from one version to other,
# the JSON payload is hardcoded.
REQUEST_PARAMS = '{"KeySchema":[{"KeyType":"HASH","AttributeName":"Id"}],"TableName":"TestTable","AttributeDefinitions":[{"AttributeName":"Id","AttributeType":"S"}],"ProvisionedThroughput":{"WriteCapacityUnits":5,"ReadCapacityUnits":5}}'
# Each tuple contains the following entries:
# region, time, credentials, original_request, signed_request

VALID_TOKEN_URLS = [
    "https://sts.googleapis.com",
    "https://us-east-1.sts.googleapis.com",
    "https://US-EAST-1.sts.googleapis.com",
    "https://sts.us-east-1.googleapis.com",
    "https://sts.US-WEST-1.googleapis.com",
    "https://us-east-1-sts.googleapis.com",
    "https://US-WEST-1-sts.googleapis.com",
    "https://us-west-1-sts.googleapis.com/path?query",
    "https://sts-us-east-1.p.googleapis.com",
]
INVALID_TOKEN_URLS = [
    "https://iamcredentials.googleapis.com",
    "sts.googleapis.com",
    "https://",
    "http://sts.googleapis.com",
    "https://st.s.googleapis.com",
    "https://us-eas\t-1.sts.googleapis.com",
    "https:/us-east-1.sts.googleapis.com",
    "https://US-WE/ST-1-sts.googleapis.com",
    "https://sts-us-east-1.googleapis.com",
    "https://sts-US-WEST-1.googleapis.com",
    "testhttps://us-east-1.sts.googleapis.com",
    "https://us-east-1.sts.googleapis.comevil.com",
    "https://us-east-1.us-east-1.sts.googleapis.com",
    "https://us-ea.s.t.sts.googleapis.com",
    "https://sts.googleapis.comevil.com",
    "hhttps://us-east-1.sts.googleapis.com",
    "https://us- -1.sts.googleapis.com",
    "https://-sts.googleapis.com",
    "https://us-east-1.sts.googleapis.com.evil.com",
    "https://sts.pgoogleapis.com",
    "https://p.googleapis.com",
    "https://sts.p.com",
    "http://sts.p.googleapis.com",
    "https://xyz-sts.p.googleapis.com",
    "https://sts-xyz.123.p.googleapis.com",
    "https://sts-xyz.p1.googleapis.com",
    "https://sts-xyz.p.foo.com",
    "https://sts-xyz.p.foo.googleapis.com",
]
VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
    "https://iamcredentials.googleapis.com",
    "https://us-east-1.iamcredentials.googleapis.com",
    "https://US-EAST-1.iamcredentials.googleapis.com",
    "https://iamcredentials.us-east-1.googleapis.com",
    "https://iamcredentials.US-WEST-1.googleapis.com",
    "https://us-east-1-iamcredentials.googleapis.com",
    "https://US-WEST-1-iamcredentials.googleapis.com",
    "https://us-west-1-iamcredentials.googleapis.com/path?query",
    "https://iamcredentials-us-east-1.p.googleapis.com",
]
INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [
    "https://sts.googleapis.com",
    "iamcredentials.googleapis.com",
    "https://",
    "http://iamcredentials.googleapis.com",
    "https://iamcre.dentials.googleapis.com",
    "https://us-eas\t-1.iamcredentials.googleapis.com",
    "https:/us-east-1.iamcredentials.googleapis.com",
    "https://US-WE/ST-1-iamcredentials.googleapis.com",
    "https://iamcredentials-us-east-1.googleapis.com",
    "https://iamcredentials-US-WEST-1.googleapis.com",
    "testhttps://us-east-1.iamcredentials.googleapis.com",
    "https://us-east-1.iamcredentials.googleapis.comevil.com",
    "https://us-east-1.us-east-1.iamcredentials.googleapis.com",
    "https://us-ea.s.t.iamcredentials.googleapis.com",
    "https://iamcredentials.googleapis.comevil.com",
    "hhttps://us-east-1.iamcredentials.googleapis.com",
    "https://us- -1.iamcredentials.googleapis.com",
    "https://-iamcredentials.googleapis.com",
    "https://us-east-1.iamcredentials.googleapis.com.evil.com",
    "https://iamcredentials.pgoogleapis.com",
    "https://p.googleapis.com",
    "https://iamcredentials.p.com",
    "http://iamcredentials.p.googleapis.com",
    "https://xyz-iamcredentials.p.googleapis.com",
    "https://iamcredentials-xyz.123.p.googleapis.com",
    "https://iamcredentials-xyz.p1.googleapis.com",
    "https://iamcredentials-xyz.p.foo.com",
    "https://iamcredentials-xyz.p.foo.googleapis.com",
]
TEST_FIXTURES = [
    # GET request (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with relative path (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com/foo/bar/../..",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/foo/bar/../..",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with /./ path (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com/./",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/./",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with pointless dot path (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com/./foo",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/./foo",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=910e4d6c9abafaf87898e1eb4c929135782ea25bb0279703146455745391e63a",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with utf8 path (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com/%E1%88%B4",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/%E1%88%B4",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=8d6634c189aa8c75c2e51e106b6b5121bed103fdb351f7d7d4381c738823af74",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with duplicate query key (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com/?foo=Zoo&foo=aha",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/?foo=Zoo&foo=aha",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=be7148d34ebccdc6423b19085378aa0bee970bdc61d144bd1a8c48c33079ab09",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with duplicate out of order query key (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com/?foo=b&foo=a",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/?foo=b&foo=a",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=feb926e49e382bec75c9d7dcb2a1b6dc8aa50ca43c25d2bc51143768c0875acc",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with utf8 query (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "GET",
            "url": "https://host.foo.com/?{}=bar".format(
                urllib.parse.unquote("%E1%88%B4")
            ),
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/?{}=bar".format(
                urllib.parse.unquote("%E1%88%B4")
            ),
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=6fb359e9a05394cc7074e0feb42573a2601abc0c869a953e8c5c12e4e01f1a8c",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # POST request with sorted headers (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "POST",
            "url": "https://host.foo.com/",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "ZOO": "zoobar"},
        },
        {
            "url": "https://host.foo.com/",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=b7a95a52518abbca0964a999a880429ab734f35ebbf1235bd79a5de87756dc4a",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
                "ZOO": "zoobar",
            },
        },
    ),
    # POST request with upper case header value from AWS Python test harness.
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "POST",
            "url": "https://host.foo.com/",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "zoo": "ZOOBAR"},
        },
        {
            "url": "https://host.foo.com/",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=273313af9d0c265c531e11db70bbd653f3ba074c1009239e8559d3987039cad7",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
                "zoo": "ZOOBAR",
            },
        },
    ),
    # POST request with header and no body (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "POST",
            "url": "https://host.foo.com/",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "p": "phfft"},
        },
        {
            "url": "https://host.foo.com/",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;p, Signature=debf546796015d6f6ded8626f5ce98597c33b47b9164cf6b17b4642036fcb592",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
                "p": "phfft",
            },
        },
    ),
    # POST request with body and no header (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "POST",
            "url": "https://host.foo.com/",
            "headers": {
                "Content-Type": "application/x-www-form-urlencoded",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
            "data": "foo=bar",
        },
        {
            "url": "https://host.foo.com/",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=content-type;date;host, Signature=5a15b22cf462f047318703b92e6f4f38884e4a7ab7b1d6426ca46a8bd1c26cbc",
                "host": "host.foo.com",
                "Content-Type": "application/x-www-form-urlencoded",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
            "data": "foo=bar",
        },
    ),
    # POST request with querystring (AWS botocore tests).
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.req
    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.sreq
    (
        "us-east-1",
        "2011-09-09T23:36:00Z",
        {
            "access_key_id": "AKIDEXAMPLE",
            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
        },
        {
            "method": "POST",
            "url": "https://host.foo.com/?foo=bar",
            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
        },
        {
            "url": "https://host.foo.com/?foo=bar",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b6e3b79003ce0743a491606ba1035a804593b0efb1e20a11cba83f8c25a57a92",
                "host": "host.foo.com",
                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
            },
        },
    ),
    # GET request with session token credentials.
    (
        "us-east-2",
        "2020-08-11T06:55:22Z",
        {
            "access_key_id": ACCESS_KEY_ID,
            "secret_access_key": SECRET_ACCESS_KEY,
            "security_token": TOKEN,
        },
        {
            "method": "GET",
            "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
        },
        {
            "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
            "method": "GET",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential="
                + ACCESS_KEY_ID
                + "/20200811/us-east-2/ec2/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=41e226f997bf917ec6c9b2b14218df0874225f13bb153236c247881e614fafc9",
                "host": "ec2.us-east-2.amazonaws.com",
                "x-amz-date": "20200811T065522Z",
                "x-amz-security-token": TOKEN,
            },
        },
    ),
    # POST request with session token credentials.
    (
        "us-east-2",
        "2020-08-11T06:55:22Z",
        {
            "access_key_id": ACCESS_KEY_ID,
            "secret_access_key": SECRET_ACCESS_KEY,
            "security_token": TOKEN,
        },
        {
            "method": "POST",
            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
        },
        {
            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential="
                + ACCESS_KEY_ID
                + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=596aa990b792d763465d73703e684ca273c45536c6d322c31be01a41d02e5b60",
                "host": "sts.us-east-2.amazonaws.com",
                "x-amz-date": "20200811T065522Z",
                "x-amz-security-token": TOKEN,
            },
        },
    ),
    # POST request with computed x-amz-date and no data.
    (
        "us-east-2",
        "2020-08-11T06:55:22Z",
        {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY},
        {
            "method": "POST",
            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
        },
        {
            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential="
                + ACCESS_KEY_ID
                + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date, Signature=9e722e5b7bfa163447e2a14df118b45ebd283c5aea72019bdf921d6e7dc01a9a",
                "host": "sts.us-east-2.amazonaws.com",
                "x-amz-date": "20200811T065522Z",
            },
        },
    ),
    # POST request with session token and additional headers/data.
    (
        "us-east-2",
        "2020-08-11T06:55:22Z",
        {
            "access_key_id": ACCESS_KEY_ID,
            "secret_access_key": SECRET_ACCESS_KEY,
            "security_token": TOKEN,
        },
        {
            "method": "POST",
            "url": "https://dynamodb.us-east-2.amazonaws.com/",
            "headers": {
                "Content-Type": "application/x-amz-json-1.0",
                "x-amz-target": "DynamoDB_20120810.CreateTable",
            },
            "data": REQUEST_PARAMS,
        },
        {
            "url": "https://dynamodb.us-east-2.amazonaws.com/",
            "method": "POST",
            "headers": {
                "Authorization": "AWS4-HMAC-SHA256 Credential="
                + ACCESS_KEY_ID
                + "/20200811/us-east-2/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-security-token;x-amz-target, Signature=eb8bce0e63654bba672d4a8acb07e72d69210c1797d56ce024dbbc31beb2a2c7",
                "host": "dynamodb.us-east-2.amazonaws.com",
                "x-amz-date": "20200811T065522Z",
                "Content-Type": "application/x-amz-json-1.0",
                "x-amz-target": "DynamoDB_20120810.CreateTable",
                "x-amz-security-token": TOKEN,
            },
            "data": REQUEST_PARAMS,
        },
    ),
]


class TestRequestSigner(object):
    @pytest.mark.parametrize(
        "region, time, credentials, original_request, signed_request", TEST_FIXTURES
    )
    @mock.patch("google.auth._helpers.utcnow")
    def test_get_request_options(
        self, utcnow, region, time, credentials, original_request, signed_request
    ):
        utcnow.return_value = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
        request_signer = aws.RequestSigner(region)
        actual_signed_request = request_signer.get_request_options(
            credentials,
            original_request.get("url"),
            original_request.get("method"),
            original_request.get("data"),
            original_request.get("headers"),
        )

        assert actual_signed_request == signed_request

    def test_get_request_options_with_missing_scheme_url(self):
        request_signer = aws.RequestSigner("us-east-2")

        with pytest.raises(ValueError) as excinfo:
            request_signer.get_request_options(
                {
                    "access_key_id": ACCESS_KEY_ID,
                    "secret_access_key": SECRET_ACCESS_KEY,
                },
                "invalid",
                "POST",
            )

        assert excinfo.match(r"Invalid AWS service URL")

    def test_get_request_options_with_invalid_scheme_url(self):
        request_signer = aws.RequestSigner("us-east-2")

        with pytest.raises(ValueError) as excinfo:
            request_signer.get_request_options(
                {
                    "access_key_id": ACCESS_KEY_ID,
                    "secret_access_key": SECRET_ACCESS_KEY,
                },
                "http://invalid",
                "POST",
            )

        assert excinfo.match(r"Invalid AWS service URL")

    def test_get_request_options_with_missing_hostname_url(self):
        request_signer = aws.RequestSigner("us-east-2")

        with pytest.raises(ValueError) as excinfo:
            request_signer.get_request_options(
                {
                    "access_key_id": ACCESS_KEY_ID,
                    "secret_access_key": SECRET_ACCESS_KEY,
                },
                "https://",
                "POST",
            )

        assert excinfo.match(r"Invalid AWS service URL")


class TestCredentials(object):
    AWS_REGION = "us-east-2"
    AWS_ROLE = "gcp-aws-role"
    AWS_SECURITY_CREDENTIALS_RESPONSE = {
        "AccessKeyId": ACCESS_KEY_ID,
        "SecretAccessKey": SECRET_ACCESS_KEY,
        "Token": TOKEN,
    }
    AWS_IMDSV2_SESSION_TOKEN = "awsimdsv2sessiontoken"
    AWS_SIGNATURE_TIME = "2020-08-11T06:55:22Z"
    CREDENTIAL_SOURCE = {
        "environment_id": "aws1",
        "region_url": REGION_URL,
        "url": SECURITY_CREDS_URL,
        "regional_cred_verification_url": CRED_VERIFICATION_URL,
    }
    CREDENTIAL_SOURCE_IPV6 = {
        "environment_id": "aws1",
        "region_url": REGION_URL_IPV6,
        "url": SECURITY_CREDS_URL_IPV6,
        "regional_cred_verification_url": CRED_VERIFICATION_URL,
        "imdsv2_session_token_url": IMDSV2_SESSION_TOKEN_URL_IPV6,
    }
    SUCCESS_RESPONSE = {
        "access_token": "ACCESS_TOKEN",
        "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
        "token_type": "Bearer",
        "expires_in": 3600,
        "scope": " ".join(SCOPES),
    }

    @classmethod
    def make_serialized_aws_signed_request(
        cls,
        aws_security_credentials,
        region_name="us-east-2",
        url="https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
    ):
        """Utility to generate serialize AWS signed requests.
        This makes it easy to assert generated subject tokens based on the
        provided AWS security credentials, regions and AWS STS endpoint.
        """
        request_signer = aws.RequestSigner(region_name)
        signed_request = request_signer.get_request_options(
            aws_security_credentials, url, "POST"
        )
        reformatted_signed_request = {
            "url": signed_request.get("url"),
            "method": signed_request.get("method"),
            "headers": [
                {
                    "key": "Authorization",
                    "value": signed_request.get("headers").get("Authorization"),
                },
                {"key": "host", "value": signed_request.get("headers").get("host")},
                {
                    "key": "x-amz-date",
                    "value": signed_request.get("headers").get("x-amz-date"),
                },
            ],
        }
        # Include security token if available.
        if "security_token" in aws_security_credentials:
            reformatted_signed_request.get("headers").append(
                {
                    "key": "x-amz-security-token",
                    "value": signed_request.get("headers").get("x-amz-security-token"),
                }
            )
        # Append x-goog-cloud-target-resource header.
        reformatted_signed_request.get("headers").append(
            {"key": "x-goog-cloud-target-resource", "value": AUDIENCE}
        ),
        return urllib.parse.quote(
            json.dumps(
                reformatted_signed_request, separators=(",", ":"), sort_keys=True
            )
        )

    @classmethod
    def make_mock_request(
        cls,
        region_status=None,
        region_name=None,
        role_status=None,
        role_name=None,
        security_credentials_status=None,
        security_credentials_data=None,
        token_status=None,
        token_data=None,
        impersonation_status=None,
        impersonation_data=None,
        imdsv2_session_token_status=None,
        imdsv2_session_token_data=None,
    ):
        """Utility function to generate a mock HTTP request object.
        This will facilitate testing various edge cases by specify how the
        various endpoints will respond while generating a Google Access token
        in an AWS environment.
        """
        responses = []
        if imdsv2_session_token_status:
            # AWS session token request
            imdsv2_session_response = mock.create_autospec(
                transport.Response, instance=True
            )
            imdsv2_session_response.status = imdsv2_session_token_status
            imdsv2_session_response.data = imdsv2_session_token_data
            responses.append(imdsv2_session_response)

        if region_status:
            # AWS region request.
            region_response = mock.create_autospec(transport.Response, instance=True)
            region_response.status = region_status
            if region_name:
                region_response.data = "{}b".format(region_name).encode("utf-8")
            responses.append(region_response)

        if role_status:
            # AWS role name request.
            role_response = mock.create_autospec(transport.Response, instance=True)
            role_response.status = role_status
            if role_name:
                role_response.data = role_name.encode("utf-8")
            responses.append(role_response)

        if security_credentials_status:
            # AWS security credentials request.
            security_credentials_response = mock.create_autospec(
                transport.Response, instance=True
            )
            security_credentials_response.status = security_credentials_status
            if security_credentials_data:
                security_credentials_response.data = json.dumps(
                    security_credentials_data
                ).encode("utf-8")
            responses.append(security_credentials_response)

        if token_status:
            # GCP token exchange request.
            token_response = mock.create_autospec(transport.Response, instance=True)
            token_response.status = token_status
            token_response.data = json.dumps(token_data).encode("utf-8")
            responses.append(token_response)

        if impersonation_status:
            # Service account impersonation request.
            impersonation_response = mock.create_autospec(
                transport.Response, instance=True
            )
            impersonation_response.status = impersonation_status
            impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
            responses.append(impersonation_response)

        request = mock.create_autospec(transport.Request)
        request.side_effect = responses

        return request

    @classmethod
    def make_credentials(
        cls,
        credential_source,
        token_url=TOKEN_URL,
        token_info_url=TOKEN_INFO_URL,
        client_id=None,
        client_secret=None,
        quota_project_id=None,
        scopes=None,
        default_scopes=None,
        service_account_impersonation_url=None,
    ):
        return aws.Credentials(
            audience=AUDIENCE,
            subject_token_type=SUBJECT_TOKEN_TYPE,
            token_url=token_url,
            token_info_url=token_info_url,
            service_account_impersonation_url=service_account_impersonation_url,
            credential_source=credential_source,
            client_id=client_id,
            client_secret=client_secret,
            quota_project_id=quota_project_id,
            scopes=scopes,
            default_scopes=default_scopes,
        )

    @classmethod
    def assert_aws_metadata_request_kwargs(
        cls, request_kwargs, url, headers=None, method="GET"
    ):
        assert request_kwargs["url"] == url
        # All used AWS metadata server endpoints use GET HTTP method.
        assert request_kwargs["method"] == method
        if headers:
            assert request_kwargs["headers"] == headers
        else:
            assert "headers" not in request_kwargs or request_kwargs["headers"] is None
        # None of the endpoints used require any data in request.
        assert "body" not in request_kwargs

    @classmethod
    def assert_token_request_kwargs(
        cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
    ):
        assert request_kwargs["url"] == token_url
        assert request_kwargs["method"] == "POST"
        assert request_kwargs["headers"] == headers
        assert request_kwargs["body"] is not None
        body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
        assert len(body_tuples) == len(request_data.keys())
        for (k, v) in body_tuples:
            assert v.decode("utf-8") == request_data[k.decode("utf-8")]

    @classmethod
    def assert_impersonation_request_kwargs(
        cls,
        request_kwargs,
        headers,
        request_data,
        service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
    ):
        assert request_kwargs["url"] == service_account_impersonation_url
        assert request_kwargs["method"] == "POST"
        assert request_kwargs["headers"] == headers
        assert request_kwargs["body"] is not None
        body_json = json.loads(request_kwargs["body"].decode("utf-8"))
        assert body_json == request_data

    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
    def test_from_info_full_options(self, mock_init):
        credentials = aws.Credentials.from_info(
            {
                "audience": AUDIENCE,
                "subject_token_type": SUBJECT_TOKEN_TYPE,
                "token_url": TOKEN_URL,
                "token_info_url": TOKEN_INFO_URL,
                "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
                "service_account_impersonation": {"token_lifetime_seconds": 2800},
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "quota_project_id": QUOTA_PROJECT_ID,
                "credential_source": self.CREDENTIAL_SOURCE,
            }
        )

        # Confirm aws.Credentials instance initialized with the expected parameters.
        assert isinstance(credentials, aws.Credentials)
        mock_init.assert_called_once_with(
            audience=AUDIENCE,
            subject_token_type=SUBJECT_TOKEN_TYPE,
            token_url=TOKEN_URL,
            token_info_url=TOKEN_INFO_URL,
            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
            service_account_impersonation_options={"token_lifetime_seconds": 2800},
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            credential_source=self.CREDENTIAL_SOURCE,
            quota_project_id=QUOTA_PROJECT_ID,
            workforce_pool_user_project=None,
            universe_domain=DEFAULT_UNIVERSE_DOMAIN,
        )

    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
    def test_from_info_required_options_only(self, mock_init):
        credentials = aws.Credentials.from_info(
            {
                "audience": AUDIENCE,
                "subject_token_type": SUBJECT_TOKEN_TYPE,
                "token_url": TOKEN_URL,
                "credential_source": self.CREDENTIAL_SOURCE,
            }
        )

        # Confirm aws.Credentials instance initialized with the expected parameters.
        assert isinstance(credentials, aws.Credentials)
        mock_init.assert_called_once_with(
            audience=AUDIENCE,
            subject_token_type=SUBJECT_TOKEN_TYPE,
            token_url=TOKEN_URL,
            token_info_url=None,
            service_account_impersonation_url=None,
            service_account_impersonation_options={},
            client_id=None,
            client_secret=None,
            credential_source=self.CREDENTIAL_SOURCE,
            quota_project_id=None,
            workforce_pool_user_project=None,
            universe_domain=DEFAULT_UNIVERSE_DOMAIN,
        )

    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
    def test_from_file_full_options(self, mock_init, tmpdir):
        info = {
            "audience": AUDIENCE,
            "subject_token_type": SUBJECT_TOKEN_TYPE,
            "token_url": TOKEN_URL,
            "token_info_url": TOKEN_INFO_URL,
            "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
            "service_account_impersonation": {"token_lifetime_seconds": 2800},
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "quota_project_id": QUOTA_PROJECT_ID,
            "credential_source": self.CREDENTIAL_SOURCE,
            "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
        }
        config_file = tmpdir.join("config.json")
        config_file.write(json.dumps(info))
        credentials = aws.Credentials.from_file(str(config_file))

        # Confirm aws.Credentials instance initialized with the expected parameters.
        assert isinstance(credentials, aws.Credentials)
        mock_init.assert_called_once_with(
            audience=AUDIENCE,
            subject_token_type=SUBJECT_TOKEN_TYPE,
            token_url=TOKEN_URL,
            token_info_url=TOKEN_INFO_URL,
            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
            service_account_impersonation_options={"token_lifetime_seconds": 2800},
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            credential_source=self.CREDENTIAL_SOURCE,
            quota_project_id=QUOTA_PROJECT_ID,
            workforce_pool_user_project=None,
            universe_domain=DEFAULT_UNIVERSE_DOMAIN,
        )

    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
    def test_from_file_required_options_only(self, mock_init, tmpdir):
        info = {
            "audience": AUDIENCE,
            "subject_token_type": SUBJECT_TOKEN_TYPE,
            "token_url": TOKEN_URL,
            "credential_source": self.CREDENTIAL_SOURCE,
        }
        config_file = tmpdir.join("config.json")
        config_file.write(json.dumps(info))
        credentials = aws.Credentials.from_file(str(config_file))

        # Confirm aws.Credentials instance initialized with the expected parameters.
        assert isinstance(credentials, aws.Credentials)
        mock_init.assert_called_once_with(
            audience=AUDIENCE,
            subject_token_type=SUBJECT_TOKEN_TYPE,
            token_url=TOKEN_URL,
            token_info_url=None,
            service_account_impersonation_url=None,
            service_account_impersonation_options={},
            client_id=None,
            client_secret=None,
            credential_source=self.CREDENTIAL_SOURCE,
            quota_project_id=None,
            workforce_pool_user_project=None,
            universe_domain=DEFAULT_UNIVERSE_DOMAIN,
        )

    def test_constructor_invalid_credential_source(self):
        # Provide invalid credential source.
        credential_source = {"unsupported": "value"}

        with pytest.raises(ValueError) as excinfo:
            self.make_credentials(credential_source=credential_source)

        assert excinfo.match(r"No valid AWS 'credential_source' provided")

    def test_constructor_invalid_environment_id(self):
        # Provide invalid environment_id.
        credential_source = self.CREDENTIAL_SOURCE.copy()
        credential_source["environment_id"] = "azure1"

        with pytest.raises(ValueError) as excinfo:
            self.make_credentials(credential_source=credential_source)

        assert excinfo.match(r"No valid AWS 'credential_source' provided")

    def test_constructor_missing_cred_verification_url(self):
        # regional_cred_verification_url is a required field.
        credential_source = self.CREDENTIAL_SOURCE.copy()
        credential_source.pop("regional_cred_verification_url")

        with pytest.raises(ValueError) as excinfo:
            self.make_credentials(credential_source=credential_source)

        assert excinfo.match(r"No valid AWS 'credential_source' provided")

    def test_constructor_invalid_environment_id_version(self):
        # Provide an unsupported version.
        credential_source = self.CREDENTIAL_SOURCE.copy()
        credential_source["environment_id"] = "aws3"

        with pytest.raises(ValueError) as excinfo:
            self.make_credentials(credential_source=credential_source)

        assert excinfo.match(r"aws version '3' is not supported in the current build.")

    def test_info(self):
        credentials = self.make_credentials(
            credential_source=self.CREDENTIAL_SOURCE.copy()
        )

        assert credentials.info == {
            "type": "external_account",
            "audience": AUDIENCE,
            "subject_token_type": SUBJECT_TOKEN_TYPE,
            "token_url": TOKEN_URL,
            "token_info_url": TOKEN_INFO_URL,
            "credential_source": self.CREDENTIAL_SOURCE,
            "universe_domain": DEFAULT_UNIVERSE_DOMAIN,
        }

    def test_token_info_url(self):
        credentials = self.make_credentials(
            credential_source=self.CREDENTIAL_SOURCE.copy()
        )

        assert credentials.token_info_url == TOKEN_INFO_URL

    def test_token_info_url_custom(self):
        for url in VALID_TOKEN_URLS:
            credentials = self.make_credentials(
                credential_source=self.CREDENTIAL_SOURCE.copy(),
                token_info_url=(url + "/introspect"),
            )

            assert credentials.token_info_url == (url + "/introspect")

    def test_token_info_url_negative(self):
        credentials = self.make_credentials(
            credential_source=self.CREDENTIAL_SOURCE.copy(), token_info_url=None
        )

        assert not credentials.token_info_url

    def test_token_url_custom(self):
        for url in VALID_TOKEN_URLS:
            credentials = self.make_credentials(
                credential_source=self.CREDENTIAL_SOURCE.copy(),
                token_url=(url + "/token"),
            )

            assert credentials._token_url == (url + "/token")

    def test_service_account_impersonation_url_custom(self):
        for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS:
            credentials = self.make_credentials(
                credential_source=self.CREDENTIAL_SOURCE.copy(),
                service_account_impersonation_url=(
                    url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
                ),
            )

            assert credentials._service_account_impersonation_url == (
                url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE
            )

    def test_retrieve_subject_token_missing_region_url(self):
        # When AWS_REGION envvar is not available, region_url is required for
        # determining the current AWS region.
        credential_source = self.CREDENTIAL_SOURCE.copy()
        credential_source.pop("region_url")
        credentials = self.make_credentials(credential_source=credential_source)

        with pytest.raises(exceptions.RefreshError) as excinfo:
            credentials.retrieve_subject_token(None)

        assert excinfo.match(r"Unable to determine AWS region")

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
        self, utcnow
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        subject_token = credentials.retrieve_subject_token(request)

        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        # Assert region request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1], REGION_URL
        )
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[1][1], SECURITY_CREDS_URL
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[2][1],
            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
            {"Content-Type": "application/json"},
        )

        # Retrieve subject_token again. Region should not be queried again.
        new_request = self.make_mock_request(
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
        )

        credentials.retrieve_subject_token(new_request)

        # Only 3 requests should be sent as the region is cached.
        assert len(new_request.call_args_list) == 2
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            new_request.call_args_list[0][1], SECURITY_CREDS_URL
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            new_request.call_args_list[1][1],
            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
            {"Content-Type": "application/json"},
        )

    @mock.patch("google.auth._helpers.utcnow")
    @mock.patch.dict(os.environ, {})
    def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(
        self, utcnow
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            imdsv2_session_token_status=http_client.OK,
            imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
        )
        credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
        credential_source_token_url[
            "imdsv2_session_token_url"
        ] = IMDSV2_SESSION_TOKEN_URL
        credentials = self.make_credentials(
            credential_source=credential_source_token_url
        )

        subject_token = credentials.retrieve_subject_token(request)

        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        # Assert session token request
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1],
            IMDSV2_SESSION_TOKEN_URL,
            {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
            "PUT",
        )
        # Assert region request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[1][1],
            REGION_URL,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[2][1],
            SECURITY_CREDS_URL,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[3][1],
            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
            {
                "Content-Type": "application/json",
                "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
            },
        )

        # Retrieve subject_token again. Region should not be queried again.
        new_request = self.make_mock_request(
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            imdsv2_session_token_status=http_client.OK,
            imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
        )

        credentials.retrieve_subject_token(new_request)

        # Only 3 requests should be sent as the region is cached.
        assert len(new_request.call_args_list) == 3
        # Assert session token request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1],
            IMDSV2_SESSION_TOKEN_URL,
            {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
            "PUT",
        )
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            new_request.call_args_list[1][1],
            SECURITY_CREDS_URL,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            new_request.call_args_list[2][1],
            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
            {
                "Content-Type": "application/json",
                "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
            },
        )

    @mock.patch("google.auth._helpers.utcnow")
    @mock.patch.dict(
        os.environ,
        {
            environment_vars.AWS_REGION: AWS_REGION,
            environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
        },
    )
    def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_secret_access_key_idmsv2(
        self, utcnow
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            imdsv2_session_token_status=http_client.OK,
            imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
        )
        credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
        credential_source_token_url[
            "imdsv2_session_token_url"
        ] = IMDSV2_SESSION_TOKEN_URL
        credentials = self.make_credentials(
            credential_source=credential_source_token_url
        )

        subject_token = credentials.retrieve_subject_token(request)
        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        # Assert session token request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1],
            IMDSV2_SESSION_TOKEN_URL,
            {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
            "PUT",
        )
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[1][1],
            SECURITY_CREDS_URL,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[2][1],
            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
            {
                "Content-Type": "application/json",
                "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
            },
        )

    @mock.patch("google.auth._helpers.utcnow")
    @mock.patch.dict(
        os.environ,
        {
            environment_vars.AWS_REGION: AWS_REGION,
            environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
        },
    )
    def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_access_key_id_idmsv2(
        self, utcnow
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            imdsv2_session_token_status=http_client.OK,
            imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
        )
        credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
        credential_source_token_url[
            "imdsv2_session_token_url"
        ] = IMDSV2_SESSION_TOKEN_URL
        credentials = self.make_credentials(
            credential_source=credential_source_token_url
        )

        subject_token = credentials.retrieve_subject_token(request)
        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        # Assert session token request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1],
            IMDSV2_SESSION_TOKEN_URL,
            {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
            "PUT",
        )
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[1][1],
            SECURITY_CREDS_URL,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[2][1],
            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
            {
                "Content-Type": "application/json",
                "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
            },
        )

    @mock.patch("google.auth._helpers.utcnow")
    @mock.patch.dict(os.environ, {environment_vars.AWS_REGION: AWS_REGION})
    def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_creds_idmsv2(
        self, utcnow
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            imdsv2_session_token_status=http_client.OK,
            imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
        )
        credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
        credential_source_token_url[
            "imdsv2_session_token_url"
        ] = IMDSV2_SESSION_TOKEN_URL
        credentials = self.make_credentials(
            credential_source=credential_source_token_url
        )

        subject_token = credentials.retrieve_subject_token(request)
        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        # Assert session token request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1],
            IMDSV2_SESSION_TOKEN_URL,
            {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
            "PUT",
        )
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[1][1],
            SECURITY_CREDS_URL,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[2][1],
            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
            {
                "Content-Type": "application/json",
                "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
            },
        )

    @mock.patch("google.auth._helpers.utcnow")
    @mock.patch.dict(
        os.environ,
        {
            environment_vars.AWS_REGION: AWS_REGION,
            environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
            environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
        },
    )
    def test_retrieve_subject_token_success_temp_creds_idmsv2(self, utcnow):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            role_status=http_client.OK, role_name=self.AWS_ROLE
        )
        credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
        credential_source_token_url[
            "imdsv2_session_token_url"
        ] = IMDSV2_SESSION_TOKEN_URL
        credentials = self.make_credentials(
            credential_source=credential_source_token_url
        )

        credentials.retrieve_subject_token(request)
        assert not request.called

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_ipv6(self, utcnow):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            imdsv2_session_token_status=http_client.OK,
            imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
        )
        credential_source_token_url = self.CREDENTIAL_SOURCE_IPV6.copy()
        credentials = self.make_credentials(
            credential_source=credential_source_token_url
        )

        subject_token = credentials.retrieve_subject_token(request)

        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        # Assert session token request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1],
            IMDSV2_SESSION_TOKEN_URL_IPV6,
            {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
            "PUT",
        )
        # Assert region request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[1][1],
            REGION_URL_IPV6,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert role request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[2][1],
            SECURITY_CREDS_URL_IPV6,
            {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
        )
        # Assert security credentials request.
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[3][1],
            "{}/{}".format(SECURITY_CREDS_URL_IPV6, self.AWS_ROLE),
            {
                "Content-Type": "application/json",
                "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
            },
        )

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_session_error_idmsv2(self, utcnow):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            imdsv2_session_token_status=http_client.UNAUTHORIZED,
            imdsv2_session_token_data="unauthorized",
        )
        credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
        credential_source_token_url[
            "imdsv2_session_token_url"
        ] = IMDSV2_SESSION_TOKEN_URL
        credentials = self.make_credentials(
            credential_source=credential_source_token_url
        )

        with pytest.raises(exceptions.RefreshError) as excinfo:
            credentials.retrieve_subject_token(request)

        assert excinfo.match(r"Unable to retrieve AWS Session Token")

        # Assert session token request
        self.assert_aws_metadata_request_kwargs(
            request.call_args_list[0][1],
            IMDSV2_SESSION_TOKEN_URL,
            {"X-aws-ec2-metadata-token-ttl-seconds": "300"},
            "PUT",
        )

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_permanent_creds_no_environment_vars(
        self, utcnow
    ):
        # Simualte a permanent credential without a session token is
        # returned by the security-credentials endpoint.
        security_creds_response = self.AWS_SECURITY_CREDENTIALS_RESPONSE.copy()
        security_creds_response.pop("Token")
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=security_creds_response,
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        subject_token = credentials.retrieve_subject_token(request)

        assert subject_token == self.make_serialized_aws_signed_request(
            {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
        )

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_environment_vars(self, utcnow, monkeypatch):
        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
        monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        subject_token = credentials.retrieve_subject_token(None)

        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_environment_vars_with_default_region(
        self, utcnow, monkeypatch
    ):
        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
        monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, self.AWS_REGION)
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        subject_token = credentials.retrieve_subject_token(None)

        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_environment_vars_with_both_regions_set(
        self, utcnow, monkeypatch
    ):
        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
        monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, "Malformed AWS Region")
        # This test makes sure that the AWS_REGION gets used over AWS_DEFAULT_REGION,
        # So, AWS_DEFAULT_REGION is set to something that would cause the test to fail,
        # And AWS_REGION is set to the a valid value, and it should succeed
        monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        subject_token = credentials.retrieve_subject_token(None)

        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_environment_vars_no_session_token(
        self, utcnow, monkeypatch
    ):
        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
        monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        subject_token = credentials.retrieve_subject_token(None)

        assert subject_token == self.make_serialized_aws_signed_request(
            {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
        )

    @mock.patch("google.auth._helpers.utcnow")
    def test_retrieve_subject_token_success_environment_vars_except_region(
        self, utcnow, monkeypatch
    ):
        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        # Region will be queried since it is not found in envvars.
        request = self.make_mock_request(
            region_status=http_client.OK, region_name=self.AWS_REGION
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        subject_token = credentials.retrieve_subject_token(request)

        assert subject_token == self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )

    def test_retrieve_subject_token_error_determining_aws_region(self):
        # Simulate error in retrieving the AWS region.
        request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        with pytest.raises(exceptions.RefreshError) as excinfo:
            credentials.retrieve_subject_token(request)

        assert excinfo.match(r"Unable to retrieve AWS region")

    def test_retrieve_subject_token_error_determining_aws_role(self):
        # Simulate error in retrieving the AWS role name.
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.BAD_REQUEST,
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        with pytest.raises(exceptions.RefreshError) as excinfo:
            credentials.retrieve_subject_token(request)

        assert excinfo.match(r"Unable to retrieve AWS role name")

    def test_retrieve_subject_token_error_determining_security_creds_url(self):
        # Simulate the security-credentials url is missing. This is needed for
        # determining the AWS security credentials when not found in envvars.
        credential_source = self.CREDENTIAL_SOURCE.copy()
        credential_source.pop("url")
        request = self.make_mock_request(
            region_status=http_client.OK, region_name=self.AWS_REGION
        )
        credentials = self.make_credentials(credential_source=credential_source)

        with pytest.raises(exceptions.RefreshError) as excinfo:
            credentials.retrieve_subject_token(request)

        assert excinfo.match(
            r"Unable to determine the AWS metadata server security credentials endpoint"
        )

    def test_retrieve_subject_token_error_determining_aws_security_creds(self):
        # Simulate error in retrieving the AWS security credentials.
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.BAD_REQUEST,
        )
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        with pytest.raises(exceptions.RefreshError) as excinfo:
            credentials.retrieve_subject_token(request)

        assert excinfo.match(r"Unable to retrieve AWS security credentials")

    @mock.patch(
        "google.auth.metrics.python_and_auth_lib_version",
        return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
    )
    @mock.patch("google.auth._helpers.utcnow")
    def test_refresh_success_without_impersonation_ignore_default_scopes(
        self, utcnow, mock_auth_lib_value
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        expected_subject_token = self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        token_headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
            "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false source/aws",
        }
        token_request_data = {
            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
            "audience": AUDIENCE,
            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
            "scope": " ".join(SCOPES),
            "subject_token": expected_subject_token,
            "subject_token_type": SUBJECT_TOKEN_TYPE,
        }
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            token_status=http_client.OK,
            token_data=self.SUCCESS_RESPONSE,
        )
        credentials = self.make_credentials(
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            credential_source=self.CREDENTIAL_SOURCE,
            quota_project_id=QUOTA_PROJECT_ID,
            scopes=SCOPES,
            # Default scopes should be ignored.
            default_scopes=["ignored"],
        )

        credentials.refresh(request)

        assert len(request.call_args_list) == 4
        # Fourth request should be sent to GCP STS endpoint.
        self.assert_token_request_kwargs(
            request.call_args_list[3][1], token_headers, token_request_data
        )
        assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
        assert credentials.quota_project_id == QUOTA_PROJECT_ID
        assert credentials.scopes == SCOPES
        assert credentials.default_scopes == ["ignored"]

    @mock.patch(
        "google.auth.metrics.python_and_auth_lib_version",
        return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
    )
    @mock.patch("google.auth._helpers.utcnow")
    def test_refresh_success_without_impersonation_use_default_scopes(
        self, utcnow, mock_auth_lib_value
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        expected_subject_token = self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        token_headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
            "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false source/aws",
        }
        token_request_data = {
            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
            "audience": AUDIENCE,
            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
            "scope": " ".join(SCOPES),
            "subject_token": expected_subject_token,
            "subject_token_type": SUBJECT_TOKEN_TYPE,
        }
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            token_status=http_client.OK,
            token_data=self.SUCCESS_RESPONSE,
        )
        credentials = self.make_credentials(
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            credential_source=self.CREDENTIAL_SOURCE,
            quota_project_id=QUOTA_PROJECT_ID,
            scopes=None,
            # Default scopes should be used since user specified scopes are none.
            default_scopes=SCOPES,
        )

        credentials.refresh(request)

        assert len(request.call_args_list) == 4
        # Fourth request should be sent to GCP STS endpoint.
        self.assert_token_request_kwargs(
            request.call_args_list[3][1], token_headers, token_request_data
        )
        assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
        assert credentials.quota_project_id == QUOTA_PROJECT_ID
        assert credentials.scopes is None
        assert credentials.default_scopes == SCOPES

    @mock.patch(
        "google.auth.metrics.token_request_access_token_impersonate",
        return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
    )
    @mock.patch(
        "google.auth.metrics.python_and_auth_lib_version",
        return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
    )
    @mock.patch("google.auth._helpers.utcnow")
    def test_refresh_success_with_impersonation_ignore_default_scopes(
        self, utcnow, mock_metrics_header_value, mock_auth_lib_value
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        expire_time = (
            _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
        ).isoformat("T") + "Z"
        expected_subject_token = self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        token_headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
            "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false source/aws",
        }
        token_request_data = {
            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
            "audience": AUDIENCE,
            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
            "scope": "https://www.googleapis.com/auth/iam",
            "subject_token": expected_subject_token,
            "subject_token_type": SUBJECT_TOKEN_TYPE,
        }
        # Service account impersonation request/response.
        impersonation_response = {
            "accessToken": "SA_ACCESS_TOKEN",
            "expireTime": expire_time,
        }
        impersonation_headers = {
            "Content-Type": "application/json",
            "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
            "x-goog-user-project": QUOTA_PROJECT_ID,
            "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
            "x-allowed-locations": "0x0",
        }
        impersonation_request_data = {
            "delegates": None,
            "scope": SCOPES,
            "lifetime": "3600s",
        }
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            token_status=http_client.OK,
            token_data=self.SUCCESS_RESPONSE,
            impersonation_status=http_client.OK,
            impersonation_data=impersonation_response,
        )
        credentials = self.make_credentials(
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            credential_source=self.CREDENTIAL_SOURCE,
            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
            quota_project_id=QUOTA_PROJECT_ID,
            scopes=SCOPES,
            # Default scopes should be ignored.
            default_scopes=["ignored"],
        )

        credentials.refresh(request)

        assert len(request.call_args_list) == 5
        # Fourth request should be sent to GCP STS endpoint.
        self.assert_token_request_kwargs(
            request.call_args_list[3][1], token_headers, token_request_data
        )
        # Fifth request should be sent to iamcredentials endpoint for service
        # account impersonation.
        self.assert_impersonation_request_kwargs(
            request.call_args_list[4][1],
            impersonation_headers,
            impersonation_request_data,
        )
        assert credentials.token == impersonation_response["accessToken"]
        assert credentials.quota_project_id == QUOTA_PROJECT_ID
        assert credentials.scopes == SCOPES
        assert credentials.default_scopes == ["ignored"]

    @mock.patch(
        "google.auth.metrics.token_request_access_token_impersonate",
        return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
    )
    @mock.patch(
        "google.auth.metrics.python_and_auth_lib_version",
        return_value=LANG_LIBRARY_METRICS_HEADER_VALUE,
    )
    @mock.patch("google.auth._helpers.utcnow")
    def test_refresh_success_with_impersonation_use_default_scopes(
        self, utcnow, mock_metrics_header_value, mock_auth_lib_value
    ):
        utcnow.return_value = datetime.datetime.strptime(
            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
        )
        expire_time = (
            _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
        ).isoformat("T") + "Z"
        expected_subject_token = self.make_serialized_aws_signed_request(
            {
                "access_key_id": ACCESS_KEY_ID,
                "secret_access_key": SECRET_ACCESS_KEY,
                "security_token": TOKEN,
            }
        )
        token_headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
            "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false source/aws",
        }
        token_request_data = {
            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
            "audience": AUDIENCE,
            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
            "scope": "https://www.googleapis.com/auth/iam",
            "subject_token": expected_subject_token,
            "subject_token_type": SUBJECT_TOKEN_TYPE,
        }
        # Service account impersonation request/response.
        impersonation_response = {
            "accessToken": "SA_ACCESS_TOKEN",
            "expireTime": expire_time,
        }
        impersonation_headers = {
            "Content-Type": "application/json",
            "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
            "x-goog-user-project": QUOTA_PROJECT_ID,
            "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
            "x-allowed-locations": "0x0",
        }
        impersonation_request_data = {
            "delegates": None,
            "scope": SCOPES,
            "lifetime": "3600s",
        }
        request = self.make_mock_request(
            region_status=http_client.OK,
            region_name=self.AWS_REGION,
            role_status=http_client.OK,
            role_name=self.AWS_ROLE,
            security_credentials_status=http_client.OK,
            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
            token_status=http_client.OK,
            token_data=self.SUCCESS_RESPONSE,
            impersonation_status=http_client.OK,
            impersonation_data=impersonation_response,
        )
        credentials = self.make_credentials(
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            credential_source=self.CREDENTIAL_SOURCE,
            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
            quota_project_id=QUOTA_PROJECT_ID,
            scopes=None,
            # Default scopes should be used since user specified scopes are none.
            default_scopes=SCOPES,
        )

        credentials.refresh(request)

        assert len(request.call_args_list) == 5
        # Fourth request should be sent to GCP STS endpoint.
        self.assert_token_request_kwargs(
            request.call_args_list[3][1], token_headers, token_request_data
        )
        # Fifth request should be sent to iamcredentials endpoint for service
        # account impersonation.
        self.assert_impersonation_request_kwargs(
            request.call_args_list[4][1],
            impersonation_headers,
            impersonation_request_data,
        )
        assert credentials.token == impersonation_response["accessToken"]
        assert credentials.quota_project_id == QUOTA_PROJECT_ID
        assert credentials.scopes is None
        assert credentials.default_scopes == SCOPES

    def test_refresh_with_retrieve_subject_token_error(self):
        request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

        with pytest.raises(exceptions.RefreshError) as excinfo:
            credentials.refresh(request)

        assert excinfo.match(r"Unable to retrieve AWS region")
