1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
|
package org.jgroups.blocks;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.ResourceManager;
import org.jgroups.util.StackType;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Tests ConnectionMap
* @author Bela Ban
*/
@Test(groups=Global.FUNCTIONAL,sequential=true)
public class ConnectionMapTest {
private TCPConnectionMap ct1, ct2;
static final InetAddress loopback_addr;
static {
try {
StackType type=Util.getIpStackType();
String tmp=type == StackType.IPv6? "::1" : "127.0.0.1";
loopback_addr=InetAddress.getByName(tmp);
}
catch(UnknownHostException e) {
throw new RuntimeException("failed initializing loopback_addr", e);
}
}
static byte[] data=new byte[]{'b', 'e', 'l', 'a'};
protected int PORT1, PORT2;
protected Address addr1, addr2;
@BeforeMethod
protected void init() throws Exception {
List<Short> ports=ResourceManager.getNextTcpPorts(loopback_addr, 2);
PORT1=ports.get(0);
PORT2=ports.get(1);
addr1=new IpAddress(loopback_addr, PORT1);
addr2=new IpAddress(loopback_addr, PORT2);
}
@AfterMethod
protected void tearDown() throws Exception {
if(ct2 != null) {
ct2.stop();
ct2=null;
}
if(ct1 != null) {
ct1.stop();
ct1=null;
}
}
/**
* A connects to B and B connects to A at the same time. This test makes sure we only have <em>one</em> connection,
* not two, e.g. a spurious connection. Tests http://jira.jboss.com/jira/browse/JGRP-549.<p/>
* Turned concurrent test into a simple sequential test. We're going to replace this code with NIO2 soon anyway...
*/
public void testReuseOfConnection() throws Exception {
TCPConnectionMap.Receiver dummy=new TCPConnectionMap.Receiver() {
public void receive(Address sender, byte[] data, int offset, int length) {}
};
ct1=new TCPConnectionMap("ConnectionMapTest1",
new DefaultThreadFactory(Util.getGlobalThreadGroup(), "ConnectionMapTest", true),
null, dummy, loopback_addr, null, PORT1, PORT1);
ct1.start();
ct2=new TCPConnectionMap("ConnectionMapTest2",
new DefaultThreadFactory(Util.getGlobalThreadGroup(), "ConnectionMapTest", true),
null, dummy, loopback_addr, null, PORT2, PORT2);
ct2.start();
int num_conns;
num_conns=ct1.getNumConnections();
assert num_conns == 0;
num_conns=ct2.getNumConnections();
assert num_conns == 0;
ct1.send(addr2, data, 0, data.length);
ct2.send(addr1, data, 0, data.length);
String msg="ct1: " + ct1 + "\nct2: " + ct2;
System.out.println(msg);
num_conns=ct1.getNumConnections();
assert num_conns == 1 : "num_conns for ct1 is " + num_conns + ", " + msg;
num_conns=ct2.getNumConnections();
assert num_conns == 1 : "num_conns for ct2 is " + num_conns + ", " + msg;
assert ct1.connectionEstablishedTo(addr2) : "valid connection to peer";
assert ct2.connectionEstablishedTo(addr1) : "valid connection to peer";
}
public static void testBlockingQueue() {
final BlockingQueue queue=new LinkedBlockingQueue();
Thread taker=new Thread() {
public void run() {
try {
System.out.println("taking an element from the queue");
queue.take();
System.out.println("clear");
}
catch(InterruptedException e) {
}
}
};
taker.start();
Util.sleep(500);
queue.clear(); // does this release the taker thread ?
Util.interruptAndWaitToDie(taker);
assert !(taker.isAlive()) : "taker: " + taker;
}
public void testStopConnectionMapNoSendQueues() throws Exception {
ct1=new TCPConnectionMap("ConnectionMapTest1",
new DefaultThreadFactory(Util.getGlobalThreadGroup(), "ConnectionMapTest", true),
new DummyReceiver(), loopback_addr, null, PORT1, PORT1, 60000, 120000);
ct1.setUseSendQueues(false);
ct1.start();
ct2=new TCPConnectionMap("ConnectionMapTest2",
new DefaultThreadFactory(Util.getGlobalThreadGroup(), "ConnectionMapTest", true),
new DummyReceiver(), loopback_addr, null, PORT2, PORT2, 60000, 120000);
ct2.setUseSendQueues(false);
ct2.start();
_testStop(ct1, ct2);
}
public void testStopConnectionMapWithSendQueues() throws Exception {
ct1=new TCPConnectionMap("ConnectionMapTest1",
new DefaultThreadFactory(Util.getGlobalThreadGroup(), "ConnectionMapTest", true),
new DummyReceiver(), loopback_addr, null, PORT1, PORT1, 60000, 120000);
ct1.start();
ct2=new TCPConnectionMap("ConnectionMapTest2",
new DefaultThreadFactory(Util.getGlobalThreadGroup(), "ConnectionMapTest", true),
new DummyReceiver(), loopback_addr, null, PORT2, PORT2, 60000, 120000);
ct2.start();
_testStop(ct1, ct2);
}
/* public void testStopConnectionMapNIONoSendQueues() throws Exception {
ct1=new ConnectionTableNIO(new DummyReceiver(), loopback_addr, null, PORT1, PORT1, 60000, 120000, false);
ct1.setUseSendQueues(false);
ct2=new ConnectionTableNIO(new DummyReceiver(), loopback_addr, null, PORT2, PORT2, 60000, 120000, false);
ct2.setUseSendQueues(false);
ct1.start();
ct2.start();
_testStop(ct1, ct2);
}
public void testStopConnectionMapNIOWithSendQueues() throws Exception {
ct1=new ConnectionTableNIO(new DummyReceiver(), loopback_addr, null, PORT1, PORT1, 60000, 120000, false);
ct2=new ConnectionTableNIO(new DummyReceiver(), loopback_addr, null, PORT2, PORT2, 60000, 120000, false);
ct1.start();
ct2.start();
_testStop(ct1, ct2);
}*/
private void _testStop(TCPConnectionMap table1, TCPConnectionMap table2) throws Exception {
table1.send(addr1, data, 0, data.length); // send to self
assert table1.getNumConnections() == 0;
table1.send(addr2, data, 0, data.length); // send to other
table2.send(addr2, data, 0, data.length); // send to self
table2.send(addr1, data, 0, data.length); // send to other
System.out.println("table1:\n" + table1 + "\ntable2:\n" + table2);
int num_conns_table1=table1.getNumConnections(), num_conns_table2=table2.getNumConnections();
assert num_conns_table1 == 1 : "table1 should have 1 connection, but has " + num_conns_table1 + ": " + table1;
assert num_conns_table2 == 1 : "table2 should have 1 connection, but has " + num_conns_table2 + ": " + table2;
table2.stop();
table1.stop();
assert table1.getNumConnections() == 0 : "table1 should have 0 connections: " + table1;
assert table2.getNumConnections() == 0 : "table2 should have 0 connections: " + table2;
}
static class DummyReceiver implements TCPConnectionMap.Receiver {
public void receive(Address sender, byte[] data, int offset, int length) {
System.out.println("-- received " + length + " bytes from " + sender);
}
}
}
|