File: test_message.py

package info (click to toggle)
mpi4py 1.3.1%2Bhg20131106-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 2,224 kB
  • ctags: 6,415
  • sloc: python: 12,056; ansic: 7,022; makefile: 697; f90: 158; cpp: 103; sh: 60
file content (124 lines) | stat: -rw-r--r-- 4,229 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from mpi4py import MPI
import mpiunittest as unittest
from arrayimpl import allclose

try:
    import array
    HAVE_ARRAY = True
except ImportError:
    HAVE_ARRAY = False
try:
    import numpy
    HAVE_NUMPY = True
except ImportError:
    HAVE_NUMPY = False

def Sendrecv(smsg, rmsg):
    comm = MPI.COMM_SELF
    sts = MPI.Status()
    comm.Sendrecv(sendbuf=smsg, recvbuf=rmsg, status=sts)

class TestMessage(unittest.TestCase):

    TYPECODES = "hil"+"HIL"+"fd"

    def _test1(self, equal, zero, s, r, t):
        r[:] = zero
        Sendrecv(s, r)
        self.assertTrue(equal(s, r))

    def _test2(self, equal, zero, s, r, typecode):
        datatype = MPI.__TypeDict__[typecode]
        for type in (None, typecode, datatype):
            r[:] = zero
            Sendrecv([s, type],
                     [r, type])
            self.assertTrue(equal(s, r))

    def _test31(self, equal, z, s, r, typecode):
        datatype = MPI.__TypeDict__[typecode]
        for count in (None, len(s)):
            for type in (None, typecode, datatype):
                r[:] = z
                Sendrecv([s, count, type],
                         [r, count, type])
                self.assertTrue(equal(s, r))

    def _test32(self, equal, z, s, r, typecode):
        datatype = MPI.__TypeDict__[typecode]
        for type in (None, typecode, datatype):
            for p in range(0, len(s)):
                r[:] = z
                Sendrecv([s, (p, None), type],
                         [r, (p, None), type])
                self.assertTrue(equal(s[:p], r[:p]))
                for q in range(p, len(s)):
                    count, displ = q-p, p
                    r[:] = z
                    Sendrecv([s, (count, displ), type],
                             [r, (count, displ), type])
                    self.assertTrue(equal(s[p:q], r[p:q]))
                    self.assertTrue(equal(z[:p], r[:p]))
                    self.assertTrue(equal(z[q:], r[q:]))

    def _test4(self, equal, z, s, r, typecode):
        datatype = MPI.__TypeDict__[typecode]
        for type in (None, typecode, datatype):
            for p in range(0, len(s)):
                r[:] = z
                Sendrecv([s, p, None, type],
                         [r, p, None, type])
                self.assertTrue(equal(s[:p], r[:p]))
                for q in range(p, len(s)):
                    count, displ = q-p, p
                    r[:] = z
                    Sendrecv([s, count, displ, type],
                             [r, count, displ, type])
                    self.assertTrue(equal(s[p:q], r[p:q]))
                    self.assertTrue(equal(z[:p], r[:p]))
                    self.assertTrue(equal(z[q:], r[q:]))

    if HAVE_ARRAY:
        def _testArray(self, test):
            from array import array
            from operator import eq as equal
            for t in tuple(self.TYPECODES):
                for n in range(1, 10):
                    z = array(t, [0]*n)
                    s = array(t, list(range(n)))
                    r = array(t, [0]*n)
                    test(equal, z, s, r, t)
        def testArray1(self):
            self._testArray(self._test1)
        def testArray2(self):
            self._testArray(self._test2)
        def testArray31(self):
            self._testArray(self._test31)
        def testArray32(self):
            self._testArray(self._test32)
        def testArray4(self):
            self._testArray(self._test4)

    if HAVE_NUMPY:
        def _testNumPy(self, test):
            from numpy import zeros, arange, empty
            for t in tuple(self.TYPECODES):
                for n in range(10):
                    z = zeros (n, dtype=t)
                    s = arange(n, dtype=t)
                    r = empty (n, dtype=t)
                    test(allclose, z, s, r, t)
        def testNumPy1(self):
            self._testNumPy(self._test1)
        def testNumPy2(self):
            self._testNumPy(self._test2)
        def testNumPy31(self):
            self._testNumPy(self._test31)
        def testNumPy32(self):
            self._testNumPy(self._test32)
        def testNumPy4(self):
            self._testNumPy(self._test4)


if __name__ == '__main__':
    unittest.main()