File: local_long.cc

package info (click to toggle)
lam 7.1.4-8
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 56,404 kB
  • sloc: ansic: 156,541; sh: 9,991; cpp: 7,699; makefile: 5,621; perl: 488; fortran: 260; asm: 83
file content (236 lines) | stat: -rw-r--r-- 6,792 bytes parent folder | download | duplicates (11)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
/*
 * Copyright (c) 2001-2003 The Trustees of Indiana University.  
 *                         All rights reserved.
 * Copyright (c) 1998-2001 University of Notre Dame. 
 *                         All rights reserved.
 * Copyright (c) 1994-1998 The Ohio State University.  
 *                         All rights reserved.
 * 
 * This file is part of the LAM/MPI software package.  For license
 * information, see the LICENSE file in the top level directory of the
 * LAM/MPI source distribution.
 * 
 * $HEADER$
 *
 * $Id: local_long.cc,v 1.23 2003/02/04 17:51:30 jsquyres Exp $
 *
 *	Function:	- maintains partially received long messages 
 *                        from IMPI hosts
 *                      - indexed on pk_drqid (IMPI_Uint8)
 *                      - when a message is fully received, it is moved to 
 *                        local_req.cc to send to the local LAM
 *                        
 */

#include <lam_config.h>
#if LAM_WANT_IMPI

// IRIX MIPSpro 7.30 compilers require that <new> comes first... @#$@#$!!!
#if LAM_CXX_NOTHROW_NEW
#include <new>
#endif
#include <iostream>
#include <map>

#include <mpi.h>
#include <laminternal.h>
#include <mpisys.h>
#include <impi-defs.h>
#include <impi.h>
#include <impid-cc.h>
#include <longbuf_mgmt.h>
#include <lamdebug-cc.h>

#include <typical.h>

using std::map;
using std::cerr;
using std::endl;


/*
 * local typedef for our map
 * Map indexed on IMPI_Uint8 (drqid)
 * The map contains a longbuf_mgmt which is the packet, some meta info,
 *     and the buffer that is being filled 
 */
typedef map<IMPI_Uint8, longbuf_mgmt*> local_long_map;


/*
 * private variables
 */
static local_long_map long_map;
static Debug debug(false);


/*
 *	local_long_init
 *
 *	Function:	- initialize local long map
 *
 *      Returns:        - 0 for success, LAMERROR on error
 */
int
local_long_init()
{
  // Don't need to do anything yet; just here for symmetry

  return 0;
}

/*
 *	local_expect_long
 *
 *	Function:	- get the drqid for this new long message
 *                      - call local_req_send to the lam node with the negative of
 *                        the tag 
 *                      - make the syncack for it and attach to the longbuf_mgmt
 *                      - put partially received buffer into map so that 
 *                        we can later receive the rest of it
 *                      - indexed on drqid
 *
 *	Accepts:	- ptr to IMPI_Packet header
 *                      - buffer with partially received data
 *                      - how many bytes already received (should always be 
 *                        IMPI_Pk_maxdatalen, but...)
 *                      - drqid to index on
 *      Returns:        - 0 for success, LAMERROR on error
 */
int
local_expect_long(IMPI_Packet* pk, char* buffer, int host, IMPI_Uint4 received)
{
  local_long_map::iterator i;
   
  IMPI_Uint8 *pdrqid = new IMPI_Uint8(get_new_drqid());
  IMPI_Uint8 msglen = pk->pk_msglen;

  // if the drqid is in the map already, it's an error

  i = long_map.find(*pdrqid);

  if (i == long_map.end()) {
    pk->pk_drqid = *pdrqid;
#if LAM_CXX_NOTHROW_NEW || !LAM_CXX_EXCEPTIONS
    longbuf_mgmt *lbm = new LAM_CXX_NOTHROW_NEW_ARG longbuf_mgmt(buffer, 
								 received, 
								 *pk);
    if (lbm == 0) {
      cerr << "LAM IMPI host not able to alloc long message placement holder" 
	   << endl;
      impi_bail(1);
    }
#else
    longbuf_mgmt *lbm;
    try {
      lbm = new longbuf_mgmt(buffer, received, *pk);
    } catch(...) {
      cerr << "LAM IMPI host not able to alloc long message placement holder" 
	   << endl;
      impi_bail(1);
    }
#endif

    IMPI_Packet *syncack = make_syncack(pk, *pdrqid);
    IMPI_Uint8 cid;

    // Post the "wait for ACK" before we send the ping.  When the ping
    // ACK is received from the destination LAM rank, the syncack will
    // be sent back to the remote IMPI host

    debug << "Waiting for long ACK from local LAM for drqid " 
	  << (int) *pdrqid << endl;
    local_expect_ack(*pdrqid, host, syncack);
    long_map[*pdrqid] = lbm;

    // send datasync "ping" to the lam node that a long msg is waiting

    cid = pk->pk_cid;
    pk->pk_cid = (IMPI_Uint8) lam_pt2impidatasync((int) cid);
    pk->pk_msglen = 1;
    debug << "Sending datasync drqid of " << *pdrqid << " to local LAM on cid "
	  << pk->pk_cid 
	  << endl;
    local_enqueue(pk, (char*) pdrqid, IMPI_TYPE_UINT8, 0, true, true);
    pk->pk_msglen = msglen;
    pk->pk_cid = cid;
    debug << "local_expect_long finished" << endl;
  }
  else
    return (LAMERROR);
  
  return 0;
}


/*
 *	local_long_midreceive
 *
 *	Function:	- locate the long msg in the local_long_map
 *                      - do a read on the host socket
 *                      - read the data directly into the msg buffer, 
 *                        appending onto what has already been read
 *                      - increment the bytes read by the packet length
 *                      - indexed on drqid
 *
 *	Accepts:	- drqid to index on
 *                      - host socket fd to read on
 *      Returns:        - 0 for success, LAMERROR on error
 */
int
local_long_midreceive(IMPI_Uint8 drqid, int fd, IMPI_Uint4 pk_len)
{
  int recvd;
  local_long_map::iterator i;
  longbuf_mgmt *ret = 0;
  IMPI_Uint4 bytes_read = 0;
  IMPI_Uint8 rem_len;
  IMPI_Packet *pk;

  // Locate the longbuf_mgmt in the local_long_map

  i = long_map.find(drqid);
  if (i != long_map.end()) {
    ret = (*i).second;
    pk = ret->get_packet();
    recvd =  ret->get_received();
    rem_len = pk->pk_msglen - recvd;  // the size of the remainder of the msg

    if (pk_len > rem_len) {  // reading more than the buff's length
      cerr << "LAM IMPI tried to received a message that exceeded its pk_msglen of " 
	   << (unsigned int)  pk->pk_msglen  <<  " on socket  "  << fd  << endl;
      impi_bail(1);
    }
    if ((pk_len < IMPI_Pk_maxdatalen) && (pk_len != rem_len))
      cerr << "WARNING:  LAM IMPI receiving a middle packet of a long msg that's shorter than IMPI_Pk_maxdatalen "
	   << IMPI_Pk_maxdatalen << endl;

    // Now read in the data from the socket.

    bytes_read = mread(fd, ret->get_buffer() + recvd, pk_len);
    if (bytes_read != pk_len) {  
      cerr << "LAM IMPI got a error reading on socket  " << fd 
	   << endl;
      impi_bail(1);
    }
    ret->inc_received(bytes_read);
    if (ret->get_received() == pk->pk_msglen) {  // End of the long msg

      // send the long msg to lam and delete it from the map.
      // Ownership of the longbug_mgmt now passes to the local send
      // function, who will delete it later.

      local_enqueue_finish_long(ret);
      long_map.erase(i);
    }
  }
  else {
    cerr << "LAM IMPI host received an invalid drqid" << endl;
    return (LAMERROR);
  }
  return 0;
}



#endif