1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
#!/usr/bin/env python
# encoding: utf-8
##############################################################################
##############################################################################
##############################################################################
###
### oauth/__init__.py
###
### This is for OAUTH authentication.
###
##############################################################################
##############################################################################
##############################################################################
import re
import urllib
import urlparse
import hmac
import binascii
import random
import time
AUTH_HEADER_OAUTH = 'Authorization'
OAUTH_VERSION = '1.0'
def escape(s):
"""Escape a URL including any /."""
return urllib.quote(s, '-._~')
def generate_timestamp():
"""Get seconds since epoch (UTC)."""
return int(time.time())
def generate_nonce(length=8):
"""Generate pseudorandom number."""
return ''.join([str(random.randint(0, 9)) for i in range(length)])
def generate_verifier(length=8):
"""Generate pseudorandom number."""
return ''.join([str(random.randint(0, 9)) for i in range(length)])
def normalise_url(url):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
# Exclude default port numbers.
if scheme == 'http' and netloc[-3:] == ':80':
netloc = netloc[:-3]
elif scheme == 'https' and netloc[-4:] == ':443':
netloc = netloc[:-4]
if scheme not in ('http', 'https'):
raise ValueError("Unsupported URL %s (%s)." % (value, scheme))
# Normalized URL excludes params, query, and fragment.
return urlparse.urlunparse((scheme, netloc, path, None, None, None))
def split_url_string(param_str):
"""Turn URL string into parameters."""
parameters = urlparse.parse_qs(param_str, keep_blank_values=False)
for k, v in parameters.iteritems():
parameters[k] = urllib.unquote(v[0])
return parameters
def get_normalized_parameters(params, url):
"""Return a string that contains the parameters that must be signed."""
items = []
for key, value in params.iteritems():
if key == 'oauth_signature':
continue
# 1.0a/9.1.1 states that kvp must be sorted by key, then by value,
# so we unpack sequence values into multiple items for sorting.
if hasattr(value, '__iter__'):
items.extend((key, item) for item in value)
else:
items.append((key, value))
# Include any query string parameters from the provided URL
query = urlparse.urlparse(url)[4]
url_items = split_url_string(query).items()
non_oauth_url_items = list([(k, v) for k, v in url_items if not k.startswith('oauth_')])
items.extend(non_oauth_url_items)
encoded_str = urllib.urlencode(sorted(items))
# Encode signature parameters per Oauth Core 1.0 protocol
# spec draft 7, section 3.6
# (http://tools.ietf.org/html/draft-hammer-oauth-07#section-3.6)
# Spaces must be encoded with "%20" instead of "+"
return encoded_str.replace('+', '%20').replace('%7E', '~')
class SignatureMethod(object):
"""A way of signing requests.
The OAuth protocol lets consumers and service providers pick a way to sign
requests. This interface shows the methods expected by the other `oauth`
modules for signing requests. Subclass it and implement its methods to
provide a new way to sign requests.
"""
def signing_base(self, params, url, method, consumer, token):
"""Calculates the string that needs to be signed.
This method returns a 2-tuple containing the starting key for the
signing and the message to be signed. The latter may be used in error
messages to help clients debug their software.
"""
raise NotImplementedError
def sign(self, params, url, method, consumer, token):
"""Returns the signature for the given request, based on the consumer
and token also provided.
You should use your implementation of `, ()` to build the
message to sign. Otherwise it may be less useful for debugging.
"""
raise NotImplementedError
def check(self, params, url, method, consumer, token, signature):
"""Returns whether the given signature is the correct signature for
the given consumer and token signing the given request."""
built = self.sign(params, url, method, consumer, token)
return built == signature
class SignatureMethod_HMAC_SHA1(SignatureMethod):
name = 'HMAC-SHA1'
def signing_base(self, params, url, method, consumer, token):
norm_url = normalise_url(url)
if norm_url is None:
raise ValueError("Base URL for request is not set.")
sig = (
escape(method),
escape(norm_url),
escape(get_normalized_parameters(params, url)),
)
key = '%s&' % escape(consumer.secret)
if token:
key += escape(token.secret)
raw = '&'.join(sig)
return key, raw
def sign(self, params, url, method, consumer, token):
"""Builds the base signature string."""
key, raw = self.signing_base(params, url, method, consumer, token)
# HMAC object.
try:
from hashlib import sha1 as sha
except ImportError:
import sha # Deprecated
hashed = hmac.new(key, raw, sha)
# Calculate the digest base 64.
return binascii.b2a_base64(hashed.digest())[:-1]
class SignatureMethod_PLAINTEXT(SignatureMethod):
name = 'PLAINTEXT'
def signing_base(self, params, url, method, consumer, token):
"""Concatenates the consumer key and secret with the token's
secret."""
sig = '%s&' % escape(consumer.secret)
if token:
sig = sig + escape(token.secret)
return sig, sig
def sign(self, params, url, method, consumer, token):
key, raw = self.signing_base(params, url, method, consumer, token)
return raw
class KeySecret(object):
def __init__(self, key, secret):
self.key = key
self.secret = secret
def generate_header(params, realm):
"""Serialize as a header for an HTTPAuth request."""
oauth_params = ((k, v) for k, v in params.items()
if k.startswith('oauth_'))
stringy_params = ((k, escape(str(v))) for k, v in oauth_params)
header_params = ('%s="%s"' % (k, v) for k, v in stringy_params)
params_header = ', '.join(header_params)
auth_header = 'OAuth realm="%s"' % realm
if params_header:
auth_header = "%s, %s" % (auth_header, params_header)
return auth_header
def get_oauth_header(req, consumer, token, realm=''):
params = {}
# The version of oauth
params['oauth_version'] = OAUTH_VERSION
# Suitable nonce value
params['oauth_nonce'] = generate_nonce()
# Timestamp to stop replay attacks
params['oauth_timestamp'] = generate_timestamp()
if consumer:
params['oauth_consumer_key'] = consumer.key
else:
raise Exception('No consumer key given.')
if token:
params['oauth_token'] = token.key
signature_method = SignatureMethod_HMAC_SHA1()
params['oauth_signature_method'] = signature_method.name
# Your signature using token and consumer secrets
params['oauth_signature'] = signature_method.sign(params, req.get_full_url(), req.get_method(), consumer, token)
return generate_header(params, realm)
|