File: authority.py

package info (click to toggle)
microsoft-authentication-library-for-python 1.34.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,320 kB
  • sloc: python: 8,613; xml: 2,783; sh: 27; makefile: 19
file content (225 lines) | stat: -rw-r--r-- 11,314 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import json
try:
    from urllib.parse import urlparse
except ImportError:  # Fall back to Python 2
    from urlparse import urlparse
import logging


logger = logging.getLogger(__name__)

# Endpoints were copied from here
# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints
AZURE_US_GOVERNMENT = "login.microsoftonline.us"
AZURE_CHINA = "login.chinacloudapi.cn"
AZURE_PUBLIC = "login.microsoftonline.com"

WORLD_WIDE = 'login.microsoftonline.com'  # There was an alias login.windows.net
WELL_KNOWN_AUTHORITY_HOSTS = set([
    WORLD_WIDE,
    AZURE_CHINA,
    'login-us.microsoftonline.com',
    AZURE_US_GOVERNMENT,
    ])
WELL_KNOWN_B2C_HOSTS = [
    "b2clogin.com",
    "b2clogin.cn",
    "b2clogin.us",
    "b2clogin.de",
    "ciamlogin.com",
    ]
_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com"


class AuthorityBuilder(object):
    def __init__(self, instance, tenant):
        """A helper to save caller from doing string concatenation.

        Usage is documented in :func:`application.ClientApplication.__init__`.
        """
        self._instance = instance.rstrip("/")
        self._tenant = tenant.strip("/")

    def __str__(self):
        return "https://{}/{}".format(self._instance, self._tenant)


class Authority(object):
    """This class represents an (already-validated) authority.

    Once constructed, it contains members named "*_endpoint" for this instance.
    TODO: It will also cache the previously-validated authority instances.
    """
    _domains_without_user_realm_discovery = set([])

    def __init__(
            self, authority_url, http_client,
            validate_authority=True,
            instance_discovery=None,
            oidc_authority_url=None,
            ):
        """Creates an authority instance, and also validates it.

        :param validate_authority:
            The Authority validation process actually checks two parts:
            instance (a.k.a. host) and tenant. We always do a tenant discovery.
            This parameter only controls whether an instance discovery will be
            performed.
        """
        self._http_client = http_client
        if oidc_authority_url:
            logger.debug("Initializing with OIDC authority: %s", oidc_authority_url)
            tenant_discovery_endpoint = self._initialize_oidc_authority(
                oidc_authority_url)
        else:
            logger.debug("Initializing with Entra authority: %s", authority_url)
            tenant_discovery_endpoint = self._initialize_entra_authority(
                authority_url, validate_authority, instance_discovery)
        try:
            openid_config = tenant_discovery(
                tenant_discovery_endpoint,
                self._http_client)
        except ValueError:
            error_message = (
                "Unable to get OIDC authority configuration for {url} "
                "because its OIDC Discovery endpoint is unavailable at "
                "{url}/.well-known/openid-configuration ".format(url=oidc_authority_url)
                if oidc_authority_url else
                "Unable to get authority configuration for {}. "
                "Authority would typically be in a format of "
                "https://login.microsoftonline.com/your_tenant "
                "or https://tenant_name.ciamlogin.com "
                "or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy. "
                .format(authority_url)
                ) + " Also please double check your tenant name or GUID is correct."
            raise ValueError(error_message)
        logger.debug(
            'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config)
        self.authorization_endpoint = openid_config['authorization_endpoint']
        self.token_endpoint = openid_config['token_endpoint']
        self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
        _, _, self.tenant = canonicalize(self.token_endpoint)  # Usually a GUID

    def _initialize_oidc_authority(self, oidc_authority_url):
        authority, self.instance, tenant = canonicalize(oidc_authority_url)
        self.is_adfs = tenant.lower() == 'adfs'  # As a convention
        self._is_b2c = True  # Not exactly true, but
            # OIDC Authority was designed for CIAM which is the next gen of B2C.
            # Besides, application.py uses this to bypass broker.
        self._is_known_to_developer = True  # Not really relevant, but application.py uses this to bypass authority validation
        return oidc_authority_url + "/.well-known/openid-configuration"

    def _initialize_entra_authority(
            self, authority_url, validate_authority, instance_discovery):
        # :param instance_discovery:
        #    By default, the known-to-Microsoft validation will use an
        #    instance discovery endpoint located at ``login.microsoftonline.com``.
        #    You can customize the endpoint by providing a url as a string.
        #    Or you can turn this behavior off by passing in a False here.
        if isinstance(authority_url, AuthorityBuilder):
            authority_url = str(authority_url)
        authority, self.instance, tenant = canonicalize(authority_url)
        is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX)
        self.is_adfs = tenant.lower() == 'adfs' and not is_ciam
        parts = authority.path.split('/')
        self._is_b2c = any(
            self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
            ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_"))
        self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority
        is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS
        instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format(  # Note: This URL seemingly returns V1 endpoint only
            WORLD_WIDE  # Historically using WORLD_WIDE. Could use self.instance too
                # See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
                # and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
            ) if instance_discovery in (None, True) else instance_discovery
        if instance_discovery_endpoint and not (
                is_known_to_microsoft or self._is_known_to_developer):
            payload = _instance_discovery(
                "https://{}{}/oauth2/v2.0/authorize".format(
                    self.instance, authority.path),
                self._http_client,
                instance_discovery_endpoint)
            if payload.get("error") == "invalid_instance":
                raise ValueError(
                    "invalid_instance: "
                    "The authority you provided, %s, is not whitelisted. "
                    "If it is indeed your legit customized domain name, "
                    "you can turn off this check by passing in "
                    "instance_discovery=False"
                    % authority_url)
            tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
        else:
            tenant_discovery_endpoint = authority._replace(
                path="{prefix}{version}/.well-known/openid-configuration".format(
                    prefix=tenant if is_ciam and len(authority.path) <= 1  # Path-less CIAM
                        else authority.path,  # In B2C, it is "/tenant/policy"
                    version="" if self.is_adfs else "/v2.0",
                    )
                ).geturl()  # Keeping original port and query. Query is useful for test.
        return tenant_discovery_endpoint

    def user_realm_discovery(self, username, correlation_id=None, response=None):
        # It will typically return a dict containing "ver", "account_type",
        # "federation_protocol", "cloud_audience_urn",
        # "federation_metadata_url", "federation_active_auth_url", etc.
        if self.instance not in self.__class__._domains_without_user_realm_discovery:
            resp = response or self._http_client.get(
                "https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
                    netloc=self.instance, username=username),
                headers={'Accept': 'application/json',
                         'client-request-id': correlation_id},)
            if resp.status_code != 404:
                resp.raise_for_status()
                return json.loads(resp.text)
            self.__class__._domains_without_user_realm_discovery.add(self.instance)
        return {}  # This can guide the caller to fall back normal ROPC flow


def canonicalize(authority_or_auth_endpoint):
    # Returns (url_parsed_result, hostname_in_lowercase, tenant)
    authority = urlparse(authority_or_auth_endpoint)
    if authority.scheme == "https":
        parts = authority.path.split("/")
        first_part = parts[1] if len(parts) >= 2 and parts[1] else None
        if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX):  # CIAM
            # Use path in CIAM authority. It will be validated by OIDC Discovery soon
            tenant = first_part if first_part else "{}.onmicrosoft.com".format(
                # Fallback to sub domain name. This variation may not be advertised
                authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0])
            return authority, authority.hostname, tenant
        # AAD
        if len(parts) >= 2 and parts[1]:
            return authority, authority.hostname, parts[1]
    raise ValueError(
        "Your given address (%s) should consist of "
        "an https url with a minimum of one segment in a path: e.g. "
        "https://login.microsoftonline.com/{tenant} "
        "or https://{tenant_name}.ciamlogin.com/{tenant} "
        "or https://{tenant_name}.b2clogin.com/{tenant_name}.onmicrosoft.com/policy"
        % authority_or_auth_endpoint)

def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs):
    resp = http_client.get(
        instance_discovery_endpoint,
        params={'authorization_endpoint': url, 'api-version': '1.0'},
        **kwargs)
    return json.loads(resp.text)

def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
    # Returns Openid Configuration
    resp = http_client.get(tenant_discovery_endpoint, **kwargs)
    if resp.status_code == 200:
        return json.loads(resp.text)  # It could raise ValueError
    if 400 <= resp.status_code < 500:
        # Nonexist tenant would hit this path
        # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration
        raise ValueError("OIDC Discovery failed on {}. HTTP status: {}, Error: {}".format(
            tenant_discovery_endpoint,
            resp.status_code,
            resp.text,  # Expose it as-is b/c OIDC defines no error response format
            ))
    # Transient network error would hit this path
    resp.raise_for_status()
    raise RuntimeError(  # A fallback here, in case resp.raise_for_status() is no-op
        "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))