File: gmpy_test_thr.py

package info (click to toggle)
python-gmpy 1.11-1
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 752 kB
  • ctags: 756
  • sloc: ansic: 10,001; python: 5,514; makefile: 36
file content (130 lines) | stat: -rw-r--r-- 3,625 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# partial unit test for gmpy 1.11 threaded mpz functionality
# relies on Tim Peters' "doctest.py" test-driver
# test-version 1.11

import gmpy as _g, doctest, sys, operator, gc, queue, threading
from functools import reduce
__test__={}

def _tf(N=2, _K=1234**5678):
    """Takes about 100ms on a first-generation Macbook Pro"""
    for i in range(N): assert (_g.mpz(1234)**5678)==_K
a=_g.mpz(123)
b=_g.mpz(456)
c=_g.mpz(123456789123456789)

def factorize(x=c):
    r'''
    (Takes about 25ms, on c, on a first-generation Macbook Pro)
    >>> factorize(a)
    [3, 41]
    >>> factorize(b)
    [2, 2, 2, 3, 19]
    >>>
    '''
    import gmpy as _g
    savex=x
    prime=2
    x=_g.mpz(x)
    factors=[]
    while x>=prime:
        newx,mult=x.remove(prime)
        if mult:
            factors.extend([int(prime)]*mult)
            x=newx
        prime=_g.next_prime(prime)
    for factor in factors: assert _g.is_prime(factor)
    from operator import mul
    assert reduce(mul, factors)==savex
    return factors

def elemop(N=1000):
    r'''
    (Takes about 40ms on a first-generation Macbook Pro)
    '''
    for i in range(N):
        assert a+b == 579
        assert a-b == -333
        assert b*a == a*b == 56088
        assert b%a == 87
        assert divmod(a, b) == (0, 123)
        assert divmod(b, a) == (3, 87)
        assert -a == -123
        assert pow(a, 10) == 792594609605189126649
        assert pow(a, 7, b) == 99
        assert cmp(a, b) == -1
        assert '7' in str(c)
        assert '0' not in str(c)
        assert a.sqrt() == 11
        assert _g.lcm(a, b) == 18696
        assert _g.fac(7) == 5040
        assert _g.fib(17) == 1597
        assert _g.divm(b, a, 20) == 12
        assert _g.divm(4, 8, 20) == 3
        assert _g.divm(4, 8, 20) == 3
        assert _g.mpz(20) == 20
        assert _g.mpz(8) == 8
        assert _g.mpz(4) == 4
        assert a.invert(100) == 87

def _test(chat=None):
    if chat:
        print("Unit tests for gmpy 1.11 (threading)")
        print("    running on Python", sys.version)
        print()
        if _g.gmp_version():
            print("Testing gmpy %s (GMP %s) with default caching (%s, %s)" % (
                (_g.version(), _g.gmp_version(), _g.get_cache()[0],
                _g.get_cache()[1])))
        else:
            print("Testing gmpy %s (MPIR %s) with default caching (%s, %s)" % (
                (_g.version(), _g.mpir_version(), _g.get_cache()[0],
                _g.get_cache()[1])))

    thismod = sys.modules.get(__name__)
    doctest.testmod(thismod, report=0)

    if chat: print("Repeating tests, with caching disabled")
    _g.set_cache(0,128)

    sav = sys.stdout
    class _Dummy:
        def write(self,*whatever):
            pass
    try:
        sys.stdout = _Dummy()
        doctest.testmod(thismod, report=0)
    finally:
        sys.stdout = sav

    if chat:
        print()
        print("Overall results for thr:")
    return doctest.master.summarize(chat)

class DoOne(threading.Thread):
    def __init__(self, q):
        threading.Thread.__init__(self)
        self.q = q
    def run(self):
        while True:
            task = self.q.get()
            if task is None: break
            task()

def _test_thr(Ntasks=5, Nthreads=1):
    q = queue.Queue()
    funcs = (_tf, 1), (factorize, 4), (elemop, 2)
    for i in range(Ntasks):
        for f, n in funcs:
            for x in range(n):
                q.put(f)
    for i in range(Nthreads):
        q.put(None)
    thrs = [DoOne(q) for i in range(Nthreads)]
    for t in thrs: t.start()
    for t in thrs: t.join()

if __name__=='__main__':
    _test(1)