1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
# -*- coding: utf-8 -*-
from json import loads
from unittest.mock import MagicMock
from oauthlib.common import urlencode
from oauthlib.oauth2 import IntrospectEndpoint, RequestValidator
from tests.unittest import TestCase
class IntrospectEndpointTest(TestCase):
def setUp(self):
self.validator = MagicMock(wraps=RequestValidator())
self.validator.client_authentication_required.return_value = True
self.validator.authenticate_client.return_value = True
self.validator.validate_bearer_token.return_value = True
self.validator.introspect_token.return_value = {}
self.endpoint = IntrospectEndpoint(self.validator)
self.uri = 'should_not_matter'
self.headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
self.resp_h = {
'Cache-Control': 'no-store',
'Content-Type': 'application/json',
'Pragma': 'no-cache'
}
self.resp_b = {
"active": True
}
def test_introspect_token(self):
for token_type in ('access_token', 'refresh_token', 'invalid'):
body = urlencode([('token', 'foo'),
('token_type_hint', token_type)])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b), self.resp_b)
self.assertEqual(s, 200)
def test_introspect_token_nohint(self):
# don't specify token_type_hint
body = urlencode([('token', 'foo')])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b), self.resp_b)
self.assertEqual(s, 200)
def test_introspect_token_false(self):
self.validator.introspect_token.return_value = None
body = urlencode([('token', 'foo')])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b), {"active": False})
self.assertEqual(s, 200)
def test_introspect_token_claims(self):
self.validator.introspect_token.return_value = {"foo": "bar"}
body = urlencode([('token', 'foo')])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b), {"active": True, "foo": "bar"})
self.assertEqual(s, 200)
def test_introspect_token_claims_spoof_active(self):
self.validator.introspect_token.return_value = {"foo": "bar", "active": False}
body = urlencode([('token', 'foo')])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b), {"active": True, "foo": "bar"})
self.assertEqual(s, 200)
def test_introspect_token_client_authentication_failed(self):
self.validator.authenticate_client.return_value = False
body = urlencode([('token', 'foo'),
('token_type_hint', 'access_token')])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, {
'Content-Type': 'application/json',
'Cache-Control': 'no-store',
'Pragma': 'no-cache',
"WWW-Authenticate": 'Bearer error="invalid_client"'
})
self.assertEqual(loads(b)['error'], 'invalid_client')
self.assertEqual(s, 401)
def test_introspect_token_public_client_authentication(self):
self.validator.client_authentication_required.return_value = False
self.validator.authenticate_client_id.return_value = True
for token_type in ('access_token', 'refresh_token', 'invalid'):
body = urlencode([('token', 'foo'),
('token_type_hint', token_type)])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b), self.resp_b)
self.assertEqual(s, 200)
def test_introspect_token_public_client_authentication_failed(self):
self.validator.client_authentication_required.return_value = False
self.validator.authenticate_client_id.return_value = False
body = urlencode([('token', 'foo'),
('token_type_hint', 'access_token')])
h, b, s = self.endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, {
'Content-Type': 'application/json',
'Cache-Control': 'no-store',
'Pragma': 'no-cache',
"WWW-Authenticate": 'Bearer error="invalid_client"'
})
self.assertEqual(loads(b)['error'], 'invalid_client')
self.assertEqual(s, 401)
def test_introspect_unsupported_token(self):
endpoint = IntrospectEndpoint(self.validator,
supported_token_types=['access_token'])
body = urlencode([('token', 'foo'),
('token_type_hint', 'refresh_token')])
h, b, s = endpoint.create_introspect_response(self.uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b)['error'], 'unsupported_token_type')
self.assertEqual(s, 400)
h, b, s = endpoint.create_introspect_response(self.uri,
headers=self.headers, body='')
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b)['error'], 'invalid_request')
self.assertEqual(s, 400)
def test_introspect_invalid_request_method(self):
endpoint = IntrospectEndpoint(self.validator,
supported_token_types=['access_token'])
test_methods = ['GET', 'pUt', 'dEleTe', 'paTcH']
test_methods = test_methods + [x.lower() for x in test_methods] + [x.upper() for x in test_methods]
for method in test_methods:
body = urlencode([('token', 'foo'),
('token_type_hint', 'refresh_token')])
h, b, s = endpoint.create_introspect_response(self.uri,
http_method = method, headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b)['error'], 'invalid_request')
self.assertIn('Unsupported request method', loads(b)['error_description'])
self.assertEqual(s, 400)
def test_introspect_bad_post_request(self):
endpoint = IntrospectEndpoint(self.validator,
supported_token_types=['access_token'])
for param in ['token', 'secret', 'code', 'foo']:
uri = 'http://some.endpoint?' + urlencode([(param, 'secret')])
body = urlencode([('token', 'foo'),
('token_type_hint', 'access_token')])
h, b, s = endpoint.create_introspect_response(
uri,
headers=self.headers, body=body)
self.assertEqual(h, self.resp_h)
self.assertEqual(loads(b)['error'], 'invalid_request')
self.assertIn('query parameters are not allowed', loads(b)['error_description'])
self.assertEqual(s, 400)
|