1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
from urllib.request import parse_http_list
from urllib.request import parse_keqv_list
from authlib.common.urls import extract_params
from authlib.common.urls import url_decode
from authlib.common.urls import urlparse
from .errors import DuplicatedOAuthProtocolParameterError
from .errors import InsecureTransportError
from .signature import SIGNATURE_TYPE_BODY
from .signature import SIGNATURE_TYPE_HEADER
from .signature import SIGNATURE_TYPE_QUERY
from .util import unescape
class OAuth1Request:
def __init__(self, method, uri, body=None, headers=None):
InsecureTransportError.check(uri)
self.method = method
self.uri = uri
self.body = body
self.headers = headers or {}
# states namespaces
self.client = None
self.credential = None
self.user = None
self.query = urlparse.urlparse(uri).query
self.query_params = url_decode(self.query)
self.body_params = extract_params(body) or []
self.auth_params, self.realm = _parse_authorization_header(headers)
self.signature_type, self.oauth_params = _parse_oauth_params(
self.query_params, self.body_params, self.auth_params
)
params = []
params.extend(self.query_params)
params.extend(self.body_params)
params.extend(self.auth_params)
self.params = params
@property
def client_id(self):
return self.oauth_params.get("oauth_consumer_key")
@property
def client_secret(self):
if self.client:
return self.client.get_client_secret()
@property
def rsa_public_key(self):
if self.client:
return self.client.get_rsa_public_key()
@property
def timestamp(self):
return self.oauth_params.get("oauth_timestamp")
@property
def redirect_uri(self):
return self.oauth_params.get("oauth_callback")
@property
def signature(self):
return self.oauth_params.get("oauth_signature")
@property
def signature_method(self):
return self.oauth_params.get("oauth_signature_method")
@property
def token(self):
return self.oauth_params.get("oauth_token")
@property
def token_secret(self):
if self.credential:
return self.credential.get_oauth_token_secret()
def _filter_oauth(params):
for k, v in params:
if k.startswith("oauth_"):
yield (k, v)
def _parse_authorization_header(headers):
"""Parse an OAuth authorization header into a list of 2-tuples."""
authorization_header = headers.get("Authorization")
if not authorization_header:
return [], None
auth_scheme = "oauth "
if authorization_header.lower().startswith(auth_scheme):
items = parse_http_list(authorization_header[len(auth_scheme) :])
try:
items = parse_keqv_list(items).items()
auth_params = [(unescape(k), unescape(v)) for k, v in items]
realm = dict(auth_params).get("realm")
return auth_params, realm
except (IndexError, ValueError):
pass
raise ValueError("Malformed authorization header")
def _parse_oauth_params(query_params, body_params, auth_params):
oauth_params_set = [
(SIGNATURE_TYPE_QUERY, list(_filter_oauth(query_params))),
(SIGNATURE_TYPE_BODY, list(_filter_oauth(body_params))),
(SIGNATURE_TYPE_HEADER, list(_filter_oauth(auth_params))),
]
oauth_params_set = [params for params in oauth_params_set if params[1]]
if len(oauth_params_set) > 1:
found_types = [p[0] for p in oauth_params_set]
raise DuplicatedOAuthProtocolParameterError(
'"oauth_" params must come from only 1 signature type '
"but were found in {}".format(",".join(found_types))
)
if oauth_params_set:
signature_type = oauth_params_set[0][0]
oauth_params = dict(oauth_params_set[0][1])
else:
signature_type = None
oauth_params = {}
return signature_type, oauth_params
|