File: nova_api.py

package info (click to toggle)
cfengine3 3.24.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 37,552 kB
  • sloc: ansic: 163,161; sh: 10,296; python: 2,950; makefile: 1,744; lex: 784; yacc: 633; perl: 211; pascal: 157; xml: 21; sed: 13
file content (173 lines) | stat: -rwxr-xr-x 6,020 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
NovaApi - module to interact with CFEngine Mission Portal Enterprise API

https://docs.cfengine.com/docs/3.18/enterprise-cfengine-guide-enterprise-api.html

Examples of usage:

```python
api = NovaApi() # defaults to CFE_ROBOT user and local hub certificate
print(api.fr_remote_hubs()) # should fail with message
api = NovaApi(api_user='admin', api_password='password')
response = api.fr_remote_hubs()
for hub in response['hubs']:
  print(hub)
  print(hub['ui_name'])
print(api.query("select * from hosts"))
print(api.query("select * from __hubs")["rows"])
print(api.status())
print(api.fr_hub_status())
print(api.get("user","admin"))
print(api.put('user',"yj",{"password":"quijibo"}))
print(api.put("role","yj",{}))
print(api.put_role_permissions("yj", ["query.post"]))
```
"""

import json
import os
import socket
import sys
import urllib3

_WORKDIR = os.environ.get("WORKDIR", "/var/cfengine")
_DEFAULT_SECRETS_PATH = "{}/httpd/secrets.ini".format(_WORKDIR)


class NovaApi:
    def __init__(
        self,
        hostname=None,
        api_user="CFE_ROBOT",
        api_password=None,
        cert_path=None,
        ca_cert_dir=None,
    ):
        self._hostname = hostname or str(socket.getfqdn())
        self._api_user = api_user
        if api_password is None:
            self._api_password = self._get_robot_password()
        else:
            self._api_password = api_password
        if cert_path is None:
            self._cert_path = "{}/httpd/ssl/certs/{}.cert".format(
                _WORKDIR, socket.getfqdn()
            )
        else:
            self._cert_path = cert_path
        if ca_cert_dir is None:
            self._ca_cert_dir = os.environ.get("SSL_CERT_DIR")
        else:
            self._ca_cert_dir = ca_cert_dir

        self._http = urllib3.PoolManager(
            cert_reqs="CERT_REQUIRED",
            ca_certs=self._cert_path,
            ca_cert_dir=self._ca_cert_dir,
        )
        self._headers = urllib3.make_headers(
            basic_auth="{}:{}".format(self._api_user, self._api_password)
        )
        self._headers["Content-Type"] = "application/json"
        # urllib3 v2.0 removed SubjectAltNameWarning and instead throws an error if no SubjectAltName is present in a certificate
        if hasattr(urllib3.exceptions, "SubjectAltNameWarning"):
            # if urllib3 is < v2.0 then SubjectAltNameWarning will exist and should be silenced
            if not sys.warnoptions:
                import warnings

                warnings.simplefilter(
                    "ignore", category=urllib3.exceptions.SubjectAltNameWarning
                )

    def __str__(self):
        return str(self.__class__) + ":" + str(self.__dict__)

    def _get_robot_password(self):
        with open(_DEFAULT_SECRETS_PATH) as file:
            for line in file:
                if "cf_robot_password" in line:
                    tokens = line.split("=")
                    if len(tokens) == 2:
                        return tokens[1].strip()
        raise Exception(
            "Could not parse CFE_ROBOT password from {} file".format(
                _DEFAULT_SECRETS_PATH
            )
        )

    def _request(self, method, path, body=None):
        url = "https://{}/api/{}".format(self._hostname, path)
        if type(body) is not str:
            payload = json.JSONEncoder().encode(body)
        else:
            payload = body
        response = self._http.request(method, url, headers=self._headers, body=payload)
        return self._build_response(response)

    def _build_response(self, response):
        if response.status != 200:
            value = {}
            message = response.data.decode("utf-8").strip()
            if not message:
                if response.status == 201:
                    message = "Created"
            value["message"] = message
            value["status"] = response.status
        else:
            data = json.loads(response.data.decode("utf-8"))
            # some APIs like query API return a top-level data key which we want to skip for ease of use
            if "data" in data:
                # data response e.g. query API returns top-level key 'data'
                # which has a value of a list with one entry containing
                # the information.
                # see https://docs.cfengine.com/docs/master/reference-enterprise-api-ref-query.html#execute-sql-query
                value = data["data"][0]
                value["meta"] = data["meta"]
            else:
                value = data if type(data) is dict else {}
            value["status"] = response.status
        return value

    def query(self, sql):
        clean_sql = sql.replace("\n", " ").strip()
        return self._request(
            "POST",
            "query",
            body="""
           {{ "query": "{}" }}""".format(
                clean_sql
            ),
        )

    def status(self):
        return self._request("GET", "")

    def fr_remote_hubs(self):
        response = self._request("GET", "fr/remote-hub")
        values = {}
        values["hubs"] = [
            response[key] for key in response if type(response[key]) is dict
        ]
        values["status"] = response["status"]
        return values

    def fr_hub_status(self):
        return self._request("GET", "fr/hub-status")

    def fr_enable_as_superhub(self):
        return self._request("POST", "fr/setup-hub/superhub")

    def fr_enable_as_feeder(self):
        return self._request("POST", "fr/setup-hub/feeder")

    def get(self, entity, identifier):
        return self._request("GET", "{}/{}".format(entity, identifier))

    def put(self, entity, identifier, data):
        return self._request("PUT", "{}/{}".format(entity, identifier), data)

    def delete(self, entity, identifier):
        return self._request("DELETE", "{}/{}".format(entity, identifier))

    def put_role_permissions(self, identifier, data):
        return self._request("PUT", "role/{}/permissions".format(identifier), data)