1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
|
// $Id: server.cpp 91671 2010-09-08 18:39:23Z johnnyw $
// server.cpp (written by Tim Harrison)
// Listens to multicast address for client log messages. Prints
// Mbits/sec received from client.
#include "ace/OS_main.h"
#include "ace/SOCK_Dgram.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "ace/Reactor.h"
#include "ace/Log_Msg.h"
#include "Log_Wrapper.h"
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_unistd.h"
#include "ace/os_include/os_netdb.h"
#if defined (ACE_HAS_IP_MULTICAST)
class Server_Events : public ACE_Event_Handler
{
public:
Server_Events (u_short port,
const char *mcast_addr,
long time_interval = 0);
~Server_Events (void);
virtual int handle_input (ACE_HANDLE fd);
virtual int handle_timeout (const ACE_Time_Value &tv,
const void *arg);
virtual ACE_HANDLE get_handle (void) const;
ACE_Time_Value *wait_time (void);
private:
char *message_;
Log_Wrapper::Log_Record *log_record_;
char buf_[4 * BUFSIZ];
char hostname_[MAXHOSTNAMELEN];
int initialized_;
int count_;
long interval_;
// time interval to log messages
ACE_Time_Value *how_long_;
ACE_Reactor *reactor_;
ACE_SOCK_Dgram_Mcast mcast_dgram_;
ACE_INET_Addr remote_addr_;
ACE_INET_Addr mcast_addr_;
// = statistics on messages received
double total_bytes_received_;
int total_messages_received_;
int last_sequence_number_;
};
static const char MCAST_ADDR[] = ACE_DEFAULT_MULTICAST_ADDR;
static const int UDP_PORT = ACE_DEFAULT_MULTICAST_PORT;
static const int DURATION = 5;
ACE_HANDLE
Server_Events::get_handle (void) const
{
return this->mcast_dgram_.get_handle ();
}
ACE_Time_Value *
Server_Events::wait_time (void)
{
return this->how_long_;
}
Server_Events::Server_Events (u_short port,
const char *mcast_addr,
long time_interval)
: initialized_ (0),
count_ (1),
interval_ (time_interval),
mcast_addr_ (port, mcast_addr),
total_bytes_received_ (0)
{
// Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group.
if (ACE_OS::hostname (this->hostname_,
MAXHOSTNAMELEN) == -1)
ACE_ERROR ((LM_ERROR,
"%p\n",
"hostname"));
else if (this->mcast_dgram_.join (this->mcast_addr_) == -1)
ACE_ERROR ((LM_ERROR,
"%p\n",
"join"));
else
{
// Point to NULL so that we block in the beginning.
this->how_long_ = 0;
this->log_record_ = (Log_Wrapper::Log_Record *) &buf_;
this->message_ = &buf_[sizeof (Log_Wrapper::Log_Record)];
}
}
// A destructor that emacs refuses to color blue ;-)
Server_Events::~Server_Events (void)
{
this->mcast_dgram_.leave (this->mcast_addr_);
ACE_DEBUG ((LM_DEBUG,
"total bytes received = %d after %d second\n",
this->total_bytes_received_,
this->interval_));
ACE_DEBUG ((LM_DEBUG,
"Mbits/sec = %.2f\n",
(float) (total_bytes_received_ * 8 / (float) (1024*1024*interval_))));
ACE_DEBUG ((LM_DEBUG,
"last sequence number = %d\ntotal messages received = %d\ndiff = %d\n",
this->last_sequence_number_,
this->total_messages_received_,
this->last_sequence_number_ - total_messages_received_));
}
int
Server_Events::handle_timeout (const ACE_Time_Value &,
const void *arg)
{
ACE_DEBUG ((LM_DEBUG, "\t%d timeout%s occurred for %s.\n",
this->count_,
this->count_ == 1 ? "" : "s",
(char *) arg));
// Don't let the timeouts continue if there's no activity since
// otherwise we use up a lot of CPU time unnecessarily.
if (this->count_ == 5)
{
reactor ()->cancel_timer (this);
this->initialized_ = 0;
ACE_DEBUG ((LM_DEBUG,
"\tcancelled timeout for %s to avoid busy waiting.\n",
(char *) arg));
}
this->count_++;
return 0;
}
int
Server_Events::handle_input (ACE_HANDLE)
{
// Receive message from multicast group.
iovec iovp[2];
iovp[0].iov_base = buf_;
iovp[0].iov_len = sizeof (log_record_);
iovp[1].iov_base = &buf_[sizeof (log_record_)];
iovp[1].iov_len = 4 * BUFSIZ - sizeof (log_record_);
ssize_t retcode =
this->mcast_dgram_.recv (iovp,
2,
this->remote_addr_);
if (retcode != -1)
{
total_messages_received_++;
total_bytes_received_ += retcode;
last_sequence_number_ =
ACE_NTOHL (log_record_->sequence_number);
for (char *message_end = this->message_ + ACE_OS::strlen (this->message_) - 1;
ACE_OS::strchr ("\r\n \t", *message_end) != 0;
)
{
*message_end-- = '\0';
if (message_end == this->message_)
break;
}
ACE_DEBUG ((LM_DEBUG,
"sequence number = %d\n",
last_sequence_number_));
ACE_DEBUG ((LM_DEBUG,
"message = '%s'\n",
this->message_));
if (this->initialized_ == 0)
{
// Restart the timer since we've received events again.
if (reactor()->schedule_timer (this,
(void *) this->hostname_,
ACE_Time_Value::zero,
ACE_Time_Value (DURATION)) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"schedule_timer"),
-1);
this->initialized_ = 1;
}
this->count_ = 1;
return 0;
}
else
return -1;
}
int
ACE_TMAIN (int, ACE_TCHAR *[])
{
// Instantiate a server which will receive messages for DURATION
// seconds.
Server_Events server_events (UDP_PORT,
MCAST_ADDR,
DURATION);
// Instance of the ACE_Reactor.
ACE_Reactor reactor;
if (reactor.register_handler (&server_events,
ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR ((LM_ERROR,
"%p\n%a",
"register_handler",
1));
ACE_DEBUG ((LM_DEBUG,
"starting up server\n"));
for (;;)
reactor.handle_events (server_events.wait_time ());
ACE_NOTREACHED (return 0;)
}
#else
int
ACE_TMAIN(int, ACE_TCHAR *argv[])
{
ACE_ERROR_RETURN ((LM_ERROR,
"error: %s must be run on a platform that support IP multicast\n",
argv[0]), -1);
}
#endif /* ACE_HAS_IP_MULTICAST */
|