File: ad_aggregate.c

package info (click to toggle)
openmpi 5.0.8-3
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 201,692 kB
  • sloc: ansic: 613,078; makefile: 42,353; sh: 11,194; javascript: 9,244; f90: 7,052; java: 6,404; perl: 5,179; python: 1,859; lex: 740; fortran: 61; cpp: 20; tcl: 12
file content (657 lines) | stat: -rw-r--r-- 24,215 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include "adio.h"
#include "adio_extern.h"

#ifdef AGGREGATION_PROFILE
#include "mpe.h"
#endif

#undef AGG_DEBUG

/* This file contains four functions:
 *
 * ADIOI_Calc_aggregator()
 * ADIOI_Calc_file_domains()
 * ADIOI_Calc_my_req()
 * ADIOI_Calc_others_req()
 *
 * The last three of these were originally in ad_read_coll.c, but they are
 * also shared with ad_write_coll.c.  I felt that they were better kept with
 * the rest of the shared aggregation code.
 */

/* Discussion of values available from above:
 *
 * ADIO_Offset st_offsets[0..nprocs-1]
 * ADIO_Offset end_offsets[0..nprocs-1]
 *    These contain a list of start and end offsets for each process in
 *    the communicator.  For example, an access at loc 10, size 10 would
 *    have a start offset of 10 and end offset of 19.
 * int nprocs
 *    number of processors in the collective I/O communicator
 * ADIO_Offset min_st_offset
 * ADIO_Offset fd_start[0..nprocs_for_coll-1]
 *    starting location of "file domain"; region that a given process will
 *    perform aggregation for (i.e. actually do I/O)
 * ADIO_Offset fd_end[0..nprocs_for_coll-1]
 *    start + size - 1 roughly, but it can be less, or 0, in the case of
 *    uneven distributions
 */

/* ADIOI_Calc_aggregator()
 *
 * The intention here is to implement a function which provides basically
 * the same functionality as in Rajeev's original version of
 * ADIOI_Calc_my_req().  He used a ceiling division approach to assign the
 * file domains, and we use the same approach here when calculating the
 * location of an offset/len in a specific file domain.  Further we assume
 * this same distribution when calculating the rank_index, which is later
 *  used to map to a specific process rank in charge of the file domain.
 *
 * A better (i.e. more general) approach would be to use the list of file
 * domains only.  This would be slower in the case where the
 * original ceiling division was used, but it would allow for arbitrary
 * distributions of regions to aggregators.  We'd need to know the
 * nprocs_for_coll in that case though, which we don't have now.
 *
 * Note a significant difference between this function and Rajeev's old code:
 * this code doesn't necessarily return a rank in the range
 * 0..nprocs_for_coll; instead you get something in 0..nprocs.  This is a
 * result of the rank mapping; any set of ranks in the communicator could be
 * used now.
 *
 * Returns an integer representing a rank in the collective I/O communicator.
 *
 * The "len" parameter is also modified to indicate the amount of data
 * actually available in this file domain.
 */
int ADIOI_Calc_aggregator(ADIO_File fd,
                          ADIO_Offset off,
                          ADIO_Offset min_off,
                          ADIO_Offset * len,
                          ADIO_Offset fd_size, ADIO_Offset * fd_start, ADIO_Offset * fd_end)
{
    int rank_index, rank;
    ADIO_Offset avail_bytes;

    MPL_UNREFERENCED_ARG(fd_start);

    /* get an index into our array of aggregators */
    rank_index = (int) ((off - min_off + fd_size) / fd_size - 1);

    if (fd->hints->striping_unit > 0) {
        /* wkliao: implementation for file domain alignment
         * fd_start[] and fd_end[] have been aligned with file lock
         * boundaries when returned from ADIOI_Calc_file_domains() so cannot
         * just use simple arithmatic as above */
        rank_index = 0;
        while (off > fd_end[rank_index])
            rank_index++;
    }

    /* we index into fd_end with rank_index, and fd_end was allocated to be no
     * bigger than fd->hins->cb_nodes.   If we ever violate that, we're
     * overrunning arrays.  Obviously, we should never ever hit this abort */
    if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
        FPRINTF(stderr,
                "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
                rank_index, fd->hints->cb_nodes, (long long) fd_size, (long long) off);
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    /* remember here that even in Rajeev's original code it was the case that
     * different aggregators could end up with different amounts of data to
     * aggregate.  here we use fd_end[] to make sure that we know how much
     * data this aggregator is working with.
     *
     * the +1 is to take into account the end vs. length issue.
     */
    avail_bytes = fd_end[rank_index] + 1 - off;
    if (avail_bytes < *len) {
        /* this file domain only has part of the requested contig. region */
        *len = avail_bytes;
    }

    /* map our index to a rank */
    /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
    rank = fd->hints->ranklist[rank_index];

    return rank;
}

void ADIOI_Calc_file_domains(ADIO_Offset * st_offsets, ADIO_Offset
                             * end_offsets, int nprocs, int nprocs_for_coll,
                             ADIO_Offset * min_st_offset_ptr,
                             ADIO_Offset ** fd_start_ptr, ADIO_Offset
                             ** fd_end_ptr, int min_fd_size,
                             ADIO_Offset * fd_size_ptr, int striping_unit)
{
/* Divide the I/O workload among "nprocs_for_coll" processes. This is
   done by (logically) dividing the file into file domains (FDs); each
   process may directly access only its own file domain. */

    ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
    int i;

#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5004, 0, NULL);
#endif

#ifdef AGG_DEBUG
    FPRINTF(stderr, "ADIOI_Calc_file_domains: %d aggregator(s)\n", nprocs_for_coll);
#endif

/* find min of start offsets and max of end offsets of all processes */

    min_st_offset = st_offsets[0];
    max_end_offset = end_offsets[0];

    for (i = 1; i < nprocs; i++) {
        min_st_offset = MPL_MIN(min_st_offset, st_offsets[i]);
        max_end_offset = MPL_MAX(max_end_offset, end_offsets[i]);
    }

/* determine the "file domain (FD)" of each process, i.e., the portion of
   the file that will be "owned" by each process */

/* partition the total file access range equally among nprocs_for_coll
   processes */
    fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1) / nprocs_for_coll;
    /* ceiling division as in HPF block distribution */

    /* Tweak the file domains so that no fd is smaller than a threshold.  We
     * have to strike a balance between efficency and parallelism: somewhere
     * between 10k processes sending 32-byte requests and one process sending a
     * 320k request is a (system-dependent) sweet spot */

    if (fd_size < min_fd_size)
        fd_size = min_fd_size;

    *fd_start_ptr = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * 2 * sizeof(ADIO_Offset));
    *fd_end_ptr = *fd_start_ptr + nprocs_for_coll;

    fd_start = *fd_start_ptr;
    fd_end = *fd_end_ptr;

    /* Wei-keng Liao: implementation for fild domain alignment to nearest file
     * lock boundary (as specified by striping_unit hint).  Could also
     * experiment with other alignment strategies here */
    if (striping_unit > 0) {
        ADIO_Offset end_off;
        int rem_front, rem_back;

        /* align fd_end[0] to the nearest file lock boundary */
        fd_start[0] = min_st_offset;
        end_off = fd_start[0] + fd_size;
        rem_front = end_off % striping_unit;
        rem_back = striping_unit - rem_front;
        if (rem_front < rem_back)
            end_off -= rem_front;
        else
            end_off += rem_back;
        fd_end[0] = end_off - 1;

        /* align fd_end[i] to the nearest file lock boundary */
        for (i = 1; i < nprocs_for_coll; i++) {
            fd_start[i] = fd_end[i - 1] + 1;
            end_off = min_st_offset + fd_size * (i + 1);
            rem_front = end_off % striping_unit;
            rem_back = striping_unit - rem_front;
            if (rem_front < rem_back)
                end_off -= rem_front;
            else
                end_off += rem_back;
            fd_end[i] = end_off - 1;
        }
        fd_end[nprocs_for_coll - 1] = max_end_offset;
    } else {    /* no hints set: do things the 'old' way */
        fd_start[0] = min_st_offset;
        fd_end[0] = min_st_offset + fd_size - 1;

        for (i = 1; i < nprocs_for_coll; i++) {
            fd_start[i] = fd_end[i - 1] + 1;
            fd_end[i] = fd_start[i] + fd_size - 1;
        }
    }

/* take care of cases in which the total file access range is not
   divisible by the number of processes. In such cases, the last
   process, or the last few processes, may have unequal load (even 0).
   For example, a range of 97 divided among 16 processes.
   Note that the division is ceiling division. */

    for (i = 0; i < nprocs_for_coll; i++) {
        if (fd_start[i] > max_end_offset)
            fd_start[i] = fd_end[i] = -1;
        if (fd_end[i] > max_end_offset)
            fd_end[i] = max_end_offset;
    }

    *fd_size_ptr = fd_size;
    *min_st_offset_ptr = min_st_offset;

#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5005, 0, NULL);
#endif
}


/* ADIOI_Calc_my_req() - calculate what portions of the access requests
 * of this process are located in the file domains of various processes
 * (including this one)
 */
void ADIOI_Calc_my_req(ADIO_File fd, ADIO_Offset * offset_list, ADIO_Offset * len_list,
                       int contig_access_count, ADIO_Offset
                       min_st_offset, ADIO_Offset * fd_start,
                       ADIO_Offset * fd_end, ADIO_Offset fd_size,
                       int nprocs,
                       int *count_my_req_procs_ptr,
                       int **count_my_req_per_proc_ptr,
                       ADIOI_Access ** my_req_ptr, MPI_Aint ** buf_idx_ptr)
/* Possibly reconsider if buf_idx's are ok as int's, or should they be aints/offsets?
   They are used as memory buffer indices so it seems like the 2G limit is in effect */
{
    int *count_my_req_per_proc, count_my_req_procs;
    MPI_Aint *buf_idx;
    int i, l, proc;
    size_t memLen;
    ADIO_Offset fd_len, rem_len, curr_idx, off, *ptr;
    ADIOI_Access *my_req;

#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5024, 0, NULL);
#endif

    *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
    count_my_req_per_proc = *count_my_req_per_proc_ptr;
/* count_my_req_per_proc[i] gives the no. of contig. requests of this
   process in process i's file domain. calloc initializes to zero.
   I'm allocating memory of size nprocs, so that I can do an
   MPI_Alltoall later on.*/

    buf_idx = (MPI_Aint *) ADIOI_Malloc(nprocs * sizeof(MPI_Aint));
/* buf_idx is relevant only if buftype_is_contig.
   buf_idx[i] gives the index into user_buf where data received
   from proc. i should be placed. This allows receives to be done
   without extra buffer. This can't be done if buftype is not contig. */

    /* initialize buf_idx to -1 */
    for (i = 0; i < nprocs; i++)
        buf_idx[i] = -1;

    /* one pass just to calculate how much space to allocate for my_req;
     * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
     */
    for (i = 0; i < contig_access_count; i++) {
        /* short circuit offset/len processing if len == 0
         *      (zero-byte  read/write */
        if (len_list[i] == 0)
            continue;
        off = offset_list[i];
        fd_len = len_list[i];
        /* note: we set fd_len to be the total size of the access.  then
         * ADIOI_Calc_aggregator() will modify the value to return the
         * amount that was available from the file domain that holds the
         * first part of the access.
         */
        proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, fd_start, fd_end);
        count_my_req_per_proc[proc]++;

        /* figure out how much data is remaining in the access (i.e. wasn't
         * part of the file domain that had the starting byte); we'll take
         * care of this data (if there is any) in the while loop below.
         */
        rem_len = len_list[i] - fd_len;

        while (rem_len != 0) {
            off += fd_len;      /* point to first remaining byte */
            fd_len = rem_len;   /* save remaining size, pass to calc */
            proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
                                         fd_size, fd_start, fd_end);

            count_my_req_per_proc[proc]++;
            rem_len -= fd_len;  /* reduce remaining length by amount from fd */
        }
    }

/* now allocate space for my_req, offset, and len */

    *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
    my_req = *my_req_ptr;

    /* combine offsets and lens into a single regions so we can make one
     * exchange instead of two later on.  Over-allocate the 'offsets' array and
     * make 'lens' point to the over-allocated part
     */
    memLen = 0;
    for (i = 0; i < nprocs; i++)
        memLen += count_my_req_per_proc[i];
    ptr = (ADIO_Offset *) ADIOI_Malloc(memLen * 2 * sizeof(ADIO_Offset));
    my_req[0].offsets = ptr;

    count_my_req_procs = 0;
    for (i = 0; i < nprocs; i++) {
        if (count_my_req_per_proc[i]) {
            my_req[i].offsets = ptr;
            ptr += count_my_req_per_proc[i];
            my_req[i].lens = ptr;
            ptr += count_my_req_per_proc[i];
            count_my_req_procs++;
        }
        my_req[i].count = 0;    /* will be incremented where needed
                                 * later */
    }

/* now fill in my_req */
    curr_idx = 0;
    for (i = 0; i < contig_access_count; i++) {
        /* short circuit offset/len processing if len == 0
         *      (zero-byte  read/write */
        if (len_list[i] == 0)
            continue;
        off = offset_list[i];
        fd_len = len_list[i];
        proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, fd_start, fd_end);

        /* for each separate contiguous access from this process */
        if (buf_idx[proc] == -1) {
            ADIOI_Assert(curr_idx == (MPI_Aint) curr_idx);
            buf_idx[proc] = (MPI_Aint) curr_idx;
        }

        l = my_req[proc].count;
        curr_idx += fd_len;

        rem_len = len_list[i] - fd_len;

        /* store the proc, offset, and len information in an array
         * of structures, my_req. Each structure contains the
         * offsets and lengths located in that process's FD,
         * and the associated count.
         */
        my_req[proc].offsets[l] = off;
        my_req[proc].lens[l] = fd_len;
        my_req[proc].count++;

        while (rem_len != 0) {
            off += fd_len;
            fd_len = rem_len;
            proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
                                         fd_size, fd_start, fd_end);

            if (buf_idx[proc] == -1) {
                ADIOI_Assert(curr_idx == (MPI_Aint) curr_idx);
                buf_idx[proc] = (MPI_Aint) curr_idx;
            }

            l = my_req[proc].count;
            curr_idx += fd_len;
            rem_len -= fd_len;

            my_req[proc].offsets[l] = off;
            my_req[proc].lens[l] = fd_len;
            my_req[proc].count++;
        }
    }

#ifdef AGG_DEBUG
    for (i = 0; i < nprocs; i++) {
        if (count_my_req_per_proc[i] > 0) {
            FPRINTF(stdout, "data needed from %d (count = %d):\n", i, my_req[i].count);
            for (l = 0; l < my_req[i].count; l++) {
                FPRINTF(stdout, "   off[%d] = %lld, len[%d] = %d\n", l,
                        (long long) my_req[i].offsets[l], l, (long long) my_req[i].lens[l]);
            }
            FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
        }
    }
#endif

    *count_my_req_procs_ptr = count_my_req_procs;
    *buf_idx_ptr = buf_idx;
#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5025, 0, NULL);
#endif
}



void ADIOI_Calc_others_req(ADIO_File fd, int count_my_req_procs,
                           int *count_my_req_per_proc,
                           ADIOI_Access * my_req,
                           int nprocs, int myrank,
                           int *count_others_req_procs_ptr, ADIOI_Access ** others_req_ptr)
{
/* determine what requests of other processes lie in this process's
   file domain */

/* count_others_req_procs = number of processes whose requests lie in
   this process's file domain (including this process itself)
   count_others_req_per_proc[i] indicates how many separate contiguous
   requests of proc. i lie in this process's file domain. */

    int *count_others_req_per_proc, count_others_req_procs;
    int i, j;
    MPI_Request *requests;
    ADIOI_Access *others_req;
    size_t memLen;
    ADIO_Offset *ptr;
    MPI_Aint *mem_ptrs;

/* first find out how much to send/recv and from/to whom */
#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5026, 0, NULL);
#endif
    count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));

    MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
                 count_others_req_per_proc, 1, MPI_INT, fd->comm);

    *others_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
    others_req = *others_req_ptr;

    memLen = 0;
    for (i = 0; i < nprocs; i++)
        memLen += count_others_req_per_proc[i];
    ptr = (ADIO_Offset *) ADIOI_Malloc(memLen * 2 * sizeof(ADIO_Offset));
    mem_ptrs = (MPI_Aint *) ADIOI_Malloc(memLen * sizeof(MPI_Aint));
    others_req[0].offsets = ptr;
    others_req[0].mem_ptrs = mem_ptrs;

    count_others_req_procs = 0;
    for (i = 0; i < nprocs; i++) {
        if (count_others_req_per_proc[i]) {
            others_req[i].count = count_others_req_per_proc[i];
            others_req[i].offsets = ptr;
            ptr += count_others_req_per_proc[i];
            others_req[i].lens = ptr;
            ptr += count_others_req_per_proc[i];
            others_req[i].mem_ptrs = mem_ptrs;
            mem_ptrs += count_others_req_per_proc[i];
            count_others_req_procs++;
        } else
            others_req[i].count = 0;
    }
    ADIOI_Free(count_others_req_per_proc);

/* now send the calculated offsets and lengths to respective processes */

    requests = (MPI_Request *)
        ADIOI_Malloc(1 + (count_my_req_procs + count_others_req_procs) * sizeof(MPI_Request));
/* +1 to avoid a 0-size malloc */

    j = 0;
    for (i = 0; i < nprocs; i++) {
        if (others_req[i].count) {
            MPI_Irecv(others_req[i].offsets, 2 * others_req[i].count,
                      ADIO_OFFSET, i, i + myrank, fd->comm, &requests[j++]);
        }
    }

    for (i = 0; i < nprocs; i++) {
        if (my_req[i].count) {
            MPI_Isend(my_req[i].offsets, 2 * my_req[i].count,
                      ADIO_OFFSET, i, i + myrank, fd->comm, &requests[j++]);
        }
    }

    if (j) {
#ifdef MPI_STATUSES_IGNORE
        MPI_Waitall(j, requests, MPI_STATUSES_IGNORE);
#else
        MPI_Status *statuses = (MPI_Status *) ADIOI_Malloc(j * sizeof(MPI_Status));
        MPI_Waitall(j, requests, statuses);
        ADIOI_Free(statuses);
#endif
    }

    ADIOI_Free(requests);

    *count_others_req_procs_ptr = count_others_req_procs;
#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5027, 0, NULL);
#endif
}


/* Nonblocking version of ADIOI_Calc_others_req().
   It consists of three functions - ADIOI_Icalc_others_req(),
   ADIOI_Icalc_others_req_main(), and ADIOI_Icalc_others_req_fini(). */
void ADIOI_Icalc_others_req(ADIOI_NBC_Request * nbc_req, int *error_code)
{
    ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;

    /* count_others_req_per_proc[i] indicates how many separate contiguous
     * requests of proc. i lie in this process's file domain. */

    /* first find out how much to send/recv and from/to whom */
#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5026, 0, NULL);
#endif
    vars->count_others_req_per_proc = (int *) ADIOI_Malloc(vars->nprocs * sizeof(int));

    *error_code = MPI_Ialltoall(vars->count_my_req_per_proc, 1, MPI_INT,
                                vars->count_others_req_per_proc, 1, MPI_INT, vars->fd->comm,
                                &vars->req1);

    if (nbc_req->rdwr == ADIOI_READ) {
        nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ;
    } else {
        ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
        nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ;
    }
}

void ADIOI_Icalc_others_req_main(ADIOI_NBC_Request * nbc_req, int *error_code)
{
    ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
    ADIO_File fd = vars->fd;
    int count_my_req_procs = vars->count_my_req_procs;
    ADIOI_Access *my_req = vars->my_req;
    int nprocs = vars->nprocs;
    int myrank = vars->myrank;
    ADIOI_Access **others_req_ptr = vars->others_req_ptr;

    /* determine what requests of other processes lie in this process's
     * file domain */

    /* count_others_req_procs = number of processes whose requests lie in
     * this process's file domain (including this process itself)
     * count_others_req_per_proc[i] indicates how many separate contiguous
     * requests of proc. i lie in this process's file domain. */

    int *count_others_req_per_proc = vars->count_others_req_per_proc;
    int count_others_req_procs;
    int i, j;
    ADIOI_Access *others_req;
    size_t memLen;
    ADIO_Offset *ptr;
    MPI_Aint *mem_ptrs;

    *others_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
    others_req = *others_req_ptr;

    memLen = 0;
    for (i = 0; i < nprocs; i++)
        memLen += count_others_req_per_proc[i];
    ptr = (ADIO_Offset *) ADIOI_Malloc(memLen * 2 * sizeof(ADIO_Offset));
    mem_ptrs = (MPI_Aint *) ADIOI_Malloc(memLen * sizeof(MPI_Aint));
    others_req[0].offsets = ptr;
    others_req[0].mem_ptrs = mem_ptrs;

    count_others_req_procs = 0;
    for (i = 0; i < nprocs; i++) {
        if (count_others_req_per_proc[i]) {
            others_req[i].count = count_others_req_per_proc[i];
            others_req[i].offsets = ptr;
            ptr += count_others_req_per_proc[i];
            others_req[i].lens = ptr;
            ptr += count_others_req_per_proc[i];
            others_req[i].mem_ptrs = mem_ptrs;
            mem_ptrs += count_others_req_per_proc[i];
            count_others_req_procs++;
        } else
            others_req[i].count = 0;
    }
    vars->count_others_req_procs = count_others_req_procs;

    /* now send the calculated offsets and lengths to respective processes */

    vars->req2 = (MPI_Request *)
        ADIOI_Malloc(1 + 2 * (count_my_req_procs + count_others_req_procs)
                     * sizeof(MPI_Request));
    /* +1 to avoid a 0-size malloc */

    j = 0;
    for (i = 0; i < nprocs; i++) {
        if (others_req[i].count) {
            MPI_Irecv(others_req[i].offsets, 2 * others_req[i].count,
                      ADIO_OFFSET, i, i + myrank, fd->comm, &vars->req2[j++]);
        }
    }

    for (i = 0; i < nprocs; i++) {
        if (my_req[i].count) {
            MPI_Isend(my_req[i].offsets, 2 * my_req[i].count,
                      ADIO_OFFSET, i, i + myrank, fd->comm, &vars->req2[j++]);
        }
    }

    /* keep the number of requests */
    vars->num_req2 = j;

    if (nbc_req->rdwr == ADIOI_READ) {
        nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ_MAIN;
    } else {
        ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
        nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ_MAIN;
    }
}

void ADIOI_Icalc_others_req_fini(ADIOI_NBC_Request * nbc_req, int *error_code)
{
    ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
    void (*next_fn) (ADIOI_NBC_Request *, int *);

    ADIOI_Free(vars->req2);
    ADIOI_Free(vars->count_others_req_per_proc);

    *vars->count_others_req_procs_ptr = vars->count_others_req_procs;
#ifdef AGGREGATION_PROFILE
    MPE_Log_event(5027, 0, NULL);
#endif
    /* end of the calculation */

    next_fn = vars->next_fn;

    /* free the struct for parameters and variables */
    ADIOI_Free(vars);
    nbc_req->cor_vars = NULL;

    /* move to the next function */
    next_fn(nbc_req, error_code);
}