1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
from base64 import urlsafe_b64encode, b64decode
from collections import namedtuple
import logging
import re
import six
from .base import Resource
from .util import (calculate_mac,
strings_match,
utc_now,
validate_header_attr)
from .exc import (CredentialsLookupError,
InvalidBewit,
MacMismatch,
TokenExpired)
log = logging.getLogger(__name__)
def get_bewit(resource):
"""
Returns a bewit identifier for the resource as a string.
:param resource:
Resource to generate a bewit for
:type resource: `mohawk.base.Resource`
"""
if resource.method != 'GET':
raise ValueError('bewits can only be generated for GET requests')
if resource.nonce != '':
raise ValueError('bewits must use an empty nonce')
mac = calculate_mac(
'bewit',
resource,
None,
)
if isinstance(mac, six.binary_type):
mac = mac.decode('ascii')
if resource.ext is None:
ext = ''
else:
validate_header_attr(resource.ext, name='ext')
ext = resource.ext
# b64encode works only with bytes in python3, but all of our parameters are
# in unicode, so we need to encode them. The cleanest way to do this that
# works in both python 2 and 3 is to use string formatting to get a
# unicode string, and then explicitly encode it to bytes.
inner_bewit = u"{id}\\{exp}\\{mac}\\{ext}".format(
id=resource.credentials['id'],
exp=resource.timestamp,
mac=mac,
ext=ext,
)
inner_bewit_bytes = inner_bewit.encode('ascii')
bewit_bytes = urlsafe_b64encode(inner_bewit_bytes)
# Now decode the resulting bytes back to a unicode string
return bewit_bytes.decode('ascii')
bewittuple = namedtuple('bewittuple', 'id expiration mac ext')
def parse_bewit(bewit):
"""
Returns a `bewittuple` representing the parts of an encoded bewit string.
This has the following named attributes:
(id, expiration, mac, ext)
:param bewit:
A base64 encoded bewit string
:type bewit: str
"""
decoded_bewit = b64decode(bewit).decode('ascii')
bewit_parts = decoded_bewit.split("\\")
if len(bewit_parts) != 4:
raise InvalidBewit('Expected 4 parts to bewit: %s' % decoded_bewit)
return bewittuple(*bewit_parts)
def strip_bewit(url):
"""
Strips the bewit parameter out of a url.
Returns (encoded_bewit, stripped_url)
Raises InvalidBewit if no bewit found.
:param url:
The url containing a bewit parameter
:type url: str
"""
m = re.search('[?&]bewit=([^&]+)', url)
if not m:
raise InvalidBewit('no bewit data found')
bewit = m.group(1)
stripped_url = url[:m.start()] + url[m.end():]
return bewit, stripped_url
def check_bewit(url, credential_lookup, now=None):
"""
Validates the given bewit.
Returns True if the resource has a valid bewit parameter attached,
or raises a subclass of HawkFail otherwise.
:param credential_lookup:
Callable to look up the credentials dict by sender ID.
The credentials dict must have the keys:
``id``, ``key``, and ``algorithm``.
See :ref:`receiving-request` for an example.
:type credential_lookup: callable
:param now=None:
Unix epoch time for the current time to determine if bewit has expired.
If None, then the current time as given by utc_now() is used.
:type now=None: integer
"""
raw_bewit, stripped_url = strip_bewit(url)
bewit = parse_bewit(raw_bewit)
try:
credentials = credential_lookup(bewit.id)
except LookupError:
raise CredentialsLookupError('Could not find credentials for ID {0}'
.format(bewit.id))
res = Resource(url=stripped_url,
method='GET',
credentials=credentials,
timestamp=bewit.expiration,
nonce='',
ext=bewit.ext,
)
mac = calculate_mac('bewit', res, None)
mac = mac.decode('ascii')
if not strings_match(mac, bewit.mac):
raise MacMismatch('bewit with mac {bewit_mac} did not match expected mac {expected_mac}'
.format(bewit_mac=bewit.mac,
expected_mac=mac))
# Check that the timestamp isn't expired
if now is None:
# TODO: Add offset/skew
now = utc_now()
if int(bewit.expiration) < now:
# TODO: Refactor TokenExpired to handle this better
raise TokenExpired('bewit with UTC timestamp {ts} has expired; '
'it was compared to {now}'
.format(ts=bewit.expiration, now=now),
localtime_in_seconds=now,
www_authenticate=''
)
return True
|