File: normDataSend.cpp

package info (click to toggle)
norm 1.5.9%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,680 kB
  • sloc: cpp: 123,494; xml: 7,536; tcl: 5,460; makefile: 3,442; python: 1,898; java: 1,750; ansic: 642; sh: 21; csh: 8
file content (227 lines) | stat: -rw-r--r-- 8,416 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/******************************************************************************
 Simple NORM_OBJECT_DATA sender example app using the NORM API

 USAGE: 
 
 normSendData

 BUILD (Unix): 
 
 g++ -o normDataSend normDataSend.cpp -D_FILE_OFFSET_BITS=64 -I../common/ \
     -I../protolib/include ../lib/libnorm.a ../protolib/lib/libProtokit.a \
     -lpthread
     
     (for MacOS/BSD, add "-lresolv")
     (for Solaris, add "-lnsl -lsocket -lresolv")

******************************************************************************/            


// Notes:
//  1) A single file is sent.
//  2) The program exits upon NORM_TX_FLUSH_COMPLETED notification (or user <CTRL-C>)
//  3) NORM receiver should be started first (before sender starts)


#include "normApi.h"     // for NORM API

#include "protoDefs.h"   // for ProtoSystemTime()       
#include "protoDebug.h"  // for SetDebugLevel(), etc   
#include "protoAddress.h"  // for ProtoAddress for easy mcast test

#include <stdio.h>       // for printf(), etc
#include <stdlib.h>      // for srand()
#include <string.h>      // for strrchr()



// Usage: normDataSend [addr <addr>/<port>]

void Usage()
{
    fprintf(stderr, "Usage: normDataSend [addr <addr>/<port>]\n");
}

int main(int argc, char* argv[])
{
    // Initialize default parameters.
    char sessionAddr[256];
    strcpy(sessionAddr, "224.1.2.3");
    UINT16 sessionPort = 6003;
    
    // Parse command-line for any parameters
    int i = 1;
    while (i < argc)
    {
        if (0 == strcmp("addr", argv[1]))
        {
            i++;
            if (i == argc)
            {
                fprintf(stderr, "normDataSend error: missing \"addr\" arguments\n");
                Usage();
                return -1;
            }
            strcpy(sessionAddr, argv[i++]);
            char* ptr = strchr(sessionAddr, '/');
            if (NULL != ptr)
            {
                *ptr = '\0';
                sessionPort = atoi(ptr+1);
            }           
        }
        else
        {
            fprintf(stderr, "normDataSend error: invalid command \"%s\"\n", argv[i]);
            Usage();
            return -1;
        }
    }
    
    // Is the session an mcast addr?
    ProtoAddress theAddr;
    theAddr.ResolveFromString(sessionAddr);
    bool isMulticastSession = theAddr.IsMulticast();
    
    
    // 1) Create a NORM API "NormInstance"
    NormInstanceHandle instance = NormCreateInstance();
    
    // 2) Create a NormSession using default "automatic" local node id
    NormSessionHandle session = NormCreateSession(instance,
                                                  sessionAddr, 
                                                  sessionPort,
                                                  NORM_NODE_ANY);
    
    // NOTE: These are some debugging routines available 
    //       (not necessary for normal app use)
    NormSetDebugLevel(3);
    // Uncomment to turn on debug NORM message tracing
    //NormSetMessageTrace(session, true);
    // Uncomment to turn on some random packet loss
    //NormSetTxLoss(session, 10.0);  // 10% packet loss
    struct timeval currentTime;
    ProtoSystemTime(currentTime);
    // Uncomment to get different packet loss patterns from run to run
    // (and a different sender sessionId)
    srand(currentTime.tv_sec);  // seed random number generator
    
    // 3) Set transmission rate
    NormSetTxRate(session, 2.0e+06);  // in bits/second
    
    // Uncomment to enable TCP-friendly congestion control
    //NormSetCongestionControl(session, true);
    
    // Uncomment to use a _specific_ transmit port number
    // (Can be the same as session port (rx port), but this
    // is _not_ recommended for mcast sessions when unicast feedback may be
    // possible! - must be called _before_ NormStartSender())
    if (!isMulticastSession) 
    {
        NormSetTxPort(session, sessionPort+1, true); 
    
        // Uncomment to set the session to only open 
        // a single socket for transmission. 
        // This also "connects" the sender to session (receiver) addr/port
        // (unicast nacking _and_ "txPort == rxPort" MUST be used by receivers)
        NormSetTxOnly(session, true, true);
    }
    
    // Uncomment to allow multiple NORM processes on same session port number
    // Note that port reuse only works well for mcast.
    // if (isMulticastSession) NormSetRxPortReuse(session, true);
    
    // 4) Start the sender using a random "sessionId"
    NormSessionId sessionId = (NormSessionId)rand();
    TRACE("starting NORM sender ...\n");
    NormStartSender(session, sessionId, 1024*1024, 1400, 64, 16);

    // Uncomment to set large tx socket buffer size
    // (might be needed for high rate sessions)
    //NormSetTxSocketBuffer(session, 512000);
    
    
    // 5) Enqueue the first data message
    //    (we enqueue text strings of random length as object content)
    unsigned int MAX_COUNT = 1;     // number of objects to send, zero means unlimited
    const int MIN_LENGTH = 460000;  // min object size (in bytes)
    const int MAX_LENGTH = 460000;  // max object size (in bytes)
    
    unsigned int dataCount = 0;
    int dataLen = MIN_LENGTH + rand() % (MAX_LENGTH - MIN_LENGTH + 1);
    char* dataMsg = new char[dataLen];
    ASSERT(NULL != dataMsg);
    char data = 'a';
    memset(dataMsg, data, dataLen);  // set message content with 'dummy' data
    // Provide some "info" about this message
    char dataInfo[256];
    sprintf(dataInfo, "NORM_OBJECT_DATA count>%d size>%d", dataCount, dataLen);
    // Enqueue the data object
    NormObjectHandle  dataObj = 
        NormDataEnqueue(session, dataMsg, dataLen, dataInfo, strlen(dataInfo));
    ASSERT(NORM_OBJECT_INVALID != dataObj);
    dataCount++;
    
    // 6) Enter NORM event loop
    bool keepGoing = true;
    while (keepGoing)
    {
        NormEvent theEvent;
        if (!NormGetNextEvent(instance, &theEvent)) continue;
        switch (theEvent.type)
        {
            case NORM_TX_QUEUE_VACANCY:
                //fprintf(stderr, "normDataSend: NORM_TX_QUEUE_VACANCY event...\n");
                break;
            case NORM_TX_QUEUE_EMPTY:
            {
                if ((0 == MAX_COUNT) || (dataCount < MAX_COUNT))
                {
                    // Enqueue another data object when norm tx queue goes empty
                    //fprintf(stderr, "normDataSend: NORM_TX_QUEUE_EMPTY event...\n");
                    dataLen = MIN_LENGTH + rand() % (MAX_LENGTH - MIN_LENGTH + 1);
                    dataMsg = new char[dataLen];
                    ASSERT(NULL != dataMsg);
                    memset(dataMsg, data, dataLen);
                    sprintf(dataInfo, "NORM_OBJECT_DATA count>%d size>%d data>%.64s ...", dataCount, dataLen, dataMsg);
                    NormObjectHandle  dataObj = 
                        NormDataEnqueue(session, dataMsg, dataLen, dataInfo, strlen(dataInfo));
                    // Note that flow control timer may have prevented NormDataEnqueue()
                    // from succeeding even though we got a NORM_TX_QUEUE_EMPTY notification
                    // (The underlying NORM code could be tightened up a bit here!)
                    if (NORM_OBJECT_INVALID != dataObj) 
                    {
                        dataCount++;
                        if (++data > 'z') data = 'a';
                    }
                    else
                    {
                        TRACE("normDataSend: FLOW CONTROL CONDITION?\n");
                    } 
                }
                break;
            }
            case NORM_TX_OBJECT_PURGED:
            {
                //fprintf(stderr, "normDataSend: NORM_TX_OBJECT_PURGED event ...\n");
                char* dataPtr = NormDataDetachData(theEvent.object);
                delete[] dataPtr;
                break;
            }   
            case NORM_TX_FLUSH_COMPLETED:
                fprintf(stderr, "normDataSend: NORM_TX_FLUSH_COMPLETED event ...\n");
                break;
                
            default:
                //TRACE("normDataSend: Got event type: %d\n", theEvent.type); 
                break;
        }  // end switch(theEvent.type)
    }  // end while (NormGetNextEvent())
    
    NormStopSender(session);
    NormDestroySession(session);
    NormDestroyInstance(instance);
    
    fprintf(stderr, "normDataSend: Done.\n");
    return 0;
}  // end main()