1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
|
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* contains the data structure we will use to describe a message
*/
#ifndef _MCA_OOB_TCP_MESSAGE_H_
#define _MCA_OOB_TCP_MESSAGE_H_
#include "opal/class/opal_list.h"
#include "orte/mca/oob/oob.h"
#include "oob_tcp_peer.h"
#include "oob_tcp_hdr.h"
#include <errno.h>
#include "opal/util/output.h"
#include "orte/mca/ns/ns_types.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
struct mca_oob_tcp_peer_t;
#define MCA_OOB_TCP_IOV_MAX 16
typedef enum { MCA_OOB_TCP_POSTED, MCA_OOB_TCP_UNEXPECTED } mca_oob_tcp_type_t;
/**
* describes each message being progressed.
*/
struct mca_oob_tcp_msg_t {
opal_list_item_t super; /**< allow this item to be put on a list */
mca_oob_tcp_type_t msg_type; /**< posted receive or unexpected */
int msg_flags; /**< flags to send/recv */
int msg_rc; /**< the return code for the send/recv (amount sent/recvd or errno) */
mca_oob_tcp_hdr_t msg_hdr; /**< header used to convey message properties to peer */
struct iovec* msg_uiov; /**< the user supplied iovec array */
int msg_ucnt; /**< the number of items in the user iovec array */
struct iovec * msg_rwiov; /**< copy of iovec array - not data */
struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */
int msg_rwnum; /**< number of iovecs left for read/write */
int msg_rwcnt; /**< total number of iovecs for read/write */
void* msg_rwbuf; /**< optional buffer for send/recv */
mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/receive */
void * msg_cbdata; /**< the data for the callback fnuction */
bool msg_complete; /**< whether the message is done sending or not */
orte_process_name_t msg_peer; /**< the name of the peer */
opal_mutex_t msg_lock; /**< lock for the condition variable */
opal_condition_t msg_condition; /**< condition variable for completion */
struct iovec msg_iov[MCA_OOB_TCP_IOV_MAX]; /** preallocate space for iovec array */
};
/**
* Convenience typedef
*/
typedef struct mca_oob_tcp_msg_t mca_oob_tcp_msg_t;
OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
/**
* Get a new structure for use with a message
*/
#define MCA_OOB_TCP_MSG_ALLOC(msg, rc) \
{ \
opal_list_item_t* item; \
OPAL_FREE_LIST_GET(&mca_oob_tcp_component.tcp_msgs, item, rc); \
msg = (mca_oob_tcp_msg_t*)item; \
}
/**
* return a message structure that is no longer needed
*/
#define MCA_OOB_TCP_MSG_RETURN(msg) \
{ \
/* frees the iovec allocated during the send/receive */ \
if(NULL != msg->msg_rwiov) \
mca_oob_tcp_msg_iov_return(msg,msg->msg_rwiov); \
if(NULL != msg->msg_rwbuf) \
free(msg->msg_rwbuf); \
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_msgs, (opal_list_item_t*)msg); \
}
/**
* Wait for a msg to complete.
* @param msg (IN) Message to wait on.
* @param size (OUT) Number of bytes delivered.
* @retval ORTE_SUCCESS or error code on failure.
*/
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);
/**
* Wait - up to a timeout - for a msg to complete.
* @param msg (IN) Message to wait on.
* @param size (OUT) Number of bytes delivered.
* @retval ORTE_SUCCESS or error code on failure.
*/
int mca_oob_tcp_msg_timedwait(mca_oob_tcp_msg_t* msg, int* size, struct timespec* ts);
/**
* Signal that a message has completed. Wakes up any pending threads (for blocking send)
* or invokes callbacks for non-blocking case.
* @param msg (IN) Message send/recv that has completed.
* @param peer (IN) The peer the send/receive was from
* @retval ORTE_SUCCESS or error code on failure.
*/
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, orte_process_name_t * peer);
/**
* Called to copy the results of a message into user supplied iovec array.
* @param msg (IN) Message send that is in progress.
* @param iov (IN) Iovec array of user supplied buffers.
* @retval count Number of elements in iovec array.
*/
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count);
/**
* Called asynchronously to progress sending a message from the event library thread.
* @param msg (IN) Message send that is in progress.
* @param peer (IN) Peer we are sending to.
* @retval Number of bytes copied.
*/
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
/**
* Called asynchronously to progress sending a message from the event library thread.
* @param msg (IN) Message send that is in progress.
* @param peer (IN) Peer theat we are recieving from.
* @retval bool Bool flag indicating wether operation has completed.
*/
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
/**
* The message has been completely received - so attempt to match
* against posted recvs.
*/
void mca_oob_tcp_msg_recv_complete(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t* peer);
/**
* Match name to a message that has been received asynchronously (unexpected).
*
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
* @param tag (IN) Message tag.
* @return msg Matched message or NULL.
*
* Note - this routine requires the caller to be holding the module lock.
*/
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(orte_process_name_t* name, int tag);
/**
* Match name to a posted recv request.
*
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
* @param tag (IN) Message tag.
* @return msg Matched message or NULL.
*
* Note - this routine requires the caller to be holding the module lock.
*/
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(orte_process_name_t* name, int tag);
/**
* Allocate space for iovec array - if the request number of elements is less than
* MCA_OOB_TCP_IOV_MAX then use the array allocated along w/ the message - otherwise
* allocate count elements.
*
* @param msg (IN) Message to allocate array.
* @param count (IN) the number of iovecs
* @return Array of iovec elements.
*
*/
static inline struct iovec* mca_oob_tcp_msg_iov_alloc(mca_oob_tcp_msg_t* msg, int count)
{
if(count <= MCA_OOB_TCP_IOV_MAX)
return msg->msg_iov;
return (struct iovec *)malloc(sizeof(struct iovec) * count);
}
/**
* Release resource held by iovec array if this is not part of the message.
*
* @param msg (IN) Message to allocate array.
* @param iov (IN) Iovec array to return.
*
*/
static inline void mca_oob_tcp_msg_iov_return(mca_oob_tcp_msg_t* msg, struct iovec* iov)
{
if(iov != msg->msg_iov)
free(iov);
}
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */
|