1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
"""Stress test diskcache.persistent.Deque."""
import itertools as it
import multiprocessing as mp
import random
import time
import diskcache as dc
OPERATIONS = 1000
SEED = 0
SIZE = 10
functions = []
def register(function):
functions.append(function)
return function
@register
def stress_get(deque):
index = random.randrange(max(1, len(deque)))
try:
deque[index]
except IndexError:
pass
@register
def stress_set(deque):
index = random.randrange(max(1, len(deque)))
value = random.random()
try:
deque[index] = value
except IndexError:
pass
@register
def stress_del(deque):
index = random.randrange(max(1, len(deque)))
try:
del deque[index]
except IndexError:
pass
@register
def stress_iadd(deque):
values = [random.random() for _ in range(5)]
deque += values
@register
def stress_append(deque):
value = random.random()
deque.append(value)
@register
def stress_appendleft(deque):
value = random.random()
deque.appendleft(value)
@register
def stress_pop(deque):
try:
deque.pop()
except IndexError:
pass
@register
def stress_popleft(deque):
try:
deque.popleft()
except IndexError:
pass
@register
def stress_reverse(deque):
deque.reverse()
@register
def stress_rotate(deque):
steps = random.randrange(max(1, len(deque)))
deque.rotate(steps)
def stress(seed, deque):
random.seed(seed)
for count in range(OPERATIONS):
if len(deque) > 100:
function = random.choice([stress_pop, stress_popleft])
else:
function = random.choice(functions)
function(deque)
def test(status=False):
random.seed(SEED)
deque = dc.Deque(range(SIZE))
processes = []
for count in range(8):
process = mp.Process(target=stress, args=(SEED + count, deque))
process.start()
processes.append(process)
for value in it.count():
time.sleep(1)
if status:
print('\r', value, 's', len(deque), 'items', ' ' * 20, end='')
if all(not process.is_alive() for process in processes):
break
if status:
print('')
assert all(process.exitcode == 0 for process in processes)
if __name__ == '__main__':
test(status=True)
|