1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
import hmac
import sys
from twilio.jwt import compat
if sys.version_info[0] == 3 and sys.version_info[1] == 2:
# PyJWT expects hmac.compare_digest to exist even under python 3.2
hmac.compare_digest = compat.compare_digest
import jwt as jwt_lib
try:
import json
except ImportError:
import simplejson as json
import time
__all__ = ['Jwt', 'JwtDecodeError']
class JwtDecodeError(Exception):
pass
class Jwt(object):
"""Base class for building a Json Web Token"""
GENERATE = object()
def __init__(self, secret_key, issuer, subject=None, algorithm='HS256', nbf=GENERATE,
ttl=3600, valid_until=None):
self.secret_key = secret_key
""":type str: The secret used to encode the JWT"""
self.issuer = issuer
""":type str: The issuer of this JWT"""
self.subject = subject
""":type str: The subject of this JWT, ommited from payload by default"""
self.algorithm = algorithm
""":type str: The algorithm used to encode the JWT, defaults to 'HS256'"""
self.nbf = nbf
""":type int: Time in secs since epoch before which this JWT is invalid. Defaults to now."""
self.ttl = ttl
""":type int: Time to live of the JWT in seconds, defaults to 1 hour"""
self.valid_until = valid_until
""":type int: Time in secs since epoch this JWT is valid for. Overrides ttl if provided."""
self.__decoded_payload = None
self.__decoded_headers = None
def _generate_payload(self):
""":rtype: dict the payload of the JWT to send"""
raise NotImplementedError('Subclass must provide a payload.')
def _generate_headers(self):
""":rtype dict: Additional headers to include in the JWT, defaults to an empty dict"""
return {}
@classmethod
def _from_jwt(cls, headers, payload, key=None):
"""
Class specific implementation of from_jwt which should take jwt components and return
and instance of this Class with jwt information loaded.
:return: Jwt object containing the headers, payload and key
"""
jwt = Jwt(
secret_key=key,
issuer=payload.get('iss', None),
subject=payload.get('sub', None),
algorithm=headers.get('alg', None),
valid_until=payload.get('exp', None),
nbf=payload.get('nbf', None),
)
jwt.__decoded_payload = payload
jwt.__decoded_headers = headers
return jwt
@property
def payload(self):
if self.__decoded_payload:
return self.__decoded_payload
payload = self._generate_payload().copy()
payload['iss'] = self.issuer
payload['exp'] = int(time.time()) + self.ttl
if self.nbf is not None:
if self.nbf == self.GENERATE:
payload['nbf'] = int(time.time())
else:
payload['nbf'] = self.nbf
if self.valid_until:
payload['exp'] = self.valid_until
if self.subject:
payload['sub'] = self.subject
return payload
@property
def headers(self):
if self.__decoded_headers:
return self.__decoded_headers
headers = self._generate_headers().copy()
headers['typ'] = 'JWT'
headers['alg'] = self.algorithm
return headers
def to_jwt(self, algorithm=None, ttl=None):
"""
Encode this JWT object into a JWT string
:param str algorithm: override the algorithm used to encode the JWT
:param int ttl: override the ttl configured in the constructor
:rtype: str The JWT string
"""
if not self.secret_key:
raise ValueError('JWT does not have a signing key configured.')
headers = self.headers.copy()
if algorithm:
headers['alg'] = algorithm
algorithm = algorithm or self.algorithm
payload = self.payload.copy()
if ttl:
payload['exp'] = int(time.time()) + ttl
return jwt_lib.encode(payload, self.secret_key, algorithm=algorithm, headers=headers)
@classmethod
def from_jwt(cls, jwt, key=''):
"""
Decode a JWT string into a Jwt object
:param str jwt: JWT string
:param Optional[str] key: key used to verify JWT signature, if not provided then validation
is skipped.
:raises JwtDecodeError if decoding JWT fails for any reason.
:return: A DecodedJwt object containing the jwt information.
"""
verify = True if key else False
try:
payload = jwt_lib.decode(bytes(jwt), key, options={
'verify_signature': verify,
'verify_exp': True,
'verify_nbf': True,
})
headers = jwt_lib.get_unverified_header(jwt)
except Exception as e:
raise JwtDecodeError(getattr(e, 'message', str(e)))
return cls._from_jwt(headers, payload, key)
def __str__(self):
return '<JWT {}>'.format(self.to_jwt())
|