File: test_renewing_authorizer.py

package info (click to toggle)
python-globus-sdk 4.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,172 kB
  • sloc: python: 35,227; sh: 44; makefile: 35
file content (192 lines) | stat: -rw-r--r-- 5,739 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import time
from unittest import mock

import pytest

from globus_sdk import exc
from globus_sdk.authorizers.renewing import EXPIRES_ADJUST_SECONDS, RenewingAuthorizer


class MockRenewer(RenewingAuthorizer):
    """
    Class that implements RenewingAuthorizer so that _get_token_response and
    _extract_token_data can return known values for testing
    """

    def __init__(self, token_data, **kwargs) -> None:
        self.token_data = token_data
        self.token_response = mock.Mock()
        super().__init__(**kwargs)

    def _get_token_response(self):
        return self.token_response

    def _extract_token_data(self, res):
        return self.token_data


ACCESS_TOKEN = "access_token_1"


@pytest.fixture
def expires_at():
    return int(time.time()) + EXPIRES_ADJUST_SECONDS + 10


@pytest.fixture
def token_data():
    return {
        "expires_at_seconds": int(time.time()) + 1000,
        "access_token": "access_token_2",
    }


@pytest.fixture
def on_refresh():
    return mock.Mock()


@pytest.fixture
def authorizer(on_refresh, token_data, expires_at):
    return MockRenewer(
        token_data,
        access_token=ACCESS_TOKEN,
        expires_at=expires_at,
        on_refresh=on_refresh,
    )


@pytest.fixture
def expired_authorizer(on_refresh, token_data, expires_at):
    return MockRenewer(
        token_data,
        access_token=ACCESS_TOKEN,
        expires_at=expires_at - 11,
        on_refresh=on_refresh,
    )


def test_init(token_data, expires_at):
    """
    Creating a MockRenewer with partial data (expires_at, access_token) results in
    errors.
    Either complete data or no partial data works.
    """
    authorizer = MockRenewer(
        token_data, access_token=ACCESS_TOKEN, expires_at=expires_at
    )
    assert authorizer.access_token == ACCESS_TOKEN
    assert authorizer.access_token != token_data["access_token"]

    # with no args, an automatic "refresh" is triggered on init
    authorizer = MockRenewer(token_data)
    assert authorizer.access_token == token_data["access_token"]
    assert authorizer.expires_at == token_data["expires_at_seconds"]

    with pytest.raises(exc.GlobusSDKUsageError):
        MockRenewer(token_data, access_token=ACCESS_TOKEN)

    with pytest.raises(exc.GlobusSDKUsageError):
        MockRenewer(token_data, expires_at=expires_at)


def test_get_new_access_token(authorizer, token_data, on_refresh):
    """
    Calls get_new_access token, confirms that the mock _get_token_data
    is used and that the mock on_refresh function is called.
    """
    # take note of original access_token_hash
    original_hash = authorizer._access_token_hash

    # get new_access_token
    authorizer._get_new_access_token()
    # confirm side effects
    assert authorizer.access_token == token_data["access_token"]
    assert authorizer.expires_at == token_data["expires_at_seconds"]
    assert authorizer._access_token_hash != original_hash
    on_refresh.assert_called_once()


def test_ensure_valid_token_ok(authorizer):
    """
    Confirms nothing is done before the access_token expires,
    """
    authorizer.ensure_valid_token()
    assert authorizer.access_token == ACCESS_TOKEN


def test_ensure_valid_token_expired(expired_authorizer, token_data):
    """
    Confirms a new access_token is gotten after expiration
    """
    expired_authorizer.ensure_valid_token()
    assert expired_authorizer.access_token == token_data["access_token"]
    assert expired_authorizer.expires_at == token_data["expires_at_seconds"]


def test_ensure_valid_token_no_token(authorizer, token_data):
    """
    Confirms a new access_token is gotten if the old one is set to None
    """
    authorizer.access_token = None
    authorizer.ensure_valid_token()
    assert authorizer.access_token == token_data["access_token"]
    assert authorizer.expires_at == token_data["expires_at_seconds"]


def test_ensure_valid_token_no_expiration(authorizer, token_data):
    """
    Confirms a new access_token is gotten if expires_at is set to None
    """
    authorizer.expires_at = None
    authorizer.ensure_valid_token()
    assert authorizer.access_token == token_data["access_token"]
    assert authorizer.expires_at == token_data["expires_at_seconds"]


def test_get_authorization_header(authorizer):
    """
    Gets authorization header, confirms expected value
    """
    assert authorizer.get_authorization_header() == "Bearer " + ACCESS_TOKEN


def test_get_authorization_header_expired(expired_authorizer, token_data):
    """
    Sets the access_token to be expired, then gets authorization header
    Confirms header value uses the new access_token.
    """
    assert expired_authorizer.get_authorization_header() == (
        "Bearer " + token_data["access_token"]
    )


def test_get_authorization_header_no_token(authorizer, token_data):
    """
    Sets the access_token to None, then gets authorization header
    Confirms header value uses the new access_token.
    """
    authorizer.access_token = None
    assert authorizer.get_authorization_header() == (
        "Bearer " + token_data["access_token"]
    )


def test_get_authorization_header_no_expires(authorizer, token_data):
    """
    Sets expires_at to None, then gets authorization header
    Confirms header value uses the new access_token.
    """
    authorizer.expires_at = None
    assert authorizer.get_authorization_header() == (
        "Bearer " + token_data["access_token"]
    )


def test_handle_missing_authorization(authorizer):
    """
    Confirms that RenewingAuthorizers will attempt to fix 401s
    by treating their existing access_token as expired
    """
    assert authorizer.handle_missing_authorization()
    assert authorizer.expires_at is None