File: capabilities.py

package info (click to toggle)
python-twilio 6.51.0%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 12,260 kB
  • sloc: python: 128,982; makefile: 51
file content (112 lines) | stat: -rw-r--r-- 4,281 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from twilio.jwt.taskrouter import TaskRouterCapabilityToken


class WorkerCapabilityToken(TaskRouterCapabilityToken):
    def __init__(self, account_sid, auth_token, workspace_sid, worker_sid, ttl=3600, **kwargs):
        """
        :param kwargs:
            All kwarg parameters supported by TaskRouterCapabilityToken
            :param bool allow_fetch_activities: shortcut to calling allow_fetch_activities,
                                                defaults to True
            :param bool allow_fetch_reservations: shortcut to calling allow_fetch_reservations,
                                                  defaults to True
            :param bool allow_fetch_worker_reservations: shortcut to calling allow_fetch_worker_reservations,
                                                         defaults to True
            :param bool allow_update_activities: shortcut to calling allow_update_activities,
                                                 defaults to False
            :param bool allow_update_reservations: shortcut to calling allow_update_reservations,
                                                   defaults to False
        """
        super(WorkerCapabilityToken, self).__init__(
            account_sid=account_sid,
            auth_token=auth_token,
            workspace_sid=workspace_sid,
            channel_id=worker_sid,
            ttl=ttl,
            **kwargs
        )

        if kwargs.get('allow_fetch_activities', True):
            self.allow_fetch_activities()
        if kwargs.get('allow_fetch_reservations', True):
            self.allow_fetch_reservations()
        if kwargs.get('allow_fetch_worker_reservations', True):
            self.allow_fetch_worker_reservations()
        if kwargs.get('allow_update_activities', False):
            self.allow_update_activities()
        if kwargs.get('allow_update_reservations', False):
            self.allow_update_reservations()

    @property
    def resource_url(self):
        return '{}/Workers/{}'.format(self.workspace_url, self.channel_id)

    @property
    def channel_prefix(self):
        return 'WK'

    def allow_fetch_activities(self):
        self._make_policy(self.workspace_url + '/Activities', 'GET', True)

    def allow_fetch_reservations(self):
        self._make_policy(self.workspace_url + '/Tasks/**', 'GET', True)

    def allow_fetch_worker_reservations(self):
        self._make_policy(self.resource_url + '/Reservations/**', 'GET', True)

    def allow_update_activities(self):
        post_filter = {'ActivitySid': {'required': True}}
        self._make_policy(self.resource_url, 'POST', True, post_filter=post_filter)

    def allow_update_reservations(self):
        self._make_policy(self.workspace_url + '/Tasks/**', 'POST', True)
        self._make_policy(self.resource_url + '/Reservations/**', 'POST', True)

    def __str__(self):
        return '<WorkerCapabilityToken {}>'.format(self.to_jwt())


class TaskQueueCapabilityToken(TaskRouterCapabilityToken):
    def __init__(self, account_sid, auth_token, workspace_sid, task_queue_sid, ttl=3600, **kwargs):
        super(TaskQueueCapabilityToken, self).__init__(
            account_sid=account_sid,
            auth_token=auth_token,
            workspace_sid=workspace_sid,
            channel_id=task_queue_sid,
            ttl=ttl,
            **kwargs
        )

    @property
    def resource_url(self):
        return '{}/TaskQueues/{}'.format(self.workspace_url, self.channel_id)

    @property
    def channel_prefix(self):
        return 'WQ'

    def __str__(self):
        return '<TaskQueueCapabilityToken {}>'.format(self.to_jwt())


class WorkspaceCapabilityToken(TaskRouterCapabilityToken):
    def __init__(self, account_sid, auth_token, workspace_sid, ttl=3600, **kwargs):
        super(WorkspaceCapabilityToken, self).__init__(
            account_sid=account_sid,
            auth_token=auth_token,
            workspace_sid=workspace_sid,
            channel_id=workspace_sid,
            ttl=ttl,
            **kwargs
        )

    @property
    def resource_url(self):
        return self.workspace_url

    @property
    def channel_prefix(self):
        return 'WS'

    def __str__(self):
        return '<WorkspaceCapabilityToken {}>'.format(self.to_jwt())