1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
# Test for utimeq module which implements task queue with support for
# wraparound time (utime.ticks_ms() style).
try:
from utime import ticks_add, ticks_diff
from utimeq import utimeq
except ImportError:
print("SKIP")
raise SystemExit
DEBUG = 0
MAX = ticks_add(0, -1)
MODULO_HALF = MAX // 2 + 1
if DEBUG:
def dprint(*v):
print(*v)
else:
def dprint(*v):
pass
# Try not to crash on invalid data
h = utimeq(10)
try:
h.push(1)
assert False
except TypeError:
pass
try:
h.pop(1)
assert False
except IndexError:
pass
# unsupported unary op
try:
~h
assert False
except TypeError:
pass
# pushing on full queue
h = utimeq(1)
h.push(1, 0, 0)
try:
h.push(2, 0, 0)
assert False
except IndexError:
pass
# popping into invalid type
try:
h.pop([])
assert False
except TypeError:
pass
# length
assert len(h) == 1
# peektime
assert h.peektime() == 1
# peektime with empty queue
try:
utimeq(1).peektime()
assert False
except IndexError:
pass
def pop_all(h):
l = []
while h:
item = [0, 0, 0]
h.pop(item)
#print("!", item)
l.append(tuple(item))
dprint(l)
return l
def add(h, v):
h.push(v, 0, 0)
dprint("-----")
#h.dump()
dprint("-----")
h = utimeq(10)
add(h, 0)
add(h, MAX)
add(h, MAX - 1)
add(h, 101)
add(h, 100)
add(h, MAX - 2)
dprint(h)
l = pop_all(h)
for i in range(len(l) - 1):
diff = ticks_diff(l[i + 1][0], l[i][0])
assert diff > 0
def edge_case(edge, offset):
h = utimeq(10)
add(h, ticks_add(0, offset))
add(h, ticks_add(edge, offset))
dprint(h)
l = pop_all(h)
diff = ticks_diff(l[1][0], l[0][0])
dprint(diff, diff > 0)
return diff
dprint("===")
diff = edge_case(MODULO_HALF - 1, 0)
assert diff == MODULO_HALF - 1
assert edge_case(MODULO_HALF - 1, 100) == diff
assert edge_case(MODULO_HALF - 1, -100) == diff
# We expect diff to be always positive, per the definition of heappop() which should return
# the smallest value.
# This is the edge case where this invariant breaks, due to assymetry of two's-complement
# range - there's one more negative integer than positive, so heappushing values like below
# will then make ticks_diff() return the minimum negative value. We could make heappop
# return them in a different order, but ticks_diff() result would be the same. Conclusion:
# never add to a heap values where (a - b) == MODULO_HALF (and which are >= MODULO_HALF
# ticks apart in real time of course).
dprint("===")
diff = edge_case(MODULO_HALF, 0)
assert diff == -MODULO_HALF
assert edge_case(MODULO_HALF, 100) == diff
assert edge_case(MODULO_HALF, -100) == diff
dprint("===")
diff = edge_case(MODULO_HALF + 1, 0)
assert diff == MODULO_HALF - 1
assert edge_case(MODULO_HALF + 1, 100) == diff
assert edge_case(MODULO_HALF + 1, -100) == diff
print("OK")
|