1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
# -*- coding: utf-8 -*-
import os
from time import time
from unittest.mock import patch
import jwt
from oauthlib.common import Request
from oauthlib.oauth2 import ServiceApplicationClient
from tests.unittest import TestCase
class ServiceApplicationClientTest(TestCase):
gt = ServiceApplicationClient.grant_type
private_key = """
-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJopeQRRKKpZI4s5i+UPwVpupG
AlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98gj0x7zamu0Ck1LtQ4c7pFMVah
5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8mfvGGg3xNjTMO7IdrwIDAQAB
AoGBAOQ2KuH8S5+OrsL4K+wfjoCi6MfxCUyqVU9GxocdM1m30WyWRFMEz2nKJ8fR
p3vTD4w8yplTOhcoXdQZl0kRoaDzrcYkm2VvJtQRrX7dKFT8dR8D/Tr7dNQLOXfC
DY6xveQczE7qt7Vk7lp4FqmxBsaaEuokt78pOOjywZoInjZhAkEA9wz3zoZNT0/i
rf6qv2qTIeieUB035N3dyw6f1BGSWYaXSuerDCD/J1qZbAPKKhyHZbVawFt3UMhe
542UftBaxQJBAO0iJy1I8GQjGnS7B3yvyH3CcLYGy296+XO/2xKp/d/ty1OIeovx
C60pLNwuFNF3z9d2GVQAdoQ89hUkOtjZLeMCQQD0JO6oPHUeUjYT+T7ImAv7UKVT
Suy30sKjLzqoGw1kR+wv7C5PeDRvscs4wa4CW9s6mjSrMDkDrmCLuJDtmf55AkEA
kmaMg2PNrjUR51F0zOEFycaaqXbGcFwe1/xx9zLmHzMDXd4bsnwt9kk+fe0hQzVS
JzatanQit3+feev1PN3QewJAWv4RZeavEUhKv+kLe95Yd0su7lTLVduVgh4v5yLT
Ga6FHdjGPcfajt+nrpB1n8UQBEH9ZxniokR/IPvdMlxqXA==
-----END RSA PRIVATE KEY-----
"""
public_key = """
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJ
opeQRRKKpZI4s5i+UPwVpupGAlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98g
j0x7zamu0Ck1LtQ4c7pFMVah5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8
mfvGGg3xNjTMO7IdrwIDAQAB
-----END PUBLIC KEY-----
"""
subject = 'resource-owner@provider.com'
issuer = 'the-client@provider.com'
audience = 'https://provider.com/token'
client_id = "someclientid"
scope = ["/profile"]
kwargs = {
"some": "providers",
"require": "extra arguments"
}
body = "isnot=empty"
body_up = "not=empty&grant_type=%s" % gt
body_kwargs = body_up + "&some=providers&require=extra+arguments"
token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
' "token_type":"example",'
' "expires_in":3600,'
' "scope":"/profile",'
' "example_parameter":"example_value"}')
token = {
"access_token": "2YotnFZFEjr1zCsicMWpAA",
"token_type": "example",
"expires_in": 3600,
"scope": ["/profile"],
"example_parameter": "example_value"
}
@patch('time.time')
def test_request_body(self, t):
t.return_value = time()
self.token['expires_at'] = self.token['expires_in'] + t.return_value
client = ServiceApplicationClient(
self.client_id, private_key=self.private_key)
# Basic with min required params
body = client.prepare_request_body(issuer=self.issuer,
subject=self.subject,
audience=self.audience,
body=self.body)
r = Request('https://a.b', body=body)
self.assertEqual(r.isnot, 'empty')
self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
self.assertEqual(claim['iss'], self.issuer)
# audience verification is handled during decode now
self.assertEqual(claim['sub'], self.subject)
self.assertEqual(claim['iat'], int(t.return_value))
self.assertNotIn('nbf', claim)
self.assertNotIn('jti', claim)
# Missing issuer parameter
self.assertRaises(ValueError, client.prepare_request_body,
issuer=None, subject=self.subject, audience=self.audience, body=self.body)
# Missing subject parameter
self.assertRaises(ValueError, client.prepare_request_body,
issuer=self.issuer, subject=None, audience=self.audience, body=self.body)
# Missing audience parameter
self.assertRaises(ValueError, client.prepare_request_body,
issuer=self.issuer, subject=self.subject, audience=None, body=self.body)
# Optional kwargs
not_before = time() - 3600
jwt_id = '8zd15df4s35f43sd'
body = client.prepare_request_body(issuer=self.issuer,
subject=self.subject,
audience=self.audience,
body=self.body,
not_before=not_before,
jwt_id=jwt_id)
r = Request('https://a.b', body=body)
self.assertEqual(r.isnot, 'empty')
self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
self.assertEqual(claim['iss'], self.issuer)
# audience verification is handled during decode now
self.assertEqual(claim['sub'], self.subject)
self.assertEqual(claim['iat'], int(t.return_value))
self.assertEqual(claim['nbf'], not_before)
self.assertEqual(claim['jti'], jwt_id)
@patch('time.time')
def test_request_body_no_initial_private_key(self, t):
t.return_value = time()
self.token['expires_at'] = self.token['expires_in'] + t.return_value
client = ServiceApplicationClient(
self.client_id, private_key=None)
# Basic with private key provided
body = client.prepare_request_body(issuer=self.issuer,
subject=self.subject,
audience=self.audience,
body=self.body,
private_key=self.private_key)
r = Request('https://a.b', body=body)
self.assertEqual(r.isnot, 'empty')
self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
self.assertEqual(claim['iss'], self.issuer)
# audience verification is handled during decode now
self.assertEqual(claim['sub'], self.subject)
self.assertEqual(claim['iat'], int(t.return_value))
# No private key provided
self.assertRaises(ValueError, client.prepare_request_body,
issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body)
@patch('time.time')
def test_parse_token_response(self, t):
t.return_value = time()
self.token['expires_at'] = self.token['expires_in'] + t.return_value
client = ServiceApplicationClient(self.client_id)
# Parse code and state
response = client.parse_request_body_response(self.token_json, scope=self.scope)
self.assertEqual(response, self.token)
self.assertEqual(client.access_token, response.get("access_token"))
self.assertEqual(client.refresh_token, response.get("refresh_token"))
self.assertEqual(client.token_type, response.get("token_type"))
# Mismatching state
self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '2'
token = client.parse_request_body_response(self.token_json, scope="invalid")
self.assertTrue(token.scope_changed)
del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
|