File: flooder.c

package info (click to toggle)
spread 3.17.4-2
  • links: PTS
  • area: main
  • in suites: lenny, squeeze
  • size: 1,800 kB
  • ctags: 2,322
  • sloc: ansic: 15,666; sh: 2,611; java: 2,291; perl: 556; yacc: 523; makefile: 255; lex: 204; xml: 77
file content (319 lines) | stat: -rw-r--r-- 12,272 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
/*
 * The Spread Toolkit.
 *     
 * The contents of this file are subject to the Spread Open-Source
 * License, Version 1.0 (the ``License''); you may not use
 * this file except in compliance with the License.  You may obtain a
 * copy of the License at:
 *
 * http://www.spread.org/license/
 *
 * or in the file ``license.txt'' found in this distribution.
 *
 * Software distributed under the License is distributed on an AS IS basis, 
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
 * for the specific language governing rights and limitations under the 
 * License.
 *
 * The Creators of Spread are:
 *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
 *
 *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
 *
 *  All Rights Reserved.
 *
 * Major Contributor(s):
 * ---------------
 *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
 *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
 *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
 *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
 *
 */


#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/* Not positive if portable JRS 5/2004 */
#include <limits.h>

#include "sp.h"

#define	MAX_MESSLEN	100000
static	char            mess[MAX_MESSLEN] ;
static	char            recv_mess[MAX_MESSLEN] ;

#define FLOODER_MAX_GROUPS      100
static	char	User[80];
static	char	Spread_name[80];
static	char	Private_group[MAX_GROUP_NAME];
static	mailbox	Mbox;
static	int	Num_bytes;
static	int	Num_messages;
static	int	Num_members;
static	int	Read_only, Write_only;

static  char	ret_groups[FLOODER_MAX_GROUPS][MAX_GROUP_NAME];

static  int     Send_Counter;
static  int     Recv_Counters[FLOODER_MAX_GROUPS];
static  int     Lowest_Recv_Counter;
static  int     My_Counter_index;

static	void	Usage( int argc, char *argv[] );
static  void    Print_help();

int main( int argc, char *argv[] )
{
	int		ret;
	int		service_type, num_groups;
	char		sender[MAX_GROUP_NAME];
	int16		mess_type;
	int		dummy_endian_mismatch;
        int             joined_members;
        int             sender_index;
	int		i,j;

	Usage( argc, argv );

        if (Num_members == 0) {
            /* connecting to the relevant Spread daemon, no need for group info */
            printf("flooder: connecting to %s\n", Spread_name );
            ret = SP_connect( Spread_name, User, 0, 0, &Mbox, Private_group ) ;
        } else {
            /* connecting to the relevant Spread daemon, DO need group info */
            printf("flooder: connecting to %s with group membership\n", Spread_name );
            ret = SP_connect( Spread_name, User, 0, 1, &Mbox, Private_group ) ;
        }
        if(ret < 0) 
	{
		SP_error( ret );
                exit(1) ;
        }
	/* 
	 * Joining the process group.
	 *
	 * Note that this is not necessary in order to multicast the
	 * messages, but only to demonstrate end-to-end behaviour.
	 */
	if( Read_only )
	{
		printf("flooder: Only receiving messages\n");
		SP_join( Mbox, "flooder" );
	}else if( Write_only ) {
		printf("flooder: starting  multicast of %d messages, %d bytes each (self discarding).\n", Num_messages, Num_bytes);
	}else{
		SP_join( Mbox, "flooder" );
		printf("flooder: starting  multicast of %d messages, %d bytes each.\n", Num_messages, Num_bytes);
	}

        /* Wait for all members to join */
        joined_members = 0;
        while( joined_members < Num_members)
        {
            service_type = 0;
            ret = SP_receive( Mbox, &service_type, sender, FLOODER_MAX_GROUPS, &num_groups, ret_groups, 
                              &mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
            if( ret < 0 ) 
            {
                if ( (ret == GROUPS_TOO_SHORT) || (ret == BUFFER_TOO_SHORT) ) {
                    printf("\n========Buffers or Groups too Short=======\n");
                    printf("Should NOT happen in wait for members! Program quitting\n");
                    exit(1);
                }       
            }

            if( Is_regular_mess( service_type ) )
            {
		mess[ret] = 0;
		if     ( Is_unreliable_mess( service_type ) ) printf("received UNRELIABLE ");
		else if( Is_reliable_mess(   service_type ) ) printf("received RELIABLE ");
		else if( Is_fifo_mess(       service_type ) ) printf("received FIFO ");
		else if( Is_causal_mess(     service_type ) ) printf("received CAUSAL ");
		else if( Is_agreed_mess(     service_type ) ) printf("received AGREED ");
		else if( Is_safe_mess(       service_type ) ) printf("received SAFE ");
		printf("message during wait for members, from %s, of type %d, (endian %d) to %d groups \n(%d bytes): %s\n",
			sender, mess_type, dummy_endian_mismatch, num_groups, ret, recv_mess );
            }else if( Is_membership_mess( service_type ) )
            {
		if     ( Is_reg_memb_mess( service_type ) )
		{
			printf("Received REGULAR membership for group %s with %d members, where I am member %d:\n",
				sender, num_groups, mess_type );
			for( i=0; i < num_groups; i++ )
				printf("\t%s\n", &ret_groups[i][0] );
                        /* update count of joined members */
                        joined_members = num_groups;

		}else if( Is_transition_mess(   service_type ) ) {
			printf("received TRANSITIONAL membership for group %s\n", sender );
		}else if( Is_caused_leave_mess( service_type ) ){
			printf("received membership message that left group %s\n", sender );
		}else printf("received incorrecty membership message of type 0x%x\n", service_type );
            } else if ( Is_reject_mess( service_type ) )
            {
		printf("REJECTED message from %s, of servicetype 0x%x messtype %d, (endian %d) to %d groups \n(%d bytes): %s\n",
			sender, service_type, mess_type, dummy_endian_mismatch, num_groups, ret, recv_mess );
            }else printf("received message of unknown message type 0x%x with ret %d\n", service_type, ret);
           
        } /* joined_members < Num_members */

        /* Update My_Counter_index field based on location of my name in last membership message */
        if (Num_members)
        {
            My_Counter_index = mess_type;
            memcpy(&mess[0], &My_Counter_index, sizeof(int));
        }

	for( i=1; i <= Num_messages; i++ )
	{
		/* multicast a message unless Read_only */
		if( !Read_only )
		{
                    if (Num_members) 
                    {
                        ret = SP_multicast( Mbox, FIFO_MESS, "flooder", 0, Num_bytes, mess );
                        Send_Counter++;
                    } else {
                        ret = SP_multicast( Mbox, RELIABLE_MESS, "flooder", 0, Num_bytes, mess );
                    }
		    if( ret != Num_bytes ) 
		    {
			if( ret < 0 )
			{
				SP_error( ret );
				exit(1);
			}
			printf("sent a different message %d -> %d\n", Num_bytes, ret );
		    }
		}

		/* receive a message (Read_only) or one of my messages */
		if( Read_only || ( i > 200 && !Write_only ) )
		{
                    int notdone;

		    do{
                        service_type = 0;

			ret = SP_receive( Mbox, &service_type, sender, FLOODER_MAX_GROUPS, &num_groups, ret_groups, 
					  &mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
                        if( ret < 0 ) 
                        {
                                if ( (ret == GROUPS_TOO_SHORT) || (ret == BUFFER_TOO_SHORT) ) {
                                        service_type = DROP_RECV;
                                        printf("\n========Buffers or Groups too Short=======\n");
                                        ret = SP_receive( Mbox, &service_type, sender, FLOODER_MAX_GROUPS, &num_groups, ret_groups, 
                                                          &mess_type, &dummy_endian_mismatch, sizeof(recv_mess), recv_mess );
                                }
                        }
			if( ret < 0 )
			{
				SP_error( ret );
				exit(1);
			}
                        if (Num_members) {
                            /* update counters of received messages */
                            memcpy(&sender_index, &recv_mess[0], sizeof(int));
                            Recv_Counters[sender_index]++;
                            /* 
                               printf("DEBUG: updated counter %d to value %d\n", sender_index, Recv_Counters[sender_index]);
                            */
                            if (Recv_Counters[sender_index] == (Lowest_Recv_Counter + 1)) {
                                /* Update Lowest_Recv_Counter */
                                Lowest_Recv_Counter = INT_MAX;
                                for (j=0; j < Num_members; j++) {
                                    if (Recv_Counters[j] == 0) continue;
                                    if (Recv_Counters[j] < Lowest_Recv_Counter)
                                        Lowest_Recv_Counter = Recv_Counters[j];
                                }
                                if (Lowest_Recv_Counter == INT_MAX) 
                                    Lowest_Recv_Counter = 0;
                            }
                            /* Read loop is done if we send messages and we have received all 
                             * other senders messages upto 100 less then our current send count 
                             */
                            notdone = ( Lowest_Recv_Counter < (Send_Counter - 200) && !Read_only );
                        } else {
                            notdone = (strcmp( sender, Private_group ) != 0 && !Read_only);
                        }

		    } while( notdone );
		}

		/* report some progress... */
		if( i%1000 == 0 ) printf("flooder: completed %6d messages of %d bytes\n",i, ret);
	}
	printf("flooder: completed multicast of %d messages, %d bytes each.\n", Num_messages, Num_bytes);

	return 0;
}

static  void    Usage(int argc, char *argv[])
{

	/* Setting defaults */
        sprintf( User, "flooder" );
        sprintf( Spread_name, "4803@localhost");
	Num_bytes    =  1000;
	Num_messages = 10000;
        Num_members = 0;
	Read_only    = 0;
	Write_only   = 0;

        while( --argc > 0 )
        {
                argv++;

                if( !strncmp( *argv, "-u", 2 ) )
                {
                        if (argc < 2) Print_help();
                        strcpy( User, argv[1] );
                        argc--; argv++;
                }else if( !strncmp( *argv, "-b", 2 ) ){
                        if (argc < 2) Print_help();
			sscanf(argv[1], "%d", &Num_bytes );
                        argc--; argv++;
                }else if( !strncmp( *argv, "-m", 2 ) ){
                        if (argc < 2) Print_help();
			sscanf(argv[1], "%d", &Num_messages );
                        argc--; argv++;
                }else if( !strncmp( *argv, "-n", 2 ) ){
                        if (argc < 2) Print_help();
			sscanf(argv[1], "%d", &Num_members );
                        argc--; argv++;
                }else if( !strncmp( *argv, "-s", 2 ) ){
                        if (argc < 2) Print_help();
                        strcpy( Spread_name, argv[1] ); 
                        argc--; argv++;
                }else if( !strncmp( *argv, "-ro", 3 ) ){
				Read_only  = 1;
				Write_only = 0;
                }else if( !strncmp( *argv, "-wo", 3 ) ){
				Write_only = 1;
				Read_only  = 0;
                }else{
                    Print_help();
                }
	}

        if (Num_members > FLOODER_MAX_GROUPS)
        {
            printf("Too many members. The max is %d\n", FLOODER_MAX_GROUPS);
            Print_help();
        }
}
static  void    Print_help()
{
    printf( "Usage: spflooder\n%s\n%s\n%s\n%s\n%s\n%s\n%s\n",
            "\t[-u <user name>]     : unique (in this machine) user name",
            "\t[-m <num messages>]  : number of messages",
            "\t[-n <num members>]   : number of group members to wait for (also turn on multi-sender FC)",
            "\t[-b <num bytes>]     : number of bytes per message 1-100,000",
            "\t[-s <spread name>]   : either port or port@machine",
            "\t[-ro]   		: read  only (no multicast)",
            "\t[-wo]   		: write only (no receive)");
    exit(1);
}