1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
import random
import unittest
import requests
from .fixtures import APITestCase
class NetworkTestCase(APITestCase):
# TODO Need to support Docker-py order of network/container creates
def test_connect(self):
"""Create network and container then connect to network"""
net_default = requests.post(
self.podman_url + "/v1.40/networks/create", json={"Name": "TestDefaultNetwork"}
)
self.assertEqual(net_default.status_code, 201, net_default.text)
net_id = net_default.json()["Id"]
create = requests.post(
self.podman_url + "/v1.40/containers/create?name=postCreateConnect",
json={
"Cmd": ["top"],
"Image": "alpine:latest",
"NetworkDisabled": False,
# FIXME adding these 2 lines cause: (This is sampled from docker-py)
# "network already exists","message":"container
# 01306e499df5441560d70071a54342611e422a94de20865add50a9565fd79fb9 is already connected to CNI
# network \"TestDefaultNetwork\": network already exists"
# "HostConfig": {"NetworkMode": "TestDefaultNetwork"},
# "NetworkingConfig": {"EndpointsConfig": {"TestDefaultNetwork": None}},
# FIXME These two lines cause:
# CNI network \"TestNetwork\" not found","message":"error configuring network namespace for container
# 369ddfa7d3211ebf1fbd5ddbff91bd33fa948858cea2985c133d6b6507546dff: CNI network \"TestNetwork\" not
# found"
# "HostConfig": {"NetworkMode": "TestNetwork"},
# "NetworkingConfig": {"EndpointsConfig": {"TestNetwork": None}},
# FIXME no networking defined cause: (note this error is from the container inspect below)
# "internal libpod error","message":"network inspection mismatch: asked to join 2 CNI network(s) [
# TestDefaultNetwork podman], but have information on 1 network(s): internal libpod error"
},
)
self.assertEqual(create.status_code, 201, create.text)
self.assertId(create.content)
payload = create.json()
start = requests.post(self.podman_url + f"/v1.40/containers/{payload['Id']}/start")
self.assertEqual(start.status_code, 204, start.text)
connect = requests.post(
self.podman_url + "/v1.40/networks/TestDefaultNetwork/connect",
json={"Container": payload["Id"]},
)
self.assertEqual(connect.status_code, 200, connect.text)
self.assertEqual(connect.text, "OK\n")
inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json")
self.assertEqual(inspect.status_code, 200, inspect.text)
payload = inspect.json()
self.assertFalse(payload["Config"].get("NetworkDisabled", False))
self.assertEqual(
net_id,
payload["NetworkSettings"]["Networks"]["TestDefaultNetwork"]["NetworkID"],
)
# TODO restore this to test, when joining multiple networks possible
# self.assertEqual(
# "TestNetwork",
# payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"],
# )
# TODO Need to support network aliases
# self.assertIn(
# "test_post_create",
# payload["NetworkSettings"]["Networks"]["TestNetwork"]["Aliases"],
# )
def test_create(self):
"""Create network and connect container during create"""
net = requests.post(
self.podman_url + "/v1.40/networks/create", json={"Name": "TestNetwork"}
)
self.assertEqual(net.status_code, 201, net.text)
net_id = net.json()["Id"]
create = requests.post(
self.podman_url + "/v1.40/containers/create?name=postCreate",
json={
"Cmd": ["date"],
"Image": "alpine:latest",
"NetworkDisabled": False,
"HostConfig": {"NetworkMode": "TestNetwork"},
},
)
self.assertEqual(create.status_code, 201, create.text)
self.assertId(create.content)
payload = create.json()
inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json")
self.assertEqual(inspect.status_code, 200, inspect.text)
payload = inspect.json()
self.assertFalse(payload["Config"].get("NetworkDisabled", False))
self.assertEqual(
net_id,
payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"],
)
def test_inspect(self):
name = f"Network_{random.getrandbits(160):x}"
create = requests.post(self.podman_url + "/v1.40/networks/create", json={"Name": name})
self.assertEqual(create.status_code, 201, create.text)
self.assertId(create.content)
net = create.json()
self.assertIsInstance(net, dict)
self.assertNotEqual(net["Id"], name)
ident = net["Id"]
ls = requests.get(self.podman_url + "/v1.40/networks")
self.assertEqual(ls.status_code, 200, ls.text)
networks = ls.json()
self.assertIsInstance(networks, list)
found = False
for net in networks:
if net["Name"] == name:
found = True
break
self.assertTrue(found, f"Network '{name}' not found")
inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}?verbose=false&scope=local")
self.assertEqual(inspect.status_code, 200, inspect.text)
def test_crud(self):
name = f"Network_{random.getrandbits(160):x}"
# Cannot test for 0 existing networks because default "podman" network always exists
create = requests.post(self.podman_url + "/v1.40/networks/create", json={"Name": name})
self.assertEqual(create.status_code, 201, create.text)
self.assertId(create.content)
net = create.json()
self.assertIsInstance(net, dict)
self.assertNotEqual(net["Id"], name)
ident = net["Id"]
ls = requests.get(self.podman_url + "/v1.40/networks")
self.assertEqual(ls.status_code, 200, ls.text)
networks = ls.json()
self.assertIsInstance(networks, list)
found = False
for net in networks:
if net["Name"] == name:
found = True
break
self.assertTrue(found, f"Network '{name}' not found")
inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}")
self.assertEqual(inspect.status_code, 200, inspect.text)
self.assertIsInstance(inspect.json(), dict)
inspect = requests.delete(self.podman_url + f"/v1.40/networks/{ident}")
self.assertEqual(inspect.status_code, 204, inspect.text)
inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}")
self.assertEqual(inspect.status_code, 404, inspect.text)
# network prune
prune_name = f"Network_{random.getrandbits(160):x}"
prune_create = requests.post(
self.podman_url + "/v1.40/networks/create", json={"Name": prune_name}
)
self.assertEqual(create.status_code, 201, prune_create.text)
prune = requests.post(self.podman_url + "/v1.40/networks/prune")
self.assertEqual(prune.status_code, 200, prune.text)
self.assertTrue(prune_name in prune.json()["NetworksDeleted"])
if __name__ == "__main__":
unittest.main()
|