File: authenticate.py

package info (click to toggle)
python-xbox-webapi 2.1.0-1.2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 2,900 kB
  • sloc: python: 4,973; makefile: 79
file content (158 lines) | stat: -rw-r--r-- 5,002 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""
Example scripts that performs XBL authentication
"""
import argparse
import asyncio
import http.server
import os
import queue
import socketserver
import threading
from urllib.parse import parse_qs, urlparse
import webbrowser

from xbox.webapi.authentication.manager import AuthenticationManager
from xbox.webapi.authentication.models import OAuth2TokenResponse
from xbox.webapi.common.signed_session import SignedSession
from xbox.webapi.scripts import CLIENT_ID, CLIENT_SECRET, REDIRECT_URI, TOKENS_FILE

QUEUE = queue.Queue(1)


class AuthCallbackRequestHandler(http.server.BaseHTTPRequestHandler):
    """
    Handles the auth callback that's received when Windows Live auth flow completed
    """

    def do_GET(self):
        try:
            url_path = self.requestline.split(" ")[1]
            query_params = parse_qs(urlparse(url_path).query)
        except Exception as e:
            self.send_error(
                400,
                explain=f"Invalid request='{self.requestline}' - Failed to parse URL Path, error={e}",
            )
            self.end_headers()
            return

        if query_params.get("error"):
            error_description = query_params.get("error_description")
            self.send_error(
                400, explain=f"Auth callback failed - Error: {error_description}"
            )
            self.end_headers()
            return

        auth_code = query_params.get("code")
        if not auth_code:
            self.send_error(
                400,
                explain=f"Auth callback failed - No code received - Original request: {self.requestline}",
            )
            self.end_headers()
            return

        if isinstance(auth_code, list):
            auth_code = auth_code[0]
        elif isinstance(auth_code, str):
            pass
        else:
            raise Exception(f"Invalid code query param: {auth_code}")

        # Put auth_code into queue for do_auth to receive
        QUEUE.put(auth_code)
        response_body = b"<script>window.close()</script>"
        self.send_response(200)
        self.send_header("Content-Type", "text/html")
        self.send_header("Content-Length", str(len(response_body)))
        self.end_headers()
        self.wfile.write(response_body)


async def do_auth(
    client_id: str, client_secret: str, redirect_uri: str, token_filepath: str
):
    async with SignedSession() as session:
        auth_mgr = AuthenticationManager(
            session, client_id, client_secret, redirect_uri
        )

        # Refresh tokens if we have them
        if os.path.exists(token_filepath):
            with open(token_filepath) as f:
                tokens = f.read()
            auth_mgr.oauth = OAuth2TokenResponse.model_validate_json(tokens)
            await auth_mgr.refresh_tokens()

        # Request new ones if they are not valid
        if not (auth_mgr.xsts_token and auth_mgr.xsts_token.is_valid()):
            auth_url = auth_mgr.generate_authorization_url()
            webbrowser.open(auth_url)
            # Wait for auth code from http server thread
            code = QUEUE.get()
            await auth_mgr.request_tokens(code)

        with open(token_filepath, mode="w") as f:
            print(f"Finished authentication, writing tokens to {token_filepath}")
            f.write(auth_mgr.oauth.json())


async def async_main():
    parser = argparse.ArgumentParser(description="Authenticate with XBL")
    parser.add_argument(
        "--tokens",
        "-t",
        default=TOKENS_FILE,
        help=f"Token filepath. Default: '{TOKENS_FILE}'",
    )
    parser.add_argument(
        "--client-id",
        "-cid",
        default=os.environ.get("CLIENT_ID", CLIENT_ID),
        help="OAuth2 Client ID",
    )
    parser.add_argument(
        "--client-secret",
        "-cs",
        default=os.environ.get("CLIENT_SECRET", CLIENT_SECRET),
        help="OAuth2 Client Secret",
    )
    parser.add_argument(
        "--redirect-uri",
        "-ru",
        default=os.environ.get("REDIRECT_URI", REDIRECT_URI),
        help="OAuth2 Redirect URI",
    )
    parser.add_argument(
        "--port",
        "-p",
        default=8080,
        type=int,
        help="""
        HTTP Server port for awaiting auth callback
        * NOTE: Changing this will break default auth flow and requires providing own OAUTH parameters
        """,
    )
    args = parser.parse_args()

    with socketserver.TCPServer(
        ("0.0.0.0", args.port), AuthCallbackRequestHandler
    ) as httpd:
        print(f"Serving HTTP Server for auth callback at port {args.port}")
        server_thread = threading.Thread(target=httpd.serve_forever)
        # Exit the server thread when the main thread terminates
        server_thread.daemon = True
        server_thread.start()

        await do_auth(
            args.client_id, args.client_secret, args.redirect_uri, args.tokens
        )


def main():
    asyncio.run(async_main())


if __name__ == "__main__":
    main()