File: oob_tcp_msg.h

package info (click to toggle)
openmpi 1.1-2.3
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 39,124 kB
  • ctags: 22,534
  • sloc: ansic: 216,698; sh: 22,541; makefile: 6,921; cpp: 5,562; asm: 3,160; lex: 375; objc: 365; perl: 347; csh: 89; tcl: 12; f90: 5
file content (217 lines) | stat: -rw-r--r-- 7,825 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/*
 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2005 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */
/** @file:
 * 
 * contains the data structure we will use to describe a message
 */

#ifndef _MCA_OOB_TCP_MESSAGE_H_
#define _MCA_OOB_TCP_MESSAGE_H_

#include "opal/class/opal_list.h"
#include "orte/mca/oob/oob.h"
#include "oob_tcp_peer.h"
#include "oob_tcp_hdr.h"
#include <errno.h>
#include "opal/util/output.h"
#include "orte/mca/ns/ns_types.h"

#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
struct mca_oob_tcp_peer_t;

#define MCA_OOB_TCP_IOV_MAX  16

typedef enum { MCA_OOB_TCP_POSTED, MCA_OOB_TCP_UNEXPECTED } mca_oob_tcp_type_t;


/**
 * describes each message being progressed.
 */
struct mca_oob_tcp_msg_t {
    opal_list_item_t      super;         /**< allow this item to be put on a list */
    mca_oob_tcp_type_t    msg_type;      /**< posted receive or unexpected */
    int                   msg_flags;     /**< flags to send/recv */
    int                   msg_rc;        /**< the return code for the send/recv (amount sent/recvd or errno) */
    mca_oob_tcp_hdr_t     msg_hdr;       /**< header used to convey message properties to peer */
    struct iovec*         msg_uiov;      /**< the user supplied iovec array */
    int                   msg_ucnt;      /**< the number of items in the user iovec array */
    struct iovec *        msg_rwiov;     /**< copy of iovec array - not data */
    struct iovec *        msg_rwptr;     /**< current read/write pointer into msg_iov */
    int                   msg_rwnum;     /**< number of iovecs left for read/write */
    int                   msg_rwcnt;     /**< total number of iovecs for read/write */
    void*                 msg_rwbuf;     /**< optional buffer for send/recv */
    mca_oob_callback_fn_t msg_cbfunc;    /**< the callback function for the send/receive */    
    void *                msg_cbdata;    /**< the data for the callback fnuction */
    bool                  msg_complete;  /**< whether the message is done sending or not */
    orte_process_name_t   msg_peer;      /**< the name of the peer */
    opal_mutex_t          msg_lock;      /**< lock for the condition variable */
    opal_condition_t      msg_condition; /**< condition variable for completion */
    struct iovec          msg_iov[MCA_OOB_TCP_IOV_MAX];  /** preallocate space for iovec array */
};

/**
 * Convenience typedef
 */
typedef struct mca_oob_tcp_msg_t mca_oob_tcp_msg_t;

OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);

/**
 * Get a new structure for use with a message
 */
#define MCA_OOB_TCP_MSG_ALLOC(msg, rc) \
    { \
    opal_list_item_t* item; \
    OPAL_FREE_LIST_GET(&mca_oob_tcp_component.tcp_msgs, item, rc); \
    msg = (mca_oob_tcp_msg_t*)item; \
    }

/**
 * return a message structure that is no longer needed
 */
#define MCA_OOB_TCP_MSG_RETURN(msg) \
    { \
    /* frees the iovec allocated during the send/receive */ \
    if(NULL != msg->msg_rwiov) \
        mca_oob_tcp_msg_iov_return(msg,msg->msg_rwiov); \
    if(NULL != msg->msg_rwbuf) \
        free(msg->msg_rwbuf); \
    OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_msgs, (opal_list_item_t*)msg); \
    }

/**
 *  Wait for a msg to complete.
 *  @param  msg (IN)     Message to wait on.
 *  @param  size (OUT)   Number of bytes delivered.
 *  @retval ORTE_SUCCESS or error code on failure.
 */
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);

/**
 *  Wait - up to a timeout - for a msg to complete.
 *  @param  msg (IN)     Message to wait on.
 *  @param  size (OUT)   Number of bytes delivered.
 *  @retval ORTE_SUCCESS or error code on failure.
 */
int mca_oob_tcp_msg_timedwait(mca_oob_tcp_msg_t* msg, int* size, struct timespec* ts);

/**
 *  Signal that a message has completed. Wakes up any pending threads (for blocking send)
 *  or invokes callbacks for non-blocking case.
 *  @param  msg (IN)   Message send/recv that has completed.
 *  @param  peer (IN)  The peer the send/receive was from
 *  @retval ORTE_SUCCESS or error code on failure.
 */
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, orte_process_name_t * peer);

/**
 *  Called to copy the results of a message into user supplied iovec array.
 *  @param  msg (IN)   Message send that is in progress. 
 *  @param  iov (IN)   Iovec array of user supplied buffers.
 *  @retval count      Number of elements in iovec array.
 */

int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count);

/**
 *  Called asynchronously to progress sending a message from the event library thread.
 *  @param  msg (IN)   Message send that is in progress. 
 *  @param  peer (IN)  Peer we are sending to.
 *  @retval            Number of bytes copied.
 */
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);

/**
 *  Called asynchronously to progress sending a message from the event library thread.
 *  @param  msg (IN)   Message send that is in progress. 
 *  @param  peer (IN)  Peer theat we are recieving from.
 *  @retval bool       Bool flag indicating wether operation has completed.
 */

bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);

/**
 * The message has been completely received - so attempt to match
 * against posted recvs.
 */

void mca_oob_tcp_msg_recv_complete(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t* peer);

/**
 *  Match name to a message that has been received asynchronously (unexpected).
 *
 *  @param  name (IN)  Name associated with peer or wildcard to match first posted recv.
 *  @param  tag (IN)   Message tag. 
 *  @return msg        Matched message or NULL.
 *
 *  Note - this routine requires the caller to be holding the module lock.
 */

mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(orte_process_name_t* name, int tag);

/**
 *  Match name to a posted recv request.
 *
 *  @param  name (IN)  Name associated with peer or wildcard to match first posted recv.
 *  @param  tag (IN)   Message tag. 
 *  @return msg        Matched message or NULL.
 *
 *  Note - this routine requires the caller to be holding the module lock.
 */

mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(orte_process_name_t* name, int tag);

/**
 *  Allocate space for iovec array - if the request number of elements is less than
 *  MCA_OOB_TCP_IOV_MAX then use the array allocated along w/ the message - otherwise
 *  allocate count elements.
 *
 *  @param  msg (IN)  Message to allocate array.
 *  @param  count (IN) the number of iovecs
 *  @return           Array of iovec elements.
 *
 */
static inline struct iovec* mca_oob_tcp_msg_iov_alloc(mca_oob_tcp_msg_t* msg, int count)
{
    if(count <= MCA_OOB_TCP_IOV_MAX) 
        return msg->msg_iov;
    return (struct iovec *)malloc(sizeof(struct iovec) * count);
}


/**
 *  Release resource held by iovec array if this is not part of the message.
 *
 *  @param  msg (IN)  Message to allocate array.
 *  @param  iov (IN)  Iovec array to return.
 *
 */

static inline void mca_oob_tcp_msg_iov_return(mca_oob_tcp_msg_t* msg, struct iovec* iov)
{
    if(iov != msg->msg_iov)
        free(iov);
}

#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */