1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
package org.jgroups.protocols;
import static java.util.concurrent.TimeUnit.SECONDS;
import org.jgroups.*;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.Test;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterMethod;
/**
* It's an attemp to setup Junit test case template for Protocol regression. <p>
* Two "processes" are started, and the coord. keeps sending msg of a counter. The 2nd
* process joins the grp and get the state from the coordinator. The subsequent msgs
* after the setState will be validated to ensure the total ordering of msg delivery. <p>
* This should cover the fix introduced by rev. 1.12
* @author Wenbo Zhu
*/
@Test(groups={Global.STACK_DEPENDENT,"known-failures"})
public class STATE_TRANSFER_Test extends ChannelTestBase {
public static final String GROUP_NAME="STATE_TRANSFER_Test";
private Coordinator coord;
@BeforeMethod
protected void setUp() throws Exception {
coord=new Coordinator();
coord.recvLoop();
coord.sendLoop();
}
@AfterMethod
protected void tearDown() throws Exception {
coord.stop();
coord=null;
}
class Coordinator extends ChannelListenerAdapter {
private JChannel channel=null;
private int cnt=0; // the state
private volatile boolean closed=false;
String getProps() {
return channel.getProperties();
}
public JChannel getChannel() {
return channel;
}
protected Coordinator() throws Exception {
channel=createChannel(true);
channel.setOpt(Channel.LOCAL, Boolean.FALSE);
channel.addChannelListener(this);
channel.connect(GROUP_NAME);
}
public void recvLoop() throws Exception {
Thread task=new Thread(new Runnable() {
public void run() {
Object tmp;
while(!closed) {
try {
tmp=channel.receive(0);
if(tmp instanceof GetStateEvent) {
synchronized(Coordinator.this) {
// System.err.println("-- GetStateEvent, cnt=" + cnt);
channel.returnState(Util.objectToByteBuffer(new Integer(cnt)));
}
}
}
catch(ChannelNotConnectedException not) {
break;
}
catch(ChannelClosedException closed) {
break;
}
catch(Exception e) {
System.err.println(e);
}
}
}
});
task.start();
}
public void sendLoop() throws Exception {
Thread task=new Thread(new Runnable() {
public void run() {
while(!closed) {
try {
synchronized(Coordinator.this) {
channel.send(null, null, new Integer(++cnt));
// System.err.println("send cnt=" + cnt);
}
Thread.sleep(1000);
}
catch(ChannelNotConnectedException not) {
break;
}
catch(ChannelClosedException closed) {
break;
}
catch(Exception e) {
System.err.println(e);
}
}
}
});
task.start();
}
public void stop() {
closed=true;
channel.close();
}
}
public void testBasicStateSync() throws Exception {
Channel channel= null;
int timeout=60; //seconds
int counter=0;
try {
channel = createChannel(coord.getChannel());
channel.setOpt(Channel.LOCAL, Boolean.FALSE);
channel.connect(GROUP_NAME);
Thread.sleep(1000);
boolean join=false;
join=channel.getState(null, 100000l);
assertTrue(join);
Object tmp;
int cnt=-1;
for(;counter < timeout;SECONDS.sleep(1),counter++) {
try {
tmp=channel.receive(0);
if(tmp instanceof SetStateEvent) {
cnt=((Integer)Util.objectFromByteBuffer(((SetStateEvent)tmp).getArg())).intValue();
// System.err.println("-- SetStateEvent, cnt=" + cnt);
continue;
}
if(tmp instanceof Message) {
if(cnt != -1) {
int msg=((Integer)((Message)tmp).getObject()).intValue();
assertEquals(cnt, msg - 1);
break; // done
}
}
}
catch(ChannelNotConnectedException not) {
break;
}
catch(ChannelClosedException closed) {
break;
}
catch(Exception e) {
System.err.println(e);
}
}
}
finally {
channel.close();
assertTrue("Timeout reached", counter < timeout);
}
}
}
|