File: ad_open.c

package info (click to toggle)
openmpi 5.0.8-3
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 201,692 kB
  • sloc: ansic: 613,078; makefile: 42,353; sh: 11,194; javascript: 9,244; f90: 7,052; java: 6,404; perl: 5,179; python: 1,859; lex: 740; fortran: 61; cpp: 20; tcl: 12
file content (318 lines) | stat: -rw-r--r-- 11,704 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include "adio.h"
#include "adio_extern.h"
#include "adio_cb_config_list.h"

#include "mpio.h"
static int is_aggregator(int rank, ADIO_File fd);
static int uses_generic_read(ADIO_File fd);
static int uses_generic_write(ADIO_File fd);
static int build_cb_config_list(ADIO_File fd,
                                MPI_Comm orig_comm, MPI_Comm comm,
                                int rank, int procs, int *error_code);

MPI_File ADIO_Open(MPI_Comm orig_comm,
                   MPI_Comm comm, const char *filename, int file_system,
                   ADIOI_Fns * ops,
                   int access_mode, ADIO_Offset disp, MPI_Datatype etype,
                   MPI_Datatype filetype, MPI_Info info, int perm, int *error_code)
{
    MPI_File mpi_fh;
    ADIO_File fd;
    int err, rank, procs;
    static char myname[] = "ADIO_OPEN";
    int max_error_code;
    MPI_Info dupinfo;
    int syshints_processed, can_skip;
    char *p;

    *error_code = MPI_SUCCESS;

    /* obtain MPI_File handle */
    mpi_fh = MPIO_File_create(sizeof(struct ADIOI_FileD));
    if (mpi_fh == MPI_FILE_NULL) {
        fd = MPI_FILE_NULL;
        *error_code = MPIO_Err_create_code(*error_code,
                                           MPIR_ERR_RECOVERABLE,
                                           myname, __LINE__, MPI_ERR_OTHER, "**nomem2", 0);
        goto fn_exit;

    }
    fd = MPIO_File_resolve(mpi_fh);

    fd->cookie = ADIOI_FILE_COOKIE;
    fd->fp_ind = disp;
    fd->fp_sys_posn = 0;
    fd->comm = comm;    /* dup'ed in MPI_File_open */
    fd->filename = ADIOI_Strdup(filename);
    fd->file_system = file_system;
    fd->fs_ptr = NULL;

    fd->fns = ops;

    fd->disp = disp;
    fd->split_coll_count = 0;
    fd->shared_fp_fd = ADIO_FILE_NULL;
    fd->atomicity = 0;
    fd->etype = etype;  /* MPI_BYTE by default */
    fd->filetype = filetype;    /* MPI_BYTE by default */
    fd->etype_size = 1; /* default etype is MPI_BYTE */

    fd->file_realm_st_offs = NULL;
    fd->file_realm_types = NULL;

    fd->perm = perm;

    fd->async_count = 0;

    fd->fortran_handle = -1;

    fd->err_handler = ADIOI_DFLT_ERR_HANDLER;

    fd->io_buf_window = MPI_WIN_NULL;
    fd->io_buf_put_amounts_window = MPI_WIN_NULL;

    MPI_Comm_rank(comm, &rank);
    MPI_Comm_size(comm, &procs);
/* create and initialize info object */
    fd->hints = (ADIOI_Hints *) ADIOI_Calloc(1, sizeof(struct ADIOI_Hints_struct));
    if (fd->hints == NULL) {
        *error_code = MPIO_Err_create_code(*error_code,
                                           MPIR_ERR_RECOVERABLE,
                                           myname, __LINE__, MPI_ERR_OTHER, "**nomem2", 0);
        goto fn_exit;
    }
    fd->hints->cb_config_list = NULL;
    fd->hints->ranklist = NULL;
    fd->hints->initialized = 0;
    fd->info = MPI_INFO_NULL;

    /* move system-wide hint processing *back* into open, but this time the
     * hintfile reader will do a scalable read-and-broadcast.  The global
     * ADIOI_syshints will get initialized at first open.  subsequent open
     * calls will just use result from first open.
     *
     * We have two goals here:
     * 1: avoid processing the hintfile multiple times
     * 2: have all processes participate in hintfile processing (so we can read-and-broadcast)
     *
     * a code might do an "initialize from 0", so we can only skip hint
     * processing once everyone has particpiated in hint processing */
    if (ADIOI_syshints == MPI_INFO_NULL)
        syshints_processed = 0;
    else
        syshints_processed = 1;

    MPI_Allreduce(&syshints_processed, &can_skip, 1, MPI_INT, MPI_MIN, fd->comm);
    if (!can_skip) {
        if (ADIOI_syshints == MPI_INFO_NULL)
            MPI_Info_create(&ADIOI_syshints);
        ADIOI_process_system_hints(fd, ADIOI_syshints);
    }

    ADIOI_incorporate_system_hints(info, ADIOI_syshints, &dupinfo);
    ADIO_SetInfo(fd, dupinfo, &err);
    if (dupinfo != MPI_INFO_NULL) {
        *error_code = MPI_Info_free(&dupinfo);
        if (*error_code != MPI_SUCCESS)
            goto fn_exit;
    }
    ADIOI_Info_set(fd->info, "romio_filesystem_type", fd->fns->fsname);

    /* Instead of repeatedly allocating this buffer in collective read/write,
     * allocating up-front might make memory management on small platforms
     * (e.g. Blue Gene) more efficent */

    fd->io_buf = ADIOI_Malloc(fd->hints->cb_buffer_size);
    /* deferred open:
     * we can only do this optimization if 'fd->hints->deferred_open' is set
     * (which means the user hinted 'no_indep_rw' and collective buffering).
     * Furthermore, we only do this if our collective read/write routines use
     * our generic function, and not an fs-specific routine (we can defer opens
     * only if we use our aggreagation code). */
    if (fd->hints->deferred_open && !(uses_generic_read(fd)
                                      && uses_generic_write(fd))) {
        fd->hints->deferred_open = 0;
    }
    if (ADIO_Feature(fd, ADIO_SCALABLE_OPEN))
        /* disable deferred open on these fs so that scalable broadcast
         * will always use the propper communicator */
        fd->hints->deferred_open = 0;


    /* on BlueGene, the cb_config_list is built when hints are processed. No
     * one else does that right now */
    if (fd->hints->ranklist == NULL) {
        build_cb_config_list(fd, orig_comm, comm, rank, procs, error_code);
        if (*error_code != MPI_SUCCESS)
            goto fn_exit;
    }
    fd->is_open = 0;
    fd->my_cb_nodes_index = -2;
    fd->is_agg = is_aggregator(rank, fd);
    /* deferred open used to split the communicator to create an "aggregator
     * communicator", but we only used it as a way to indicate that deferred
     * open happened.  fd->is_open and fd->is_agg are sufficient */

    /* actual opens start here */
    /* generic open: one process opens to create the file, all others open */
    /* nfs open: everybody opens or else you'll end up with "file not found"
     * due to stupid nfs consistency semantics */
    /* scalable open: one process opens and broadcasts results to everyone */

    ADIOI_OpenColl(fd, rank, access_mode, error_code);

    /* deferred open consideration: if an independent process lied about
     * "no_indep_rw" and opens the file later (example: HDF5 uses independent
     * i/o for metadata), that deferred open will use the access_mode provided
     * by the user.  CREATE|EXCL only makes sense here -- exclusive access in
     * the deferred open case is going to fail and surprise the user.  Turn off
     * the excl amode bit. Save user's ammode for MPI_FILE_GET_AMODE */
    fd->orig_access_mode = access_mode;
    if (fd->access_mode & ADIO_EXCL)
        fd->access_mode ^= ADIO_EXCL;


    /* for debugging, it can be helpful to see the hints selected. Some file
     * systes set up the hints in the open call (e.g. lustre) */
    p = getenv("ROMIO_PRINT_HINTS");
    if (rank == 0 && p != NULL) {
        ADIOI_Info_print_keyvals(fd->info);
    }

  fn_exit:
    MPI_Allreduce(error_code, &max_error_code, 1, MPI_INT, MPI_MAX, comm);
    if (max_error_code != MPI_SUCCESS) {

        /* If the file was successfully opened, close it */
        if (*error_code == MPI_SUCCESS) {

            /* in the deferred open case, only those who have actually
             * opened the file should close it */
            if (fd->hints->deferred_open) {
                if (fd->is_agg) {
                    (*(fd->fns->ADIOI_xxx_Close)) (fd, error_code);
                }
            } else {
                (*(fd->fns->ADIOI_xxx_Close)) (fd, error_code);
            }
        }
        ADIOI_Free(fd->filename);
        ADIOI_Free(fd->hints->ranklist);
        if (fd->hints->cb_config_list != NULL)
            ADIOI_Free(fd->hints->cb_config_list);
        ADIOI_Free(fd->hints);
        if (fd->info != MPI_INFO_NULL)
            MPI_Info_free(&(fd->info));
        ADIOI_Free(fd->io_buf);
        ADIOI_Free(fd);
        fd = ADIO_FILE_NULL;
        if (*error_code == MPI_SUCCESS) {
            *error_code = MPIO_Err_create_code(MPI_SUCCESS,
                                               MPIR_ERR_RECOVERABLE, myname,
                                               __LINE__, MPI_ERR_IO, "**oremote_fail", 0);
        }
    }

    return fd;
}

/* a simple linear search. possible enancement: add a my_cb_nodes_index member
 * (index into cb_nodes, else -1 if not aggregator) for faster lookups
 *
 * fd->hints->cb_nodes is the number of aggregators
 * fd->hints->ranklist[] is an array of the ranks of aggregators
 *
 * might want to move this to adio/common/cb_config_list.c
 */
int is_aggregator(int rank, ADIO_File fd)
{
    int i;

    if (fd->my_cb_nodes_index == -2) {
        for (i = 0; i < fd->hints->cb_nodes; i++) {
            if (rank == fd->hints->ranklist[i]) {
                fd->my_cb_nodes_index = i;
                return 1;
            }
        }
        fd->my_cb_nodes_index = -1;
    } else if (fd->my_cb_nodes_index != -1)
        return 1;

    return 0;
}

/*
 * If file system implements some version of two-phase -- doesn't have to be
 * generic -- we can still carry out the defered open optimization
 */
static int uses_generic_read(ADIO_File fd)
{
    if (ADIO_Feature(fd, ADIO_TWO_PHASE))
        return 1;
    return 0;
}

static int uses_generic_write(ADIO_File fd)
{
    if (ADIO_Feature(fd, ADIO_TWO_PHASE))
        return 1;
    return 0;
}

static int build_cb_config_list(ADIO_File fd,
                                MPI_Comm orig_comm, MPI_Comm comm,
                                int rank, int procs, int *error_code)
{
    ADIO_cb_name_array array;
    int *tmp_ranklist;
    int rank_ct;
    char *value;
    static char myname[] = "ADIO_OPEN cb_config_list";

    /* gather the processor name array if we don't already have it */
    /* this has to be done early in ADIO_Open so that we can cache the name
     * array in both the dup'd communicator (in case we want it later) and the
     * original communicator */
    ADIOI_cb_gather_name_array(orig_comm, comm, &array);

/* parse the cb_config_list and create a rank map on rank 0 */
    if (rank == 0) {
        tmp_ranklist = (int *) ADIOI_Malloc(sizeof(int) * procs);
        if (tmp_ranklist == NULL) {
            *error_code = MPIO_Err_create_code(*error_code,
                                               MPIR_ERR_RECOVERABLE,
                                               myname, __LINE__, MPI_ERR_OTHER, "**nomem2", 0);
            return 0;
        }

        rank_ct = ADIOI_cb_config_list_parse(fd->hints->cb_config_list,
                                             array, tmp_ranklist, fd->hints->cb_nodes);

        /* store the ranklist using the minimum amount of memory */
        if (rank_ct > 0) {
            fd->hints->ranklist = (int *) ADIOI_Malloc(sizeof(int) * rank_ct);
            memcpy(fd->hints->ranklist, tmp_ranklist, sizeof(int) * rank_ct);
        }
        ADIOI_Free(tmp_ranklist);
        fd->hints->cb_nodes = rank_ct;
        /* TEMPORARY -- REMOVE WHEN NO LONGER UPDATING INFO FOR FS-INDEP. */
        value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char));
        MPL_snprintf(value, MPI_MAX_INFO_VAL + 1, "%d", rank_ct);
        ADIOI_Info_set(fd->info, "cb_nodes", value);
        ADIOI_Free(value);
    }

    ADIOI_cb_bcast_rank_map(fd);
    if (fd->hints->cb_nodes <= 0) {
        *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
                                           myname, __LINE__, MPI_ERR_IO, "**ioagnomatch", 0);
        fd = ADIO_FILE_NULL;
    }
    return 0;
}