1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
|
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/
#include "adio.h"
#include "adio_extern.h"
#ifdef AGGREGATION_PROFILE
#include "mpe.h"
#endif
#undef AGG_DEBUG
/* This file contains four functions:
*
* ADIOI_Calc_aggregator()
* ADIOI_Calc_file_domains()
* ADIOI_Calc_my_req()
* ADIOI_Calc_others_req()
*
* The last three of these were originally in ad_read_coll.c, but they are
* also shared with ad_write_coll.c. I felt that they were better kept with
* the rest of the shared aggregation code.
*/
/* Discussion of values available from above:
*
* ADIO_Offset st_offsets[0..nprocs-1]
* ADIO_Offset end_offsets[0..nprocs-1]
* These contain a list of start and end offsets for each process in
* the communicator. For example, an access at loc 10, size 10 would
* have a start offset of 10 and end offset of 19.
* int nprocs
* number of processors in the collective I/O communicator
* ADIO_Offset min_st_offset
* ADIO_Offset fd_start[0..nprocs_for_coll-1]
* starting location of "file domain"; region that a given process will
* perform aggregation for (i.e. actually do I/O)
* ADIO_Offset fd_end[0..nprocs_for_coll-1]
* start + size - 1 roughly, but it can be less, or 0, in the case of
* uneven distributions
*/
/* ADIOI_Calc_aggregator()
*
* The intention here is to implement a function which provides basically
* the same functionality as in Rajeev's original version of
* ADIOI_Calc_my_req(). He used a ceiling division approach to assign the
* file domains, and we use the same approach here when calculating the
* location of an offset/len in a specific file domain. Further we assume
* this same distribution when calculating the rank_index, which is later
* used to map to a specific process rank in charge of the file domain.
*
* A better (i.e. more general) approach would be to use the list of file
* domains only. This would be slower in the case where the
* original ceiling division was used, but it would allow for arbitrary
* distributions of regions to aggregators. We'd need to know the
* nprocs_for_coll in that case though, which we don't have now.
*
* Note a significant difference between this function and Rajeev's old code:
* this code doesn't necessarily return a rank in the range
* 0..nprocs_for_coll; instead you get something in 0..nprocs. This is a
* result of the rank mapping; any set of ranks in the communicator could be
* used now.
*
* Returns an integer representing a rank in the collective I/O communicator.
*
* The "len" parameter is also modified to indicate the amount of data
* actually available in this file domain.
*/
int ADIOI_Calc_aggregator(ADIO_File fd,
ADIO_Offset off,
ADIO_Offset min_off,
ADIO_Offset * len,
ADIO_Offset fd_size, ADIO_Offset * fd_start, ADIO_Offset * fd_end)
{
int rank_index, rank;
ADIO_Offset avail_bytes;
MPL_UNREFERENCED_ARG(fd_start);
/* get an index into our array of aggregators */
rank_index = (int) ((off - min_off + fd_size) / fd_size - 1);
if (fd->hints->striping_unit > 0) {
/* wkliao: implementation for file domain alignment
* fd_start[] and fd_end[] have been aligned with file lock
* boundaries when returned from ADIOI_Calc_file_domains() so cannot
* just use simple arithmatic as above */
rank_index = 0;
while (off > fd_end[rank_index])
rank_index++;
}
/* we index into fd_end with rank_index, and fd_end was allocated to be no
* bigger than fd->hins->cb_nodes. If we ever violate that, we're
* overrunning arrays. Obviously, we should never ever hit this abort */
if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
FPRINTF(stderr,
"Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
rank_index, fd->hints->cb_nodes, (long long) fd_size, (long long) off);
MPI_Abort(MPI_COMM_WORLD, 1);
}
/* remember here that even in Rajeev's original code it was the case that
* different aggregators could end up with different amounts of data to
* aggregate. here we use fd_end[] to make sure that we know how much
* data this aggregator is working with.
*
* the +1 is to take into account the end vs. length issue.
*/
avail_bytes = fd_end[rank_index] + 1 - off;
if (avail_bytes < *len) {
/* this file domain only has part of the requested contig. region */
*len = avail_bytes;
}
/* map our index to a rank */
/* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
rank = fd->hints->ranklist[rank_index];
return rank;
}
void ADIOI_Calc_file_domains(ADIO_Offset * st_offsets, ADIO_Offset
* end_offsets, int nprocs, int nprocs_for_coll,
ADIO_Offset * min_st_offset_ptr,
ADIO_Offset ** fd_start_ptr, ADIO_Offset
** fd_end_ptr, int min_fd_size,
ADIO_Offset * fd_size_ptr, int striping_unit)
{
/* Divide the I/O workload among "nprocs_for_coll" processes. This is
done by (logically) dividing the file into file domains (FDs); each
process may directly access only its own file domain. */
ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
int i;
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5004, 0, NULL);
#endif
#ifdef AGG_DEBUG
FPRINTF(stderr, "ADIOI_Calc_file_domains: %d aggregator(s)\n", nprocs_for_coll);
#endif
/* find min of start offsets and max of end offsets of all processes */
min_st_offset = st_offsets[0];
max_end_offset = end_offsets[0];
for (i = 1; i < nprocs; i++) {
min_st_offset = MPL_MIN(min_st_offset, st_offsets[i]);
max_end_offset = MPL_MAX(max_end_offset, end_offsets[i]);
}
/* determine the "file domain (FD)" of each process, i.e., the portion of
the file that will be "owned" by each process */
/* partition the total file access range equally among nprocs_for_coll
processes */
fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1) / nprocs_for_coll;
/* ceiling division as in HPF block distribution */
/* Tweak the file domains so that no fd is smaller than a threshold. We
* have to strike a balance between efficency and parallelism: somewhere
* between 10k processes sending 32-byte requests and one process sending a
* 320k request is a (system-dependent) sweet spot */
if (fd_size < min_fd_size)
fd_size = min_fd_size;
*fd_start_ptr = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * 2 * sizeof(ADIO_Offset));
*fd_end_ptr = *fd_start_ptr + nprocs_for_coll;
fd_start = *fd_start_ptr;
fd_end = *fd_end_ptr;
/* Wei-keng Liao: implementation for fild domain alignment to nearest file
* lock boundary (as specified by striping_unit hint). Could also
* experiment with other alignment strategies here */
if (striping_unit > 0) {
ADIO_Offset end_off;
int rem_front, rem_back;
/* align fd_end[0] to the nearest file lock boundary */
fd_start[0] = min_st_offset;
end_off = fd_start[0] + fd_size;
rem_front = end_off % striping_unit;
rem_back = striping_unit - rem_front;
if (rem_front < rem_back)
end_off -= rem_front;
else
end_off += rem_back;
fd_end[0] = end_off - 1;
/* align fd_end[i] to the nearest file lock boundary */
for (i = 1; i < nprocs_for_coll; i++) {
fd_start[i] = fd_end[i - 1] + 1;
end_off = min_st_offset + fd_size * (i + 1);
rem_front = end_off % striping_unit;
rem_back = striping_unit - rem_front;
if (rem_front < rem_back)
end_off -= rem_front;
else
end_off += rem_back;
fd_end[i] = end_off - 1;
}
fd_end[nprocs_for_coll - 1] = max_end_offset;
} else { /* no hints set: do things the 'old' way */
fd_start[0] = min_st_offset;
fd_end[0] = min_st_offset + fd_size - 1;
for (i = 1; i < nprocs_for_coll; i++) {
fd_start[i] = fd_end[i - 1] + 1;
fd_end[i] = fd_start[i] + fd_size - 1;
}
}
/* take care of cases in which the total file access range is not
divisible by the number of processes. In such cases, the last
process, or the last few processes, may have unequal load (even 0).
For example, a range of 97 divided among 16 processes.
Note that the division is ceiling division. */
for (i = 0; i < nprocs_for_coll; i++) {
if (fd_start[i] > max_end_offset)
fd_start[i] = fd_end[i] = -1;
if (fd_end[i] > max_end_offset)
fd_end[i] = max_end_offset;
}
*fd_size_ptr = fd_size;
*min_st_offset_ptr = min_st_offset;
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5005, 0, NULL);
#endif
}
/* ADIOI_Calc_my_req() - calculate what portions of the access requests
* of this process are located in the file domains of various processes
* (including this one)
*/
void ADIOI_Calc_my_req(ADIO_File fd, ADIO_Offset * offset_list, ADIO_Offset * len_list,
int contig_access_count, ADIO_Offset
min_st_offset, ADIO_Offset * fd_start,
ADIO_Offset * fd_end, ADIO_Offset fd_size,
int nprocs,
int *count_my_req_procs_ptr,
int **count_my_req_per_proc_ptr,
ADIOI_Access ** my_req_ptr, MPI_Aint ** buf_idx_ptr)
/* Possibly reconsider if buf_idx's are ok as int's, or should they be aints/offsets?
They are used as memory buffer indices so it seems like the 2G limit is in effect */
{
int *count_my_req_per_proc, count_my_req_procs;
MPI_Aint *buf_idx;
int i, l, proc;
size_t memLen;
ADIO_Offset fd_len, rem_len, curr_idx, off, *ptr;
ADIOI_Access *my_req;
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5024, 0, NULL);
#endif
*count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
count_my_req_per_proc = *count_my_req_per_proc_ptr;
/* count_my_req_per_proc[i] gives the no. of contig. requests of this
process in process i's file domain. calloc initializes to zero.
I'm allocating memory of size nprocs, so that I can do an
MPI_Alltoall later on.*/
buf_idx = (MPI_Aint *) ADIOI_Malloc(nprocs * sizeof(MPI_Aint));
/* buf_idx is relevant only if buftype_is_contig.
buf_idx[i] gives the index into user_buf where data received
from proc. i should be placed. This allows receives to be done
without extra buffer. This can't be done if buftype is not contig. */
/* initialize buf_idx to -1 */
for (i = 0; i < nprocs; i++)
buf_idx[i] = -1;
/* one pass just to calculate how much space to allocate for my_req;
* contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
*/
for (i = 0; i < contig_access_count; i++) {
/* short circuit offset/len processing if len == 0
* (zero-byte read/write */
if (len_list[i] == 0)
continue;
off = offset_list[i];
fd_len = len_list[i];
/* note: we set fd_len to be the total size of the access. then
* ADIOI_Calc_aggregator() will modify the value to return the
* amount that was available from the file domain that holds the
* first part of the access.
*/
proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, fd_start, fd_end);
count_my_req_per_proc[proc]++;
/* figure out how much data is remaining in the access (i.e. wasn't
* part of the file domain that had the starting byte); we'll take
* care of this data (if there is any) in the while loop below.
*/
rem_len = len_list[i] - fd_len;
while (rem_len != 0) {
off += fd_len; /* point to first remaining byte */
fd_len = rem_len; /* save remaining size, pass to calc */
proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
fd_size, fd_start, fd_end);
count_my_req_per_proc[proc]++;
rem_len -= fd_len; /* reduce remaining length by amount from fd */
}
}
/* now allocate space for my_req, offset, and len */
*my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
my_req = *my_req_ptr;
/* combine offsets and lens into a single regions so we can make one
* exchange instead of two later on. Over-allocate the 'offsets' array and
* make 'lens' point to the over-allocated part
*/
memLen = 0;
for (i = 0; i < nprocs; i++)
memLen += count_my_req_per_proc[i];
ptr = (ADIO_Offset *) ADIOI_Malloc(memLen * 2 * sizeof(ADIO_Offset));
my_req[0].offsets = ptr;
count_my_req_procs = 0;
for (i = 0; i < nprocs; i++) {
if (count_my_req_per_proc[i]) {
my_req[i].offsets = ptr;
ptr += count_my_req_per_proc[i];
my_req[i].lens = ptr;
ptr += count_my_req_per_proc[i];
count_my_req_procs++;
}
my_req[i].count = 0; /* will be incremented where needed
* later */
}
/* now fill in my_req */
curr_idx = 0;
for (i = 0; i < contig_access_count; i++) {
/* short circuit offset/len processing if len == 0
* (zero-byte read/write */
if (len_list[i] == 0)
continue;
off = offset_list[i];
fd_len = len_list[i];
proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, fd_start, fd_end);
/* for each separate contiguous access from this process */
if (buf_idx[proc] == -1) {
ADIOI_Assert(curr_idx == (MPI_Aint) curr_idx);
buf_idx[proc] = (MPI_Aint) curr_idx;
}
l = my_req[proc].count;
curr_idx += fd_len;
rem_len = len_list[i] - fd_len;
/* store the proc, offset, and len information in an array
* of structures, my_req. Each structure contains the
* offsets and lengths located in that process's FD,
* and the associated count.
*/
my_req[proc].offsets[l] = off;
my_req[proc].lens[l] = fd_len;
my_req[proc].count++;
while (rem_len != 0) {
off += fd_len;
fd_len = rem_len;
proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
fd_size, fd_start, fd_end);
if (buf_idx[proc] == -1) {
ADIOI_Assert(curr_idx == (MPI_Aint) curr_idx);
buf_idx[proc] = (MPI_Aint) curr_idx;
}
l = my_req[proc].count;
curr_idx += fd_len;
rem_len -= fd_len;
my_req[proc].offsets[l] = off;
my_req[proc].lens[l] = fd_len;
my_req[proc].count++;
}
}
#ifdef AGG_DEBUG
for (i = 0; i < nprocs; i++) {
if (count_my_req_per_proc[i] > 0) {
FPRINTF(stdout, "data needed from %d (count = %d):\n", i, my_req[i].count);
for (l = 0; l < my_req[i].count; l++) {
FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n", l,
(long long) my_req[i].offsets[l], l, (long long) my_req[i].lens[l]);
}
FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
}
}
#endif
*count_my_req_procs_ptr = count_my_req_procs;
*buf_idx_ptr = buf_idx;
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5025, 0, NULL);
#endif
}
void ADIOI_Calc_others_req(ADIO_File fd, int count_my_req_procs,
int *count_my_req_per_proc,
ADIOI_Access * my_req,
int nprocs, int myrank,
int *count_others_req_procs_ptr, ADIOI_Access ** others_req_ptr)
{
/* determine what requests of other processes lie in this process's
file domain */
/* count_others_req_procs = number of processes whose requests lie in
this process's file domain (including this process itself)
count_others_req_per_proc[i] indicates how many separate contiguous
requests of proc. i lie in this process's file domain. */
int *count_others_req_per_proc, count_others_req_procs;
int i, j;
MPI_Request *requests;
ADIOI_Access *others_req;
size_t memLen;
ADIO_Offset *ptr;
MPI_Aint *mem_ptrs;
/* first find out how much to send/recv and from/to whom */
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5026, 0, NULL);
#endif
count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
count_others_req_per_proc, 1, MPI_INT, fd->comm);
*others_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
others_req = *others_req_ptr;
memLen = 0;
for (i = 0; i < nprocs; i++)
memLen += count_others_req_per_proc[i];
ptr = (ADIO_Offset *) ADIOI_Malloc(memLen * 2 * sizeof(ADIO_Offset));
mem_ptrs = (MPI_Aint *) ADIOI_Malloc(memLen * sizeof(MPI_Aint));
others_req[0].offsets = ptr;
others_req[0].mem_ptrs = mem_ptrs;
count_others_req_procs = 0;
for (i = 0; i < nprocs; i++) {
if (count_others_req_per_proc[i]) {
others_req[i].count = count_others_req_per_proc[i];
others_req[i].offsets = ptr;
ptr += count_others_req_per_proc[i];
others_req[i].lens = ptr;
ptr += count_others_req_per_proc[i];
others_req[i].mem_ptrs = mem_ptrs;
mem_ptrs += count_others_req_per_proc[i];
count_others_req_procs++;
} else
others_req[i].count = 0;
}
ADIOI_Free(count_others_req_per_proc);
/* now send the calculated offsets and lengths to respective processes */
requests = (MPI_Request *)
ADIOI_Malloc(1 + (count_my_req_procs + count_others_req_procs) * sizeof(MPI_Request));
/* +1 to avoid a 0-size malloc */
j = 0;
for (i = 0; i < nprocs; i++) {
if (others_req[i].count) {
MPI_Irecv(others_req[i].offsets, 2 * others_req[i].count,
ADIO_OFFSET, i, i + myrank, fd->comm, &requests[j++]);
}
}
for (i = 0; i < nprocs; i++) {
if (my_req[i].count) {
MPI_Isend(my_req[i].offsets, 2 * my_req[i].count,
ADIO_OFFSET, i, i + myrank, fd->comm, &requests[j++]);
}
}
if (j) {
#ifdef MPI_STATUSES_IGNORE
MPI_Waitall(j, requests, MPI_STATUSES_IGNORE);
#else
MPI_Status *statuses = (MPI_Status *) ADIOI_Malloc(j * sizeof(MPI_Status));
MPI_Waitall(j, requests, statuses);
ADIOI_Free(statuses);
#endif
}
ADIOI_Free(requests);
*count_others_req_procs_ptr = count_others_req_procs;
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5027, 0, NULL);
#endif
}
/* Nonblocking version of ADIOI_Calc_others_req().
It consists of three functions - ADIOI_Icalc_others_req(),
ADIOI_Icalc_others_req_main(), and ADIOI_Icalc_others_req_fini(). */
void ADIOI_Icalc_others_req(ADIOI_NBC_Request * nbc_req, int *error_code)
{
ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
/* count_others_req_per_proc[i] indicates how many separate contiguous
* requests of proc. i lie in this process's file domain. */
/* first find out how much to send/recv and from/to whom */
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5026, 0, NULL);
#endif
vars->count_others_req_per_proc = (int *) ADIOI_Malloc(vars->nprocs * sizeof(int));
*error_code = MPI_Ialltoall(vars->count_my_req_per_proc, 1, MPI_INT,
vars->count_others_req_per_proc, 1, MPI_INT, vars->fd->comm,
&vars->req1);
if (nbc_req->rdwr == ADIOI_READ) {
nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ;
} else {
ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ;
}
}
void ADIOI_Icalc_others_req_main(ADIOI_NBC_Request * nbc_req, int *error_code)
{
ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
ADIO_File fd = vars->fd;
int count_my_req_procs = vars->count_my_req_procs;
ADIOI_Access *my_req = vars->my_req;
int nprocs = vars->nprocs;
int myrank = vars->myrank;
ADIOI_Access **others_req_ptr = vars->others_req_ptr;
/* determine what requests of other processes lie in this process's
* file domain */
/* count_others_req_procs = number of processes whose requests lie in
* this process's file domain (including this process itself)
* count_others_req_per_proc[i] indicates how many separate contiguous
* requests of proc. i lie in this process's file domain. */
int *count_others_req_per_proc = vars->count_others_req_per_proc;
int count_others_req_procs;
int i, j;
ADIOI_Access *others_req;
size_t memLen;
ADIO_Offset *ptr;
MPI_Aint *mem_ptrs;
*others_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
others_req = *others_req_ptr;
memLen = 0;
for (i = 0; i < nprocs; i++)
memLen += count_others_req_per_proc[i];
ptr = (ADIO_Offset *) ADIOI_Malloc(memLen * 2 * sizeof(ADIO_Offset));
mem_ptrs = (MPI_Aint *) ADIOI_Malloc(memLen * sizeof(MPI_Aint));
others_req[0].offsets = ptr;
others_req[0].mem_ptrs = mem_ptrs;
count_others_req_procs = 0;
for (i = 0; i < nprocs; i++) {
if (count_others_req_per_proc[i]) {
others_req[i].count = count_others_req_per_proc[i];
others_req[i].offsets = ptr;
ptr += count_others_req_per_proc[i];
others_req[i].lens = ptr;
ptr += count_others_req_per_proc[i];
others_req[i].mem_ptrs = mem_ptrs;
mem_ptrs += count_others_req_per_proc[i];
count_others_req_procs++;
} else
others_req[i].count = 0;
}
vars->count_others_req_procs = count_others_req_procs;
/* now send the calculated offsets and lengths to respective processes */
vars->req2 = (MPI_Request *)
ADIOI_Malloc(1 + 2 * (count_my_req_procs + count_others_req_procs)
* sizeof(MPI_Request));
/* +1 to avoid a 0-size malloc */
j = 0;
for (i = 0; i < nprocs; i++) {
if (others_req[i].count) {
MPI_Irecv(others_req[i].offsets, 2 * others_req[i].count,
ADIO_OFFSET, i, i + myrank, fd->comm, &vars->req2[j++]);
}
}
for (i = 0; i < nprocs; i++) {
if (my_req[i].count) {
MPI_Isend(my_req[i].offsets, 2 * my_req[i].count,
ADIO_OFFSET, i, i + myrank, fd->comm, &vars->req2[j++]);
}
}
/* keep the number of requests */
vars->num_req2 = j;
if (nbc_req->rdwr == ADIOI_READ) {
nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ_MAIN;
} else {
ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ_MAIN;
}
}
void ADIOI_Icalc_others_req_fini(ADIOI_NBC_Request * nbc_req, int *error_code)
{
ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
void (*next_fn) (ADIOI_NBC_Request *, int *);
ADIOI_Free(vars->req2);
ADIOI_Free(vars->count_others_req_per_proc);
*vars->count_others_req_procs_ptr = vars->count_others_req_procs;
#ifdef AGGREGATION_PROFILE
MPE_Log_event(5027, 0, NULL);
#endif
/* end of the calculation */
next_fn = vars->next_fn;
/* free the struct for parameters and variables */
ADIOI_Free(vars);
nbc_req->cor_vars = NULL;
/* move to the next function */
next_fn(nbc_req, error_code);
}
|