File: test_catalog.py

package info (click to toggle)
python-consul 1.6.0-1~exp1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 484 kB
  • sloc: python: 2,858; makefile: 197
file content (63 lines) | stat: -rw-r--r-- 2,790 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class TestCatalog:
    pass
    # def test_catalog(self, consul_obj):
    #     c, _consul_version = consul_obj
    #
    #     # grab the node our server created, so we can ignore it
    #     _, nodes = c.catalog.nodes()
    #     assert len(nodes) == 1
    #     current = nodes[0]
    #
    #     # test catalog.datacenters
    #     assert c.catalog.datacenters() == ["dc1"]
    #
    #     # test catalog.register
    #     pytest.raises(consul.ConsulException, c.catalog.register, "foo", "10.1.10.11", dc="dc2")
    #
    #     assert c.catalog.register("n1", "10.1.10.11", service={"service": "s1"}, check={"name": "c1"}) is True
    #     assert c.catalog.register("n1", "10.1.10.11", service={"service": "s2"}) is True
    #     assert c.catalog.register("n2", "10.1.10.12", service={"service": "s1", "tags": ["master"]}) is True
    #
    #     # test catalog.nodes
    #     pytest.raises(consul.ConsulException, c.catalog.nodes, dc="dc2")
    #     _, nodes = c.catalog.nodes()
    #     nodes.remove(current)
    #     assert [x["Node"] for x in nodes] == ["n1", "n2"]
    #
    #     # test catalog.services
    #     pytest.raises(consul.ConsulException, c.catalog.services, dc="dc2")
    #     _, services = c.catalog.services()
    #     assert services == {"s1": ["master"], "s2": [], "consul": []}
    #
    #     # test catalog.node
    #     pytest.raises(consul.ConsulException, c.catalog.node, "n1", dc="dc2")
    #     _, node = c.catalog.node("n1")
    #     assert set(node["Services"].keys()) == {"s1", "s2"}
    #     _, node = c.catalog.node("n3")
    #     assert node is None
    #
    #     # test catalog.service
    #     pytest.raises(consul.ConsulException, c.catalog.service, "s1", dc="dc2")
    #     _, nodes = c.catalog.service("s1")
    #     assert {x["Node"] for x in nodes} == {"n1", "n2"}
    #     _, nodes = c.catalog.service("s1", tag="master")
    #     assert {x["Node"] for x in nodes} == {"n2"}
    #
    #     # test catalog.deregister
    #     pytest.raises(consul.ConsulException, c.catalog.deregister, "n2", dc="dc2")
    #     assert c.catalog.deregister("n1", check_id="c1") is True
    #     assert c.catalog.deregister("n2", service_id="s1") is True
    #     # check the nodes weren't removed
    #     _, nodes = c.catalog.nodes()
    #     nodes.remove(current)
    #     assert [x["Node"] for x in nodes] == ["n1", "n2"]
    #     # check n2's s1 service was removed though
    #     _, nodes = c.catalog.service("s1")
    #     assert {x["Node"] for x in nodes} == {"n1"}
    #
    #     # cleanup
    #     assert c.catalog.deregister("n1") is True
    #     assert c.catalog.deregister("n2") is True
    #     _, nodes = c.catalog.nodes()
    #     nodes.remove(current)
    #     assert [x["Node"] for x in nodes] == []