1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
from twilio.jwt import Jwt
class TaskRouterCapabilityToken(Jwt):
VERSION = 'v1'
DOMAIN = 'https://taskrouter.twilio.com'
EVENTS_BASE_URL = 'https://event-bridge.twilio.com/v1/wschannels'
def __init__(self, account_sid, auth_token, workspace_sid, channel_id, **kwargs):
"""
:param str account_sid: Twilio account sid
:param str auth_token: Twilio auth token used to sign the JWT
:param str workspace_sid: TaskRouter workspace sid
:param str channel_id: TaskRouter channel sid
:param kwargs:
:param bool allow_web_sockets: shortcut to calling allow_web_sockets, defaults to True
:param bool allow_fetch_self: shortcut to calling allow_fetch_self, defaults to True
:param bool allow_update_self: shortcut to calling allow_update_self, defaults to False
:param bool allow_delete_self: shortcut to calling allow_delete_self, defaults to False
:param bool allow_fetch_subresources: shortcut to calling allow_fetch_subresources,
defaults to False
:param bool allow_update_subresources: shortcut to calling allow_update_subresources,
defaults to False
:param bool allow_delete_subresources: shortcut to calling allow_delete_subresources,
defaults to False
:returns a new TaskRouterCapabilityToken with capabilities set depending on kwargs.
"""
super(TaskRouterCapabilityToken, self).__init__(
secret_key=auth_token,
issuer=account_sid,
algorithm='HS256',
nbf=kwargs.get('nbf', Jwt.GENERATE),
ttl=kwargs.get('ttl', 3600),
valid_until=kwargs.get('valid_until', None),
)
self._validate_inputs(account_sid, workspace_sid, channel_id)
self.account_sid = account_sid
self.auth_token = auth_token
self.workspace_sid = workspace_sid
self.channel_id = channel_id
self.policies = []
if kwargs.get('allow_web_sockets', True):
self.allow_web_sockets()
if kwargs.get('allow_fetch_self', True):
self.allow_fetch_self()
if kwargs.get('allow_update_self', False):
self.allow_update_self()
if kwargs.get('allow_delete_self', False):
self.allow_delete_self()
if kwargs.get('allow_fetch_subresources', False):
self.allow_fetch_subresources()
if kwargs.get('allow_delete_subresources', False):
self.allow_delete_subresources()
if kwargs.get('allow_update_subresources', False):
self.allow_update_subresources()
@property
def workspace_url(self):
return '{}/{}/Workspaces/{}'.format(self.DOMAIN, self.VERSION, self.workspace_sid)
@property
def resource_url(self):
raise NotImplementedError('Subclass must set its specific resource_url.')
@property
def channel_prefix(self):
raise NotImplementedError('Subclass must set its specific channel_id sid prefix.')
def allow_fetch_self(self):
self._make_policy(self.resource_url, 'GET', True)
def allow_update_self(self):
self._make_policy(self.resource_url, 'POST', True)
def allow_delete_self(self):
self._make_policy(self.resource_url, 'DELETE', True)
def allow_fetch_subresources(self):
self._make_policy(self.resource_url + '/**', 'GET', True)
def allow_update_subresources(self):
self._make_policy(self.resource_url + '/**', 'POST', True)
def allow_delete_subresources(self):
self._make_policy(self.resource_url + '/**', 'DELETE', True)
def allow_web_sockets(self, channel_id=None):
channel_id = channel_id or self.channel_id
web_socket_url = '{}/{}/{}'.format(self.EVENTS_BASE_URL, self.account_sid, channel_id)
self._make_policy(web_socket_url, 'GET', True)
self._make_policy(web_socket_url, 'POST', True)
def _generate_payload(self):
payload = {
'account_sid': self.account_sid,
'workspace_sid': self.workspace_sid,
'channel': self.channel_id,
'version': self.VERSION,
'friendly_name': self.channel_id,
'policies': self.policies,
}
if self.channel_id.startswith('WK'):
payload['worker_sid'] = self.channel_id
elif self.channel_id.startswith('WQ'):
payload['taskqueue_sid'] = self.channel_id
return payload
def _make_policy(self, url, method, allowed, query_filter=None, post_filter=None):
self.policies.append({
'url': url,
'method': method.upper(),
'allow': allowed,
'query_filter': query_filter or {},
'post_filter': post_filter or {},
})
def _validate_inputs(self, account_sid, workspace_sid, channel_id):
if not account_sid or not account_sid.startswith('AC'):
raise ValueError('Invalid account sid provided {}'.format(account_sid))
if not workspace_sid or not workspace_sid.startswith('WS'):
raise ValueError('Invalid workspace sid provided {}'.format(workspace_sid))
if not channel_id or not channel_id.startswith(self.channel_prefix):
raise ValueError('Invalid channel id provided {}'.format(channel_id))
def __str__(self):
return '<TaskRouterCapabilityToken {}>'.format(self.to_jwt())
|