File: authorizations_api.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (134 lines) | stat: -rw-r--r-- 4,702 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Authorization is about managing the security of your InfluxDB instance."""

from influxdb_client import Authorization, AuthorizationsService, User, Organization


class AuthorizationsApi(object):
    """Implementation for '/api/v2/authorizations' endpoint."""

    def __init__(self, influxdb_client):
        """Initialize defaults."""
        self._influxdb_client = influxdb_client
        self._authorizations_service = AuthorizationsService(influxdb_client.api_client)

    def create_authorization(self, org_id=None, permissions: list = None,
                             authorization: Authorization = None) -> Authorization:
        """
        Create an authorization.

        :type permissions: list of Permission
        :param org_id: organization id
        :param permissions: list of permissions
        :type authorization: authorization object

        """
        if authorization is not None:
            return self._authorizations_service.post_authorizations(authorization_post_request=authorization)

            # if org_id is not None and permissions is not None:
        authorization = Authorization(org_id=org_id, permissions=permissions)
        return self._authorizations_service.post_authorizations(authorization_post_request=authorization)

    def find_authorization_by_id(self, auth_id: str) -> Authorization:
        """
        Find authorization by id.

        :param auth_id: authorization id
        :return: Authorization
        """
        return self._authorizations_service.get_authorizations_id(auth_id=auth_id)

    def find_authorizations(self, **kwargs):
        """
        Get a list of all authorizations.

        :key str user_id: filter authorizations belonging to a user id
        :key str user: filter authorizations belonging to a user name
        :key str org_id: filter authorizations belonging to a org id
        :key str org: filter authorizations belonging to a org name
        :return: Authorizations
        """
        authorizations = self._authorizations_service.get_authorizations(**kwargs)

        return authorizations.authorizations

    def find_authorizations_by_user(self, user: User):
        """
        Find authorization by User.

        :return: Authorization list
        """
        return self.find_authorizations(user_id=user.id)

    def find_authorizations_by_user_id(self, user_id: str):
        """
        Find authorization by user id.

        :return: Authorization list
        """
        return self.find_authorizations(user_id=user_id)

    def find_authorizations_by_user_name(self, user_name: str):
        """
        Find authorization by user name.

        :return: Authorization list
        """
        return self.find_authorizations(user=user_name)

    def find_authorizations_by_org(self, org: Organization):
        """
        Find authorization by user name.

        :return: Authorization list
        """
        if isinstance(org, Organization):
            return self.find_authorizations(org_id=org.id)

    def find_authorizations_by_org_name(self, org_name: str):
        """
        Find authorization by org name.

        :return: Authorization list
        """
        return self.find_authorizations(org=org_name)

    def find_authorizations_by_org_id(self, org_id: str):
        """
        Find authorization by org id.

        :return: Authorization list
        """
        return self.find_authorizations(org_id=org_id)

    def update_authorization(self, auth):
        """
        Update authorization object.

        :param auth:
        :return:
        """
        return self._authorizations_service.patch_authorizations_id(auth_id=auth.id, authorization_update_request=auth)

    def clone_authorization(self, auth) -> Authorization:
        """Clone an authorization."""
        if isinstance(auth, Authorization):
            cloned = Authorization(org_id=auth.org_id, permissions=auth.permissions)
            # cloned.description = auth.description
            # cloned.status = auth.status
            return self.create_authorization(authorization=cloned)

        if isinstance(auth, str):
            authorization = self.find_authorization_by_id(auth)
            return self.clone_authorization(auth=authorization)

        raise ValueError("Invalid argument")

    def delete_authorization(self, auth):
        """Delete a authorization."""
        if isinstance(auth, Authorization):
            return self._authorizations_service.delete_authorizations_id(auth_id=auth.id)

        if isinstance(auth, str):
            return self._authorizations_service.delete_authorizations_id(auth_id=auth)
        raise ValueError("Invalid argument")