File: broker.py

package info (click to toggle)
microsoft-authentication-library-for-python 1.34.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,320 kB
  • sloc: python: 8,613; xml: 2,783; sh: 27; makefile: 19
file content (282 lines) | stat: -rw-r--r-- 12,956 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""This module is an adaptor to the underlying broker.
It relies on PyMsalRuntime which is the package providing broker's functionality.
"""
import json
import logging
import sys
import time
import uuid

from .sku import __version__, SKU

logger = logging.getLogger(__name__)
try:
    import pymsalruntime  # Its API description is available in site-packages/pymsalruntime/PyMsalRuntime.pyi
    pymsalruntime.register_logging_callback(lambda message, level: {  # New in pymsalruntime 0.7
        pymsalruntime.LogLevel.TRACE: logger.debug,  # Python has no TRACE level
        pymsalruntime.LogLevel.DEBUG: logger.debug,
        # Let broker's excess info, warning and error logs map into default DEBUG, for now
        #pymsalruntime.LogLevel.INFO: logger.info,
        #pymsalruntime.LogLevel.WARNING: logger.warning,
        #pymsalruntime.LogLevel.ERROR: logger.error,
        pymsalruntime.LogLevel.FATAL: logger.critical,
        }.get(level, logger.debug)(message))
except (ImportError, AttributeError):  # AttributeError happens when a prior pymsalruntime uninstallation somehow leaved an empty folder behind
    # PyMsalRuntime currently supports these Windows versions, listed in this MSFT internal link
    # https://github.com/AzureAD/microsoft-authentication-library-for-cpp/pull/2406/files
    min_ver = {
        "win32": "1.20",
        "darwin": "1.31",
        "linux": "1.33",
    }.get(sys.platform)
    if min_ver:
        raise ImportError(
            f'You must install dependency by: pip install "msal[broker]>={min_ver},<2"')
    else:  # Unsupported platform
        raise ImportError("Dependency pymsalruntime unavailable on current platform")
# It could throw RuntimeError when running on ancient versions of Windows


class RedirectUriError(ValueError):
    pass


class TokenTypeError(ValueError):
    pass


_redirect_uri_on_mac = "msauth.com.msauth.unsignedapp://auth"  # Note:
    # On Mac, the native Python has a team_id which links to bundle id
    # com.apple.python3 however it won't give Python scripts better security.
    # Besides, the homebrew-installed Pythons have no team_id
    # so they have to use a generic placeholder anyway.
    # The v-team chose to combine two situations into using same placeholder.


def _convert_error(error, client_id):
    context = error.get_context()  # Available since pymsalruntime 0.0.4
    if (
            "AADSTS50011" in context  # In WAM, this could happen on both interactive and silent flows
            or "AADSTS7000218" in context  # This "request body must contain ... client_secret" is just a symptom of current app has no WAM redirect_uri
            ):
        raise RedirectUriError(  # This would be seen by either the app developer or end user
            """MsalRuntime needs the current app to register these redirect_uri
(1) ms-appx-web://Microsoft.AAD.BrokerPlugin/{}
(2) {}
(3) https://login.microsoftonline.com/common/oauth2/nativeclient""".format(
            client_id, _redirect_uri_on_mac))
        # OTOH, AAD would emit other errors when other error handling branch was hit first,
        # so, the AADSTS50011/RedirectUriError is not guaranteed to happen.
    return {
        "error": "broker_error",  # Note: Broker implies your device needs to be compliant.
            # You may use "dsregcmd /status" to check your device state
            # https://docs.microsoft.com/en-us/azure/active-directory/devices/troubleshoot-device-dsregcmd
        "error_description": "{}. Status: {}, Error code: {}, Tag: {}".format(
            context,
            error.get_status(), error.get_error_code(), error.get_tag()),
        "_broker_status": error.get_status(),
        "_broker_error_code": error.get_error_code(),
        "_broker_tag": error.get_tag(),
        }


def _read_account_by_id(account_id, correlation_id):
    """Return an instance of MSALRuntimeError or MSALRuntimeAccount, or None"""
    callback_data = pymsalruntime.CallbackData()
    pymsalruntime.read_account_by_id(
        account_id,
        correlation_id,
        lambda result, callback_data=callback_data: callback_data.complete(result)
        )
    callback_data.signal.wait()
    error = callback_data.result.get_error()
    if error:
        logger.debug("read_account_by_id() error: %s", _convert_error(error, None))
        return None
    account = callback_data.result.get_account()
    if account:
        return account
    return None  # None happens when the account was not created by broker


def _convert_result(result, client_id, expected_token_type=None):  # Mimic an on-the-wire response from AAD
    telemetry = result.get_telemetry_data()
    telemetry.pop("wam_telemetry", None)  # In pymsalruntime 0.13, it contains PII "account_id"
    error = result.get_error()
    if error:
        return dict(_convert_error(error, client_id), _msalruntime_telemetry=telemetry)
    id_token_claims = json.loads(result.get_id_token()) if result.get_id_token() else {}
    account = result.get_account()
    assert account, "Account is expected to be always available"
    # Note: There are more account attribute getters available in pymsalruntime 0.13+
    return_value = {k: v for k, v in {
        "access_token":
            result.get_authorization_header()  # It returns "pop SignedHttpRequest"
                .split()[1]
            if result.is_pop_authorization() else result.get_access_token(),
        "expires_in": result.get_access_token_expiry_time() - int(time.time()),  # Convert epoch to count-down
        "id_token": result.get_raw_id_token(),  # New in pymsalruntime 0.8.1
        "id_token_claims": id_token_claims,
        "client_info": account.get_client_info(),
        "_account_id": account.get_account_id(),
		"token_type": "pop" if result.is_pop_authorization() else (
            expected_token_type or "bearer"),  # Workaround "ssh-cert"'s absence from broker
        }.items() if v}
    likely_a_cert = return_value["access_token"].startswith("AAAA")  # Empirical observation
    if return_value["token_type"].lower() == "ssh-cert" and not likely_a_cert:
        raise TokenTypeError("Broker could not get an SSH Cert: {}...".format(
            return_value["access_token"][:8]))
    granted_scopes = result.get_granted_scopes()  # New in pymsalruntime 0.3.x
    if granted_scopes:
        return_value["scope"] = " ".join(granted_scopes)  # Mimic the on-the-wire data format
    return dict(return_value, _msalruntime_telemetry=telemetry)


def _get_new_correlation_id():
    return str(uuid.uuid4())


def _enable_msa_pt(params):
    params.set_additional_parameter("msal_request_type", "consumer_passthrough")  # PyMsalRuntime 0.8+

def _build_msal_runtime_auth_params(client_id, authority):
    params = pymsalruntime.MSALRuntimeAuthParameters(client_id, authority)
    params.set_additional_parameter("msal_client_sku", SKU)
    params.set_additional_parameter("msal_client_ver", __version__)
    return params

def _signin_silently(
        authority, client_id, scopes, correlation_id=None, claims=None,
        enable_msa_pt=False,
        auth_scheme=None,
        **kwargs):
    params = _build_msal_runtime_auth_params(client_id, authority)
    params.set_requested_scopes(scopes)
    if claims:
        params.set_decoded_claims(claims)
    if auth_scheme:
        params.set_pop_params(
            auth_scheme._http_method, auth_scheme._url.netloc, auth_scheme._url.path,
            auth_scheme._nonce)
    callback_data = pymsalruntime.CallbackData()
    for k, v in kwargs.items():  # This can be used to support domain_hint, max_age, etc.
        if v is not None:
            params.set_additional_parameter(k, str(v))
    if enable_msa_pt:
        _enable_msa_pt(params)
    pymsalruntime.signin_silently(
        params,
        correlation_id or _get_new_correlation_id(),
        lambda result, callback_data=callback_data: callback_data.complete(result))
    callback_data.signal.wait()
    return _convert_result(
        callback_data.result, client_id, expected_token_type=kwargs.get("token_type"))


def _signin_interactively(
        authority, client_id, scopes,
        parent_window_handle,  # None means auto-detect for console apps
        prompt=None,  # Note: This function does not really use this parameter
        login_hint=None,
        claims=None,
        correlation_id=None,
        enable_msa_pt=False,
        auth_scheme=None,
        **kwargs):
    params = _build_msal_runtime_auth_params(client_id, authority)
    params.set_requested_scopes(scopes)
    params.set_redirect_uri(
        _redirect_uri_on_mac if sys.platform == "darwin" else
        "https://login.microsoftonline.com/common/oauth2/nativeclient"
        # This default redirect_uri value is not currently used by WAM
        # but it is required by the MSAL.cpp to be set to a non-empty valid URI.
    )
    if prompt:
        if prompt == "select_account":
            if login_hint:
                # FWIW, AAD's browser interactive flow would honor select_account
                # and ignore login_hint in such a case.
                # But pymsalruntime 0.3.x would pop up a meaningless account picker
                # and then force the account_hint user to re-input password. Not what we want.
                # https://identitydivision.visualstudio.com/Engineering/_workitems/edit/1744492
                login_hint = None  # Mimicing the AAD behavior
                logger.warning("Using both select_account and login_hint is ambiguous. Ignoring login_hint.")
        else:
            logger.warning("prompt=%s is not supported by this module", prompt)
    if parent_window_handle is None:
        # This fixes account picker hanging in IDE debug mode on some machines
        params.set_additional_parameter("msal_gui_thread", "true")  # Since pymsalruntime 0.8.1
    if enable_msa_pt:
        _enable_msa_pt(params)
    if auth_scheme:
        params.set_pop_params(
            auth_scheme._http_method, auth_scheme._url.netloc, auth_scheme._url.path,
            auth_scheme._nonce)
    for k, v in kwargs.items():  # This can be used to support domain_hint, max_age, etc.
        if v is not None:
            params.set_additional_parameter(k, str(v))
    if claims:
        params.set_decoded_claims(claims)
    callback_data = pymsalruntime.CallbackData(is_interactive=True)
    pymsalruntime.signin_interactively(
        parent_window_handle or pymsalruntime.get_console_window() or pymsalruntime.get_desktop_window(),  # Since pymsalruntime 0.2+
        params,
        correlation_id or _get_new_correlation_id(),
        login_hint,  # None value will be accepted since pymsalruntime 0.3+
        lambda result, callback_data=callback_data: callback_data.complete(result))
    callback_data.signal.wait()
    return _convert_result(
        callback_data.result, client_id, expected_token_type=kwargs.get("token_type"))


def _acquire_token_silently(
        authority, client_id, account_id, scopes, claims=None, correlation_id=None,
        auth_scheme=None,
        **kwargs):
    # For MSA PT scenario where you use the /organizations, yes,
    # acquireTokenSilently is expected to fail.  - Sam Wilson
    correlation_id = correlation_id or _get_new_correlation_id()
    account = _read_account_by_id(account_id, correlation_id)
    if account is None:
        return
    params = _build_msal_runtime_auth_params(client_id, authority)
    params.set_requested_scopes(scopes)
    if claims:
        params.set_decoded_claims(claims)
    if auth_scheme:
        params.set_pop_params(
            auth_scheme._http_method, auth_scheme._url.netloc, auth_scheme._url.path,
            auth_scheme._nonce)
    for k, v in kwargs.items():  # This can be used to support domain_hint, max_age, etc.
        if v is not None:
            params.set_additional_parameter(k, str(v))
    callback_data = pymsalruntime.CallbackData()
    pymsalruntime.acquire_token_silently(
        params,
        correlation_id,
        account,
        lambda result, callback_data=callback_data: callback_data.complete(result))
    callback_data.signal.wait()
    return _convert_result(
        callback_data.result, client_id, expected_token_type=kwargs.get("token_type"))


def _signout_silently(client_id, account_id, correlation_id=None):
    correlation_id = correlation_id or _get_new_correlation_id()
    account = _read_account_by_id(account_id, correlation_id)
    if account is None:
        return
    callback_data = pymsalruntime.CallbackData()
    pymsalruntime.signout_silently(  # New in PyMsalRuntime 0.7
        client_id,
        correlation_id,
        account,
        lambda result, callback_data=callback_data: callback_data.complete(result))
    callback_data.signal.wait()
    error = callback_data.result.get_error()
    if error:
        return _convert_error(error, client_id)

def _enable_pii_log():
    pymsalruntime.set_is_pii_enabled(1)  # New in PyMsalRuntime 0.13.0