1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
|
/******************************************************************************
Simple NORM_OBJECT_STREAM sender example app using the NORM API
USAGE:
normSendStream
BUILD (Unix):
g++ -o normStreamSend normStreamSend.cpp -D_FILE_OFFSET_BITS=64 -I../include/ \
../lib/libnorm.a ../protolib/lib/libProtokit.a -lpthread
(for MacOS/BSD, add "-lresolv")
(for Solaris, add "-lnsl -lsocket -lresolv")
******************************************************************************/
// Notes:
// 1) A series of text messages are sent over a stream
#include "normApi.h" // for NORM API
#include <stdio.h> // for printf(), etc
#include <stdlib.h> // for srand()
#include <string.h> // for strrchr()
#include <sys/time.h> // for gettimeofday()
#include <arpa/inet.h> // for htons()
int main(int argc, char* argv[])
{
// 0) Default parameter values
const int MSG_COUNT_MAX = 10;
const unsigned int MSG_LENGTH_MIN = 40;
const unsigned int MSG_LENGTH_MAX = 40;
UINT32 streamBufferSize = 4*1024*1024; // 1 Mbyte stream buffer size
double normRate = 1.0e+07; // 10 Mbps default NORM tx rate for fixed rate operation (bits/sec units here)
double msgRate = -1.0; //1e+06; // 32 kbits/sec default message rate
// 1) Create a NORM API "NormInstance"
NormInstanceHandle instance = NormCreateInstance();
// 2) Create a NormSession using default "automatic" local node id (based on IP addr)
// TBD - add an option to set a specific NormNodeId
NormSessionHandle session = NormCreateSession(instance,
"224.1.2.3",
6003,
1);//NORM_NODE_ANY);
// NOTE: These are some debugging routines available
// (not necessary for normal app use)
NormSetDebugLevel(3);
// Uncomment to turn on debug NORM message tracing
NormSetMessageTrace(session, true);
// Uncomment to turn on some random packet loss
//NormSetTxLoss(session, 25.0); // 25% packet loss for testing purposes
struct timeval currentTime;
gettimeofday(¤tTime, NULL);
// Uncomment to get different packet loss patterns from run to run
// (and a different sender sessionId)
srand(currentTime.tv_sec); // seed random number generator
// 3) Set transmission rate
NormSetTxRate(session, normRate); // in bits/second
//NormSetFlowControl(session, 0.0);
// Init GRTT to low value (3 msec)
//NormSetGrttEstimate(session, 1.0e-03);
// Disable receiver backoffs (for lower latency, high speed performance)
// (For large group sizes, the default backoff factor is RECOMMENDED)
NormSetBackoffFactor(session, 2.0);
// Uncomment to use a _specific_ transmit port number
// (Can be the same as session port (rx port), but this
// is _not_ recommended when unicast feedback may be
// possible! - must be called _before_ NormStartSender())
//NormSetTxPort(session, 6001);
// Uncomment to enable TCP-friendly congestion control
//NormSetCongestionControl(session, true);
// Uncomment to enable rx port reuse (this plus unique NormNodeId's enables same-machine send/recv)
NormSetRxPortReuse(session, true);
// 4) Start the sender using a random "sessionId"
NormSessionId sessionId = (NormSessionId)rand();
NormStartSender(session, sessionId, 4*1024*1024, 1300, 64, 16);
// Uncomment to set large tx socket buffer size
// (may be needed to achieve very high packet output rates)
//NormSetTxSocketBuffer(session, 512000);
// 5) Enqueue the NORM_OBJECT_STREAM object
// Provide some "info" about this stream (the info is OPTIONAL)
char dataInfo[256];
sprintf(dataInfo, "NORM_OBJECT_STREAM message stream ...");
NormObjectHandle stream = NormStreamOpen(session, streamBufferSize, dataInfo, strlen(dataInfo) + 1);
if (NORM_OBJECT_INVALID == stream)
{
fprintf(stderr, "normStreamSend NormStreamOpen() error!\n");
return -1;
}
// 6) Write the first stream message
// (we enqueue text strings of random length as messages)
// ( a 2-byte network byte order length "header" is in each message)
unsigned int msgCount = 0;
char data = 'a';
char msgData[MSG_LENGTH_MAX];
UINT16 msgLen = MSG_LENGTH_MIN + (rand() % (MSG_LENGTH_MAX - MSG_LENGTH_MIN + 1));
// set 2 byte message header (length in network byte order)
UINT16 msgHeader = htons(msgLen);
memcpy(msgData, &msgHeader, 2); // 2-byte message length "header"
memset(msgData + 2, data, msgLen - 3); // n-byte message content
msgData[msgLen - 1] = '\0'; // 1-byte NULL-termination
// Write the message (as much as stream buffer will accept)
unsigned int bytesWritten = NormStreamWrite(stream, msgData, msgLen);
bool vacancy = (bytesWritten == msgLen);
// Initialize the "delayTime" used in "select()" loop below
// based on whether message was competely written to stream
// (i.e. wait according to "msgRate" (configured bytes per second))
double delayTime;
if (vacancy)
{
// Complete message was written, wait msg interval time
NormStreamMarkEom(stream);
msgCount++;
delayTime = (msgRate > 0.0) ? ((double)msgLen / msgRate) : 0.0;
}
else
{
// wait indefinitely for NORM_TX_QUEUE_VACANCY event
// to finish writing current message to stream
delayTime = -1.0;
}
// 6) We keep a running "timeAccumulator" value to maintain the proper
// _average_ message transmission rate. (TBD - impose "max" accumulation limit)
double timeAccumulator = 0.0;
struct timeval lastTime;
gettimeofday(&lastTime, NULL);
// 7) We use a "select()" call to wait for NORM events or message interval timeout
int normfd = NormGetDescriptor(instance);
fd_set fdset;
struct timeval timeout;
// 6) Enter NORM event loop
bool keepGoing = true;
while (keepGoing)
{
FD_SET(normfd, &fdset);
struct timeval* timeoutPtr;
if (delayTime < 0.0)
{
timeoutPtr = NULL; // wait indefinitely (i.e. for queue vacancy)
}
else
{
if (delayTime > timeAccumulator)
delayTime -= timeAccumulator;
else
delayTime = 0.0;
timeout.tv_sec = (unsigned long)delayTime;
timeout.tv_usec = (unsigned long)(1.0e+06 * (delayTime - (double)timeout.tv_sec));
timeoutPtr = &timeout;
}
int result = select(normfd+1, &fdset, NULL, NULL, timeoutPtr);
bool keepSending = true;
if ((MSG_COUNT_MAX > 0) && (msgCount >= (unsigned int)MSG_COUNT_MAX))
keepSending = false;
if (result > 0)
{
// Get and handle NORM API event
NormEvent theEvent;
if (NormGetNextEvent(instance, &theEvent))
{
switch (theEvent.type)
{
case NORM_TX_QUEUE_EMPTY:
case NORM_TX_QUEUE_VACANCY:
{
/*
if (NORM_TX_QUEUE_VACANCY == theEvent.type)
fprintf(stderr, "normStreamSend: NORM_TX_QUEUE_VACANCY event ...\n");
else
fprintf(stderr, "normStreamSend: NORM_TX_QUEUE_EMPTY event ...\n");
*/
if (keepSending && (bytesWritten < msgLen))
{
// Finish writing remaining pending message content (as much as can be written)
bytesWritten += NormStreamWrite(stream, msgData + bytesWritten, msgLen - bytesWritten);
if (bytesWritten == msgLen)
{
// Complete message was written, wait msg interval time
NormStreamMarkEom(stream);
msgCount++;
delayTime = (msgRate > 0.0) ? ((double)msgLen / msgRate) : 0.0;
vacancy = true;
}
}
break;
}
case NORM_TX_OBJECT_PURGED:
fprintf(stderr, "normStreamSend: NORM_TX_OBJECT_PURGED event ...\n");
break;
case NORM_TX_FLUSH_COMPLETED:
fprintf(stderr, "normStreamSend: NORM_TX_FLUSH_COMPLETED event ...\n");
break;
case NORM_GRTT_UPDATED:
fprintf(stderr, "normStreamSend: NORM_GRTT_UPDATED event ...\n");
break;
default:
fprintf(stderr, "normStreamSend: Got event type: %d\n", theEvent.type);
} // end switch(theEvent.type)
} // end if (NormGetNextEvent())
}
else if (result < 0)
{
// select() error
perror("normStreamSend: select() error");
break;
}
// This code writes _new_ message(s) to the stream _if_ there is "vacancy"
// and it is time based on "msgRate" and how much time has passed since "lastTime"
struct timeval currentTime;
gettimeofday(¤tTime, NULL);
double timeDelta = (double)(currentTime.tv_sec - lastTime.tv_sec);
if (currentTime.tv_usec > lastTime.tv_usec)
timeDelta += 1.0e-06 * (currentTime.tv_usec - lastTime.tv_usec);
else
timeDelta -= 1.0e-06 * (lastTime.tv_usec - currentTime.tv_usec);
timeAccumulator += timeDelta;
while (keepSending && vacancy && (timeAccumulator > delayTime))
{
timeAccumulator -= delayTime; // subtract last message tx duration from accumulator
// Fill buffer with new message "data" text character (a-z)
if (++data > 'z') data = 'a';
msgLen = MSG_LENGTH_MIN + (rand() % (MSG_LENGTH_MAX - MSG_LENGTH_MIN + 1));
// set 2 byte message header (length in network byte order)
msgHeader = htons(msgLen);
memcpy(msgData, &msgHeader, 2); // 2-byte message length "header"
memset(msgData+2, data, msgLen-3); // n-byte message content
msgData[msgLen - 1] = '\0'; // 1-byte NULL-termination
bytesWritten = NormStreamWrite(stream, msgData, msgLen);
if (bytesWritten < msgLen)
{
// wait indefinitely for NORM_TX_QUEUE_VACANCY event
// to finish writing current message to stream
vacancy = false;
delayTime = -1.0;
//fprintf(stderr, "norm tx stream buffer full, time accumulator = %lf\n", timeAccumulator);
}
else
{
// Complete message was written, wait msg interval time
NormStreamMarkEom(stream);
msgCount++;
delayTime = (msgRate > 0.0) ? ((double)msgLen / msgRate) : 0.0;
if ((MSG_COUNT_MAX > 0) && ((unsigned int)msgCount >= MSG_COUNT_MAX))
{
fprintf(stderr, "closing stream after %u messages ...\n", msgCount);
NormStreamClose(stream, true); // gracefully close stream
keepSending = false;
}
}
}
if (timeAccumulator <= delayTime)
{
delayTime -= timeAccumulator;
timeAccumulator = 0.0;
}
lastTime = currentTime;
} // end while (keepGoing)
NormStopSender(session);
NormDestroySession(session);
NormDestroyInstance(instance);
fprintf(stderr, "normDataSend: Done.\n");
return 0;
} // end main()
|