File: manager.py

package info (click to toggle)
python-xbox-webapi 2.1.0-1.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,900 kB
  • sloc: python: 4,973; makefile: 79
file content (161 lines) | stat: -rw-r--r-- 5,743 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
Authentication Manager

Authenticate with Windows Live Server and Xbox Live.
"""
import logging
from typing import List, Optional

import httpx

from xbox.webapi.authentication.models import (
    OAuth2TokenResponse,
    XAUResponse,
    XSTSResponse,
)
from xbox.webapi.common.exceptions import AuthenticationException
from xbox.webapi.common.signed_session import SignedSession

log = logging.getLogger("authentication")

DEFAULT_SCOPES = ["Xboxlive.signin", "Xboxlive.offline_access"]


class AuthenticationManager:
    def __init__(
        self,
        client_session: SignedSession,
        client_id: str,
        client_secret: str,
        redirect_uri: str,
        scopes: Optional[List[str]] = None,
    ):
        if not isinstance(client_session, (SignedSession, httpx.AsyncClient)):
            raise DeprecationWarning(
                """Xbox WebAPI changed to use SignedSession (wrapped httpx.AsyncClient).
                Please check the documentation"""
            )

        self.session: SignedSession = client_session
        self._client_id: str = client_id
        self._client_secret: str = client_secret
        self._redirect_uri: str = redirect_uri
        self._scopes: List[str] = scopes or DEFAULT_SCOPES

        self.oauth: OAuth2TokenResponse = None
        self.user_token: XAUResponse = None
        self.xsts_token: XSTSResponse = None

    def generate_authorization_url(self, state: Optional[str] = None) -> str:
        """Generate Windows Live Authorization URL."""
        query_params = {
            "client_id": self._client_id,
            "response_type": "code",
            "approval_prompt": "auto",
            "scope": " ".join(self._scopes),
            "redirect_uri": self._redirect_uri,
        }

        if state:
            query_params["state"] = state

        return str(
            httpx.URL(
                "https://login.live.com/oauth20_authorize.srf", params=query_params
            )
        )

    async def request_tokens(self, authorization_code: str) -> None:
        """Request all tokens."""
        self.oauth = await self.request_oauth_token(authorization_code)
        self.user_token = await self.request_user_token()
        self.xsts_token = await self.request_xsts_token()

    async def refresh_tokens(self) -> None:
        """Refresh all tokens."""
        if not (self.oauth and self.oauth.is_valid()):
            self.oauth = await self.refresh_oauth_token()
        if not (self.user_token and self.user_token.is_valid()):
            self.user_token = await self.request_user_token()
        if not (self.xsts_token and self.xsts_token.is_valid()):
            self.xsts_token = await self.request_xsts_token()

    async def request_oauth_token(self, authorization_code: str) -> OAuth2TokenResponse:
        """Request OAuth2 token."""
        return await self._oauth2_token_request(
            {
                "grant_type": "authorization_code",
                "code": authorization_code,
                "scope": " ".join(self._scopes),
                "redirect_uri": self._redirect_uri,
            }
        )

    async def refresh_oauth_token(self) -> OAuth2TokenResponse:
        """Refresh OAuth2 token."""
        return await self._oauth2_token_request(
            {
                "grant_type": "refresh_token",
                "scope": " ".join(self._scopes),
                "refresh_token": self.oauth.refresh_token,
            }
        )

    async def _oauth2_token_request(self, data: dict) -> OAuth2TokenResponse:
        """Execute token requests."""
        data["client_id"] = self._client_id
        if self._client_secret:
            data["client_secret"] = self._client_secret
        resp = await self.session.post(
            "https://login.live.com/oauth20_token.srf", data=data
        )
        resp.raise_for_status()
        return OAuth2TokenResponse(**resp.json())

    async def request_user_token(
        self,
        relying_party: str = "http://auth.xboxlive.com",
        use_compact_ticket: bool = False,
    ) -> XAUResponse:
        """Authenticate via access token and receive user token."""
        url = "https://user.auth.xboxlive.com/user/authenticate"
        headers = {"x-xbl-contract-version": "1"}
        data = {
            "RelyingParty": relying_party,
            "TokenType": "JWT",
            "Properties": {
                "AuthMethod": "RPS",
                "SiteName": "user.auth.xboxlive.com",
                "RpsTicket": self.oauth.access_token
                if use_compact_ticket
                else f"d={self.oauth.access_token}",
            },
        }

        resp = await self.session.post(url, json=data, headers=headers)
        resp.raise_for_status()
        return XAUResponse(**resp.json())

    async def request_xsts_token(
        self, relying_party: str = "http://xboxlive.com"
    ) -> XSTSResponse:
        """Authorize via user token and receive final X token."""
        url = "https://xsts.auth.xboxlive.com/xsts/authorize"
        headers = {"x-xbl-contract-version": "1"}
        data = {
            "RelyingParty": relying_party,
            "TokenType": "JWT",
            "Properties": {
                "UserTokens": [self.user_token.token],
                "SandboxId": "RETAIL",
            },
        }

        resp = await self.session.post(url, json=data, headers=headers)
        if resp.status_code == 401:  # if unauthorized
            print(
                "Failed to authorize you! Your password or username may be wrong or you are trying to use child account (< 18 years old)"
            )
            raise AuthenticationException()
        resp.raise_for_status()
        return XSTSResponse(**resp.json())