1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
|
import types
import unittest
from itertools import chain, product
from multiprocessing import cpu_count
from unittest.mock import patch
import numpy as np
from mpire.utils import apply_numpy_chunking, chunk_tasks, format_seconds, get_n_chunks, make_single_arguments, TimeIt
class ChunkTasksTest(unittest.TestCase):
def test_no_chunk_size_no_n_splits_provided(self):
"""
Test that a ValueError is raised when no chunk_size and n_splits are provided
"""
with self.assertRaises(ValueError):
next(chunk_tasks([]))
def test_generator_without_iterable_len(self):
"""
Test that a ValueError is raised when a generator is provided without iterable_len
"""
with self.assertRaises(ValueError):
next(chunk_tasks(iter([]), n_splits=1))
def test_chunk_size_has_priority_over_n_splits(self):
"""
Test that chunk_size is prioritized over n_splits
"""
chunks = list(chunk_tasks(range(4), chunk_size=4, n_splits=4))
self.assertEqual(len(chunks), 1)
self.assertEqual(len(chunks[0]), 4)
self.assertEqual(list(range(4)), list(chain.from_iterable(chunks)))
def test_empty_input(self):
"""
Test that the chunker is an empty generator for an empty input iterable
"""
with self.subTest('list input'):
chunks = list(chunk_tasks([], n_splits=5))
self.assertEqual(len(chunks), 0)
with self.subTest('generator/iterator input'):
chunks = list(chunk_tasks(iter([]), iterable_len=0, n_splits=5))
self.assertEqual(len(chunks), 0)
def test_iterable_len_doesnt_match_input_size(self):
"""
Test for cases where iterable_len does and does not match the number of arguments (it should work fine)
"""
num_args = 10
for iter_len in [5, 10, 20]:
expected_args_sum = min(iter_len, num_args)
# Test for normal list (range is considered a normal list as it implements __len__ and such)
with self.subTest(iter_len=iter_len, input='list'):
chunks = list(chunk_tasks(range(num_args), iterable_len=iter_len, n_splits=1))
total_args = sum(map(len, chunks))
self.assertEqual(total_args, expected_args_sum)
self.assertEqual(list(range(expected_args_sum)), list(chain.from_iterable(chunks)))
# Test for an actual generator (range does not really behave like one)
with self.subTest(iter_len=iter_len, input='generator/iterator'):
chunks = list(chunk_tasks(iter(range(num_args)), iterable_len=iter_len, n_splits=1))
total_args = sum(map(len, chunks))
self.assertEqual(total_args, expected_args_sum)
self.assertEqual(list(range(expected_args_sum)), list(chain.from_iterable(chunks)))
def test_n_splits(self):
"""
Test different values of n_splits: len(args) {<, ==, >} n_splits
"""
n_splits = 5
for num_args in [n_splits - 1, n_splits, n_splits + 1]:
expected_n_chunks = min(n_splits, num_args)
# Test for normal list (range is considered a normal list as it implements __len__ and such)
with self.subTest(num_args=num_args, input='list'):
chunks = list(chunk_tasks(range(num_args), n_splits=n_splits))
self.assertEqual(len(chunks), expected_n_chunks)
self.assertEqual(list(range(num_args)), list(chain.from_iterable(chunks)))
# Test for an actual generator (range does not really behave like one)
with self.subTest(num_args=num_args, input='generator/iterator'):
chunks = list(chunk_tasks(iter(range(num_args)), iterable_len=num_args, n_splits=n_splits))
self.assertEqual(len(chunks), expected_n_chunks)
self.assertEqual(list(range(num_args)), list(chain.from_iterable(chunks)))
def test_chunk_size(self):
"""
Test that chunks are of the right size if chunk_size is provided
"""
chunk_size = 3
for num_args in [chunk_size - 1, chunk_size, chunk_size + 1]:
# Test for normal list (range is considered a normal list as it implements __len__ and such)
with self.subTest(num_args=num_args, input='list'):
chunks = list(chunk_tasks(range(num_args), chunk_size=chunk_size))
for chunk in chunks[:-1]:
self.assertEqual(len(chunk), chunk_size)
self.assertLessEqual(len(chunks[-1]), chunk_size)
self.assertEqual(list(range(num_args)), list(chain.from_iterable(chunks)))
# Test for an actual generator (range does not really behave like one)
with self.subTest(num_args=num_args, input='generator/iterator'):
chunks = list(chunk_tasks(iter(range(num_args)), chunk_size=chunk_size))
for chunk in chunks[:-1]:
self.assertEqual(len(chunk), chunk_size)
self.assertLessEqual(len(chunks[-1]), chunk_size)
self.assertEqual(list(range(num_args)), list(chain.from_iterable(chunks)))
class ApplyNumpyChunkingTest(unittest.TestCase):
"""
This function simply calls other, already tested, functions in succession. We do test the individual parameter
influence, but interactions between them are skipped
"""
def setUp(self):
self.test_data_numpy = np.random.rand(100, 2)
def test_iterable_len(self):
"""
Test that iterable_len is adhered to. When iterable_len < len(input) it should reduce the input size. If higher
or None it should take the entire input
"""
for iterable_len, expected_size in [(5, 5), (150, 100), (None, 100)]:
with self.subTest(iterable_len=iterable_len):
iterable_of_args, iterable_len_, chunk_size, n_splits = apply_numpy_chunking(
self.test_data_numpy, iterable_len=iterable_len, n_splits=1
)
# Materialize generator and test contents
iterable_of_args = list(iterable_of_args)
self.assertEqual(len(iterable_of_args), 1)
self.assertIsInstance(iterable_of_args[0][0], np.ndarray)
np.testing.assert_array_equal(iterable_of_args[0][0], self.test_data_numpy[:expected_size])
# Test other output
self.assertEqual(iterable_len_, 1)
self.assertEqual(chunk_size, 1)
self.assertIsNone(n_splits)
def test_chunk_size(self):
"""
Test that chunk_size works as expected. Note that chunk_size trumps n_splits
"""
for chunk_size, expected_n_chunks in [(1, 100), (3, 34), (200, 1), (None, 1)]:
with self.subTest(chunk_size=chunk_size):
iterable_of_args, iterable_len, chunk_size_, n_splits = apply_numpy_chunking(
self.test_data_numpy, chunk_size=chunk_size, n_splits=1
)
# Materialize generator and test contents. The chunks should be of size chunk_size (expect for the last
# chunk which can be smaller)
iterable_of_args = list(iterable_of_args)
self.assertEqual(len(iterable_of_args), expected_n_chunks)
chunk_size = chunk_size or 100
for chunk_idx, chunk in enumerate(iterable_of_args):
self.assertIsInstance(chunk[0], np.ndarray)
np.testing.assert_array_equal(chunk[0], self.test_data_numpy[chunk_idx * chunk_size:
(chunk_idx + 1) * chunk_size])
# Test other output
self.assertEqual(iterable_len, expected_n_chunks)
self.assertEqual(chunk_size_, 1)
self.assertIsNone(n_splits)
def test_n_splits(self):
"""
Test that n_splits works as expected.
"""
for n_splits, expected_n_chunks in [(1, 1), (3, 3), (150, 100)]:
with self.subTest(n_splits=n_splits):
iterable_of_args, iterable_len, chunk_size, n_splits_ = apply_numpy_chunking(
self.test_data_numpy, n_splits=n_splits
)
# Materialize generator and test contents. We simply test if every row of the original input occurs in
# the chunks
iterable_of_args = list(iterable_of_args)
self.assertEqual(len(iterable_of_args), expected_n_chunks)
offset = 0
for chunk in iterable_of_args:
self.assertIsInstance(chunk[0], np.ndarray)
np.testing.assert_array_equal(chunk[0], self.test_data_numpy[offset:offset + len(chunk[0])])
offset += len(chunk[0])
self.assertEqual(offset, 100)
# Test other output
self.assertEqual(iterable_len, expected_n_chunks)
self.assertEqual(chunk_size, 1)
self.assertIsNone(n_splits_)
# chunk_size and n_splits can't be both None
with self.subTest(n_splits=None), self.assertRaises(ValueError):
iterable_of_args, *_ = apply_numpy_chunking(self.test_data_numpy, n_splits=None)
list(iterable_of_args)
def test_n_jobs(self):
"""
Test that n_jobs works as expected. When chunk_size and n_splits are both None, n_jobs * 4 is passed on as
n_splits
"""
for n_jobs, expected_n_chunks in [(1, 4), (3, 12), (40, 100), (150, 100)]:
with self.subTest(n_jobs=n_jobs):
iterable_of_args, iterable_len, chunk_size, n_splits_ = apply_numpy_chunking(
self.test_data_numpy, n_jobs=n_jobs
)
# Materialize generator and test contents. We simply test if every row of the original input occurs in
# the chunks
iterable_of_args = list(iterable_of_args)
self.assertEqual(len(iterable_of_args), expected_n_chunks)
offset = 0
for chunk in iterable_of_args:
self.assertIsInstance(chunk[0], np.ndarray)
np.testing.assert_array_equal(chunk[0], self.test_data_numpy[offset:offset + len(chunk[0])])
offset += len(chunk[0])
self.assertEqual(offset, 100)
# Test other output
self.assertEqual(iterable_len, expected_n_chunks)
self.assertEqual(chunk_size, 1)
self.assertIsNone(n_splits_)
class GetNChunksTest(unittest.TestCase):
def setUp(self):
self.test_data = [1, 2, 3, 5, 6, 9, 37, 42, 1337, 0, 3, 5, 0]
self.test_data_numpy = np.random.rand(100, 2)
def test_everything_none(self):
"""
When everything is None we should use cpu_count * 4 as number of splits. We have to take the number of tasks
into account
"""
with self.subTest(input='list'):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=None, chunk_size=None, n_splits=None,
n_jobs=None), min(13, cpu_count() * 4))
with self.subTest(input='numpy'):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=None, chunk_size=None, n_splits=None,
n_jobs=None), min(100, cpu_count() * 4))
def test_smaller_iterable_len(self):
"""
Test iterable_len, where iterable_len < len(input)
"""
with self.subTest(input='list'):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=5, chunk_size=None, n_splits=None, n_jobs=None),
min(5, cpu_count() * 4))
with self.subTest(input='numpy'):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=5, chunk_size=None, n_splits=None,
n_jobs=None), min(5, cpu_count() * 4))
with self.subTest(input='generator/iterator'):
self.assertEqual(get_n_chunks(iter(self.test_data), iterable_len=5, chunk_size=None, n_splits=None,
n_jobs=None), min(5, cpu_count() * 4))
def test_larger_iterable_len(self):
"""
Test iterable_len, where iterable_len > len(input). Should ignores iterable_len when actual number of tasks is
less, except when we use the data_generator function, in which case we cannot determine the actual number of
elements.
"""
with self.subTest(input='list'):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=25, chunk_size=None, n_splits=None, n_jobs=None),
min(13, cpu_count() * 4))
with self.subTest(input='numpy'):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=125, chunk_size=None, n_splits=None,
n_jobs=None), min(100, cpu_count() * 4))
with self.subTest(input='generator/iterator'):
self.assertEqual(get_n_chunks(iter(self.test_data), iterable_len=25, chunk_size=None, n_splits=None,
n_jobs=None), min(25, cpu_count() * 4))
def test_chunk_size(self):
"""
Test chunk_size
"""
for chunk_size, expected_n_chunks in [(1, 13), (3, 5)]:
with self.subTest(input='list', chunk_size=chunk_size, expected_n_chunks=expected_n_chunks):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=None, chunk_size=chunk_size, n_splits=None,
n_jobs=None), expected_n_chunks)
for chunk_size, expected_n_chunks in [(1, 100), (3, 34)]:
with self.subTest(input='list', chunk_size=chunk_size, expected_n_chunks=expected_n_chunks):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=None, chunk_size=chunk_size,
n_splits=None, n_jobs=None), expected_n_chunks)
def test_n_splits(self):
"""
Test n_splits. n_jobs shouldn't have any influence
"""
for n_splits, n_jobs in product([1, 6], [None, 2, 8]):
with self.subTest(input='list', n_splits=n_splits, n_jobs=n_jobs):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=None, chunk_size=None, n_splits=n_splits,
n_jobs=n_jobs), n_splits)
with self.subTest(input='numpy', n_splits=n_splits, n_jobs=n_jobs):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=None, chunk_size=None,
n_splits=n_splits, n_jobs=n_jobs), n_splits)
def test_n_jobs(self):
"""
When everything is None except n_jobs we should use n_jobs * 4 as number of splits. Again, taking into account
the number of tasks
"""
for n_jobs in [1, 6]:
with self.subTest(input='list', n_jobs=n_jobs):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=None, chunk_size=None, n_splits=None,
n_jobs=n_jobs), min(4 * n_jobs, len(self.test_data)))
with self.subTest(input='numpy', n_jobs=n_jobs):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=None, chunk_size=None, n_splits=None,
n_jobs=n_jobs), min(4 * n_jobs, len(self.test_data_numpy)))
def test_chunk_size_priority_over_n_splits(self):
"""
chunk_size should have priority over n_splits
"""
with self.subTest(input='list', chunk_size=1, n_splits=6):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=None, chunk_size=1, n_splits=6, n_jobs=None), 13)
with self.subTest(input='numpy', chunk_size=1, n_splits=6):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=None, chunk_size=1, n_splits=6,
n_jobs=None), 100)
with self.subTest(input='list', chunk_size=3, n_splits=3):
self.assertEqual(get_n_chunks(self.test_data, iterable_len=None, chunk_size=3, n_splits=3, n_jobs=None), 5)
with self.subTest(input='numpy', chunk_size=3, n_splits=3):
self.assertEqual(get_n_chunks(self.test_data_numpy, iterable_len=None, chunk_size=3, n_splits=3,
n_jobs=None), 34)
def test_generator_input_with_no_iterable_len_raises(self):
"""
When working with generators the iterable_len should be provided (the working examples are already tested above)
"""
for chunk_size, n_splits, n_jobs in product([None, 1, 3], [None, 1, 3], [None, 1, 3]):
with self.subTest(chunk_size=chunk_size, n_splits=n_splits, n_jobs=n_jobs), self.assertRaises(ValueError):
get_n_chunks(iter(self.test_data), iterable_len=None, chunk_size=chunk_size, n_splits=n_splits,
n_jobs=n_jobs)
class MakeSingleArgumentsTest(unittest.TestCase):
def test_make_single_arguments(self):
"""
Tests the make_single_arguments function for different inputs
"""
# Test for some different inputs
for (args_in, args_out), generator in product(
[(['a', 'c', 'b', 'd'], [('a',), ('c',), ('b',), ('d',)]),
([1, 2, 3, 4, 5], [(1,), (2,), (3,), (4,), (5,)]),
([(True,), (False,), (None,)], [((True,),), ((False,),), ((None,),)])],
[False, True]
):
# Transform
args_transformed = make_single_arguments((arg for arg in args_in) if generator else args_in,
generator=generator)
# Check type
self.assertTrue(isinstance(args_transformed, types.GeneratorType if generator else list))
# Check contents
self.assertEqual(list(args_transformed), args_out)
class FormatSecondsTest(unittest.TestCase):
def test_none_input(self):
"""
When the input is None it should return an empty string
"""
for with_milliseconds in [False, True]:
with self.subTest(with_milliseconds=with_milliseconds):
self.assertEqual(format_seconds(None, with_milliseconds=with_milliseconds), '')
def test_without_milliseconds(self):
"""
Test output without milliseconds
"""
for seconds, expected_output in [(0, '0:00:00'), (1, '0:00:01'), (1.337, '0:00:01'), (2.9, '0:00:02'),
(123456.78901234, '1 day, 10:17:36')]:
with self.subTest(seconds=seconds):
self.assertEqual(format_seconds(seconds, with_milliseconds=False), expected_output)
def test_with_milliseconds(self):
"""
Test output with milliseconds. Only shows them when they're actually needed.
"""
for seconds, expected_output in [(0, '0:00:00'), (1, '0:00:01'), (1.337, '0:00:01.337'), (2.9, '0:00:02.900'),
(123456.78901234, '1 day, 10:17:36.789')]:
with self.subTest(seconds=seconds):
self.assertEqual(format_seconds(seconds, with_milliseconds=True), expected_output)
class TimeItTest(unittest.TestCase):
def test_array_storage(self):
"""
TimeIt should write to the correct idx in the cum_time_array container. The max_time_array is a min-heap
container, so the lowest value is stored at index 0. The single highest value in this case is stored at index 2
"""
for array_idx in range(5):
cum_time_array = [0.0, 0.0, 0.0, 0.0, 0.0]
max_time_array = [(0.0, ''), (0.0, ''), (0.0, ''), (0.0, ''), (0.0, '')]
with self.subTest(array_idx=array_idx), patch('mpire.utils.time.time', side_effect=[0.0, 4.2]), \
TimeIt(cum_time_array, array_idx, max_time_array):
pass
self.assertListEqual([t for idx, t in enumerate(cum_time_array) if idx != array_idx], [0.0, 0.0, 0.0, 0.0])
self.assertListEqual([t for idx, t in enumerate(max_time_array) if idx != 2.0],
[(0.0, ''), (0.0, ''), (0.0, ''), (0.0, '')])
self.assertEqual(cum_time_array[array_idx], 4.2)
self.assertGreaterEqual(max_time_array[2], (4.2, None))
def test_cum_time(self):
"""
Using TimeIt multiple times should increase the cum_time_array
"""
# These return values are used by TimeIt in order: start, end, start, end, ... So the first time the duration
# will be 1 second, then 2 seconds, and 3 seconds.
cum_time_array = [0]
with patch('mpire.utils.time.time', side_effect=[0.0, 1.0, 0.0, 2.0, 0.0, 3.0]):
with TimeIt(cum_time_array, 0):
pass
self.assertEqual(cum_time_array[0], 1.0)
with TimeIt(cum_time_array, 0):
pass
self.assertEqual(cum_time_array[0], 3.0)
with TimeIt(cum_time_array, 0):
pass
self.assertEqual(cum_time_array[0], 6.0)
def test_max_time(self):
"""
Using TimeIt multiple times should store the max duration value in the max_time_array using heapq. There's only
room for the highest 5 values, while it is called 6 times. The smallest duration shouldn't be present.
"""
# These return values are used by TimeIt in order: start, end, start, end, ... So the first time the duration
# will be 1 second, then 2 seconds, 3 seconds, 3 seconds again, 0.5 seconds, and 10 seconds.
cum_time_array = [0.0]
max_time_array = [(0.0, ''), (0.0, ''), (0.0, ''), (0.0, ''), (0.0, '')]
with patch('mpire.utils.time.time', side_effect=[0.0, 1.0, 0.0, 2.0, 0.0, 3.0, 0.0, 3.0, 0.0, 0.5, 0.0, 10.0]):
for _ in range(6):
with TimeIt(cum_time_array, 0, max_time_array):
pass
self.assertListEqual(max_time_array, [(1.0, None), (2.0, None), (10.0, None), (3.0, None), (3.0, None)])
def test_format_args(self):
"""
The format args func should be called when provided
"""
for format_func, formatted in [(lambda: "1", "1"), (lambda: 2, 2), (lambda: "foo", "foo")]:
# These return values are used by TimeIt in order: start, end, start, end, ... So the first time the
# duration will be 1 second, then 2 seconds, and 3 seconds.
with self.subTest(format_func=format_func), \
patch('mpire.utils.time.time', side_effect=[0.0, 1.0, 0.0, 2.0, 0.0, 3.0]):
cum_time_array = [0.0]
max_time_array = [(0.0, ''), (0.0, '')]
for _ in range(3):
with TimeIt(cum_time_array, 0, max_time_array, format_func):
pass
# The heapq only had room for two entries. The highest durations should be kept
self.assertListEqual(max_time_array, [(2.0, formatted), (3.0, formatted)])
|