1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
"""Test various api functions."""
from unittest import mock, IsolatedAsyncioTestCase
import time
import aiofiles
from io import BufferedIOBase
from blinkpy.helpers.util import (
json_load,
json_save,
Throttle,
time_to_seconds,
gen_uid,
get_time,
merge_dicts,
backoff_seconds,
BlinkException,
)
from blinkpy.helpers import constants as const
class TestUtil(IsolatedAsyncioTestCase):
"""Test the helpers/util module."""
def setUp(self):
"""Initialize the blink module."""
def tearDown(self):
"""Tear down blink module."""
async def test_throttle(self):
"""Test the throttle decorator."""
calls = []
@Throttle(seconds=5)
async def test_throttle(force=False):
calls.append(1)
now = int(time.time())
# First call should fire
await test_throttle()
self.assertEqual(1, len(calls))
# Call again, still should fire with delay
await test_throttle()
self.assertEqual(2, len(calls))
assert int(time.time()) - now >= 5
# Call with force
await test_throttle(force=True)
self.assertEqual(3, len(calls))
# Call without throttle, fire with delay
now = int(time.time())
await test_throttle()
self.assertEqual(4, len(calls))
assert int(time.time()) - now >= 5
async def test_throttle_per_instance(self):
"""Test that throttle is done once per instance of class."""
class Tester:
"""A tester class for throttling."""
async def test(self):
"""Test the throttle."""
return True
tester = Tester()
throttled = Throttle(seconds=1)(tester.test)
now = int(time.time())
self.assertEqual(await throttled(), True)
self.assertEqual(await throttled(), True)
assert int(time.time()) - now >= 1
async def test_throttle_multiple_objects(self):
"""Test that function is throttled even if called by multiple objects."""
@Throttle(seconds=5)
async def test_throttle_method():
return True
class Tester:
"""A tester class for throttling."""
def test(self):
"""Test function for throttle."""
return test_throttle_method()
tester1 = Tester()
tester2 = Tester()
now = int(time.time())
self.assertEqual(await tester1.test(), True)
self.assertEqual(await tester2.test(), True)
assert int(time.time()) - now >= 5
async def test_throttle_on_two_methods(self):
"""Test that throttle works for multiple methods."""
class Tester:
"""A tester class for throttling."""
@Throttle(seconds=3)
async def test1(self):
"""Test function for throttle."""
return True
@Throttle(seconds=5)
async def test2(self):
"""Test function for throttle."""
return True
tester = Tester()
now = int(time.time())
self.assertEqual(await tester.test1(), True)
self.assertEqual(await tester.test2(), True)
self.assertEqual(await tester.test1(), True)
assert int(time.time()) - now >= 3
self.assertEqual(await tester.test2(), True)
assert int(time.time()) - now >= 5
def test_time_to_seconds(self):
"""Test time to seconds conversion."""
correct_time = "1970-01-01T00:00:05+00:00"
wrong_time = "1/1/1970 00:00:03"
self.assertEqual(time_to_seconds(correct_time), 5)
self.assertFalse(time_to_seconds(wrong_time))
async def test_json_save(self):
"""Check that the file is saved."""
mock_file = mock.MagicMock()
aiofiles.threadpool.wrap.register(mock.MagicMock)(
lambda *args, **kwargs: aiofiles.threadpool.AsyncBufferedIOBase(
*args, **kwargs
)
)
with mock.patch(
"aiofiles.threadpool.sync_open", return_value=mock_file
) as mock_open:
await json_save('{"test":1,"test2":2}', "face.file")
mock_open.assert_called_once()
async def test_json_load_data(self):
"""Check that bad file is handled."""
filename = "fake.file"
aiofiles.threadpool.wrap.register(mock.MagicMock)(
lambda *args, **kwargs: aiofiles.threadpool.AsyncBufferedIOBase(
*args, **kwargs
)
)
self.assertEqual(await json_load(filename), None)
mock_file = mock.MagicMock(spec=BufferedIOBase)
mock_file.name = filename
mock_file.read.return_value = '{"some data":"more"}'
with mock.patch("aiofiles.threadpool.sync_open", return_value=mock_file):
self.assertNotEqual(await json_load(filename), None)
async def test_json_load_bad_data(self):
"""Check that bad file is handled."""
self.assertEqual(await json_load("fake.file"), None)
filename = "fake.file"
aiofiles.threadpool.wrap.register(mock.MagicMock)(
lambda *args, **kwargs: aiofiles.threadpool.AsyncBufferedIOBase(
*args, **kwargs
)
)
self.assertEqual(await json_load(filename), None)
mock_file = mock.MagicMock(spec=BufferedIOBase)
mock_file.name = filename
mock_file.read.return_value = ""
with mock.patch("aiofiles.threadpool.sync_open", return_value=mock_file):
self.assertEqual(await json_load("fake.file"), None)
def test_gen_uid(self):
"""Test gen_uid formatting."""
val1 = gen_uid(8)
val2 = gen_uid(8, uid_format=True)
self.assertEqual(len(val1), 16)
self.assertTrue(val2.startswith("BlinkCamera_"))
val2_cut = val2.split("_")
val2_split = val2_cut[1].split("-")
self.assertEqual(len(val2_split[0]), 8)
self.assertEqual(len(val2_split[1]), 4)
self.assertEqual(len(val2_split[2]), 4)
self.assertEqual(len(val2_split[3]), 4)
self.assertEqual(len(val2_split[4]), 12)
def test_get_time(self):
"""Test the get time util."""
self.assertEqual(
get_time(), time.strftime(const.TIMESTAMP_FORMAT, time.gmtime(time.time()))
)
def test_merge_dicts(self):
"""Test for duplicates message in merge dicts."""
dict_A = {"key1": "value1", "key2": "value2"}
dict_B = {"key1": "value1"}
expected_log = [
"WARNING:blinkpy.helpers.util:Duplicates found during merge: ['key1']. "
"Renaming is recommended."
]
with self.assertLogs(level="DEBUG") as merge_log:
merge_dicts(dict_A, dict_B)
self.assertListEqual(merge_log.output, expected_log)
def test_backoff_seconds(self):
"""Test the backoff seconds function."""
self.assertNotEqual(backoff_seconds(), None)
def test_blink_exception(self):
"""Test the Blink Exception class."""
test_exception = BlinkException([1, "No good"])
self.assertEqual(test_exception.errid, 1)
self.assertEqual(test_exception.message, "No good")
|