File: test_util.py

package info (click to toggle)
blinkpy 0.25.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 600 kB
  • sloc: python: 6,179; sh: 25; makefile: 2
file content (222 lines) | stat: -rw-r--r-- 7,282 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
"""Test various api functions."""

from unittest import mock, IsolatedAsyncioTestCase
import time
import aiofiles
from io import BufferedIOBase
from blinkpy.helpers.util import (
    json_load,
    json_save,
    Throttle,
    time_to_seconds,
    gen_uid,
    get_time,
    merge_dicts,
    backoff_seconds,
    BlinkException,
)
from blinkpy.helpers import constants as const


class TestUtil(IsolatedAsyncioTestCase):
    """Test the helpers/util module."""

    def setUp(self):
        """Initialize the blink module."""

    def tearDown(self):
        """Tear down blink module."""

    async def test_throttle(self):
        """Test the throttle decorator."""
        calls = []

        @Throttle(seconds=5)
        async def test_throttle(force=False):
            calls.append(1)

        now = int(time.time())

        # First call should fire
        await test_throttle()
        self.assertEqual(1, len(calls))

        # Call again, still should fire with delay
        await test_throttle()
        self.assertEqual(2, len(calls))
        assert int(time.time()) - now >= 5

        # Call with force
        await test_throttle(force=True)
        self.assertEqual(3, len(calls))

        # Call without throttle, fire with delay
        now = int(time.time())

        await test_throttle()
        self.assertEqual(4, len(calls))
        assert int(time.time()) - now >= 5

    async def test_throttle_per_instance(self):
        """Test that throttle is done once per instance of class."""

        class Tester:
            """A tester class for throttling."""

            async def test(self):
                """Test the throttle."""
                return True

        tester = Tester()
        throttled = Throttle(seconds=1)(tester.test)
        now = int(time.time())
        self.assertEqual(await throttled(), True)
        self.assertEqual(await throttled(), True)
        assert int(time.time()) - now >= 1

    async def test_throttle_multiple_objects(self):
        """Test that function is throttled even if called by multiple objects."""

        @Throttle(seconds=5)
        async def test_throttle_method():
            return True

        class Tester:
            """A tester class for throttling."""

            def test(self):
                """Test function for throttle."""
                return test_throttle_method()

        tester1 = Tester()
        tester2 = Tester()
        now = int(time.time())
        self.assertEqual(await tester1.test(), True)
        self.assertEqual(await tester2.test(), True)
        assert int(time.time()) - now >= 5

    async def test_throttle_on_two_methods(self):
        """Test that throttle works for multiple methods."""

        class Tester:
            """A tester class for throttling."""

            @Throttle(seconds=3)
            async def test1(self):
                """Test function for throttle."""
                return True

            @Throttle(seconds=5)
            async def test2(self):
                """Test function for throttle."""
                return True

        tester = Tester()
        now = int(time.time())

        self.assertEqual(await tester.test1(), True)
        self.assertEqual(await tester.test2(), True)
        self.assertEqual(await tester.test1(), True)
        assert int(time.time()) - now >= 3
        self.assertEqual(await tester.test2(), True)
        assert int(time.time()) - now >= 5

    def test_time_to_seconds(self):
        """Test time to seconds conversion."""
        correct_time = "1970-01-01T00:00:05+00:00"
        wrong_time = "1/1/1970 00:00:03"
        self.assertEqual(time_to_seconds(correct_time), 5)
        self.assertFalse(time_to_seconds(wrong_time))

    async def test_json_save(self):
        """Check that the file is saved."""
        mock_file = mock.MagicMock()
        aiofiles.threadpool.wrap.register(mock.MagicMock)(
            lambda *args, **kwargs: aiofiles.threadpool.AsyncBufferedIOBase(
                *args, **kwargs
            )
        )
        with mock.patch(
            "aiofiles.threadpool.sync_open", return_value=mock_file
        ) as mock_open:
            await json_save('{"test":1,"test2":2}', "face.file")
            mock_open.assert_called_once()

    async def test_json_load_data(self):
        """Check that bad file is handled."""
        filename = "fake.file"
        aiofiles.threadpool.wrap.register(mock.MagicMock)(
            lambda *args, **kwargs: aiofiles.threadpool.AsyncBufferedIOBase(
                *args, **kwargs
            )
        )
        self.assertEqual(await json_load(filename), None)

        mock_file = mock.MagicMock(spec=BufferedIOBase)
        mock_file.name = filename
        mock_file.read.return_value = '{"some data":"more"}'
        with mock.patch("aiofiles.threadpool.sync_open", return_value=mock_file):
            self.assertNotEqual(await json_load(filename), None)

    async def test_json_load_bad_data(self):
        """Check that bad file is handled."""
        self.assertEqual(await json_load("fake.file"), None)
        filename = "fake.file"
        aiofiles.threadpool.wrap.register(mock.MagicMock)(
            lambda *args, **kwargs: aiofiles.threadpool.AsyncBufferedIOBase(
                *args, **kwargs
            )
        )
        self.assertEqual(await json_load(filename), None)

        mock_file = mock.MagicMock(spec=BufferedIOBase)
        mock_file.name = filename
        mock_file.read.return_value = ""
        with mock.patch("aiofiles.threadpool.sync_open", return_value=mock_file):
            self.assertEqual(await json_load("fake.file"), None)

    def test_gen_uid(self):
        """Test gen_uid formatting."""
        val1 = gen_uid(8)
        val2 = gen_uid(8, uid_format=True)

        self.assertEqual(len(val1), 16)

        self.assertTrue(val2.startswith("BlinkCamera_"))
        val2_cut = val2.split("_")
        val2_split = val2_cut[1].split("-")
        self.assertEqual(len(val2_split[0]), 8)
        self.assertEqual(len(val2_split[1]), 4)
        self.assertEqual(len(val2_split[2]), 4)
        self.assertEqual(len(val2_split[3]), 4)
        self.assertEqual(len(val2_split[4]), 12)

    def test_get_time(self):
        """Test the get time util."""
        self.assertEqual(
            get_time(), time.strftime(const.TIMESTAMP_FORMAT, time.gmtime(time.time()))
        )

    def test_merge_dicts(self):
        """Test for duplicates message in merge dicts."""
        dict_A = {"key1": "value1", "key2": "value2"}
        dict_B = {"key1": "value1"}

        expected_log = [
            "WARNING:blinkpy.helpers.util:Duplicates found during merge: ['key1']. "
            "Renaming is recommended."
        ]

        with self.assertLogs(level="DEBUG") as merge_log:
            merge_dicts(dict_A, dict_B)
        self.assertListEqual(merge_log.output, expected_log)

    def test_backoff_seconds(self):
        """Test the backoff seconds function."""
        self.assertNotEqual(backoff_seconds(), None)

    def test_blink_exception(self):
        """Test the Blink Exception class."""
        test_exception = BlinkException([1, "No good"])
        self.assertEqual(test_exception.errid, 1)
        self.assertEqual(test_exception.message, "No good")