1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
from requests import Session
from requests.auth import AuthBase
from authlib.common.encoding import to_native
from authlib.oauth1 import SIGNATURE_HMAC_SHA1
from authlib.oauth1 import SIGNATURE_TYPE_HEADER
from authlib.oauth1 import ClientAuth
from authlib.oauth1.client import OAuth1Client
from ..base_client import OAuthError
from .utils import update_session_configure
class OAuth1Auth(AuthBase, ClientAuth):
"""Signs the request using OAuth 1 (RFC5849)."""
def __call__(self, req):
url, headers, body = self.prepare(req.method, req.url, req.headers, req.body)
req.url = to_native(url)
req.prepare_headers(headers)
if body:
req.body = body
return req
class OAuth1Session(OAuth1Client, Session):
auth_class = OAuth1Auth
def __init__(
self,
client_id,
client_secret=None,
token=None,
token_secret=None,
redirect_uri=None,
rsa_key=None,
verifier=None,
signature_method=SIGNATURE_HMAC_SHA1,
signature_type=SIGNATURE_TYPE_HEADER,
force_include_body=False,
**kwargs,
):
Session.__init__(self)
update_session_configure(self, kwargs)
OAuth1Client.__init__(
self,
session=self,
client_id=client_id,
client_secret=client_secret,
token=token,
token_secret=token_secret,
redirect_uri=redirect_uri,
rsa_key=rsa_key,
verifier=verifier,
signature_method=signature_method,
signature_type=signature_type,
force_include_body=force_include_body,
**kwargs,
)
def rebuild_auth(self, prepared_request, response):
"""When being redirected we should always strip Authorization
header, since nonce may not be reused as per OAuth spec.
"""
if "Authorization" in prepared_request.headers:
# If we get redirected to a new host, we should strip out
# any authentication headers.
prepared_request.headers.pop("Authorization", True)
prepared_request.prepare_auth(self.auth)
@staticmethod
def handle_error(error_type, error_description):
raise OAuthError(error_type, error_description)
|