1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
/*
* dmucs_msg.cc: code to parse a packet coming into the DMUCS server.
*
* Copyright (C) 2005, 2006 Victor T. Norman
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
* Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "dmucs.h"
#include "dmucs_msg.h"
#include "dmucs_db.h"
#include <exception>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
extern std::string hostsInfoFile;
class DmucsBadMsg : public std::exception {};
DmucsMsg *
DmucsMsg::parseMsg(Socket *sock, const char *buffer)
{
struct in_addr clientIp;
clientIp.s_addr = Speeraddr(sock);
char dpropstr[DPROP_MAX_STRLEN + 1];
dpropstr[0] = '\0'; // empty string
/*
* The first word in the buffer must be one of: "host", "load",
* "status", or "monitor".
*/
if (strncmp(buffer, "host", 4) == 0) {
/* The string is "host <clientIpAddr> [<typeStr>]" where the
typeStr is an optional string that is the distinguishing property
of the host the client wants. */
char cliIpStr[64];
int res = sscanf(buffer, "host %s %s", cliIpStr, dpropstr);
if (res != 2 && res != 1) {
fprintf(stderr, "Got a bad host request message ->%s<--\n",buffer);
return NULL;
}
return new DmucsHostReqMsg(clientIp, dpropstr);
} else if (strncmp(buffer, "load", 4) == 0) {
/* The buffer must hold:
* load <host-IP-address> <3 floating pt numbers>
* followed by an optional <dprop>.
*/
char machname[64];
float ldavg1, ldavg5, ldavg10;
if (sscanf(buffer, "load %s %f %f %f %s", machname, &ldavg1,
&ldavg5, &ldavg10, dpropstr) != 5) {
if (sscanf(buffer, "load %s %f %f %f", machname, &ldavg1,
&ldavg5, &ldavg10) != 4) {
fprintf(stderr, "Got a bad load avg msg!!!\n");
return NULL;
}
}
struct in_addr host;
host.s_addr = inet_addr(machname);
DMUCS_DEBUG((stderr, "host %s: ldAvg1 %2.2f, ldAvg5 %2.2f, "
"ldAvg10 %2.2f, dprop '%s'\n",
machname, ldavg1, ldavg5, ldavg10, dpropstr));
return new DmucsLdAvgMsg(clientIp, host,
ldavg1, ldavg5, ldavg10, dpropstr);
} else if (strncmp(buffer, "status", 6) == 0) {
/* The buffer must hold:
* status <host-IP-address> up|down [<dprop>]
* NOTE: the host-IP-address MUST be in "dot-notation".
*/
char machname[64];
char state[10];
if (sscanf(buffer, "status %s %s %s", machname, state, dpropstr)!= 3){
if (sscanf(buffer, "status %s %s", machname, state) != 2) {
fprintf(stderr, "Got a bad status msg!!!\n");
return NULL;
}
}
fprintf(stderr, "machname %s, state %s, dprop '%s'\n",
machname, state, dpropstr);
struct in_addr host;
host.s_addr = inet_addr(machname);
host_status_t status = STATUS_UNAVAILABLE;
if (strncmp(state, "up", 2) == 0) {
status = STATUS_AVAILABLE;
} else if (strncmp(state, "down", 4) == 0) {
status = STATUS_UNAVAILABLE;
} else {
fprintf(stderr, "got unknown state %s\n", state);
}
return new DmucsStatusMsg(clientIp, host, status, dpropstr);
} else if (strncmp(buffer, "monitor", 7) == 0) {
return new DmucsMonitorReqMsg(clientIp, dpropstr);
}
fprintf(stderr, "request not recognized: ->%s<-\n", buffer);
return NULL;
}
void
DmucsHostReqMsg::handle(Socket *sock, const char *buf)
{
DMUCS_DEBUG((stderr, "Got host request: -->%s<--\n", buf));
DmucsDb *db = DmucsDb::getInstance();
unsigned int cpuIpAddr = 0;
try {
cpuIpAddr = db->getBestAvailCpu(dprop_);
std::string resolved_name =
DmucsHost::resolveIp2Name(cpuIpAddr, dprop_);
fprintf(stderr, "Giving out %s\n", resolved_name.c_str());
db->assignCpuToClient(cpuIpAddr, dprop_, (intptr_t)sock);
#if 0
fprintf(stderr, "The databases are now:\n");
db->dump();
#endif
} catch (DmucsNoMoreHosts &e) {
/* getBestAvailCpu() might return 0, when there are
no more available CPUs. We send 0.0.0.0 to the client
but we don't record it as an assigned cpu. */
fprintf(stderr, "!!!!! Out of hosts in db \"%s\" !!!!!\n",
dprop2cstr(dprop_));
} catch (...) {
fprintf(stderr, "!!!!! Some other error: %s!!!!!\n",
strerror(errno));
// Send 0.0.0.0 to the client.
}
struct in_addr c;
c.s_addr = cpuIpAddr;
Sputs(inet_ntoa(c), sock);
}
void
DmucsLdAvgMsg::handle(Socket *sock, const char *buf)
{
DmucsDb *db = DmucsDb::getInstance();
DMUCS_DEBUG((stderr, "Got load average mesg\n"));
try {
DmucsHost *host = db->getHost(host_, dprop_);
host->updateTier(ldAvg1_, ldAvg5_, ldAvg10_);
/* If the host hasn't been explicitly made unavailable,
then make it available. If the host is overloaded
but isn't anymore, then make it available. */
if (host->isSilent() ||
(host->isOverloaded() && host->getTier() != 0)) {
host->avail(); // make sure the host is available
}
} catch (DmucsHostNotFound &e) {
DmucsHost *h = DmucsHost::createHost(host_, dprop_, hostsInfoFile);
h->updateTier(ldAvg1_, ldAvg5_, ldAvg10_);
fprintf(stderr, "New host available: %s/%d, tier %d, type %s\n",
h->getName().c_str(), h->getNumCpus(), h->getTier(),
dprop2cstr(dprop_));
} catch (...) {
}
removeFd(sock);
}
void
DmucsStatusMsg::handle(Socket *sock, const char *buf)
{
DmucsDb *db = DmucsDb::getInstance();
if (status_ == STATUS_AVAILABLE) {
if (db->haveHost(host_, dprop_)) {
/* Make it available (if it wasn't). */
db->getHost(host_, dprop_)->avail();
} else {
/* A new host is available! */
DMUCS_DEBUG((stderr, "Creating new host %s, type %s\n",
inet_ntoa(host_), dprop2cstr(dprop_)));
DmucsHost::createHost(host_, dprop_, hostsInfoFile);
}
} else { // status is unavailable.
db->getHost(host_, dprop_)->unavail();
}
removeFd(sock);
}
void
DmucsMonitorReqMsg::handle(Socket *sock, const char *buf)
{
std::string str = DmucsDb::getInstance()->serialize();
Sputs((char *) str.c_str(), sock);
removeFd(sock);
}
|