File: NIODaemon.java

package info (click to toggle)
libglazedlists-java 1.9.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 3,012 kB
  • sloc: java: 22,561; xml: 940; makefile: 5
file content (208 lines) | stat: -rw-r--r-- 6,440 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/* Glazed Lists                                                 (c) 2003-2006 */
/* http://publicobject.com/glazedlists/                      publicobject.com,*/
/*                                                     O'Dell Engineering Ltd.*/
package ca.odell.glazedlists.impl.nio;

// NIO is used for CTP
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * An event queue of I/O events and a thread to run them on.
 */
public final class NIODaemon implements Runnable {
    
    /** logging */
    private static Logger logger = Logger.getLogger(NIODaemon.class.toString());
    
    /** asynch queue of tasks to execute */
    private List pendingRunnables = new ArrayList();
    
    /** the only thread that shall access the network resources of this manager */
    private Thread ioThread = null;
    
    /** the selector to awaken when necessary */
    private Selector selector;
    
    /** whether the connection manager shall shut down */
    private boolean keepRunning = false;
    
    /** whom to handle incoming connections */
    private NIOServer server = null;
    
    /**
     * Starts the NIODaemon.
     */
    public synchronized void start() throws IOException {
        // verify we haven't already started
        if(ioThread != null) throw new IllegalStateException();
        
        // prepare for non-blocking, selectable IO
        selector = Selector.open();

        // start handling connections
        keepRunning = true;
        ioThread = new Thread(this, "GlazedLists nio");
        ioThread.start();
    }
        
    /**
     * Continuously selects a connection which needs servicing and services it.
     */
    public void run() {
        // the list of runnables to run this iteration
        List toExecute = new ArrayList();
            
        // always run the selector handler
        SelectAndHandle selectAndHandle = new SelectAndHandle(this);

        // continuously select a socket and action on it
        while(keepRunning) {

            // get the list of runnables to run
            synchronized(this) {
                toExecute.addAll(pendingRunnables);
                toExecute.add(selectAndHandle);
                pendingRunnables.clear();
            }
            
            // run the runnables
            for(Iterator i = toExecute.iterator(); keepRunning && i.hasNext(); ) {
                Runnable runnable = (Runnable)i.next();
                i.remove();
                try {
                    runnable.run();
                } catch(RuntimeException e) {
                    logger.log(Level.SEVERE, "Failure processing I/O, continuing", e);
                }
            }
        }
        
        // do final clean up of state
        synchronized(this) {
            pendingRunnables.clear();
            selector = null;
            ioThread = null;
            keepRunning = false;
        }
    }
    
    /**
     * Tests whether this connection manager has started.
     */
    public synchronized boolean isRunning() {
        return (ioThread != null);
    }

    /**
     * Tests whether the current thread is the network thread.
     */
    public synchronized boolean isNetworkThread() {
        return Thread.currentThread() == ioThread;
    }
    
    /**
     * Wake up the CTP thread so that it may process pending events.
     */
    private void wakeUp() {
        selector.wakeup();
    }
    
    /**
     * Runs the specified task on the NIODaemon thread.
     */
    public void invokeAndWait(Runnable runnable) {
        // if the server has not yet been started
        if(!isRunning()) throw new IllegalStateException(); 

        // invoke immediately if possible
        if(isNetworkThread()) {
            runnable.run();

        // run on the network thread while waiting on the current thread
        } else {
            BlockingRunnable blockingRunnable = new BlockingRunnable(runnable);
            synchronized(blockingRunnable) {
                // start the event
                synchronized(this) {
                    pendingRunnables.add(blockingRunnable);
                }
                wakeUp();
                
                // wait for it to be completed
                try {
                    blockingRunnable.wait();
                } catch(InterruptedException e) {
                    throw new RuntimeException("Wait interrupted " + e.getMessage());
                }
                
                // propagate any RuntimeExceptions
                RuntimeException problem = blockingRunnable.getInvocationTargetException();
                if(problem != null) throw problem;
            }
        }
    }
    
    /**
     * Runs the specified task the next time the NIODaemon thread has a chance.
     */
    public void invokeLater(Runnable runnable) {
        synchronized(this) {
            // if the server has not yet been started
            if(!isRunning()) throw new IllegalStateException(); 
            
            pendingRunnables.add(runnable);
            wakeUp();
        }
    }
    
    /**
     * Stops the NIODaemon.
     */
    public void stop() {
        // shutdown the server
        invokeAndWait(new Shutdown(this));
        
        // stop the server
        invokeAndWait(new Stop());
    }
    /**
     * Stops the server after it has been shut down.
     */
    private class Stop implements Runnable {
        public void run() {
            // warn if unsatisfied keys remain
            if(selector.keys().size() != 0) {
                logger.warning("Server stopping with " + selector.keys().size() + " active connections");
            } else {
                logger.info("Server stopping with " + selector.keys().size() + " active connections");
            }
    
            // break out of the server dispatch loop
            keepRunning = false;
        }
    }
    
    /**
     * Gets the selector that this NIODaemon manages.
     */
    public Selector getSelector() {
        return selector;
    }
    
    /**
     * Configure this NIODaemon to use the specified server handler for acceptable
     * selection keys.
     */
    public void setServer(NIOServer server) {
        this.server = server;
    }
    public NIOServer getServer() {
        return server;
    }
}