File: pickled.pxi

package info (click to toggle)
paraview 3.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 173,864 kB
  • ctags: 192,754
  • sloc: cpp: 1,497,065; ansic: 607,818; tcl: 47,394; xml: 47,108; python: 29,870; perl: 3,117; java: 2,428; yacc: 1,853; sh: 833; asm: 471; lex: 393; pascal: 228; makefile: 189; fortran: 31
file content (455 lines) | stat: -rw-r--r-- 15,048 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# --------------------------------------------------------------------

cdef extern from *:
    char*      PyMPIBytes_AsString(object) except NULL
    Py_ssize_t PyMPIBytes_Size(object) except -1
    object     PyMPIBytes_FromStringAndSize(char*,Py_ssize_t)

# --------------------------------------------------------------------

cdef object _Pickle_dumps = None
cdef object _Pickle_loads = None
cdef object _Pickle_PROTOCOL = -1

try:
    from cPickle import dumps as _Pickle_dumps
    from cPickle import loads as _Pickle_loads
    from cPickle import HIGHEST_PROTOCOL as _Pickle_PROTOCOL
except ImportError:
    from pickle import dumps as _Pickle_dumps
    from pickle import loads as _Pickle_loads
    from pickle import HIGHEST_PROTOCOL as _Pickle_PROTOCOL

cdef inline object PyMPI_Dump(object obj):
    return _Pickle_dumps(obj, _Pickle_PROTOCOL)

cdef inline object PyMPI_Load(object obj):
    return _Pickle_loads(obj)

# --------------------------------------------------------------------

cdef inline object _py_reduce(object op, object seq):
    if seq is None: return None
    cdef int i=0, n=len(seq)
    if op is __MAXLOC__ or op is __MINLOC__:
        seq = list(zip(seq, range(n)))
    res = seq[0]
    for i from 1 <= i < n:
        res = op(res, seq[i])
    return res


cdef inline object _py_scan(object op, object seq):
    if seq is None: return None
    cdef int i=0, n=len(seq)
    if op is MAXLOC or op is MINLOC:
        seq = list(zip(seq, range(n)))
    for i from 1 <= i < n:
        seq[i] = op(seq[i-1], seq[i])
    return seq


cdef inline object _py_exscan(object op, object seq):
    if seq is None: return None
    seq = _py_scan(op, seq)
    seq.pop(-1)
    seq.insert(0, None)
    return seq

# --------------------------------------------------------------------

cdef class _p_Pickler:

    cdef object dump(self, object obj, void **p, int *n):
        if obj is None:
            p[0] = NULL
            n[0] = 0
        else:
            obj  = PyMPI_Dump(obj)
            p[0] = PyMPIBytes_AsString(obj)
            n[0] = PyMPIBytes_Size(obj)
        return obj

    cdef object alloc(self, void **p, int n):
        if n == 0:
            obj  = None
            p[0] = NULL
        else:
            obj  = PyMPIBytes_FromStringAndSize(NULL, n)
            p[0] = PyMPIBytes_AsString(obj)
        return obj

    cdef object load(self, obj):
        if obj is None: return None
        return PyMPI_Load(obj)

    cdef object dumpv(self, object obj, void **p,
                      int n, int cnt[], int dsp[]):
        cdef int i=0, d=0, c=0
        cdef Py_ssize_t m = 0
        if obj is None:
            p[0] = NULL
            for i from 0 <= i < n:
                cnt[i] = 0
                dsp[i] = 0
        else:
            obj = list(obj)
            m = len(obj)
            if m != n: raise ValueError(mpistr(
                "expecting %d items, got %d") % (n, m))
            for i from 0 <= i < n:
                obj[i] = self.dump(obj[i], p, &c)
                if c == 0: obj[i] = b''
                cnt[i] = c
                dsp[i] = d
                d += c
            obj = b''.join(obj)
            p[0] = PyMPIBytes_AsString(obj)
        return obj

    cdef object allocv(self, void **p,
                       int n, int cnt[], int dsp[]):
        cdef int i=0, d=0
        for i from 0 <= i < n:
            dsp[i] = d
            d += cnt[i]
        return self.alloc(p, d)

    cdef object loadv(self, object obj,
                      int n, int cnt[], int dsp[]):
        cdef int i=0, d=0, c=0
        cdef object items = [None] * n
        if obj is None: return items
        for i from 0 <= i < n:
            c = cnt[i]
            d = dsp[i]
            if c == 0: continue
            items[i] = self.load(obj[d:d+c])
        return items


cdef _p_Pickler PyMPI_PICKLER = _p_Pickler()

cdef inline _p_Pickler PyMPI_pickler():
    return PyMPI_PICKLER

# --------------------------------------------------------------------

cdef object PyMPI_send(object obj, int dest, int tag,
                       MPI_Comm comm):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *sbuf = NULL
    cdef int scount = 0
    cdef MPI_Datatype stype = MPI_BYTE
    #
    cdef int dosend = (dest != MPI_PROC_NULL)
    #
    cdef object smsg = None
    if dosend: smsg = pickler.dump(obj, &sbuf, &scount)
    with nogil: CHKERR( MPI_Send(sbuf, scount, stype,
                                 dest, tag, comm) )
    return None


cdef object PyMPI_recv(object obj, int source, int tag,
                       MPI_Comm comm, MPI_Status *status):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *rbuf = NULL
    cdef int rcount = 0
    cdef MPI_Datatype rtype = MPI_BYTE
    #
    cdef int dorecv = (source != MPI_PROC_NULL)
    #
    cdef MPI_Status rsts
    with nogil: CHKERR( MPI_Probe(source, tag, comm, &rsts) )
    with nogil: CHKERR( MPI_Get_count(&rsts, rtype, &rcount) )
    source = rsts.MPI_SOURCE
    tag = rsts.MPI_TAG
    #
    cdef object rmsg = None
    if dorecv: rmsg = pickler.alloc(&rbuf, rcount)
    with nogil: CHKERR( MPI_Recv(rbuf, rcount, rtype,
                                 source, tag, comm, status) )
    if dorecv: rmsg = pickler.load(rmsg)
    return rmsg


cdef object PyMPI_sendrecv(object sobj, int dest,   int sendtag,
                           object robj, int source, int recvtag,
                           MPI_Comm comm, MPI_Status *status):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *sbuf = NULL
    cdef int scount = 0
    cdef MPI_Datatype stype = MPI_BYTE
    cdef void *rbuf = NULL
    cdef int rcount = 0
    cdef MPI_Datatype rtype = MPI_BYTE
    #
    cdef int dosend = (dest   != MPI_PROC_NULL)
    cdef int dorecv = (source != MPI_PROC_NULL)
    #
    cdef object smsg = None
    if dosend: smsg = pickler.dump(sobj, &sbuf, &scount)
    cdef MPI_Request sreq = MPI_REQUEST_NULL
    with nogil: CHKERR( MPI_Isend(sbuf, scount, stype,
                                  dest, sendtag, comm, &sreq) )
    #
    cdef MPI_Status rsts
    with nogil: CHKERR( MPI_Probe(source, recvtag, comm, &rsts) )
    with nogil: CHKERR( MPI_Get_count(&rsts, rtype, &rcount) )
    source  = rsts.MPI_SOURCE
    recvtag = rsts.MPI_TAG
    #
    cdef object rmsg = None
    if dorecv: rmsg = pickler.alloc(&rbuf, rcount)
    with nogil: CHKERR( MPI_Recv(rbuf, rcount, rtype,
                                 source, recvtag, comm, status) )
    #
    with nogil: CHKERR( MPI_Wait(&sreq, MPI_STATUS_IGNORE) )
    if dorecv: rmsg = pickler.load(rmsg)
    return rmsg

# --------------------------------------------------------------------

cdef object PyMPI_barrier(MPI_Comm comm):
    with nogil: CHKERR( MPI_Barrier(comm) )
    return None


cdef object PyMPI_bcast(object obj,
                        int root, MPI_Comm comm):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *buf = NULL
    cdef int count = 0
    cdef MPI_Datatype dtype = MPI_BYTE
    #
    cdef int dosend=0, dorecv=0
    cdef int inter=0, rank=0
    CHKERR( MPI_Comm_test_inter(comm, &inter) )
    if inter:
        if root == <int>MPI_PROC_NULL:
            dosend=0; dorecv=0;
        elif root == <int>MPI_ROOT:
            dosend=1; dorecv=0;
        else:
            dosend=0; dorecv=1;
    else:
        CHKERR( MPI_Comm_rank(comm, &rank) )
        if root == rank:
            dosend=1; dorecv=1;
        else:
            dosend=0; dorecv=1;
    #
    cdef object smsg = None
    if dosend: smsg = pickler.dump(obj, &buf, &count)
    with nogil: CHKERR( MPI_Bcast(&count, 1, MPI_INT,
                                  root, comm) )
    cdef object rmsg = None
    if dorecv and dosend: rmsg = smsg
    elif dorecv: rmsg = pickler.alloc(&buf, count)
    with nogil: CHKERR( MPI_Bcast(buf, count, MPI_BYTE,
                                  root, comm) )
    if dorecv: rmsg = pickler.load(rmsg)
    return rmsg


cdef object PyMPI_gather(object sendobj, object recvobj,
                         int root, MPI_Comm comm):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *sbuf = NULL
    cdef int scount = 0
    cdef MPI_Datatype stype = MPI_BYTE
    cdef void *rbuf = NULL
    cdef int *rcounts = NULL
    cdef int *rdispls = NULL
    cdef MPI_Datatype rtype = MPI_BYTE
    #
    cdef int dosend=0, dorecv=0
    cdef int inter=0, size=0, rank=0
    CHKERR( MPI_Comm_test_inter(comm, &inter) )
    if inter:
        CHKERR( MPI_Comm_remote_size(comm, &size) )
        if root == <int>MPI_PROC_NULL:
            dosend=0; dorecv=0;
        elif root == <int>MPI_ROOT:
            dosend=0; dorecv=1;
        else:
            dosend=1; dorecv=0;
    else:
        CHKERR( MPI_Comm_size(comm, &size) )
        CHKERR( MPI_Comm_rank(comm, &rank) )
        if root == rank:
            dosend=1; dorecv=1;
        else:
            dosend=1; dorecv=0;
    #
    cdef object tmp1=None, tmp2=None
    if dorecv: tmp1 = newarray_int(size, &rcounts)
    if dorecv: tmp2 = newarray_int(size, &rdispls)
    #
    cdef object smsg = None
    if dosend: smsg = pickler.dump(sendobj, &sbuf, &scount)
    with nogil: CHKERR( MPI_Gather(&scount, 1, MPI_INT,
                                   rcounts, 1, MPI_INT,
                                   root, comm) )
    cdef object rmsg = None
    if dorecv: rmsg = pickler.allocv(&rbuf, size, rcounts, rdispls)
    with nogil: CHKERR( MPI_Gatherv(sbuf, scount,           stype,
                                    rbuf, rcounts, rdispls, rtype,
                                    root, comm) )
    if dorecv: rmsg = pickler.loadv(rmsg, size, rcounts, rdispls)
    return rmsg


cdef object PyMPI_scatter(object sendobj, object recvobj,
                          int root, MPI_Comm comm):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *sbuf = NULL
    cdef int *scounts = NULL
    cdef int *sdispls = NULL
    cdef MPI_Datatype stype = MPI_BYTE
    cdef void *rbuf = NULL
    cdef int rcount = 0
    cdef MPI_Datatype rtype = MPI_BYTE
    #
    cdef int dosend=0, dorecv=0
    cdef int inter=0, size=0, rank=0
    CHKERR( MPI_Comm_test_inter(comm, &inter) )
    if inter:
        CHKERR( MPI_Comm_remote_size(comm, &size) )
        if root == <int>MPI_PROC_NULL:
            dosend=1; dorecv=0;
        elif root == <int>MPI_ROOT:
            dosend=1; dorecv=0;
        else:
            dosend=0; dorecv=1;
    else:
        CHKERR( MPI_Comm_size(comm, &size) )
        CHKERR( MPI_Comm_rank(comm, &rank) )
        if root == rank:
            dosend=1; dorecv=1;
        else:
            dosend=0; dorecv=1;
    #
    cdef object tmp1=None, tmp2=None
    if dosend: tmp1 = newarray_int(size, &scounts)
    if dosend: tmp2 = newarray_int(size, &sdispls)
    #
    cdef object smsg = None
    if dosend: smsg = pickler.dumpv(sendobj, &sbuf, size, scounts, sdispls)
    with nogil: CHKERR( MPI_Scatter(scounts, 1, MPI_INT,
                                    &rcount, 1, MPI_INT,
                                    root, comm) )
    cdef object rmsg = None
    if dorecv: rmsg = pickler.alloc(&rbuf, rcount)
    with nogil: CHKERR( MPI_Scatterv(sbuf, scounts, sdispls, stype,
                                     rbuf, rcount,           rtype,
                                     root, comm) )
    if dorecv: rmsg = pickler.load(rmsg)
    return rmsg


cdef object PyMPI_allgather(object sendobj, object recvobj,
                            MPI_Comm comm):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *sbuf = NULL
    cdef int scount = 0
    cdef MPI_Datatype stype = MPI_BYTE
    cdef void *rbuf = NULL
    cdef int *rcounts = NULL
    cdef int *rdispls = NULL
    cdef MPI_Datatype rtype = MPI_BYTE
    #
    cdef int inter=0, size=0
    CHKERR( MPI_Comm_test_inter(comm, &inter) )
    if inter:
        CHKERR( MPI_Comm_remote_size(comm, &size) )
    else:
        CHKERR( MPI_Comm_size(comm, &size) )
    #
    cdef object tmp1 = newarray_int(size, &rcounts)
    cdef object tmp2 = newarray_int(size, &rdispls)
    #
    cdef object smsg = pickler.dump(sendobj, &sbuf, &scount)
    with nogil: CHKERR( MPI_Allgather(&scount, 1, MPI_INT,
                                      rcounts, 1, MPI_INT,
                                      comm) )
    cdef object rmsg = pickler.allocv(&rbuf, size, rcounts, rdispls)
    with nogil: CHKERR( MPI_Allgatherv(sbuf, scount,           stype,
                                       rbuf, rcounts, rdispls, rtype,
                                       comm) )
    rmsg = pickler.loadv(rmsg, size, rcounts, rdispls)
    return rmsg


cdef object PyMPI_alltoall(object sendobj, object recvobj,
                           MPI_Comm comm):
    cdef _p_Pickler pickler = PyMPI_pickler()
    #
    cdef void *sbuf = NULL
    cdef int *scounts = NULL
    cdef int *sdispls = NULL
    cdef MPI_Datatype stype = MPI_BYTE
    cdef void *rbuf = NULL
    cdef int *rcounts = NULL
    cdef int *rdispls = NULL
    cdef MPI_Datatype rtype = MPI_BYTE
    #
    cdef int inter=0, size=0
    CHKERR( MPI_Comm_test_inter(comm, &inter) )
    if inter:
        CHKERR( MPI_Comm_remote_size(comm, &size) )
    else:
        CHKERR( MPI_Comm_size(comm, &size) )
    #
    cdef object stmp1 = newarray_int(size, &scounts)
    cdef object stmp2 = newarray_int(size, &sdispls)
    cdef object rtmp1 = newarray_int(size, &rcounts)
    cdef object rtmp2 = newarray_int(size, &rdispls)
    #
    cdef object smsg = pickler.dumpv(sendobj, &sbuf, size, scounts, sdispls)
    with nogil: CHKERR( MPI_Alltoall(scounts, 1, MPI_INT,
                                     rcounts, 1, MPI_INT,
                                     comm) )
    cdef object rmsg = pickler.allocv(&rbuf, size, rcounts, rdispls)
    with nogil: CHKERR( MPI_Alltoallv(sbuf, scounts, sdispls, stype,
                                      rbuf, rcounts, rdispls, rtype,
                                      comm) )
    rmsg = pickler.loadv(rmsg, size, rcounts, rdispls)
    return rmsg

# --------------------------------------------------------------------

cdef object PyMPI_reduce(object sendobj, object recvobj,
                         object op, int root, MPI_Comm comm):
    items = PyMPI_gather(sendobj, recvobj, root, comm)
    return _py_reduce(op, items)


cdef object PyMPI_allreduce(object sendobj, object recvobj,
                            object op, MPI_Comm comm):
    items = PyMPI_allgather(sendobj, recvobj, comm)
    return _py_reduce(op, items)


cdef object PyMPI_scan(object sendobj, object recvobj,
                       object op, MPI_Comm comm):
    items = PyMPI_gather(sendobj, None, 0, comm)
    items = _py_scan(op, items)
    return PyMPI_scatter(items, None, 0, comm)


cdef object PyMPI_exscan(object sendobj, object recvobj,
                         object op, MPI_Comm comm):
    items = PyMPI_gather(sendobj, None, 0, comm)
    items = _py_exscan(op, items)
    return PyMPI_scatter(items, None, 0, comm)

# --------------------------------------------------------------------