File: PeerConnection.java

package info (click to toggle)
libglazedlists-java 1.9.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 3,024 kB
  • ctags: 4,252
  • sloc: java: 22,561; xml: 818; sh: 51; makefile: 5
file content (231 lines) | stat: -rw-r--r-- 8,408 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
/* Glazed Lists                                                 (c) 2003-2006 */
/* http://publicobject.com/glazedlists/                      publicobject.com,*/
/*                                                     O'Dell Engineering Ltd.*/
package ca.odell.glazedlists.impl.rbp;

// NIO is used for BRP
import ca.odell.glazedlists.impl.ctp.CTPConnection;
import ca.odell.glazedlists.impl.ctp.CTPHandler;
import ca.odell.glazedlists.impl.io.Bufferlo;

import java.text.ParseException;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;


/**
 * Models a connection to a local peer.
 *
 * <p>A connection is multiplexed and serves multiple resources. It contains a map
 * of resources being published and resources being subscribed to.
 *
 * @author <a href="mailto:jesse@swank.ca">Jesse Wilson</a>
 */
class PeerConnection implements CTPHandler {
    
    /** logging */
    private static Logger logger = Logger.getLogger(PeerConnection.class.toString());

    /** the peer that owns all connections */
    private Peer peer;
    
    /** the lower level connection to this peer */
    private CTPConnection connection = null;

    /** the state of this connection */
    private static final int AWAITING_CONNECT = 0;
    private static final int READY = 1;
    private static final int AWAITING_CLOSE = 2;
    private static final int CLOSED = 3;
    private int state = AWAITING_CONNECT;

    /** the incoming bytes pending a full block */
    private Bufferlo currentBlock = new Bufferlo();

    /** the outgoing bytes pending a connection */
    private Bufferlo pendingConnect = new Bufferlo();

    /** locally subscribed resources by resource name */
    Map incomingSubscriptions = new TreeMap();

    /** locally published resources by resource name */
    Map outgoingPublications = new TreeMap();

    /**
     * Creates a new PeerConnection for the specified peer.
     */
    public PeerConnection(Peer peer) {
        this.peer = peer;
    }

    /**
     * Handles the connection being ready for chunks to be sent.
     */
    public void connectionReady(CTPConnection connection) {
        // know where we were before
        int priorState = state;
        
        // now that we're connected
        this.connection = connection;
        this.state = READY;
        
        // handle any pending operations: data
        if(pendingConnect.length() > 0) {
            connection.sendChunk(pendingConnect);
        }
        // handle any pending operations: close
        if(priorState == AWAITING_CLOSE) {
            close();
        }
    }

    /**
     * Handles the connection being closed by the remote client. This will also
     * be called if there is a connection error, which is the case when a remote
     * host sends data that cannot be interpretted by CTPConnection.
     *
     * @param reason An exception if the connection was closed as the result of
     *      a failure. This may be null.
     */
    public void connectionClosed(CTPConnection source, Exception reason) {
        this.connection = null;
        this.state = CLOSED;
        peer.connections.remove(this);
        
        // notify resources of the close
        List resourcesToNotify = new ArrayList();
        resourcesToNotify.addAll(incomingSubscriptions.values());
        resourcesToNotify.addAll(outgoingPublications.values());
        for(Iterator r = resourcesToNotify.iterator(); r.hasNext(); ) {
            ResourceConnection resource = (ResourceConnection)r.next();
            resource.getResource().connectionClosed(resource, reason);
        }
    }

    /**
     * Handles reception of the specified chunk of data. This chunk should be able
     * to be cleanly concatenated with the previous and following chunks without
     * problem by the reader.
     *
     * @param data A non-empty ByteBuffer containing the bytes for this chunk. The
     *      relevant bytes start at data.position() and end at data.limit(). This
     *      buffer is only valid for the duration of this method call.
     */
    public void receiveChunk(CTPConnection source, Bufferlo data) {
        // get all the data in the working block
        currentBlock.append(data);
        
        // handle all blocks
        try {
            PeerBlock block = null;
            while((block = PeerBlock.fromBytes(currentBlock, source.getLocalHost(), source.getLocalPort())) != null) {
                ResourceUri resourceUri = block.getResourceUri();
                
                // get the resource for this connection
                ResourceConnection resource = null;
                if(block.isSubscribe()) {
                    resource = new ResourceConnection(this, peer.getPublishedResource(resourceUri));
                } else if(block.isUnsubscribe()) {
                    resource = (ResourceConnection)outgoingPublications.get(resourceUri);
                } else if(block.isSubscribeConfirm() || block.isUpdate() || block.isUnpublish()) {
                    resource = (ResourceConnection)incomingSubscriptions.get(resourceUri);
                } else {
                    throw new UnsupportedOperationException();
                }
                
                // handle an unknown resource name
                if(resource == null) {
                    logger.warning("Unknown resource: \"" + resourceUri + "\"");
                    close();
                    return;
                }
                
                // handle the block
                resource.getResource().incomingBlock(resource, block);
            }
        // if the data is corrupted, close the connection
        } catch(ParseException e) {
            source.close(e);
        // if any other error happened, close the connection
        } catch(RuntimeException e) {
            logger.log(Level.WARNING, "Unexpected error handling block", e.getMessage());
            source.close(e);
        }
    }
    
    /**
     * Test whether this connection is being used by incoming subscriptions or
     * outgoing publications.
     */
    boolean isIdle() {
        return (incomingSubscriptions.isEmpty() && outgoingPublications.isEmpty());
    }

    /**
     * Close this peer connection.
     */
    public void close() {
        // if we're already done
        if(state == CLOSED) {
            logger.warning("Closing a closed connection");
            return;
        }
        
        // close now
        state = AWAITING_CLOSE;
        if(connection != null) {
            connection.close();
            peer.connections.remove(this);
        }
    }
    
    /**
     * Writes the specified block to this peer.
     */
    public void writeBlock(PeerResource resource, PeerBlock block) {
        if(state == AWAITING_CONNECT) {
            pendingConnect.append(block.toBytes(null, -1));
        } else if(state == READY) {
            connection.sendChunk(block.toBytes(connection.getLocalHost(), connection.getLocalPort()));
        } else if(state == CLOSED || state == AWAITING_CLOSE) {
            logger.warning("Write block to closed connection: " + this);
        } else {
            throw new IllegalStateException();
        }
    }
    
    /**
     * Gets this connection as a String.
     */
    @Override
    public String toString() {
        if(state == AWAITING_CONNECT) return "pending";
        else if(state == READY) return connection.toString();
        else if(state == CLOSED) return "closed";
        else if(state == AWAITING_CLOSE) return "closing";
        else throw new IllegalStateException();
    }
    
    /**
     * Prints the current state of this connection.
     */
    void print() {
        System.out.print(this);
        System.out.print(": ");
        System.out.print("Incoming {");
        for(Iterator s = incomingSubscriptions.keySet().iterator(); s.hasNext(); ) {
            ResourceUri resourceUri = (ResourceUri)s.next();
            System.out.print(resourceUri);
            if(s.hasNext()) System.out.print(", ");
        }
        System.out.print("}, ");
        System.out.print("Outgoing {");
        for(Iterator s = outgoingPublications.keySet().iterator(); s.hasNext(); ) {
            ResourceUri resourceUri = (ResourceUri)s.next();
            System.out.print(resourceUri);
            if(s.hasNext()) System.out.print(", ");
        }
        System.out.println("}");
    }
}