File: token.py

package info (click to toggle)
python-ytmusicapi 1.10.2-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,412 kB
  • sloc: python: 4,324; sh: 14; makefile: 12
file content (146 lines) | stat: -rw-r--r-- 5,033 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import json
import time
import webbrowser
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

from requests.structures import CaseInsensitiveDict

from ytmusicapi.auth.oauth.credentials import Credentials
from ytmusicapi.auth.oauth.models import BaseTokenDict, Bearer, DefaultScope, RefreshableTokenDict


@dataclass
class Token:
    """Base class representation of the YouTubeMusicAPI OAuth token."""

    scope: DefaultScope
    token_type: Bearer

    access_token: str
    refresh_token: str
    expires_at: int = 0
    expires_in: int = 0

    @staticmethod
    def members():
        return Token.__annotations__.keys()

    def __repr__(self) -> str:
        """Readable version."""
        return f"{self.__class__.__name__}: {self.as_dict()}"

    def as_dict(self) -> RefreshableTokenDict:
        """Returns dictionary containing underlying token values."""
        return {key: self.__dict__[key] for key in Token.members()}  # type: ignore

    def as_json(self) -> str:
        return json.dumps(self.as_dict())

    def as_auth(self) -> str:
        """Returns Authorization header ready str of token_type and access_token."""
        return f"{self.token_type} {self.access_token}"

    @property
    def is_expiring(self) -> bool:
        return self.expires_in < 60


class OAuthToken(Token):
    """Wrapper for an OAuth token implementing expiration methods."""

    @staticmethod
    def is_oauth(headers: CaseInsensitiveDict) -> bool:
        return all(key in headers for key in Token.members())

    def update(self, fresh_access: BaseTokenDict):
        """
        Update access_token and expiration attributes with a BaseTokenDict inplace.
        expires_at attribute set using current epoch, avoid expiration desync
        by passing only recently requested tokens dicts or updating values to compensate.
        """
        self.access_token = fresh_access["access_token"]
        self.expires_at = int(time.time()) + fresh_access["expires_in"]

    @property
    def is_expiring(self) -> bool:
        return self.expires_at - int(time.time()) < 60

    @classmethod
    def from_json(cls, file_path: Path) -> "OAuthToken":
        if file_path.is_file():
            with open(file_path) as json_file:
                file_pack = json.load(json_file)

        return cls(**file_pack)


@dataclass
class RefreshingToken(OAuthToken):
    """
    Compositional implementation of Token that automatically refreshes
    an underlying OAuthToken when required (credential expiration <= 1 min)
    upon access_token attribute access.
    """

    #: credentials used for access_token refreshing
    credentials: Optional[Credentials] = None

    #: protected/property attribute enables auto writing token values to new file location via setter
    _local_cache: Optional[Path] = None

    def __getattribute__(self, item):
        """access token setter to auto-refresh if it is expiring"""
        if item == "access_token" and self.is_expiring:
            fresh = self.credentials.refresh_token(self.refresh_token)
            self.update(fresh)
            self.store_token()

        return super().__getattribute__(item)

    @property
    def local_cache(self) -> Optional[Path]:
        return self._local_cache

    @local_cache.setter
    def local_cache(self, path: Path):
        """Update attribute and dump token to new path."""
        self._local_cache = path
        self.store_token()

    @classmethod
    def prompt_for_token(
        cls, credentials: Credentials, open_browser: bool = False, to_file: Optional[str] = None
    ) -> "RefreshingToken":
        """
        Method for CLI token creation via user inputs.

        :param credentials: Client credentials
        :param open_browser: Optional. Open browser to OAuth consent url automatically. (Default: ``False``).
        :param to_file: Optional. Path to store/sync json version of resulting token. (Default: ``None``).
        """

        code = credentials.get_code()
        url = f"{code['verification_url']}?user_code={code['user_code']}"
        if open_browser:
            webbrowser.open(url)
        input(f"Go to {url}, finish the login flow and press Enter when done, Ctrl-C to abort")
        raw_token = credentials.token_from_code(code["device_code"])
        ref_token = cls(credentials=credentials, **raw_token)
        ref_token.update(ref_token.as_dict())
        if to_file:
            ref_token.local_cache = Path(to_file)
        return ref_token

    def store_token(self, path: Optional[str] = None) -> None:
        """
        Write token values to json file at specified path, defaulting to self.local_cache.
        Operation does not update instance local_cache attribute.
        Automatically called when local_cache is set post init.
        """
        file_path = path if path else self.local_cache

        if file_path:
            with open(file_path, encoding="utf8", mode="w") as file:
                json.dump(self.as_dict(), file, indent=True)