File: test_api_other.py

package info (click to toggle)
pypuppetdb 3.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 508 kB
  • sloc: python: 3,810; makefile: 127; sh: 9
file content (81 lines) | stat: -rw-r--r-- 2,822 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import json

import httpretty
import pytest

import pypuppetdb


def stub_request(url, data=None, method=httpretty.GET, status=200, **kwargs):
    if data is None:
        body = "[]"
    else:
        with open(data) as d:
            body = json.load(d.read())
    return httpretty.register_uri(method, url, body=body, status=status, **kwargs)


@pytest.fixture(params=["string", "QueryBuilder"])
def query(request):
    key = "certname"
    value = "node1"
    if request.param == "string":
        return f'["{key}", "=", "{value}"]'
    elif request.param == "QueryBuilder":
        return pypuppetdb.QueryBuilder.EqualsOperator(key, value)


class TestCommandAPI:
    def test_command(self, api):
        httpretty.enable()
        stub_request("http://localhost:8080/pdb/cmd/v1", method=httpretty.POST)
        api.command("deactivate node", {"certname": "testnode"})
        assert httpretty.last_request().path.startswith("/pdb/cmd/v1")
        httpretty.disable()
        httpretty.reset()

    def test_cmd(self, api, query):
        httpretty.reset()
        httpretty.enable()
        stub_request("http://localhost:8080/pdb/cmd/v1", method=httpretty.POST)
        node_name = "testnode"
        api._cmd("deactivate node", {"certname": node_name})
        last_request = httpretty.last_request()
        assert last_request.querystring == {
            "certname": [node_name],
            "command": ["deactivate node"],
            "version": ["3"],
            "checksum": ["b93d474970e54943aec050ee399dfb85d21e143a"],
        }
        assert last_request.headers["Content-Type"] == "application/json"
        assert last_request.method == "POST"
        assert last_request.body == json.dumps({"certname": node_name}).encode(
            "latin-1"
        )
        httpretty.disable()
        httpretty.reset()

    def test_cmd_bad_command(self, api):
        httpretty.enable()
        stub_request("http://localhost:8080/pdb/cmd/v1")
        with pytest.raises(pypuppetdb.errors.APIError):
            api._cmd("incorrect command", {})
        httpretty.disable()
        httpretty.reset()

    def test_cmd_with_token_authorization(self, token_api):
        httpretty.enable()
        stub_request("https://localhost:8080/pdb/cmd/v1", method=httpretty.POST)
        token_api._cmd("deactivate node", {"certname": ""})
        assert httpretty.last_request().path.startswith("/pdb/cmd/v1")
        assert httpretty.last_request().headers["X-Authentication"] == "tokenstring"


class TestStatusAPI:
    def test_status(self, api):
        httpretty.enable()
        stub_request("http://localhost:8080/status/v1/services/puppetdb-status")
        api.status()
        assert httpretty.last_request().path == "/status/v1/services/puppetdb-status"
        httpretty.disable()
        httpretty.reset()