File: capabilities.py

package info (click to toggle)
python-twilio 9.4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,756 kB
  • sloc: python: 8,281; makefile: 65
file content (116 lines) | stat: -rw-r--r-- 4,309 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from twilio.jwt.taskrouter import TaskRouterCapabilityToken


class WorkerCapabilityToken(TaskRouterCapabilityToken):
    def __init__(
        self, account_sid, auth_token, workspace_sid, worker_sid, ttl=3600, **kwargs
    ):
        """
        :param kwargs:
            All kwarg parameters supported by TaskRouterCapabilityToken
            :param bool allow_fetch_activities: shortcut to calling allow_fetch_activities,
                                                defaults to True
            :param bool allow_fetch_reservations: shortcut to calling allow_fetch_reservations,
                                                  defaults to True
            :param bool allow_fetch_worker_reservations: shortcut to calling allow_fetch_worker_reservations,
                                                         defaults to True
            :param bool allow_update_activities: shortcut to calling allow_update_activities,
                                                 defaults to False
            :param bool allow_update_reservations: shortcut to calling allow_update_reservations,
                                                   defaults to False
        """
        super(WorkerCapabilityToken, self).__init__(
            account_sid=account_sid,
            auth_token=auth_token,
            workspace_sid=workspace_sid,
            channel_id=worker_sid,
            ttl=ttl,
            **kwargs
        )

        if kwargs.get("allow_fetch_activities", True):
            self.allow_fetch_activities()
        if kwargs.get("allow_fetch_reservations", True):
            self.allow_fetch_reservations()
        if kwargs.get("allow_fetch_worker_reservations", True):
            self.allow_fetch_worker_reservations()
        if kwargs.get("allow_update_activities", False):
            self.allow_update_activities()
        if kwargs.get("allow_update_reservations", False):
            self.allow_update_reservations()

    @property
    def resource_url(self):
        return "{}/Workers/{}".format(self.workspace_url, self.channel_id)

    @property
    def channel_prefix(self):
        return "WK"

    def allow_fetch_activities(self):
        self._make_policy(self.workspace_url + "/Activities", "GET", True)

    def allow_fetch_reservations(self):
        self._make_policy(self.workspace_url + "/Tasks/**", "GET", True)

    def allow_fetch_worker_reservations(self):
        self._make_policy(self.resource_url + "/Reservations/**", "GET", True)

    def allow_update_activities(self):
        post_filter = {"ActivitySid": {"required": True}}
        self._make_policy(self.resource_url, "POST", True, post_filter=post_filter)

    def allow_update_reservations(self):
        self._make_policy(self.workspace_url + "/Tasks/**", "POST", True)
        self._make_policy(self.resource_url + "/Reservations/**", "POST", True)

    def __str__(self):
        return "<WorkerCapabilityToken {}>".format(self.to_jwt())


class TaskQueueCapabilityToken(TaskRouterCapabilityToken):
    def __init__(
        self, account_sid, auth_token, workspace_sid, task_queue_sid, ttl=3600, **kwargs
    ):
        super(TaskQueueCapabilityToken, self).__init__(
            account_sid=account_sid,
            auth_token=auth_token,
            workspace_sid=workspace_sid,
            channel_id=task_queue_sid,
            ttl=ttl,
            **kwargs
        )

    @property
    def resource_url(self):
        return "{}/TaskQueues/{}".format(self.workspace_url, self.channel_id)

    @property
    def channel_prefix(self):
        return "WQ"

    def __str__(self):
        return "<TaskQueueCapabilityToken {}>".format(self.to_jwt())


class WorkspaceCapabilityToken(TaskRouterCapabilityToken):
    def __init__(self, account_sid, auth_token, workspace_sid, ttl=3600, **kwargs):
        super(WorkspaceCapabilityToken, self).__init__(
            account_sid=account_sid,
            auth_token=auth_token,
            workspace_sid=workspace_sid,
            channel_id=workspace_sid,
            ttl=ttl,
            **kwargs
        )

    @property
    def resource_url(self):
        return self.workspace_url

    @property
    def channel_prefix(self):
        return "WS"

    def __str__(self):
        return "<WorkspaceCapabilityToken {}>".format(self.to_jwt())