1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
|
/* Glazed Lists (c) 2003-2006 */
/* http://publicobject.com/glazedlists/ publicobject.com,*/
/* O'Dell Engineering Ltd.*/
package ca.odell.glazedlists.impl.rbp;
// NIO is used for BRP
import ca.odell.glazedlists.impl.ctp.CTPConnection;
import ca.odell.glazedlists.impl.ctp.CTPHandler;
import ca.odell.glazedlists.impl.io.Bufferlo;
import java.text.ParseException;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Models a connection to a local peer.
*
* <p>A connection is multiplexed and serves multiple resources. It contains a map
* of resources being published and resources being subscribed to.
*
* @author <a href="mailto:jesse@swank.ca">Jesse Wilson</a>
*/
class PeerConnection implements CTPHandler {
/** logging */
private static Logger logger = Logger.getLogger(PeerConnection.class.toString());
/** the peer that owns all connections */
private Peer peer;
/** the lower level connection to this peer */
private CTPConnection connection = null;
/** the state of this connection */
private static final int AWAITING_CONNECT = 0;
private static final int READY = 1;
private static final int AWAITING_CLOSE = 2;
private static final int CLOSED = 3;
private int state = AWAITING_CONNECT;
/** the incoming bytes pending a full block */
private Bufferlo currentBlock = new Bufferlo();
/** the outgoing bytes pending a connection */
private Bufferlo pendingConnect = new Bufferlo();
/** locally subscribed resources by resource name */
Map incomingSubscriptions = new TreeMap();
/** locally published resources by resource name */
Map outgoingPublications = new TreeMap();
/**
* Creates a new PeerConnection for the specified peer.
*/
public PeerConnection(Peer peer) {
this.peer = peer;
}
/**
* Handles the connection being ready for chunks to be sent.
*/
public void connectionReady(CTPConnection connection) {
// know where we were before
int priorState = state;
// now that we're connected
this.connection = connection;
this.state = READY;
// handle any pending operations: data
if(pendingConnect.length() > 0) {
connection.sendChunk(pendingConnect);
}
// handle any pending operations: close
if(priorState == AWAITING_CLOSE) {
close();
}
}
/**
* Handles the connection being closed by the remote client. This will also
* be called if there is a connection error, which is the case when a remote
* host sends data that cannot be interpretted by CTPConnection.
*
* @param reason An exception if the connection was closed as the result of
* a failure. This may be null.
*/
public void connectionClosed(CTPConnection source, Exception reason) {
this.connection = null;
this.state = CLOSED;
peer.connections.remove(this);
// notify resources of the close
List resourcesToNotify = new ArrayList();
resourcesToNotify.addAll(incomingSubscriptions.values());
resourcesToNotify.addAll(outgoingPublications.values());
for(Iterator r = resourcesToNotify.iterator(); r.hasNext(); ) {
ResourceConnection resource = (ResourceConnection)r.next();
resource.getResource().connectionClosed(resource, reason);
}
}
/**
* Handles reception of the specified chunk of data. This chunk should be able
* to be cleanly concatenated with the previous and following chunks without
* problem by the reader.
*
* @param data A non-empty ByteBuffer containing the bytes for this chunk. The
* relevant bytes start at data.position() and end at data.limit(). This
* buffer is only valid for the duration of this method call.
*/
public void receiveChunk(CTPConnection source, Bufferlo data) {
// get all the data in the working block
currentBlock.append(data);
// handle all blocks
try {
PeerBlock block = null;
while((block = PeerBlock.fromBytes(currentBlock, source.getLocalHost(), source.getLocalPort())) != null) {
ResourceUri resourceUri = block.getResourceUri();
// get the resource for this connection
ResourceConnection resource = null;
if(block.isSubscribe()) {
resource = new ResourceConnection(this, peer.getPublishedResource(resourceUri));
} else if(block.isUnsubscribe()) {
resource = (ResourceConnection)outgoingPublications.get(resourceUri);
} else if(block.isSubscribeConfirm() || block.isUpdate() || block.isUnpublish()) {
resource = (ResourceConnection)incomingSubscriptions.get(resourceUri);
} else {
throw new UnsupportedOperationException();
}
// handle an unknown resource name
if(resource == null) {
logger.warning("Unknown resource: \"" + resourceUri + "\"");
close();
return;
}
// handle the block
resource.getResource().incomingBlock(resource, block);
}
// if the data is corrupted, close the connection
} catch(ParseException e) {
source.close(e);
// if any other error happened, close the connection
} catch(RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error handling block", e.getMessage());
source.close(e);
}
}
/**
* Test whether this connection is being used by incoming subscriptions or
* outgoing publications.
*/
boolean isIdle() {
return (incomingSubscriptions.isEmpty() && outgoingPublications.isEmpty());
}
/**
* Close this peer connection.
*/
public void close() {
// if we're already done
if(state == CLOSED) {
logger.warning("Closing a closed connection");
return;
}
// close now
state = AWAITING_CLOSE;
if(connection != null) {
connection.close();
peer.connections.remove(this);
}
}
/**
* Writes the specified block to this peer.
*/
public void writeBlock(PeerResource resource, PeerBlock block) {
if(state == AWAITING_CONNECT) {
pendingConnect.append(block.toBytes(null, -1));
} else if(state == READY) {
connection.sendChunk(block.toBytes(connection.getLocalHost(), connection.getLocalPort()));
} else if(state == CLOSED || state == AWAITING_CLOSE) {
logger.warning("Write block to closed connection: " + this);
} else {
throw new IllegalStateException();
}
}
/**
* Gets this connection as a String.
*/
@Override
public String toString() {
if(state == AWAITING_CONNECT) return "pending";
else if(state == READY) return connection.toString();
else if(state == CLOSED) return "closed";
else if(state == AWAITING_CLOSE) return "closing";
else throw new IllegalStateException();
}
/**
* Prints the current state of this connection.
*/
void print() {
System.out.print(this);
System.out.print(": ");
System.out.print("Incoming {");
for(Iterator s = incomingSubscriptions.keySet().iterator(); s.hasNext(); ) {
ResourceUri resourceUri = (ResourceUri)s.next();
System.out.print(resourceUri);
if(s.hasNext()) System.out.print(", ");
}
System.out.print("}, ");
System.out.print("Outgoing {");
for(Iterator s = outgoingPublications.keySet().iterator(); s.hasNext(); ) {
ResourceUri resourceUri = (ResourceUri)s.next();
System.out.print(resourceUri);
if(s.hasNext()) System.out.print(", ");
}
System.out.println("}");
}
}
|