File: normStreamRecv.cpp

package info (click to toggle)
norm 1.5.9%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,680 kB
  • sloc: cpp: 123,494; xml: 7,536; tcl: 5,460; makefile: 3,442; python: 1,898; java: 1,750; ansic: 642; sh: 21; csh: 8
file content (238 lines) | stat: -rw-r--r-- 10,182 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/******************************************************************************
 Simple NORM_OBJECT_STREAM receiver example app using the NORM API
 (expects NORM message stream of text messages with 2-byte length header (network byte order))

 USAGE: 
 
 normStreamRecv

 BUILD (Unix): 
 
 g++ -o normStreamRecv normStreamRecv.cpp -D_FILE_OFFSET_BITS=64 -I../include/ \
     ./lib/libnorm.a ../protolib/lib/libProtokit.a -lpthread
     
     (for MacOS/BSD, add "-lresolv")
     (for Solaris, add "-lnsl -lsocket -lresolv")

******************************************************************************/            


// Notes:
//  1) The program also will exit on <CTRL-C> from user
//  2) "normStreamRecv" should be started before "normStreamSend" but can join stream in progress.
//  3) This example is designed to receive a single stream from a single sender (could be modified
//     to support multiple streams and/or senders)

#include "normApi.h"     // for NORM API

#include <stdio.h>       // for printf(), etc
#include <stdlib.h>      // for srand()
#include <string.h>      // for strrchr()
#include <sys/time.h>    // for gettimeofday()
#include <arpa/inet.h>   // for ntohs()

int main(int argc, char* argv[])
{
    // 0) Some default params
    const unsigned int MSG_LENGTH_MAX = 5000;
    
    
    // 1) Create a NORM API "NormInstance"
    NormInstanceHandle instance = NormCreateInstance();
    
    // 2) Create a NormSession using default "automatic" local node id
    NormSessionHandle session = NormCreateSession(instance,
                                                  "224.1.2.3", 
                                                   6003,
                                                   NORM_NODE_ANY);
    
    // NOTE: These are debugging routines available 
    //       (not necessary for normal app use)
    // (Need to include "common/protoDebug.h" for this
    NormSetDebugLevel(3);
    // Uncomment to turn on debug NORM message tracing
    NormSetMessageTrace(session, true);
    // Uncomment to write debug output to file "normLog.txt"
    //NormOpenDebugLog(instance, "normLog.txt");
    
    
    // Uncomment to turn on some random packet loss for testing
    //NormSetRxLoss(session, 10.0);  // 10% packet loss
    struct timeval currentTime;
    gettimeofday(&currentTime, NULL);
    // Uncomment to get different packet loss patterns from run to run
    srand(currentTime.tv_sec);  // seed random number generator
    
    
    // Uncomment to enable rx port reuse (this plus unique NormNodeId's enables same-machine send/recv)
    NormSetRxPortReuse(session, true);
    
    // 3) Start the receiver with 1 Mbyte buffer per sender
    NormStartReceiver(session, 8*1024*1024);
    
    NormSetSilentReceiver(session, true);
    
    if (!NormSetRxSocketBuffer(session, 8*1024*1024))
        perror("normStreamRecv error: unable to set requested socket buffer size");
    
    // We use these variables to keep track of our recv stream
    // and to buffer reading from the recv stream
    NormObjectHandle stream = NORM_OBJECT_INVALID;  // we use this to make sure we're handling the correct (only) stream
    bool msgSync = false;
    char msgBuffer[MSG_LENGTH_MAX];
    UINT16 msgLen = 0;
    UINT16 msgIndex = 0;
  
    // 4) Enter NORM event loop
    bool keepGoing = true;
    while (keepGoing)
    {
        NormEvent theEvent;
        if (!NormGetNextEvent(instance, &theEvent)) continue;
        switch (theEvent.type)
        {
           case NORM_RX_OBJECT_NEW:
                fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_NEW event ...\n");
                if (NORM_OBJECT_INVALID == stream)
                {
                    if (NORM_OBJECT_STREAM == NormObjectGetType(theEvent.object))
                    {
                        stream = theEvent.object;
                        msgLen = msgIndex = 0;  // init stream reading state
                        msgSync = false;
                    }
                    else
                    {
                        fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_NEW for non-stream object?!\n");
                    }
                }
                else
                {
                    fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_NEW while already receiving stream?!\n");
                }                 
                break;

            case NORM_RX_OBJECT_INFO:
            {
                // Assume info contains NULL-terminated string
                //fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_INFO event ...\n");
                if (theEvent.object != stream)
                {
                    fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_UPDATED for unhandled object?!\n");
                    break;
                }
                char streamInfo[8192];
                unsigned int infoLen = NormObjectGetInfo(theEvent.object, streamInfo, 8191);
                streamInfo[infoLen] = '\0';
                fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_INFO event, info = \"%s\"\n", streamInfo);
                break;
            }
            case NORM_RX_OBJECT_UPDATED:
            {
                //fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_UPDATED event ...\n");
                if (theEvent.object != stream)
                {
                    fprintf(stderr, "normStreamRecv error: received NORM_RX_OBJECT_UPDATED for unhandled object?!\n");
                    break;
                }
                while (1)
                {
                    // If we're not "in sync", seek message start
                    if (!msgSync)
                    {
                        msgSync = NormStreamSeekMsgStart(stream);
                        if (!msgSync) break;  // wait for next NORM_RX_OBJECT_UPDATED  to re-sync
                    }
                    if (msgIndex < 2)
                    {
                        // We still need to read the 2-byte message header for the next message
                        unsigned int numBytes = 2 - msgIndex;
                        if (!NormStreamRead(stream, msgBuffer+msgIndex, &numBytes))
                        {
                            fprintf(stderr, "normStreamRecv error: broken stream detected, re-syncing ...\n");
                            msgLen = msgIndex = 0;
                            msgSync = false;
                            continue;  // try to re-sync and read again
                        }
                        msgIndex += numBytes;
                        if (msgIndex < 2) break; // wait for next NORM_RX_OBJECT_UPDATED to read more
                        memcpy(&msgLen, msgBuffer, 2);
                        msgLen = ntohs(msgLen);
                        if ((msgLen < 2) || (msgLen > MSG_LENGTH_MAX))
                        {
                            fprintf(stderr, "normStreamRecv error: message received with invalid length?!\n");
                            msgLen = msgIndex = 0;
                            msgSync = false;
                            continue;  // try to re-sync and read again
                        }
                    }
                    // Read "content" portion of message (note "msgIndex" accounts for length "header"
                    unsigned int numBytes = msgLen - msgIndex;
                    if (!NormStreamRead(stream, msgBuffer+msgIndex, &numBytes))
                    {
                        fprintf(stderr, "normStreamRecv error: broken stream detected, re-syncing ...\n");
                        msgLen = msgIndex = 0;
                        msgSync = false;
                        continue;  // try to re-sync and read again
                    }
                    fprintf(stderr, "read %u bytes from stream ...\n", numBytes);
                    msgIndex += numBytes;
                    if (msgIndex == msgLen)
                    {
                        // Complete message read
                        fprintf(stderr, "normStreamRecv msg: %s\n", msgBuffer+2);
                        msgLen = msgIndex = 0; // reset state variables for next message
                    }
                    else
                    {
                        break; //  wait for next NORM_RX_OBJECT_UPDATED to read more
                    }
                }  // end while(1) (NormStreamRead() loop)
                break;                 
            }
            case NORM_RX_OBJECT_COMPLETED:
            {
                fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_COMPLETED event ...\n");
                if (stream == theEvent.object)
                {
                    fprintf(stderr, "normStreamRecv: current stream completed ...\n");
                    stream = NORM_OBJECT_INVALID;
                }
                break;
            }
            case NORM_RX_OBJECT_ABORTED:
                fprintf(stderr, "normStreamRecv: NORM_RX_OBJECT_ABORTED event ...\n");
                if (stream == theEvent.object)
                {
                    fprintf(stderr, "normStreamRecv error: current stream aborted ...\n");
                    stream = NORM_OBJECT_INVALID;
                }
                break;

            case NORM_REMOTE_SENDER_NEW:
                fprintf(stderr, "normStreamRecv: NORM_REMOTE_SENDER_NEW event ...\n");
                break;

            case NORM_REMOTE_SENDER_ACTIVE:
                fprintf(stderr, "normStreamRecv: NORM_REMOTE_SENDER_ACTIVE event ...\n");
                break;

            case NORM_REMOTE_SENDER_INACTIVE:
                fprintf(stderr, "normStreamRecv: NORM_REMOTE_SENDER_INACTIVE event ...\n");
                break;
                
            case NORM_GRTT_UPDATED:
                fprintf(stderr, "normStreamRecv: NORM_GRTT_UPDATED event ...\n");
                break;

            default:
                fprintf(stderr, "normStreamRecv: Got event type: %d\n", theEvent.type); 
        }  // end switch(theEvent.type)
    }
    NormStopReceiver(session);
    NormDestroySession(session);
    NormDestroyInstance(instance);
    
    fprintf(stderr, "normStreamRecv: Done.\n");
    return 0;
}  // end main()