1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
|
/******************************************************************************
Simple NORM_OBJECT_STREAM receiver example app using the NORM API
(expects NORM message stream of text messages with 2-byte length header (network byte order))
USAGE:
normStreamRecv
BUILD (Unix):
g++ -o normStreamRecv normStreamRecv.cpp -D_FILE_OFFSET_BITS=64 -I../include/ \
./lib/libnorm.a ../protolib/lib/libProtokit.a -lpthread
(for MacOS/BSD, add "-lresolv")
(for Solaris, add "-lnsl -lsocket -lresolv")
******************************************************************************/
// Notes:
// 1) The program also will exit on <CTRL-C> from user
// 2) "normStreamRecv" should be started before "normStreamSend" but can join stream in progress.
// 3) This example is designed to receive a single stream from a single sender (could be modified
// to support multiple streams and/or senders)
#include "normApi.h" // for NORM API
#include <stdio.h> // for printf(), etc
#include <stdlib.h> // for srand()
#include <string.h> // for strrchr()
#include <sys/time.h> // for gettimeofday()
#include <arpa/inet.h> // for ntohs()
int main(int argc, char* argv[])
{
// 0) Some default params
const unsigned int MSG_LENGTH_MAX = 5000;
// 1) Create a NORM API "NormInstance"
NormInstanceHandle instance = NormCreateInstance();
// 2) Create a NormSession using default "automatic" local node id
NormSessionHandle session = NormCreateSession(instance,
"224.1.2.3",
6003,
NORM_NODE_ANY);
// NOTE: These are debugging routines available
// (not necessary for normal app use)
// (Need to include "common/protoDebug.h" for this
NormSetDebugLevel(3);
// Uncomment to turn on debug NORM message tracing
NormSetMessageTrace(session, true);
// Uncomment to write debug output to file "normLog.txt"
//NormOpenDebugLog(instance, "normLog.txt");
// Uncomment to turn on some random packet loss for testing
//NormSetRxLoss(session, 10.0); // 10% packet loss
struct timeval currentTime;
gettimeofday(¤tTime, NULL);
// Uncomment to get different packet loss patterns from run to run
srand(currentTime.tv_sec); // seed random number generator
// Uncomment to enable rx port reuse (this plus unique NormNodeId's enables same-machine send/recv)
NormSetRxPortReuse(session, true);
// 3) Start the receiver with 1 Mbyte buffer per sender
NormStartReceiver(session, 8*1024*1024);
NormSetSilentReceiver(session, true);
if (!NormSetRxSocketBuffer(session, 8*1024*1024))
perror("normStreamRecv error: unable to set requested socket buffer size");
// We use these variables to keep track of our recv stream
// and to buffer reading from the recv stream
NormObjectHandle stream = NORM_OBJECT_INVALID; // we use this to make sure we're handling the correct (only) stream
bool msgSync = false;
char msgBuffer[MSG_LENGTH_MAX];
UINT16 msgLen = 0;
UINT16 msgIndex = 0;
// 4) Enter NORM event loop
bool keepGoing = true;
while (keepGoing)
{
NormEvent theEvent;
if (!NormGetNextEvent(instance, &theEvent)) continue;
switch (theEvent.type)
{
case NORM_RX_OBJECT_NEW:
fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_NEW event ...\n");
if (NORM_OBJECT_INVALID == stream)
{
if (NORM_OBJECT_STREAM == NormObjectGetType(theEvent.object))
{
stream = theEvent.object;
msgLen = msgIndex = 0; // init stream reading state
msgSync = false;
}
else
{
fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_NEW for non-stream object?!\n");
}
}
else
{
fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_NEW while already receiving stream?!\n");
}
break;
case NORM_RX_OBJECT_INFO:
{
// Assume info contains NULL-terminated string
//fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_INFO event ...\n");
if (theEvent.object != stream)
{
fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_UPDATED for unhandled object?!\n");
break;
}
char streamInfo[8192];
unsigned int infoLen = NormObjectGetInfo(theEvent.object, streamInfo, 8191);
streamInfo[infoLen] = '\0';
fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_INFO event, info = \"%s\"\n", streamInfo);
break;
}
case NORM_RX_OBJECT_UPDATED:
{
//fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_UPDATED event ...\n");
if (theEvent.object != stream)
{
fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_UPDATED for unhandled object?!\n");
break;
}
while (1)
{
// If we're not "in sync", seek message start
if (!msgSync)
{
msgSync = NormStreamSeekMsgStart(stream);
if (!msgSync) break; // wait for next NORM_RX_OBJECT_UPDATED to re-sync
}
if (msgIndex < 2)
{
// We still need to read the 2-byte message header for the next message
unsigned int numBytes = 2 - msgIndex;
if (!NormStreamRead(stream, msgBuffer+msgIndex, &numBytes))
{
fprintf(stderr, "normStreamRecv error: broken stream detected, re-syncing ...\n");
msgLen = msgIndex = 0;
msgSync = false;
continue; // try to re-sync and read again
}
msgIndex += numBytes;
if (msgIndex < 2) break; // wait for next NORM_RX_OBJECT_UPDATED to read more
memcpy(&msgLen, msgBuffer, 2);
msgLen = ntohs(msgLen);
if ((msgLen < 2) || (msgLen > MSG_LENGTH_MAX))
{
fprintf(stderr, "normStreamRecv error: message received with invalid length?!\n");
msgLen = msgIndex = 0;
msgSync = false;
continue; // try to re-sync and read again
}
}
// Read "content" portion of message (note "msgIndex" accounts for length "header"
unsigned int numBytes = msgLen - msgIndex;
if (!NormStreamRead(stream, msgBuffer+msgIndex, &numBytes))
{
fprintf(stderr, "normStreamRecv error: broken stream detected, re-syncing ...\n");
msgLen = msgIndex = 0;
msgSync = false;
continue; // try to re-sync and read again
}
fprintf(stderr, "read %u bytes from stream ...\n", numBytes);
msgIndex += numBytes;
if (msgIndex == msgLen)
{
// Complete message read
fprintf(stderr, "normStreamRecv msg: %s\n", msgBuffer+2);
msgLen = msgIndex = 0; // reset state variables for next message
}
else
{
break; // wait for next NORM_RX_OBJECT_UPDATED to read more
}
} // end while(1) (NormStreamRead() loop)
break;
}
case NORM_RX_OBJECT_COMPLETED:
{
fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_COMPLETED event ...\n");
if (stream == theEvent.object)
{
fprintf(stderr, "normStreamRecv: current stream completed ...\n");
stream = NORM_OBJECT_INVALID;
}
break;
}
case NORM_RX_OBJECT_ABORTED:
fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_ABORTED event ...\n");
if (stream == theEvent.object)
{
fprintf(stderr, "normStreamRecv error: current stream aborted ...\n");
stream = NORM_OBJECT_INVALID;
}
break;
case NORM_REMOTE_SENDER_NEW:
fprintf(stderr, "normStreamRecv: NORM_REMOTE_SENDER_NEW event ...\n");
break;
case NORM_REMOTE_SENDER_ACTIVE:
fprintf(stderr, "normStreamRecv: NORM_REMOTE_SENDER_ACTIVE event ...\n");
break;
case NORM_REMOTE_SENDER_INACTIVE:
fprintf(stderr, "normStreamRecv: NORM_REMOTE_SENDER_INACTIVE event ...\n");
break;
case NORM_GRTT_UPDATED:
fprintf(stderr, "normStreamRecv: NORM_GRTT_UPDATED event ...\n");
break;
default:
fprintf(stderr, "normStreamRecv: Got event type: %d\n", theEvent.type);
} // end switch(theEvent.type)
}
NormStopReceiver(session);
NormDestroySession(session);
NormDestroyInstance(instance);
fprintf(stderr, "normStreamRecv: Done.\n");
return 0;
} // end main()
|