1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
# partial unit test for gmpy threaded mpz functionality
# relies on Tim Peters' "doctest.py" test-driver
import gmpy as _g, doctest, sys, operator, gc, queue, threading
from functools import reduce
__test__={}
def _tf(N=2, _K=1234**5678):
"""Takes about 100ms on a first-generation Macbook Pro"""
for i in range(N): assert (_g.mpz(1234)**5678)==_K
a=_g.mpz(123)
b=_g.mpz(456)
c=_g.mpz(123456789123456789)
def factorize(x=c):
r'''
(Takes about 25ms, on c, on a first-generation Macbook Pro)
>>> factorize(a)
[3, 41]
>>> factorize(b)
[2, 2, 2, 3, 19]
>>>
'''
import gmpy as _g
savex=x
prime=2
x=_g.mpz(x)
factors=[]
while x>=prime:
newx,mult=x.remove(prime)
if mult:
factors.extend([int(prime)]*mult)
x=newx
prime=_g.next_prime(prime)
for factor in factors: assert _g.is_prime(factor)
from operator import mul
assert reduce(mul, factors)==savex
return factors
def elemop(N=1000):
r'''
(Takes about 40ms on a first-generation Macbook Pro)
'''
for i in range(N):
assert a+b == 579
assert a-b == -333
assert b*a == a*b == 56088
assert b%a == 87
assert divmod(a, b) == (0, 123)
assert divmod(b, a) == (3, 87)
assert -a == -123
assert pow(a, 10) == 792594609605189126649
assert pow(a, 7, b) == 99
assert cmp(a, b) == -1
assert '7' in str(c)
assert '0' not in str(c)
assert a.sqrt() == 11
assert _g.lcm(a, b) == 18696
assert _g.fac(7) == 5040
assert _g.fib(17) == 1597
assert _g.divm(b, a, 20) == 12
assert _g.divm(4, 8, 20) == 3
assert _g.divm(4, 8, 20) == 3
assert _g.mpz(20) == 20
assert _g.mpz(8) == 8
assert _g.mpz(4) == 4
assert a.invert(100) == 87
def _test(chat=None):
if chat:
print("Unit tests for gmpy 1.15 (threading)")
print(" running on Python", sys.version)
print()
if _g.gmp_version():
print("Testing gmpy %s (GMP %s) with default caching (%s, %s)" % (
(_g.version(), _g.gmp_version(), _g.get_cache()[0],
_g.get_cache()[1])))
else:
print("Testing gmpy %s (MPIR %s) with default caching (%s, %s)" % (
(_g.version(), _g.mpir_version(), _g.get_cache()[0],
_g.get_cache()[1])))
thismod = sys.modules.get(__name__)
doctest.testmod(thismod, report=0)
if chat: print("Repeating tests, with caching disabled")
_g.set_cache(0,128)
sav = sys.stdout
class _Dummy:
def write(self,*whatever):
pass
try:
sys.stdout = _Dummy()
doctest.testmod(thismod, report=0)
finally:
sys.stdout = sav
if chat:
print()
print("Overall results for thr:")
return doctest.master.summarize(chat)
class DoOne(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
def run(self):
while True:
task = self.q.get()
if task is None: break
task()
def _test_thr(Ntasks=5, Nthreads=1):
q = queue.Queue()
funcs = (_tf, 1), (factorize, 4), (elemop, 2)
for i in range(Ntasks):
for f, n in funcs:
for x in range(n):
q.put(f)
for i in range(Nthreads):
q.put(None)
thrs = [DoOne(q) for i in range(Nthreads)]
for t in thrs: t.start()
for t in thrs: t.join()
if __name__=='__main__':
_test(1)
|