1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# partial unit test for gmpy 1.11 threaded mpz functionality
# relies on Tim Peters' "doctest.py" test-driver
# test-version 1.11
import gmpy as _g, doctest, sys, operator, gc, queue, threading
from functools import reduce
__test__={}
def _tf(N=2, _K=1234**5678):
"""Takes about 100ms on a first-generation Macbook Pro"""
for i in range(N): assert (_g.mpz(1234)**5678)==_K
a=_g.mpz(123)
b=_g.mpz(456)
c=_g.mpz(123456789123456789)
def factorize(x=c):
r'''
(Takes about 25ms, on c, on a first-generation Macbook Pro)
>>> factorize(a)
[3, 41]
>>> factorize(b)
[2, 2, 2, 3, 19]
>>>
'''
import gmpy as _g
savex=x
prime=2
x=_g.mpz(x)
factors=[]
while x>=prime:
newx,mult=x.remove(prime)
if mult:
factors.extend([int(prime)]*mult)
x=newx
prime=_g.next_prime(prime)
for factor in factors: assert _g.is_prime(factor)
from operator import mul
assert reduce(mul, factors)==savex
return factors
def elemop(N=1000):
r'''
(Takes about 40ms on a first-generation Macbook Pro)
'''
for i in range(N):
assert a+b == 579
assert a-b == -333
assert b*a == a*b == 56088
assert b%a == 87
assert divmod(a, b) == (0, 123)
assert divmod(b, a) == (3, 87)
assert -a == -123
assert pow(a, 10) == 792594609605189126649
assert pow(a, 7, b) == 99
assert cmp(a, b) == -1
assert '7' in str(c)
assert '0' not in str(c)
assert a.sqrt() == 11
assert _g.lcm(a, b) == 18696
assert _g.fac(7) == 5040
assert _g.fib(17) == 1597
assert _g.divm(b, a, 20) == 12
assert _g.divm(4, 8, 20) == 3
assert _g.divm(4, 8, 20) == 3
assert _g.mpz(20) == 20
assert _g.mpz(8) == 8
assert _g.mpz(4) == 4
assert a.invert(100) == 87
def _test(chat=None):
if chat:
print("Unit tests for gmpy 1.11 (threading)")
print(" running on Python", sys.version)
print()
if _g.gmp_version():
print("Testing gmpy %s (GMP %s) with default caching (%s, %s)" % (
(_g.version(), _g.gmp_version(), _g.get_cache()[0],
_g.get_cache()[1])))
else:
print("Testing gmpy %s (MPIR %s) with default caching (%s, %s)" % (
(_g.version(), _g.mpir_version(), _g.get_cache()[0],
_g.get_cache()[1])))
thismod = sys.modules.get(__name__)
doctest.testmod(thismod, report=0)
if chat: print("Repeating tests, with caching disabled")
_g.set_cache(0,128)
sav = sys.stdout
class _Dummy:
def write(self,*whatever):
pass
try:
sys.stdout = _Dummy()
doctest.testmod(thismod, report=0)
finally:
sys.stdout = sav
if chat:
print()
print("Overall results for thr:")
return doctest.master.summarize(chat)
class DoOne(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
def run(self):
while True:
task = self.q.get()
if task is None: break
task()
def _test_thr(Ntasks=5, Nthreads=1):
q = queue.Queue()
funcs = (_tf, 1), (factorize, 4), (elemop, 2)
for i in range(Ntasks):
for f, n in funcs:
for x in range(n):
q.put(f)
for i in range(Nthreads):
q.put(None)
thrs = [DoOne(q) for i in range(Nthreads)]
for t in thrs: t.start()
for t in thrs: t.join()
if __name__=='__main__':
_test(1)
|