1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
/*
* Copyright (c) 2018 DataDirect Networks. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "fbtl_ime.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fbtl/fbtl.h"
static ssize_t mca_fbtl_ime_nonblocking_op (ompio_file_t *fh,
ompi_request_t *request, int io_op);
ssize_t mca_fbtl_ime_ipreadv (ompio_file_t *fh, ompi_request_t *request)
{
return mca_fbtl_ime_nonblocking_op(fh, request, FBTL_IME_READ);
}
ssize_t mca_fbtl_ime_ipwritev (ompio_file_t *fh, ompi_request_t *request)
{
return mca_fbtl_ime_nonblocking_op(fh, request, FBTL_IME_WRITE);
}
static ssize_t mca_fbtl_ime_nonblocking_op (ompio_file_t *fh,
ompi_request_t *request, int io_op)
{
mca_fbtl_ime_request_data_t *data;
mca_ompio_request_t *req = (mca_ompio_request_t *) request;
int i=0, req_index = 0, ret;
data = (mca_fbtl_ime_request_data_t *) malloc ( sizeof (mca_fbtl_ime_request_data_t));
if ( NULL == data ) {
opal_output (1,"could not allocate memory\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* We might allocate too much memory here because we don't know
how many IME requests will be necessary.
We will use all the iovec "slots" in the array,
but maybe not all the request and request status slots.
That is, because an IME request can handle several iovecs,
not just one. */
data->allocated_data = (void*) malloc( fh->f_num_of_io_entries *
(sizeof(struct iovec) +
sizeof(struct ime_aiocb) +
sizeof(ssize_t)) );
if (NULL == data->allocated_data) {
opal_output(1, "OUT OF MEMORY\n");
free(data);
return OMPI_ERR_OUT_OF_RESOURCE;
}
data->aio_iovecs = (struct iovec *) data->allocated_data;
data->aio_reqs = (struct ime_aiocb *) (data->aio_iovecs +
fh->f_num_of_io_entries);
data->aio_req_status = (ssize_t *) (data->aio_reqs +
fh->f_num_of_io_entries);
/* Fill some attributes of the OMPIO request data */
data->aio_req_type = io_op; /* The correctness of io_op will be checked later */
data->aio_req_chunks = mca_fbtl_ime_aio_reqs_max;
data->aio_req_fail_count = 0;
data->aio_total_len = 0;
data->aio_fh = fh;
data->aio_reqs[0].iovcnt = 0;
/* Go through all IO entries and try to aggregate them. */
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
data->aio_iovecs[i].iov_base = fh->f_io_array[i].memory_address;
data->aio_iovecs[i].iov_len = fh->f_io_array[i].length;
/* If the processed iovec will be the first in our ime_aiocb request,
then we initialize this aio request for IME. */
if (data->aio_reqs[req_index].iovcnt == 0) {
data->aio_reqs[req_index].iov = &data->aio_iovecs[i];
data->aio_reqs[req_index].iovcnt = 1;
data->aio_reqs[req_index].file_offset = (off_t)
fh->f_io_array[i].offset;
data->aio_reqs[req_index].fd = fh->fd;
data->aio_reqs[req_index].complete_cb = &mca_fbtl_ime_complete_cb;
data->aio_reqs[req_index].user_context = (intptr_t)
&data->aio_req_status[req_index];
data->aio_req_status[req_index] = FBTL_IME_IN_PROGRESS;
}
/* Here we check if the next iovec will be appended to
the current ime_aiocb request.
ie: if data is contiguous
AND we don't exceed the advised number of iovecs for IME
In that case, the next iovec will be appended to the IME req. */
if (i+1 != fh->f_num_of_io_entries &&
((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
(ptrdiff_t)fh->f_io_array[i].length) ==
(OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset &&
data->aio_reqs[req_index].iovcnt < mca_fbtl_ime_iov_max ) {
data->aio_reqs[req_index].iovcnt++;
}
/* Otherwise, we need to create a new request
(except if there is no next iovec to process) */
else if ( i+1 != fh->f_num_of_io_entries ) {
req_index++;
data->aio_reqs[req_index].iovcnt = 0;
}
}
/* Fill the missing attributes of the OMPI request */
data->aio_req_count = req_index + 1;
data->aio_open_reqs = req_index + 1;
data->aio_first_active_req = 0;
if ( data->aio_req_count > data->aio_req_chunks ) {
data->aio_last_active_req = data->aio_req_chunks;
}
else {
data->aio_last_active_req = data->aio_req_count;
}
/* Actually start the requests (or at least the first batch).
In case an error happened when one request is started, we
don't send the next ones and mark the failing request as
the last active one. Finally we exit as if no error happened,
because some other requests might have already been started
and they need to be finalized properly (via the progress function).
*/
for (i=0; i < data->aio_last_active_req; i++) {
switch(io_op) {
case FBTL_IME_READ:
ret = ime_native_aio_read(&data->aio_reqs[i]);
if (ret < 0) {
opal_output(1, "mca_fbtl_ime_nonblocking_op: error in "
"ime_native_aio_read() error ret=%d %s",
ret, strerror(errno));
data->aio_req_status[i] = FBTL_IME_REQ_ERROR;
data->aio_last_active_req = i + 1;
goto standard_exit;
}
break;
case FBTL_IME_WRITE:
ret = ime_native_aio_write(&data->aio_reqs[i]);
if (ret < 0) {
opal_output(1, "mca_fbtl_ime_nonblocking_op: error in "
"ime_native_aio_write() error ret=%d %s",
ret, strerror(errno));
data->aio_req_status[i] = FBTL_IME_REQ_ERROR;
data->aio_last_active_req = i + 1;
goto standard_exit;
}
break;
default:
opal_output(1, "mca_fbtl_ime_nonblocking_op: an unsupported "
"IO operation was requested. io_op=%d", io_op);
goto error_exit;
}
}
standard_exit:
req->req_data = data;
req->req_progress_fn = mca_fbtl_ime_progress;
req->req_free_fn = mca_fbtl_ime_request_free;
return OMPI_SUCCESS;
error_exit:
free(data->allocated_data);
free(data);
return OMPI_ERROR;
}
|