1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
/* Glazed Lists (c) 2003-2006 */
/* http://publicobject.com/glazedlists/ publicobject.com,*/
/* O'Dell Engineering Ltd.*/
package ca.odell.glazedlists.impl.nio;
// NIO is used for CTP
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* An event queue of I/O events and a thread to run them on.
*/
public final class NIODaemon implements Runnable {
/** logging */
private static Logger logger = Logger.getLogger(NIODaemon.class.toString());
/** asynch queue of tasks to execute */
private List pendingRunnables = new ArrayList();
/** the only thread that shall access the network resources of this manager */
private Thread ioThread = null;
/** the selector to awaken when necessary */
private Selector selector;
/** whether the connection manager shall shut down */
private boolean keepRunning = false;
/** whom to handle incoming connections */
private NIOServer server = null;
/**
* Starts the NIODaemon.
*/
public synchronized void start() throws IOException {
// verify we haven't already started
if(ioThread != null) throw new IllegalStateException();
// prepare for non-blocking, selectable IO
selector = Selector.open();
// start handling connections
keepRunning = true;
ioThread = new Thread(this, "GlazedLists nio");
ioThread.start();
}
/**
* Continuously selects a connection which needs servicing and services it.
*/
public void run() {
// the list of runnables to run this iteration
List toExecute = new ArrayList();
// always run the selector handler
SelectAndHandle selectAndHandle = new SelectAndHandle(this);
// continuously select a socket and action on it
while(keepRunning) {
// get the list of runnables to run
synchronized(this) {
toExecute.addAll(pendingRunnables);
toExecute.add(selectAndHandle);
pendingRunnables.clear();
}
// run the runnables
for(Iterator i = toExecute.iterator(); keepRunning && i.hasNext(); ) {
Runnable runnable = (Runnable)i.next();
i.remove();
try {
runnable.run();
} catch(RuntimeException e) {
logger.log(Level.SEVERE, "Failure processing I/O, continuing", e);
}
}
}
// do final clean up of state
synchronized(this) {
pendingRunnables.clear();
selector = null;
ioThread = null;
keepRunning = false;
}
}
/**
* Tests whether this connection manager has started.
*/
public synchronized boolean isRunning() {
return (ioThread != null);
}
/**
* Tests whether the current thread is the network thread.
*/
public synchronized boolean isNetworkThread() {
return Thread.currentThread() == ioThread;
}
/**
* Wake up the CTP thread so that it may process pending events.
*/
private void wakeUp() {
selector.wakeup();
}
/**
* Runs the specified task on the NIODaemon thread.
*/
public void invokeAndWait(Runnable runnable) {
// if the server has not yet been started
if(!isRunning()) throw new IllegalStateException();
// invoke immediately if possible
if(isNetworkThread()) {
runnable.run();
// run on the network thread while waiting on the current thread
} else {
BlockingRunnable blockingRunnable = new BlockingRunnable(runnable);
synchronized(blockingRunnable) {
// start the event
synchronized(this) {
pendingRunnables.add(blockingRunnable);
}
wakeUp();
// wait for it to be completed
try {
blockingRunnable.wait();
} catch(InterruptedException e) {
throw new RuntimeException("Wait interrupted " + e.getMessage());
}
// propagate any RuntimeExceptions
RuntimeException problem = blockingRunnable.getInvocationTargetException();
if(problem != null) throw problem;
}
}
}
/**
* Runs the specified task the next time the NIODaemon thread has a chance.
*/
public void invokeLater(Runnable runnable) {
synchronized(this) {
// if the server has not yet been started
if(!isRunning()) throw new IllegalStateException();
pendingRunnables.add(runnable);
wakeUp();
}
}
/**
* Stops the NIODaemon.
*/
public void stop() {
// shutdown the server
invokeAndWait(new Shutdown(this));
// stop the server
invokeAndWait(new Stop());
}
/**
* Stops the server after it has been shut down.
*/
private class Stop implements Runnable {
public void run() {
// warn if unsatisfied keys remain
if(selector.keys().size() != 0) {
logger.warning("Server stopping with " + selector.keys().size() + " active connections");
} else {
logger.info("Server stopping with " + selector.keys().size() + " active connections");
}
// break out of the server dispatch loop
keepRunning = false;
}
}
/**
* Gets the selector that this NIODaemon manages.
*/
public Selector getSelector() {
return selector;
}
/**
* Configure this NIODaemon to use the specified server handler for acceptable
* selection keys.
*/
public void setServer(NIOServer server) {
this.server = server;
}
public NIOServer getServer() {
return server;
}
}
|