File: gmpy_test_thr.py

package info (click to toggle)
python-gmpy 1.11-1
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 752 kB
  • ctags: 756
  • sloc: ansic: 10,001; python: 5,514; makefile: 36
file content (129 lines) | stat: -rw-r--r-- 3,583 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# partial unit test for gmpy 1.11 threaded mpz functionality
# relies on Tim Peters' "doctest.py" test-driver
# test-version 1.11

import gmpy as _g, doctest, sys, operator, gc, Queue, threading

__test__={}
def _tf(N=2, _K=1234**5678):
    """Takes about 100ms on a first-generation Macbook Pro"""
    for i in range(N): assert (_g.mpz(1234)**5678)==_K
a=_g.mpz(123)
b=_g.mpz(456)
c=_g.mpz(123456789123456789)

def factorize(x=c):
    r'''
    (Takes about 25ms, on c, on a first-generation Macbook Pro)
    >>> factorize(a)
    [3, 41]
    >>> factorize(b)
    [2, 2, 2, 3, 19]
    >>>
    '''
    import gmpy as _g
    savex=x
    prime=2
    x=_g.mpz(x)
    factors=[]
    while x>=prime:
        newx,mult=x.remove(prime)
        if mult:
            factors.extend([int(prime)]*mult)
            x=newx
        prime=_g.next_prime(prime)
    for factor in factors: assert _g.is_prime(factor)
    from operator import mul
    assert reduce(mul, factors)==savex
    return factors

def elemop(N=1000):
    r'''
    (Takes about 40ms on a first-generation Macbook Pro)
    '''
    for i in range(N):
        assert a+b == 579
        assert a-b == -333
        assert b*a == a*b == 56088
        assert b%a == 87
        assert divmod(a, b) == (0, 123)
        assert divmod(b, a) == (3, 87)
        assert -a == -123
        assert pow(a, 10) == 792594609605189126649
        assert pow(a, 7, b) == 99
        assert cmp(a, b) == -1
        assert '7' in str(c)
        assert '0' not in str(c)
        assert a.sqrt() == 11
        assert _g.lcm(a, b) == 18696
        assert _g.fac(7) == 5040
        assert _g.fib(17) == 1597
        assert _g.divm(b, a, 20) == 12
        assert _g.divm(4, 8, 20) == 3
        assert _g.divm(4, 8, 20) == 3
        assert _g.mpz(20) == 20
        assert _g.mpz(8) == 8
        assert _g.mpz(4) == 4
        assert a.invert(100) == 87

def _test(chat=None):
    if chat:
        print "Unit tests for gmpy 1.11 (threading)"
        print "    running on Python", sys.version
        print
        if _g.gmp_version():
            print "Testing gmpy %s (GMP %s) with default caching (%s, %s)" % (
                (_g.version(), _g.gmp_version(), _g.get_cache()[0],
                _g.get_cache()[1]))
        else:
            print "Testing gmpy %s (MPIR %s) with default caching (%s, %s)" % (
                (_g.version(), _g.mpir_version(), _g.get_cache()[0],
                _g.get_cache()[1]))

    thismod = sys.modules.get(__name__)
    doctest.testmod(thismod, report=0)

    if chat: print "Repeating tests, with caching disabled"
    _g.set_zcache(0)

    sav = sys.stdout
    class _Dummy:
        def write(self,*whatever):
            pass
    try:
        sys.stdout = _Dummy()
        doctest.testmod(thismod, report=0)
    finally:
        sys.stdout = sav

    if chat:
        print
        print "Overall results for thr:"
    return doctest.master.summarize(chat)

class DoOne(threading.Thread):
    def __init__(self, q):
        threading.Thread.__init__(self)
        self.q = q
    def run(self):
        while True:
            task = self.q.get()
            if task is None: break
            task()

def _test_thr(Ntasks=5, Nthreads=1):
    q = Queue.Queue()
    funcs = (_tf, 1), (factorize, 4), (elemop, 2)
    for i in range(Ntasks):
        for f, n in funcs:
            for x in range(n):
                q.put(f)
    for i in range(Nthreads):
        q.put(None)
    thrs = [DoOne(q) for i in range(Nthreads)]
    for t in thrs: t.start()
    for t in thrs: t.join()

if __name__=='__main__':
    _test(1)