File: SimulatorTest.java

package info (click to toggle)
libjgroups-java 2.12.2.Final-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,712 kB
  • sloc: java: 109,098; xml: 9,423; sh: 149; makefile: 2
file content (454 lines) | stat: -rw-r--r-- 13,178 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454


package org.jgroups.tests;

import org.testng.annotations.Test ;
import org.testng.annotations.BeforeMethod ;
import org.testng.annotations.AfterMethod ;
import org.testng.Assert;

import org.jgroups.*;
import org.jgroups.debug.Simulator;
import org.jgroups.protocols.DELAY;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;

import java.util.Properties;
import java.util.Vector;

/**
 * Tests for the fault-injection features of Simulator.
 * 
 * @author Richard Achmatowicz 
 */
@Test(groups=Global.FUNCTIONAL, sequential=true)
public class SimulatorTest {

	final static int NUM_PEERS = 3; 
	final static int NUM_MSGS = 5; 
	final static int WAIT_TIMEOUT = 5; // secs
	final static int MSGS_PER_STATUS_LINE = 1; 

	// convey assertion failure from thread to main framework
	static boolean allMsgsReceived = false ;

	IpAddress[] addresses = new IpAddress[NUM_PEERS] ;
	Vector<Address> members = null ;
	View view = null ;
	Simulator[] simulators = new Simulator[NUM_PEERS] ;
	DELAY[] layers = new DELAY[NUM_PEERS] ;
	Protocol[][] stacks = new Protocol[NUM_PEERS][] ;
	Thread[] threads = new Thread[NUM_PEERS] ;

	//define senders and receivers
	boolean[] isSender = new boolean[NUM_PEERS] ;

	// used to wait for signal that all messages received
	static Object all_msgs_recd = new Object() ;


	/**
	 * Set up a number of simulator instances wrapping NAKACK 
	 */
	@BeforeMethod(alwaysRun=true)
	public void setUp() throws Exception {

		System.out.println("calling setUp()") ;

		// define the senders and the receivers
		isSender[0] = false ;
		isSender[1] = true ;
		isSender[2] = true ;

		// dummy IP addresses and ports 
		addresses[0] = new IpAddress(1111);
		addresses[1] = new IpAddress(2222);
		addresses[2] = new IpAddress(3333);

		// dummy set of members which works for all three simulators
		members = new Vector<Address>();
		for (int i = 0 ; i < NUM_PEERS; i++) {
			members.add(addresses[i]);
		}

		// create a dummy View(creator, timestamp, member set) 
		view = new View(addresses[0], 1, members);

		// create new simulator instances
		for (int i = 0; i < NUM_PEERS; i++) {
			createSimulator(simulators, view, addresses, layers, stacks, i) ;
		}

		// describe the configuration of the three simulators
		for (int i = 0; i < NUM_PEERS; i++) {
			for (int j = 0; j < NUM_PEERS; j++) {
				if (i == j) 
					simulators[i].addMember(addresses[j]) ;
				else 
					simulators[i].addMember(addresses[j], simulators[j]) ;
			}
		}

		// set up the receiver callbacks for each simulator
		Simulator.Receiver[] receivers = new Simulator.Receiver[NUM_PEERS] ;

		// set up the sender and the receiver callbacks, according to whether
		// the peer is a sender or a receiver
		for (int i = 0; i < NUM_PEERS; i++) {

			if (isSender[i]) 
				receivers[i] = new SenderPeer(simulators[i]) ;
			else 
				receivers[i] = new ReceiverPeer(simulators[i]) ;

			simulators[i].setReceiver(receivers[i]);
		}

		// start the simulators
		for (int i = 0; i < NUM_PEERS; i++) 
			simulators[i].start();

		System.out.println("Ending setUp()") ;
	}

	@AfterMethod(alwaysRun=true)
	public void tearDown() throws Exception {

		System.out.println("Calling tearDown()") ;
		// reset messages received flag
		allMsgsReceived = false ;
		
		// stop the simulators
		for (int i = 0; i < NUM_PEERS; i++)
			simulators[i].stop();
		System.out.println("Ending tearDown()") ;
	}

	private void createSimulator(Simulator[] simulators, View view, Address[] addresses, DELAY[] layers, Protocol[][] stacks, int i) {

		// create the simulator instance
		simulators[i] = new Simulator();
		simulators[i].setLocalAddress(addresses[i]);
		simulators[i].setView(view);

		// set up a dummy passthrough layer using DELAY
		layers[i] = new DELAY();

		// set up its properties
		layers[i].setInDelay(0);
		layers[i].setOutDelay(0);

		// our protocol stack under test consists of one protocol
		stacks[i] = new Protocol[]{layers[i]};
		simulators[i].setProtocolStack(stacks[i]);
	}


	/*
	 * Drop messages from our address to destination address a
	 */
	class MyDropMessage implements Simulator.DropMessage {
		Address address = null ;

		MyDropMessage(Address a) {
			this.address = a ;
		}
		public boolean drop(Message msg, Address dest) {

			// multicast messages have their source address set, and drop
			// gets called |view| times, each with a different dest parameter
			
			// here we drop that part of a multicast which is headed for dest
			if (msg.getDest() == null && dest.equals(address)) {
				return true ;
			}

			// drop when sending specifically to address
			if (msg.getDest() != null && msg.getDest().equals(address)) {
				return true ;
			}
			return false ;
		}
	}


	/**
	 * Test dropped messages modelling.
	 */
	public void testDroppedMessages() {

		System.out.println("Starting testDroppedMessages") ;

		// set up a drop function which drops all messages from 2 to 0
		// (even those message instances which are part of a multicast) 
		Simulator.DropMessage d = new MyDropMessage(addresses[0]) ;
		simulators[2].registerDropMessage(d) ;

		// start the NAKACK peers and let them exchange messages
		for (int i = 0; i < NUM_PEERS; i++) {
			threads[i] = new MyPeer(simulators[i], isSender[i]) ;
			threads[i].start() ;
		}

		// wait for the receiver peer to signal that it has received messages, or timeout
		synchronized(all_msgs_recd) {
			try {
				all_msgs_recd.wait(WAIT_TIMEOUT * 1000) ;
			}
			catch(InterruptedException e) {
				System.out.println("main thread interrupted") ;
			}
		}

		// wait for the threads to terminate
		try {
			for (int i = 0; i < NUM_PEERS; i++) {
				threads[i].join() ;
			}
		}
		catch(InterruptedException e) {
		}


		int receiver = ((ReceiverPeer)simulators[0].getReceiver()).getNumberOfReceivedMessages() ;
		int sender1 = ((SenderPeer)simulators[1].getReceiver()).getNumberOfReceivedMessages() ;
		int sender2 = ((SenderPeer)simulators[2].getReceiver()).getNumberOfReceivedMessages() ;

		Assert.assertFalse(allMsgsReceived, "receiver received all messages from both peers") ;  
		Assert.assertTrue(receiver == NUM_MSGS, "receiver did not receive all messages from single peer: received " + receiver) ;  
		Assert.assertTrue(sender1 == 2 * NUM_MSGS, "sender1 did not receive messages from itself and other sender: received " + sender1) ;  
		Assert.assertTrue(sender2 == 2 * NUM_MSGS, "sender2 did not receive messages from itself and other sender: received " + sender2) ;  
	}

	/**
	 * Test crash failure modelling.
	 */
	public void testCrashFailure() {

		// simulate crash failure on sender
		System.out.println("Starting testCrashfailure") ;
		simulators[1].simulateCrashFailure() ;

		// start the NAKACK peers and let them exchange messages
		for (int i = 0; i < NUM_PEERS; i++) {
			threads[i] = new MyPeer(simulators[i], isSender[i]) ;
			threads[i].start() ;
		}

		// wait for the receiver peer to signal that it has received messages, or timeout
		synchronized(all_msgs_recd) {
			try {
				all_msgs_recd.wait(WAIT_TIMEOUT * 1000) ;
			}
			catch(InterruptedException e) {
				System.out.println("main thread interrupted") ;
			}
		}

		// wait for the threads to terminate
		try {
			for (int i = 0; i < NUM_PEERS; i++) {
				threads[i].join() ;
			}
		}
		catch(InterruptedException e) {
		}

		int receiver = ((ReceiverPeer)simulators[0].getReceiver()).getNumberOfReceivedMessages() ;
		int sender1 = ((SenderPeer)simulators[1].getReceiver()).getNumberOfReceivedMessages() ;
		int sender2 = ((SenderPeer)simulators[2].getReceiver()).getNumberOfReceivedMessages() ;

		Assert.assertFalse(allMsgsReceived, "receiver received all messages from both peers") ;  
		Assert.assertTrue(receiver == NUM_MSGS, "receiver did not receive all messages from single peer") ;  
		Assert.assertTrue(sender1 == 0, "sender1 received messages") ;  
		Assert.assertTrue(sender2 == NUM_MSGS, "sender2 did not receive messages only from itself") ;  
	}

	/**
	 * Test network partition modelling.
	 */
	public void testNetworkPartition() {

		System.out.println("Starting testNetworkPartition") ;

		Address[] part1 = {addresses[0], addresses[1]} ;
		Address[] part2 = {addresses[2]} ;

		simulators[0].simulatePartition(part1) ;
		simulators[1].simulatePartition(part1) ;
		simulators[2].simulatePartition(part2) ;

		// start the NAKACK peers and let them exchange messages
		for (int i = 0; i < NUM_PEERS; i++) {
			threads[i] = new MyPeer(simulators[i], isSender[i]) ;
			threads[i].start() ;
		}

		// wait for the receiver peer to signal that it has received messages, or timeout
		synchronized(all_msgs_recd) {
			try {
				all_msgs_recd.wait(WAIT_TIMEOUT * 1000) ;
			}
			catch(InterruptedException e) {
				System.out.println("main thread interrupted") ;
			}
		}

		// wait for the threads to terminate
		try {
			for (int i = 0; i < NUM_PEERS; i++) {
				threads[i].join() ;
			}
		}
		catch(InterruptedException e) {
		}


		int receiver = ((ReceiverPeer)simulators[0].getReceiver()).getNumberOfReceivedMessages() ;
		int sender1 = ((SenderPeer)simulators[1].getReceiver()).getNumberOfReceivedMessages() ;
		int sender2 = ((SenderPeer)simulators[2].getReceiver()).getNumberOfReceivedMessages() ;

		Assert.assertFalse(allMsgsReceived, "receiver received all messages from both peers") ;  
		Assert.assertTrue(receiver == NUM_MSGS, "receiver did not receive all messages from single peer") ;  
		Assert.assertTrue(sender1 == NUM_MSGS, "sender1 did not receive messages only from itself") ;  
		Assert.assertTrue(sender2 == NUM_MSGS, "sender2 did not receive messages only from itself") ;  
	}


	/**
	 * This is called by the Simulator when a message comes back up the stack.
	 * Used by message senders to simply display messages received from other peers.
	 */
	class SenderPeer implements Simulator.Receiver {
		Simulator simulator = null ;
		int num_mgs_received=0;

		SenderPeer(Simulator s) {
			this.simulator = s ;
		}

		// keep track of how many messages were received by this peer
		public void receive(Event evt) {
			if(evt.getType() == Event.MSG) {
				num_mgs_received++;
				if(num_mgs_received % MSGS_PER_STATUS_LINE == 0)
					System.out.println("<" + simulator.getLocalAddress() + ">:" + "<== " + num_mgs_received);
			}
		}

		public int getNumberOfReceivedMessages() {
			return num_mgs_received;
		}
	}

	/**
	 * This is called by the Simulator when a message comes back up the stack.
	 * This method should do the following:
	 * - receive messages from senders 
	 * - terminate when correct number of messages have been received
	 */
	class ReceiverPeer implements Simulator.Receiver {
		Simulator simulator = null ;
		int num_mgs_received=0;
		Message msg ;
		Address sender ;

		ReceiverPeer(Simulator s) {
			this.simulator = s ;
		}

		public synchronized void receive(Event evt) {

			if (evt.getType() == Event.MSG) {

				// keep track of seqno ordering of messages received
				msg=(Message)evt.getArg();
				sender=msg.getSrc();

				try {
					num_mgs_received++ ;

					Address address = simulator.getLocalAddress() ;

					if(num_mgs_received % MSGS_PER_STATUS_LINE == 0)
						System.out.println("<" + address + ">:" + "PASS: received msg #" + num_mgs_received + " from " + sender);


					// condition to terminate the test - all messages received (whether in 
					// correct order or not)
					if(num_mgs_received >= SimulatorTest.NUM_MSGS * (NUM_PEERS-1)) {

						// indicate that we have received the required number of messages
						// to differentiate between timeout and notifyAll cases on monitor
						allMsgsReceived = true ;

						// signal that all messages have been received - this will allow the receiver
						// thread to terminate normally
						synchronized(all_msgs_recd) {
							all_msgs_recd.notifyAll() ;
						}
					}   
				}
				catch(Exception ex) {
					System.out.println("SimulatorTest.receive()" + ex.toString());
				}
			}	
		}

		public int getNumberOfReceivedMessages() {
			return num_mgs_received;
		}
	}


	static class MyPeer extends Thread {

		Simulator s = null ;
		boolean sender = false ;

		public MyPeer(Simulator s, boolean sender) {
			this.s = s ;
			this.sender = sender ;
		}

		public void run() {

			// senders send NUM_MSGS messages to all peers, beginning with seqno 1
			if (sender) {

				Address address = s.getLocalAddress() ;

				// send a collection of dummy messages by mcast to the stack under test
				for(int i=1; i <= NUM_MSGS; i++) {

					Message msg=new Message(null, address, new Long(i));
					Event evt=new Event(Event.MSG, msg);

					// call Simulator.send() to introduce the event into the stack under test
					s.send(evt);

					// status indicator
					if(i % MSGS_PER_STATUS_LINE == 0)
						System.out.println("<" + address + ">:" + " ==> " + i);
				}	   
			}

			if (!sender) {
				// wait for the receiver callback to signal that it has received messages, or timeout
				// this just causes this thread to block until its receiver has finished 
				synchronized(all_msgs_recd) {
					try {
						all_msgs_recd.wait(WAIT_TIMEOUT * 1000) ;
					}
					catch(InterruptedException e) {
						System.out.println("thread interrupted") ;
					}
				}
			}
		}
	}

}