File: async_app.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (152 lines) | stat: -rw-r--r-- 5,978 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import logging
import time

from authlib.common.urls import urlparse

from .errors import MissingRequestTokenError
from .errors import MissingTokenError
from .sync_app import OAuth1Base
from .sync_app import OAuth2Base

log = logging.getLogger(__name__)

__all__ = ["AsyncOAuth1Mixin", "AsyncOAuth2Mixin"]


class AsyncOAuth1Mixin(OAuth1Base):
    async def request(self, method, url, token=None, **kwargs):
        async with self._get_oauth_client() as session:
            return await _http_request(self, session, method, url, token, kwargs)

    async def create_authorization_url(self, redirect_uri=None, **kwargs):
        """Generate the authorization url and state for HTTP redirect.

        :param redirect_uri: Callback or redirect URI for authorization.
        :param kwargs: Extra parameters to include.
        :return: dict
        """
        if not self.authorize_url:
            raise RuntimeError('Missing "authorize_url" value')

        if self.authorize_params:
            kwargs.update(self.authorize_params)

        async with self._get_oauth_client() as client:
            client.redirect_uri = redirect_uri
            params = {}
            if self.request_token_params:
                params.update(self.request_token_params)
            request_token = await client.fetch_request_token(
                self.request_token_url, **params
            )
            log.debug(f"Fetch request token: {request_token!r}")
            url = client.create_authorization_url(self.authorize_url, **kwargs)
            state = request_token["oauth_token"]
        return {"url": url, "request_token": request_token, "state": state}

    async def fetch_access_token(self, request_token=None, **kwargs):
        """Fetch access token in one step.

        :param request_token: A previous request token for OAuth 1.
        :param kwargs: Extra parameters to fetch access token.
        :return: A token dict.
        """
        async with self._get_oauth_client() as client:
            if request_token is None:
                raise MissingRequestTokenError()
            # merge request token with verifier
            token = {}
            token.update(request_token)
            token.update(kwargs)
            client.token = token
            params = self.access_token_params or {}
            token = await client.fetch_access_token(self.access_token_url, **params)
        return token


class AsyncOAuth2Mixin(OAuth2Base):
    async def _on_update_token(self, token, refresh_token=None, access_token=None):
        if self._update_token:
            await self._update_token(
                token,
                refresh_token=refresh_token,
                access_token=access_token,
            )

    async def load_server_metadata(self):
        if self._server_metadata_url and "_loaded_at" not in self.server_metadata:
            async with self.client_cls(**self.client_kwargs) as client:
                resp = await client.request(
                    "GET", self._server_metadata_url, withhold_token=True
                )
                resp.raise_for_status()
                metadata = resp.json()
                metadata["_loaded_at"] = time.time()
            self.server_metadata.update(metadata)
        return self.server_metadata

    async def request(self, method, url, token=None, **kwargs):
        metadata = await self.load_server_metadata()
        async with self._get_oauth_client(**metadata) as session:
            return await _http_request(self, session, method, url, token, kwargs)

    async def create_authorization_url(self, redirect_uri=None, **kwargs):
        """Generate the authorization url and state for HTTP redirect.

        :param redirect_uri: Callback or redirect URI for authorization.
        :param kwargs: Extra parameters to include.
        :return: dict
        """
        metadata = await self.load_server_metadata()
        authorization_endpoint = self.authorize_url or metadata.get(
            "authorization_endpoint"
        )
        if not authorization_endpoint:
            raise RuntimeError('Missing "authorize_url" value')

        if self.authorize_params:
            kwargs.update(self.authorize_params)

        async with self._get_oauth_client(**metadata) as client:
            client.redirect_uri = redirect_uri
            return self._create_oauth2_authorization_url(
                client, authorization_endpoint, **kwargs
            )

    async def fetch_access_token(self, redirect_uri=None, **kwargs):
        """Fetch access token in the final step.

        :param redirect_uri: Callback or Redirect URI that is used in
                             previous :meth:`authorize_redirect`.
        :param kwargs: Extra parameters to fetch access token.
        :return: A token dict.
        """
        metadata = await self.load_server_metadata()
        token_endpoint = self.access_token_url or metadata.get("token_endpoint")
        async with self._get_oauth_client(**metadata) as client:
            if redirect_uri is not None:
                client.redirect_uri = redirect_uri
            params = {}
            if self.access_token_params:
                params.update(self.access_token_params)
            params.update(kwargs)
            token = await client.fetch_token(token_endpoint, **params)
        return token


async def _http_request(ctx, session, method, url, token, kwargs):
    request = kwargs.pop("request", None)
    withhold_token = kwargs.get("withhold_token")
    if ctx.api_base_url and not url.startswith(("https://", "http://")):
        url = urlparse.urljoin(ctx.api_base_url, url)

    if withhold_token:
        return await session.request(method, url, **kwargs)

    if token is None and ctx._fetch_token and request:
        token = await ctx._fetch_token(request)
    if token is None:
        raise MissingTokenError()

    session.token = token
    return await session.request(method, url, **kwargs)