File: assertion_session.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (70 lines) | stat: -rw-r--r-- 2,073 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from requests import Session

from authlib.oauth2.rfc7521 import AssertionClient
from authlib.oauth2.rfc7523 import JWTBearerGrant

from .oauth2_session import OAuth2Auth
from .utils import update_session_configure


class AssertionAuth(OAuth2Auth):
    def ensure_active_token(self):
        if self.client and (
            not self.token or self.token.is_expired(self.client.leeway)
        ):
            return self.client.refresh_token()


class AssertionSession(AssertionClient, Session):
    """Constructs a new Assertion Framework for OAuth 2.0 Authorization Grants
    per RFC7521_.

    .. _RFC7521: https://tools.ietf.org/html/rfc7521
    """

    token_auth_class = AssertionAuth
    JWT_BEARER_GRANT_TYPE = JWTBearerGrant.GRANT_TYPE
    ASSERTION_METHODS = {
        JWT_BEARER_GRANT_TYPE: JWTBearerGrant.sign,
    }
    DEFAULT_GRANT_TYPE = JWT_BEARER_GRANT_TYPE

    def __init__(
        self,
        token_endpoint,
        issuer,
        subject,
        audience=None,
        grant_type=None,
        claims=None,
        token_placement="header",
        scope=None,
        default_timeout=None,
        leeway=60,
        **kwargs,
    ):
        Session.__init__(self)
        self.default_timeout = default_timeout
        update_session_configure(self, kwargs)
        AssertionClient.__init__(
            self,
            session=self,
            token_endpoint=token_endpoint,
            issuer=issuer,
            subject=subject,
            audience=audience,
            grant_type=grant_type,
            claims=claims,
            token_placement=token_placement,
            scope=scope,
            leeway=leeway,
            **kwargs,
        )

    def request(self, method, url, withhold_token=False, auth=None, **kwargs):
        """Send request with auto refresh token feature."""
        if self.default_timeout:
            kwargs.setdefault("timeout", self.default_timeout)
        if not withhold_token and auth is None:
            auth = self.token_auth
        return super().request(method, url, auth=auth, **kwargs)