1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
|
import asyncio
import base64
import struct
import pytest
from packaging import version
import consul
import consul.aio
import consul.check
Check = consul.check.Check
@pytest.fixture
async def consul_obj(consul_port):
consul_port, consul_version = consul_port
c = consul.aio.Consul(port=consul_port)
yield c, consul_version
await c.close()
@pytest.fixture
async def consul_acl_obj(acl_consul):
consul, token, consul_version = acl_consul
consul.token = token
yield consul, consul_version
await consul.close()
class TestAsyncioConsul:
async def test_kv(self, consul_obj) -> None:
c, _consul_version = consul_obj
_index, data = await c.kv.get("foo")
assert data is None
response = await c.kv.put("foo", "bar")
assert response is True
_index, data = await c.kv.get("foo")
assert data["Value"] == b"bar"
async def test_consul_ctor(self, consul_obj) -> None:
c, _consul_version = consul_obj
await c.kv.put("foo", struct.pack("i", 1000))
_index, data = await c.kv.get("foo")
assert struct.unpack("i", data["Value"]) == (1000,)
async def test_kv_binary(self, consul_obj) -> None:
c, _consul_version = consul_obj
await c.kv.put("foo", struct.pack("i", 1000))
_index, data = await c.kv.get("foo")
assert struct.unpack("i", data["Value"]) == (1000,)
async def test_kv_missing(self, consul_obj) -> None:
c, _consul_version = consul_obj
async def put() -> None:
await asyncio.sleep(2.0 / 100)
await c.kv.put("foo", "bar")
fut = asyncio.ensure_future(put())
await c.kv.put("index", "bump")
index, data = await c.kv.get("foo")
assert data is None
index, data = await c.kv.get("foo", index=index)
assert data["Value"] == b"bar"
await fut
await c.close()
async def test_kv_put_flags(self, consul_obj) -> None:
c, _consul_version = consul_obj
await c.kv.put("foo", "bar")
_index, data = await c.kv.get("foo")
assert data["Flags"] == 0
response = await c.kv.put("foo", "bar", flags=50)
assert response is True
_index, data = await c.kv.get("foo")
assert data["Flags"] == 50
async def test_kv_delete(self, consul_obj) -> None:
c, _consul_version = consul_obj
await c.kv.put("foo1", "1")
await c.kv.put("foo2", "2")
await c.kv.put("foo3", "3")
_index, data = await c.kv.get("foo", recurse=True)
assert [x["Key"] for x in data] == ["foo1", "foo2", "foo3"]
response = await c.kv.delete("foo2")
assert response is True
_index, data = await c.kv.get("foo", recurse=True)
assert [x["Key"] for x in data] == ["foo1", "foo3"]
response = await c.kv.delete("foo", recurse=True)
assert response is True
_index, data = await c.kv.get("foo", recurse=True)
assert data is None
async def test_kv_subscribe(self, consul_obj) -> None:
c, _consul_version = consul_obj
async def put() -> None:
await asyncio.sleep(1.0 / 100)
response = await c.kv.put("foo", "bar")
assert response is True
fut = asyncio.ensure_future(put())
index, data = await c.kv.get("foo")
assert data is None
index, data = await c.kv.get("foo", index=index)
assert data["Value"] == b"bar"
await fut
async def test_transaction(self, consul_obj) -> None:
c, _consul_version = consul_obj
value = base64.b64encode(b"1").decode("utf8")
d = {"KV": {"Verb": "set", "Key": "asdf", "Value": value}}
r = await c.txn.put([d])
assert r["Errors"] is None
d = {"KV": {"Verb": "get", "Key": "asdf"}}
r = await c.txn.put([d])
assert r["Results"][0]["KV"]["Value"] == value
async def test_agent_services(self, consul_obj) -> None:
c, _consul_version = consul_obj
EXPECTED = {
"v1": {
"foo": {
"Port": 0,
"ID": "foo",
"CreateIndex": 0,
"ModifyIndex": 0,
"EnableTagOverride": False,
"Service": "foo",
"Tags": [],
"Meta": {},
"Address": "",
}
},
"v2": {
"foo": {
"Address": "",
"Datacenter": "dc1",
"EnableTagOverride": False,
"ID": "foo",
"Meta": {},
"Port": 0,
"Service": "foo",
"Tags": [],
"Weights": {"Passing": 1, "Warning": 1},
}
},
}
expected = EXPECTED["v1"]
if version.parse(_consul_version) >= version.parse("1.13.8"):
expected = EXPECTED["v2"]
services = await c.agent.services()
assert services == {}
response = await c.agent.service.register("foo")
assert response is True
services = await c.agent.services()
assert services == expected
response = await c.agent.service.deregister("foo")
assert response is True
services = await c.agent.services()
assert services == {}
# async def test_catalog(self, acl_consul):
# port, token, _consul_version = acl_consul
# if should_skip(_consul_version, "<", "1.11.0"):
# pytest.skip("Endpoint /v1/acl/create for the legacy ACL system was removed in Consul 1.11.")
#
# c = consul.aio.Consul(port=port, token=token)
#
# async def register():
# await asyncio.sleep(1.0 / 100)
# response = await c.catalog.register("n1", "10.1.10.11")
# assert response is True
# await asyncio.sleep(500 / 1000.0)
# response = await c.catalog.deregister("n1")
# assert response is True
#
# fut = asyncio.ensure_future(register())
# index, nodes = await c.catalog.nodes()
# assert len(nodes) == 1
# current = nodes[0]
#
# index, nodes = await c.catalog.nodes(index=index)
# nodes.remove(current)
# assert [x["Node"] for x in nodes] == ["n1"]
#
# index, nodes = await c.catalog.nodes(index=index)
# nodes.remove(current)
# assert [x["Node"] for x in nodes] == []
# await fut
async def test_session(self, consul_obj) -> None:
c, _consul_version = consul_obj
async def register() -> None:
await asyncio.sleep(1.0 / 100)
session_id = await c.session.create()
await asyncio.sleep(50 / 1000.0)
response = await c.session.destroy(session_id)
assert response is True
fut = asyncio.ensure_future(register())
index, services = await c.session.list()
assert services == []
await asyncio.sleep(20 / 1000.0)
index, services = await c.session.list(index=index)
assert len(services)
index, services = await c.session.list(index=index)
assert services == []
await fut
# async def test_acl_old(self, acl_consul):
# port, token, _consul_version = acl_consul
# if should_skip(_consul_version, "<", "1.11.0"):
# pytest.skip("Endpoint /v1/acl/create for the legacy ACL system was removed in Consul 1.11.")
#
# c = consul.aio.Consul(port=port, token=token)
#
# rules = """
# key "" {
# policy = "read"
# }
# key "private/" {
# policy = "deny"
# }
# """
#
# token = await c.acl.create(rules=rules)
#
# with pytest.raises(consul.ACLPermissionDenied):
# await c.acl.list(token=token)
#
# destroyed = await c.acl.destroy(token)
# assert destroyed is True
|