File: __main__.py

package info (click to toggle)
microsoft-authentication-library-for-python 1.34.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,320 kB
  • sloc: python: 8,613; xml: 2,783; sh: 27; makefile: 19
file content (345 lines) | stat: -rw-r--r-- 16,516 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# It is currently shipped inside msal library.
# Pros: It is always available wherever msal is installed.
# Cons: Its 3rd-party dependencies (if any) may become msal's dependency.
"""MSAL Python Tester

Usage 1: Run it on the fly.
    python -m msal
    Note: We choose to not define a console script to avoid name conflict.

Usage 2: Build an all-in-one executable file for bug bash.
    shiv -e msal.__main__._main -o msaltest-on-os-name.pyz .
"""
import base64, getpass, json, logging, sys, os, atexit, msal

_token_cache_filename = "msal_cache.bin"
global_cache = msal.SerializableTokenCache()
atexit.register(lambda:
    open(_token_cache_filename, "w").write(global_cache.serialize())
    # Hint: The following optional line persists only when state changed
    if global_cache.has_state_changed else None
    )

_AZURE_CLI = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"
_VISUAL_STUDIO = "04f0c124-f2bc-4f59-8241-bf6df9866bbd"
placeholder_auth_scheme = msal.PopAuthScheme(
    http_method=msal.PopAuthScheme.HTTP_GET,
    url="https://example.com/endpoint",
    nonce="placeholder",
    )

def print_json(blob):
    print(json.dumps(blob, indent=2, sort_keys=True))

def _input_boolean(message):
    return input(
        "{} (N/n/F/f or empty means False, otherwise it is True): ".format(message)
        ) not in ('N', 'n', 'F', 'f', '')

def _input(message, default=None):
    return input(message.format(default=default)).strip() or default

def _select_options(
        options, header="Your options:", footer="    Your choice? ", option_renderer=str,
        accept_nonempty_string=False,
        ):
    assert options, "options must not be empty"
    if header:
        print(header)
    for i, o in enumerate(options, start=1):
        print("    {}: {}".format(i, option_renderer(o)))
    if accept_nonempty_string:
        print("    Or you can just type in your input.")
    while True:
        raw_data = input(footer)
        try:
            choice = int(raw_data)
            if 1 <= choice <= len(options):
                return options[choice - 1]
        except ValueError:
            if raw_data and accept_nonempty_string:
                return raw_data

enable_debug_log = _input_boolean("Enable MSAL Python's DEBUG log?")
logging.basicConfig(level=logging.DEBUG if enable_debug_log else logging.INFO)
try:
    from dotenv import load_dotenv
    load_dotenv()
    logging.info("Loaded environment variables from .env file")
except ImportError:
    logging.warning(
        "python-dotenv is not installed. "
        "You may need to set environment variables manually.")

def _input_scopes():
    scopes = _select_options([
        "https://graph.microsoft.com/.default",
        "https://management.azure.com/.default",
        "User.Read",
        "User.ReadBasic.All",
        ],
        header="Select a scope (multiple scopes can only be input by manually typing them, delimited by space):",
        accept_nonempty_string=True,
        ).split()  # It also converts the input string(s) into a list
    if "https://pas.windows.net/CheckMyAccess/Linux/.default" in scopes:
        raise ValueError("SSH Cert scope shall be tested by its dedicated functions")
    return scopes

def _select_account(app):
    accounts = app.get_accounts()
    if accounts:
        return _select_options(
            accounts,
            option_renderer=lambda a: "{}, came from {}".format(a["username"], a["account_source"]),
            header="Account(s) already signed in inside MSAL Python:",
            )
    else:
        print("No account available inside MSAL Python. Use other methods to acquire token first.")

def _acquire_token_silent(app):
    """acquire_token_silent() - with an account already signed into MSAL Python."""
    account = _select_account(app)
    if account:
        print_json(app.acquire_token_silent_with_error(
            _input_scopes(),
            account=account,
            force_refresh=_input_boolean("Bypass MSAL Python's token cache?"),
            auth_scheme=placeholder_auth_scheme
                if app.is_pop_supported() and _input_boolean("Acquire AT POP via Broker?")
                else None,
            ))

def _acquire_token_interactive(app, scopes=None, data=None):
    """acquire_token_interactive() - User will be prompted if app opts to do select_account."""
    assert isinstance(app, msal.PublicClientApplication)
    scopes = scopes or _input_scopes()  # Let user input scope param before less important prompt and login_hint
    prompt = _select_options([
        {"value": None, "description": "Unspecified. Proceed silently with a default account (if any), fallback to prompt."},
        {"value": "none", "description": "none. Proceed silently with a default account (if any), or error out."},
        {"value": "select_account", "description": "select_account. Prompt with an account picker."},
        ],
        option_renderer=lambda o: o["description"],
        header="Prompt behavior?")["value"]
    if prompt == "select_account":
        login_hint = None  # login_hint is unnecessary when prompt=select_account
    else:
        raw_login_hint = _select_options(
            [None] + [a["username"] for a in app.get_accounts()],
            header="login_hint? (If you have multiple signed-in sessions in browser/broker, and you specify a login_hint to match one of them, you will bypass the account picker.)",
            accept_nonempty_string=True,
            )
        login_hint = raw_login_hint["username"] if isinstance(raw_login_hint, dict) else raw_login_hint
    result = app.acquire_token_interactive(
        scopes,
        parent_window_handle=app.CONSOLE_WINDOW_HANDLE,  # This test app is a console app
        enable_msa_passthrough=app.client_id in [  # Apps are expected to set this right
            _AZURE_CLI, _VISUAL_STUDIO,
            ],  # Here this test app mimics the setting for some known MSA-PT apps
        port=1234,  # Hard coded for testing. Real app typically uses default value.
        prompt=prompt, login_hint=login_hint, data=data or {},
        auth_scheme=placeholder_auth_scheme
            if app.is_pop_supported() and _input_boolean("Acquire AT POP via Broker?")
            else None,
        )
    if login_hint and "id_token_claims" in result:
        signed_in_user = result.get("id_token_claims", {}).get("preferred_username")
        if signed_in_user != login_hint:
            logging.warning('Signed-in user "%s" does not match login_hint', signed_in_user)
    print_json(result)
    return result

def _acquire_token_by_username_password(app):
    """acquire_token_by_username_password() - See constraints here: https://docs.microsoft.com/en-us/azure/active-directory/develop/msal-authentication-flows#constraints-for-ropc"""
    print_json(app.acquire_token_by_username_password(
        _input("username: "), getpass.getpass("password: "), scopes=_input_scopes()))

def _acquire_token_by_device_flow(app):
    """acquire_token_by_device_flow() - Note that this one does not go through broker"""
    assert isinstance(app, msal.PublicClientApplication)
    flow = app.initiate_device_flow(scopes=_input_scopes())
    print(flow["message"])
    sys.stdout.flush()  # Some terminal needs this to ensure the message is shown
    input("After you completed the step above, press ENTER in this console to continue...")
    result = app.acquire_token_by_device_flow(flow)  # By default it will block
    print_json(result)

_JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
_SSH_CERT_DATA = {"token_type": "ssh-cert", "key_id": "key1", "req_cnf": _JWK1}
_SSH_CERT_SCOPE = ["https://pas.windows.net/CheckMyAccess/Linux/.default"]

def _acquire_ssh_cert_silently(app):
    """Acquire an SSH Cert silently- This typically only works with Azure CLI"""
    assert isinstance(app, msal.PublicClientApplication)
    account = _select_account(app)
    if account:
        result = app.acquire_token_silent(
            _SSH_CERT_SCOPE,
            account,
            data=_SSH_CERT_DATA,
            force_refresh=_input_boolean("Bypass MSAL Python's token cache?"),
            )
        print_json(result)
        if result and result.get("token_type") != "ssh-cert":
            logging.error("Unable to acquire an ssh-cert.")

def _acquire_ssh_cert_interactive(app):
    """Acquire an SSH Cert interactively - This typically only works with Azure CLI"""
    assert isinstance(app, msal.PublicClientApplication)
    result = _acquire_token_interactive(app, scopes=_SSH_CERT_SCOPE, data=_SSH_CERT_DATA)
    if result.get("token_type") != "ssh-cert":
        logging.error("Unable to acquire an ssh-cert")

_POP_KEY_ID = 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA-AAAAAAAA'  # Fake key with a certain format and length
_RAW_REQ_CNF = json.dumps({"kid": _POP_KEY_ID, "xms_ksl": "sw"})
_POP_DATA = {  # Sampled from Azure CLI's plugin connectedk8s
    'token_type': 'pop',
    'key_id': _POP_KEY_ID,
    "req_cnf": base64.urlsafe_b64encode(_RAW_REQ_CNF.encode('utf-8')).decode('utf-8').rstrip('='),
        # Note: Sending _RAW_REQ_CNF without base64 encoding would result in an http 500 error
}  # See also https://github.com/Azure/azure-cli-extensions/blob/main/src/connectedk8s/azext_connectedk8s/_clientproxyutils.py#L86-L92

def _acquire_pop_token_interactive(app):
    """Acquire a POP token interactively - This typically only works with Azure CLI"""
    assert isinstance(app, msal.PublicClientApplication)
    POP_SCOPE = ['6256c85f-0aad-4d50-b960-e6e9b21efe35/.default']  # KAP 1P Server App Scope, obtained from https://github.com/Azure/azure-cli-extensions/pull/4468/files#diff-a47efa3186c7eb4f1176e07d0b858ead0bf4a58bfd51e448ee3607a5b4ef47f6R116
    result = _acquire_token_interactive(app, scopes=POP_SCOPE, data=_POP_DATA)
    print_json(result)
    if result.get("token_type") != "pop":
        logging.error("Unable to acquire a pop token")

def _remove_account(app):
    """remove_account() - Invalidate account and/or token(s) from cache, so that acquire_token_silent() would be reset"""
    account = _select_account(app)
    if account:
        app.remove_account(account)
        print('Account "{}" and/or its token(s) are signed out from MSAL Python'.format(account["username"]))

def _acquire_token_for_client(app):
    """CCA.acquire_token_for_client() - Rerun this will get same token from cache."""
    assert isinstance(app, msal.ConfidentialClientApplication)
    print_json(app.acquire_token_for_client(scopes=_input_scopes()))

def _remove_tokens_for_client(app):
    """CCA.remove_tokens_for_client() - Run this to evict tokens from cache."""
    assert isinstance(app, msal.ConfidentialClientApplication)
    app.remove_tokens_for_client()

def _exit(app):
    """Exit"""
    bug_link = (
        "https://identitydivision.visualstudio.com/Engineering/_queries/query/79b3a352-a775-406f-87cd-a487c382a8ed/"
        if app._enable_broker else
        "https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/new/choose"
        )
    print("Bye. If you found a bug, please report it here: {}".format(bug_link))
    sys.exit()

def _main():
    print("Welcome to the Msal Python {} Tester (Experimental)\n".format(msal.__version__))
    cache_choice = _select_options([
            {
                "choice": "empty",
                "desc": "Start with an empty token cache. Suitable for one-off tests.",
            },
            {
                "choice": "reuse",
                "desc": "Reuse the previous token cache {} (if any) "
                    "which was created during last test app exit. "
                    "Useful for testing acquire_token_silent() repeatedly".format(
                        _token_cache_filename),
            },
        ],
        option_renderer=lambda o: o["desc"],
        header="What token cache state do you want to begin with?",
        accept_nonempty_string=False)
    if cache_choice["choice"] == "reuse" and os.path.exists(_token_cache_filename):
        try:
            global_cache.deserialize(open(_token_cache_filename, "r").read())
        except IOError:
            pass  # Use empty token cache
    chosen_app = _select_options([
        {"client_id": _AZURE_CLI, "name": "Azure CLI (Correctly configured for MSA-PT)"},
        {"client_id": _VISUAL_STUDIO, "name": "Visual Studio (Correctly configured for MSA-PT)"},
        {"client_id": "95de633a-083e-42f5-b444-a4295d8e9314", "name": "Whiteboard Services (Non MSA-PT app. Accepts AAD & MSA accounts.)"},
        {
            "client_id": os.getenv("CLIENT_ID"),
            "client_secret": os.getenv("CLIENT_SECRET"),
            "name": "A confidential client app (CCA) whose settings are defined "
                "in environment variables CLIENT_ID and CLIENT_SECRET",
        },
        ],
        option_renderer=lambda a: a["name"],
        header="Impersonate this app "
            "(or you can type in the client_id of your own public client app)",
        accept_nonempty_string=True)
    is_cca = isinstance(chosen_app, dict) and "client_secret" in chosen_app
    if is_cca and not (chosen_app["client_id"] and chosen_app["client_secret"]):
        raise ValueError("You need to set environment variables CLIENT_ID and CLIENT_SECRET")
    enable_broker = (not is_cca) and _input_boolean("Enable broker? "
        "(It will error out later if your app has not registered some redirect URI)"
        )
    enable_pii_log = _input_boolean("Enable PII in broker's log?") if enable_broker and enable_debug_log else False
    authority = _select_options([
        "https://login.microsoftonline.com/common",
        "https://login.microsoftonline.com/organizations",
        "https://login.microsoftonline.com/microsoft.onmicrosoft.com",
        "https://login.microsoftonline.com/msidlab4.onmicrosoft.com",
        "https://login.microsoftonline.com/consumers",
        ],
        header="Input authority (Note that MSA-PT apps would NOT use the /common authority)",
        accept_nonempty_string=True,
        )
    instance_discovery = _input_boolean(
        "You input an unusual authority which might fail the Instance Discovery. "
        "Now, do you want to perform Instance Discovery on your input authority?"
        ) if authority and not authority.startswith(
            "https://login.microsoftonline.com") else None
    app = msal.PublicClientApplication(
        chosen_app["client_id"] if isinstance(chosen_app, dict) else chosen_app,
        authority=authority,
        instance_discovery=instance_discovery,
        enable_broker_on_windows=enable_broker,
        enable_broker_on_mac=enable_broker,
        enable_broker_on_linux=enable_broker,
        enable_broker_on_wsl=enable_broker,
        enable_pii_log=enable_pii_log,
        token_cache=global_cache,
        ) if not is_cca else msal.ConfidentialClientApplication(
        chosen_app["client_id"],
        client_credential=chosen_app["client_secret"],
        authority=authority,
        instance_discovery=instance_discovery,
        enable_pii_log=enable_pii_log,
        token_cache=global_cache,
        )
    methods_to_be_tested = [
            _acquire_token_silent,
        ] + ([
            _acquire_token_interactive,
            _acquire_token_by_device_flow,
            _acquire_ssh_cert_silently,
            _acquire_ssh_cert_interactive,
            _acquire_pop_token_interactive,
            ] if isinstance(app, msal.PublicClientApplication) else []
        ) + [
            _acquire_token_by_username_password,
            _remove_account,
        ] + ([
            _acquire_token_for_client,
            _remove_tokens_for_client,
            ] if isinstance(app, msal.ConfidentialClientApplication) else []
        )
    while True:
        func = _select_options(
            methods_to_be_tested + [_exit],
            option_renderer=lambda f: f.__doc__, header="MSAL Python APIs:")
        try:
            func(app)
        except ValueError as e:
            logging.error("Invalid input: %s", e)
        except KeyboardInterrupt:  # Useful for bailing out a stuck interactive flow
            print("Aborted")

if __name__ == "__main__":
    _main()