1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
from twilio.jwt import Jwt
from six import iteritems
from twilio.compat import urlencode
class ClientCapabilityToken(Jwt):
"""A token to control permissions with Twilio Client"""
def __init__(self, account_sid, auth_token, nbf=Jwt.GENERATE, ttl=3600, valid_until=None,
**kwargs):
"""
:param str account_sid: The account sid to which this token is granted access.
:param str auth_token: The secret key used to sign the token. Note, this auth token is not
visible to the user of the token.
:param int nbf: Time in secs from epic before which this token is considered invalid.
:param int ttl: the amount of time in seconds from generation that this token is valid for.
:param kwargs:
:returns: A new CapabilityToken with zero permissions
"""
super(ClientCapabilityToken, self).__init__(
algorithm='HS256',
secret_key=auth_token,
issuer=account_sid,
nbf=nbf,
ttl=ttl,
valid_until=None,
)
self.account_sid = account_sid
self.auth_token = auth_token
self.client_name = None
self.capabilities = {}
if 'allow_client_outgoing' in kwargs:
self.allow_client_outgoing(**kwargs['allow_client_outgoing'])
if 'allow_client_incoming' in kwargs:
self.allow_client_incoming(**kwargs['allow_client_incoming'])
if 'allow_event_stream' in kwargs:
self.allow_event_stream(**kwargs['allow_event_stream'])
def allow_client_outgoing(self, application_sid, **kwargs):
"""
Allow the user of this token to make outgoing connections. Keyword arguments are passed
to the application.
:param str application_sid: Application to contact
"""
scope = ScopeURI('client', 'outgoing', {'appSid': application_sid})
if kwargs:
scope.add_param('appParams', urlencode(kwargs, doseq=True))
self.capabilities['outgoing'] = scope
def allow_client_incoming(self, client_name):
"""
Allow the user of this token to accept incoming connections.
:param str client_name: Client name to accept calls from
"""
self.client_name = client_name
self.capabilities['incoming'] = ScopeURI('client', 'incoming', {'clientName': client_name})
def allow_event_stream(self, **kwargs):
"""
Allow the user of this token to access their event stream.
"""
scope = ScopeURI('stream', 'subscribe', {'path': '/2010-04-01/Events'})
if kwargs:
scope.add_param('params', urlencode(kwargs, doseq=True))
self.capabilities["events"] = scope
def _generate_payload(self):
if 'outgoing' in self.capabilities and self.client_name is not None:
self.capabilities['outgoing'].add_param('clientName', self.client_name)
scope_uris = [scope_uri.to_payload() for scope_uri in self.capabilities.values()]
return {'scope': ' '.join(scope_uris)}
class ScopeURI(object):
"""A single capability granted to Twilio Client and scoped to a service"""
def __init__(self, service, privilege, params=None):
self.service = service
self.privilege = privilege
self.params = params or {}
def add_param(self, key, value):
self.params[key] = value
def to_payload(self):
if self.params:
sorted_params = sorted([(k, v) for k, v in iteritems(self.params)])
encoded_params = urlencode(sorted_params)
param_string = '?{}'.format(encoded_params)
else:
param_string = ''
return 'scope:{}:{}{}'.format(self.service, self.privilege, param_string)
def __str__(self):
return '<ScopeURI {}>'.format(self.to_payload())
|