File: test_aiorequest.py

package info (click to toggle)
python-aiopvapi 3.1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 684 kB
  • sloc: python: 3,123; xml: 850; makefile: 5
file content (172 lines) | stat: -rw-r--r-- 6,632 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from json.decoder import JSONDecodeError

from aiopvapi.helpers.aiorequest import PvApiConnectionError, \
    PvApiResponseStatusError
from tests.fake_server import TestFakeServer, make_url


class TestAioRequest(TestFakeServer):

    def test_get_status_200(self):
        """Test get with status 200."""

        async def go():
            await self.start_fake_server()
            return await self.request.get(make_url('test_get_status_200'))

        ret = self.loop.run_until_complete(go())

        self.assertEqual({'title': 'test'}, ret)

    def test_get_wrong_status(self):
        """Test get with some other status."""

        async def go():
            await self.start_fake_server()
            return await self.request.get(make_url('wrong_status'))

        with self.assertRaises(PvApiResponseStatusError):
            ret = self.loop.run_until_complete(go())

    def test_get_invalid_json(self):
        """Test get with invalid Json."""

        async def go():
            await self.start_fake_server()
            return await self.request.get(make_url('invalid_json'))

        with self.assertRaises(JSONDecodeError):
            ret = self.loop.run_until_complete(go())

    def test_get_timeout(self):
        """Test get timeout."""

        async def go():
            await self.start_fake_server()
            return await self.request.get(make_url('get_timeout'))

        with self.assertRaises(PvApiConnectionError):
            ret = self.loop.run_until_complete(go())

    def test_post_status_200(self):
        """Test good post requests (200)."""

        async def go():
            await self.start_fake_server()
            return await self.request.post(make_url('post_status_200'),
                                           {'a': 'b', 'c': 'd'})

        ret = self.loop.run_until_complete(go())
        self.assertEqual({'a': 'b', 'c': 'd'}, ret)

    def test_post_status_201(self):
        """Test good post requests (201)."""

        async def go():
            await self.start_fake_server()
            return await self.request.post(make_url('post_status_201'),
                                           {'a': 'b', 'c': 'd'})

        ret = self.loop.run_until_complete(go())
        self.assertEqual({'a': 'b', 'c': 'd'}, ret)

    #
    # def test_post_wrong_status(self):
    #     """Test post request with wrong status."""
    #     mocked.post('http://127.0.0.1/3', body='{"title": "test"}',
    #                 status=202,
    #                 headers={'content-type': 'application/json'})
    #     with self.assertRaises(PvApiResponseStatusError):
    #         ret = self.loop.run_until_complete(
    #             self.request.post('http://127.0.0.1/3', {'a': 'b', 'c': 'd'}))
    #
    #
    # def test_post_broken_json(self):
    #     """Test post request with broken JSON."""
    #     mocked.post('http://127.0.0.1/4', body='{"title": "test}',
    #                 status=200,
    #                 headers={'content-type': 'application/json'})
    #     with self.assertRaises(JSONDecodeError):
    #         ret = self.loop.run_until_complete(
    #             self.request.post('http://127.0.0.1/4', {'a': 'b', 'c': 'd'}))
    #
    #
    # def test_post_timeot(self):
    #     """Test post timeout."""
    #     with self.assertRaises(PvApiConnectionError):
    #         ret = self.loop.run_until_complete(
    #             self.request.post('http://127.0.0.1/5', {'a': 'b', 'c': 'd'}))
    #
    #
    # def test_put_status_200(self):
    #     """Test good put requests (200)."""
    #     mocked.put('http://127.0.0.1/1', body='{"title": "test"}',
    #                status=200,
    #                headers={'content-type': 'application/json'})
    #     ret = self.loop.run_until_complete(
    #         self.request.put('http://127.0.0.1/1', {'a': 'b', 'c': 'd'}))
    #     self.assertEqual({'title': 'test'}, ret)
    #
    #
    # def test_put_wrong_status(self):
    #     """Test put request with wrong status."""
    #     mocked.put('http://127.0.0.1/2', body='{"title": "test"}',
    #                status=201,
    #                headers={'content-type': 'application/json'})
    #     with self.assertRaises(PvApiResponseStatusError):
    #         ret = self.loop.run_until_complete(
    #             self.request.put('http://127.0.0.1/2', {'a': 'b', 'c': 'd'}))
    #
    #
    # def test_put_broken_json(self):
    #     """Test put request with broken JSON."""
    #     mocked.put('http://127.0.0.1/4', body='{"title": "test}',
    #                status=200,
    #                headers={'content-type': 'application/json'})
    #     with self.assertRaises(JSONDecodeError):
    #         ret = self.loop.run_until_complete(
    #             self.request.put('http://127.0.0.1/4', {'a': 'b', 'c': 'd'}))
    #
    #
    # def test_put_timeout(self):
    #     """Test put request with timeout."""
    #     with self.assertRaises(PvApiConnectionError):
    #         ret = self.loop.run_until_complete(
    #             self.request.put('http://127.0.0.1/5', {'a': 'b', 'c': 'd'}))
    #
    #
    # def test_delete_status_200(self):
    #     """Test delete request with status 200"""
    #     mocked.delete('http://127.0.0.1/1',
    #                   status=200,
    #                   headers={'content-type': 'application/json'})
    #     ret = self.loop.run_until_complete(
    #         self.request.delete('http://127.0.0.1/1'))
    #     self.assertIsNone(ret)
    #
    #
    # def test_delete_status_204(self):
    #     """Test delete request with status 204"""
    #     mocked.delete('http://127.0.0.1/2', body='{"title": "test"}',
    #                   status=204,
    #                   headers={'content-type': 'application/json'})
    #     ret = self.loop.run_until_complete(
    #         self.request.delete('http://127.0.0.1/2'))
    #     self.assertTrue(ret)
    #
    #
    # def test_delete_wrong_status(self):
    #     """Test delete request with wrong status"""
    #     mocked.delete('http://127.0.0.1/3', body='{"title": "test"}',
    #                   status=202,
    #                   headers={'content-type': 'application/json'})
    #     with self.assertRaises(PvApiResponseStatusError):
    #         ret = self.loop.run_until_complete(
    #             self.request.delete('http://127.0.0.1/3'))
    #
    #
    # def test_delete_timeout(self):
    #     """Test delete request with timeout"""
    #     with self.assertRaises(PvApiConnectionError):
    #         ret = self.loop.run_until_complete(
    #             self.request.delete('http://127.0.0.1/5'))