1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
import json
import httpretty
from pypuppetdb.types import Node, Inventory, Fact
class TestPqlAPI:
def test_pql_casting(self, api):
pql_query = """
nodes {
facts {
name = "operatingsystem" and
value = "Debian"
}
}
"""
pql_body = [
{
"cached_catalog_status": "not_used",
"catalog_environment": "production",
"catalog_timestamp": "2016-08-15T11:06:26.275Z",
"certname": "greenserver.vm",
"deactivated": None,
"expired": None,
"facts_environment": "production",
"facts_timestamp": "2016-08-15T11:06:26.140Z",
"latest_report_hash": "4a956674b016d95a7b77c99513ba26e4a744f8d1",
"latest_report_noop": False,
"latest_report_noop_pending": None,
"latest_report_status": "changed",
"report_environment": "production",
"report_timestamp": "2016-08-15T11:06:18.393Z",
},
{
"cached_catalog_status": "not_used",
"catalog_environment": "production",
"catalog_timestamp": "2016-08-15T11:06:26.275Z",
"certname": "blueserver.vm",
"deactivated": None,
"expired": None,
"facts_environment": "production",
"facts_timestamp": "2016-08-15T11:06:26.140Z",
"latest_report_hash": "4a956674b016d95a7b77c99513ba26e4a744f8d1",
"latest_report_noop": False,
"latest_report_noop_pending": None,
"latest_report_status": "changed",
"report_environment": "production",
"report_timestamp": "2016-08-15T11:06:18.393Z",
},
]
pql_url = "http://localhost:8080/pdb/query/v4"
httpretty.enable()
httpretty.register_uri(httpretty.GET, pql_url, body=json.dumps(pql_body))
nodes = list(api.pql(pql_query))
assert type(nodes[0]) == Node
assert nodes[0].name == "greenserver.vm"
httpretty.disable()
httpretty.reset()
def test_pql_no_casting(self, api):
pql_query = """
nodes[certname] {
facts {
name = "operatingsystem" and
value = "Debian"
}
}
"""
pql_body = [
{"certname": "foo.example.com"},
{"certname": "bar.example.com"},
]
pql_url = "http://localhost:8080/pdb/query/v4"
httpretty.enable()
httpretty.register_uri(httpretty.GET, pql_url, body=json.dumps(pql_body))
elements = list(api.pql(pql_query))
assert httpretty.last_request().path.startswith("/pdb/query/v4")
assert elements[0]["certname"] == "foo.example.com"
httpretty.disable()
httpretty.reset()
def test_get_type_from_query_matching(self, api):
pql = """
nodes {
facts {
name = "operatingsystem" and
value = "Debian"
}
}
"""
type = api._get_type_from_query(pql)
assert type == Node
def test_get_type_from_query_matching_empty_projection(self, api):
pql = """
inventory[] {}
"""
type = api._get_type_from_query(pql)
assert type == Inventory
def test_get_type_from_query_matching_no_whitespace(self, api):
pql = """
facts{}
"""
type = api._get_type_from_query(pql)
assert type == Fact
def test_get_type_from_query_matching_but_unsupported(self, api):
pql = """
producers {}
"""
type = api._get_type_from_query(pql)
assert type is None
def test_get_type_from_query_not_matching_projection(self, api):
pql = """
nodes[certname, count()] {}
"""
type = api._get_type_from_query(pql)
assert type is None
|