1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
from json.decoder import JSONDecodeError
from aiopvapi.helpers.aiorequest import PvApiConnectionError, \
PvApiResponseStatusError
from tests.fake_server import TestFakeServer, make_url
class TestAioRequest(TestFakeServer):
def test_get_status_200(self):
"""Test get with status 200."""
async def go():
await self.start_fake_server()
return await self.request.get(make_url('test_get_status_200'))
ret = self.loop.run_until_complete(go())
self.assertEqual({'title': 'test'}, ret)
def test_get_wrong_status(self):
"""Test get with some other status."""
async def go():
await self.start_fake_server()
return await self.request.get(make_url('wrong_status'))
with self.assertRaises(PvApiResponseStatusError):
ret = self.loop.run_until_complete(go())
def test_get_invalid_json(self):
"""Test get with invalid Json."""
async def go():
await self.start_fake_server()
return await self.request.get(make_url('invalid_json'))
with self.assertRaises(JSONDecodeError):
ret = self.loop.run_until_complete(go())
def test_get_timeout(self):
"""Test get timeout."""
async def go():
await self.start_fake_server()
return await self.request.get(make_url('get_timeout'))
with self.assertRaises(PvApiConnectionError):
ret = self.loop.run_until_complete(go())
def test_post_status_200(self):
"""Test good post requests (200)."""
async def go():
await self.start_fake_server()
return await self.request.post(make_url('post_status_200'),
{'a': 'b', 'c': 'd'})
ret = self.loop.run_until_complete(go())
self.assertEqual({'a': 'b', 'c': 'd'}, ret)
def test_post_status_201(self):
"""Test good post requests (201)."""
async def go():
await self.start_fake_server()
return await self.request.post(make_url('post_status_201'),
{'a': 'b', 'c': 'd'})
ret = self.loop.run_until_complete(go())
self.assertEqual({'a': 'b', 'c': 'd'}, ret)
#
# def test_post_wrong_status(self):
# """Test post request with wrong status."""
# mocked.post('http://127.0.0.1/3', body='{"title": "test"}',
# status=202,
# headers={'content-type': 'application/json'})
# with self.assertRaises(PvApiResponseStatusError):
# ret = self.loop.run_until_complete(
# self.request.post('http://127.0.0.1/3', {'a': 'b', 'c': 'd'}))
#
#
# def test_post_broken_json(self):
# """Test post request with broken JSON."""
# mocked.post('http://127.0.0.1/4', body='{"title": "test}',
# status=200,
# headers={'content-type': 'application/json'})
# with self.assertRaises(JSONDecodeError):
# ret = self.loop.run_until_complete(
# self.request.post('http://127.0.0.1/4', {'a': 'b', 'c': 'd'}))
#
#
# def test_post_timeot(self):
# """Test post timeout."""
# with self.assertRaises(PvApiConnectionError):
# ret = self.loop.run_until_complete(
# self.request.post('http://127.0.0.1/5', {'a': 'b', 'c': 'd'}))
#
#
# def test_put_status_200(self):
# """Test good put requests (200)."""
# mocked.put('http://127.0.0.1/1', body='{"title": "test"}',
# status=200,
# headers={'content-type': 'application/json'})
# ret = self.loop.run_until_complete(
# self.request.put('http://127.0.0.1/1', {'a': 'b', 'c': 'd'}))
# self.assertEqual({'title': 'test'}, ret)
#
#
# def test_put_wrong_status(self):
# """Test put request with wrong status."""
# mocked.put('http://127.0.0.1/2', body='{"title": "test"}',
# status=201,
# headers={'content-type': 'application/json'})
# with self.assertRaises(PvApiResponseStatusError):
# ret = self.loop.run_until_complete(
# self.request.put('http://127.0.0.1/2', {'a': 'b', 'c': 'd'}))
#
#
# def test_put_broken_json(self):
# """Test put request with broken JSON."""
# mocked.put('http://127.0.0.1/4', body='{"title": "test}',
# status=200,
# headers={'content-type': 'application/json'})
# with self.assertRaises(JSONDecodeError):
# ret = self.loop.run_until_complete(
# self.request.put('http://127.0.0.1/4', {'a': 'b', 'c': 'd'}))
#
#
# def test_put_timeout(self):
# """Test put request with timeout."""
# with self.assertRaises(PvApiConnectionError):
# ret = self.loop.run_until_complete(
# self.request.put('http://127.0.0.1/5', {'a': 'b', 'c': 'd'}))
#
#
# def test_delete_status_200(self):
# """Test delete request with status 200"""
# mocked.delete('http://127.0.0.1/1',
# status=200,
# headers={'content-type': 'application/json'})
# ret = self.loop.run_until_complete(
# self.request.delete('http://127.0.0.1/1'))
# self.assertIsNone(ret)
#
#
# def test_delete_status_204(self):
# """Test delete request with status 204"""
# mocked.delete('http://127.0.0.1/2', body='{"title": "test"}',
# status=204,
# headers={'content-type': 'application/json'})
# ret = self.loop.run_until_complete(
# self.request.delete('http://127.0.0.1/2'))
# self.assertTrue(ret)
#
#
# def test_delete_wrong_status(self):
# """Test delete request with wrong status"""
# mocked.delete('http://127.0.0.1/3', body='{"title": "test"}',
# status=202,
# headers={'content-type': 'application/json'})
# with self.assertRaises(PvApiResponseStatusError):
# ret = self.loop.run_until_complete(
# self.request.delete('http://127.0.0.1/3'))
#
#
# def test_delete_timeout(self):
# """Test delete request with timeout"""
# with self.assertRaises(PvApiConnectionError):
# ret = self.loop.run_until_complete(
# self.request.delete('http://127.0.0.1/5'))
|