1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
#!/usr/bin/env python
"""
Tests the PQueue implementation a little bit.
"""
import pqueue
import pq3
from time import clock
import whrandom
benchdefault = 100000
def generatetestset(num=benchdefault):
print "Generating", num, "random (priority, data) pairs....",
start = clock()
testset = []
for x in xrange(num):
testset.append((whrandom.randint(0,num),x,))
stop = clock()
print "done --", stop-start, "seconds."
return testset
def testinsert(pq, testset):
print "Inserting into the priority queue....",
insert = 0
for priority,data in testset:
start = clock()
pq.insert(priority, data)
stop = clock()
insert = insert + stop-start
print "done --", insert, "seconds."
return
def testextract(pq, testset, reduceset=None):
if reduceset:
print "Reducing/extracting from the queue....",
else:
print "Extracting from the priority queue....",
extract = 0
reduce = 0
reducecount = 0
copyt = testset[:]
out = []
maxtest = len(testset)-1
for x in testset:
start = clock()
try:
out.append(pq.pop())
except IndexError: break
stop = clock()
extract = extract + stop-start
if reduceset:
index = whrandom.randint(0,maxtest)
data = copyt[index][1]
try:
newpriority = pq[data] / 2
except KeyError: continue
reducecount = reducecount + 1
copyt[index] = (newpriority,data)
start = clock()
pq[data] = newpriority
stop = clock()
reduce = reduce + stop-start
if reduceset:
print "done *", reducecount, "--", reduce, "/", extract, "seconds."
else:
print "done --", extract, "seconds."
print "Checking the results....",
copyt.sort()
if reduceset:
out.sort()
if copyt == out:
print "hrm... data probably ok."
else:
print "doh... it looks screwed."
else:
f = lambda x:x[0]
if map(f,copyt) == map(f,out):
print "oooh... looking good...",
if copyt == out:
print "and stable!"
else:
print "unstable...",
out.sort()
if copyt == out:
print "but data ok."
else:
print "and data buggered."
else:
print "doh... they look screwed:"
def testpq(pq,nums,reduce=None):
testinsert(pq,nums)
testextract(pq,nums)
if reduce:
testinsert(pq,nums)
testextract(pq,nums,reduce)
def main(argv):
benchsize = benchdefault
if len(argv) > 1:
import string
try:
benchsize = string.atoi(argv[1])
except ValueError:
benchsize = benchdefault
p = pqueue.PQueue()
testset = generatetestset(benchsize)
print "-"*20
print "Testing pqueue.PQueue:"
testpq(p,testset,1)
p = pq3.PQ0()
# Kludge the PQ0 class to use the same method names
p.insert = p.addelt
p.pop = p.popsmallest
print "-"*20
print "Testing pq3.PQ0:"
testpq(p,testset)
p = pq3.PQueue()
# Kludge the PQ0 class to use the same method names
p.insert = p.addelt
p.pop = p.popsmallest
print "-"*20
print "Testing pq3.PQueue:"
testpq(p,testset)
p = pq3.PQEquivBig()
# Kludge the PQ0 class to use the same method names
p.insert = p.addelt
p.pop = p.popsmallest
print "-"*20
print "Testing pq3.PQEquivBig:"
testpq(p,testset)
if __name__ == "__main__":
import sys
main(sys.argv)
# End.
|