File: WsRemoteEndpointImplServer.java

package info (click to toggle)
tomcat9 9.0.95-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 45,952 kB
  • sloc: java: 371,527; xml: 67,915; jsp: 4,588; sh: 1,225; perl: 314; makefile: 18; ansic: 15
file content (404 lines) | stat: -rw-r--r-- 16,865 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.websocket.server;

import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.servlet.http.WebConnection;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;

import org.apache.coyote.http11.upgrade.UpgradeInfo;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.websocket.Constants;
import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsRemoteEndpointImplBase;

/**
 * This is the server side {@link javax.websocket.RemoteEndpoint} implementation - i.e. what the server uses to send
 * data to the client.
 */
public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {

    private static final StringManager sm = StringManager.getManager(WsRemoteEndpointImplServer.class);
    private final Log log = LogFactory.getLog(WsRemoteEndpointImplServer.class); // must not be static

    private final SocketWrapperBase<?> socketWrapper;
    private final UpgradeInfo upgradeInfo;
    private final WebConnection connection;
    private final WsWriteTimeout wsWriteTimeout;
    private volatile SendHandler handler = null;
    private volatile ByteBuffer[] buffers = null;

    private volatile long timeoutExpiry = -1;

    public WsRemoteEndpointImplServer(SocketWrapperBase<?> socketWrapper, UpgradeInfo upgradeInfo,
            WsServerContainer serverContainer, WebConnection connection) {
        this.socketWrapper = socketWrapper;
        this.upgradeInfo = upgradeInfo;
        this.connection = connection;
        this.wsWriteTimeout = serverContainer.getTimeout();
    }


    @Override
    protected final boolean isMasked() {
        return false;
    }


    /**
     * {@inheritDoc}
     * <p>
     * The close message is a special case. It needs to be blocking else implementing the clean-up that follows the
     * sending of the close message gets a lot more complicated. On the server, this creates additional complications as
     * a dead-lock may occur in the following scenario:
     * <ol>
     * <li>Application thread writes message using non-blocking</li>
     * <li>Write does not complete (write logic holds message pending lock)</li>
     * <li>Socket is added to poller (or equivalent) for write
     * <li>Client sends close message</li>
     * <li>Container processes received close message and tries to send close message in response</li>
     * <li>Container holds socket lock and is blocked waiting for message pending lock</li>
     * <li>Poller fires write possible event for socket</li>
     * <li>Container tries to process write possible event but is blocked waiting for socket lock</li>
     * <li>Processing of the WebSocket connection is dead-locked until the original message write times out</li>
     * </ol>
     * The purpose of this method is to break the above dead-lock. It does this by returning control of the processor to
     * the socket wrapper and releasing the socket lock while waiting for the pending message write to complete.
     * Normally, that would be a terrible idea as it creates the possibility that the processor is returned to the pool
     * more than once under various error conditions. In this instance it is safe because these are upgrade processors
     * (isUpgrade() returns {@code true}) and upgrade processors are never pooled.
     * <p>
     * TODO: Despite the complications it creates, it would be worth exploring the possibility of processing a received
     * close frame in a non-blocking manner.
     */
    @Override
    protected boolean acquireMessagePartInProgressSemaphore(byte opCode, long timeoutExpiry)
            throws InterruptedException {

        /*
         * Special handling is required only when all of the following are true:
         * - A close message is being sent
         * - This thread currently holds the socketWrapper lock (i.e. the thread is current processing a socket event)
         *
         * Special handling is only possible if the socketWrapper lock is a ReentrantLock (it will be by default)
         */
        if (socketWrapper.getLock() instanceof ReentrantLock) {
            ReentrantLock reentrantLock = (ReentrantLock) socketWrapper.getLock();
            if (opCode == Constants.OPCODE_CLOSE && reentrantLock.isHeldByCurrentThread()) {
                int socketWrapperLockCount = reentrantLock.getHoldCount();

                while (!messagePartInProgress.tryAcquire()) {
                    if (timeoutExpiry < System.currentTimeMillis()) {
                        return false;
                    }
                    try {
                        // Release control of the processor
                        socketWrapper.setCurrentProcessor(connection);
                        // Release the per socket lock(s)
                        for (int i = 0; i < socketWrapperLockCount; i++) {
                            socketWrapper.getLock().unlock();
                        }
                        // Provide opportunity for another thread to obtain the socketWrapper lock
                        Thread.yield();
                    } finally {
                        // Re-obtain the per socket lock(s)
                        for (int i = 0; i < socketWrapperLockCount; i++) {
                            socketWrapper.getLock().lock();
                        }
                        // Re-take control of the processor
                        socketWrapper.takeCurrentProcessor();
                    }
                }

                return true;
            }
        }

        // Skip special handling
        return super.acquireMessagePartInProgressSemaphore(opCode, timeoutExpiry);
    }


    @Override
    protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, ByteBuffer... buffers) {
        if (socketWrapper.hasAsyncIO()) {
            final boolean block = (blockingWriteTimeoutExpiry != -1);
            long timeout = -1;
            if (block) {
                timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
                if (timeout <= 0) {
                    SendResult sr = new SendResult(new SocketTimeoutException());
                    handler.onResult(sr);
                    return;
                }
            } else {
                this.handler = handler;
                timeout = getSendTimeout();
                if (timeout > 0) {
                    // Register with timeout thread
                    timeoutExpiry = timeout + System.currentTimeMillis();
                    wsWriteTimeout.register(this);
                }
            }
            socketWrapper.write(block ? BlockingMode.BLOCK : BlockingMode.SEMI_BLOCK, timeout, TimeUnit.MILLISECONDS,
                    null, SocketWrapperBase.COMPLETE_WRITE_WITH_COMPLETION, new CompletionHandler<Long, Void>() {
                        @Override
                        public void completed(Long result, Void attachment) {
                            if (block) {
                                long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
                                if (timeout <= 0) {
                                    failed(new SocketTimeoutException(), null);
                                } else {
                                    handler.onResult(SENDRESULT_OK);
                                }
                            } else {
                                wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
                                clearHandler(null, true);
                            }
                        }

                        @Override
                        public void failed(Throwable exc, Void attachment) {
                            if (block) {
                                SendResult sr = new SendResult(exc);
                                handler.onResult(sr);
                            } else {
                                wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
                                clearHandler(exc, true);
                                close();
                            }
                        }
                    }, buffers);
        } else {
            if (blockingWriteTimeoutExpiry == -1) {
                this.handler = handler;
                this.buffers = buffers;
                // This is definitely the same thread that triggered the write so a
                // dispatch will be required.
                onWritePossible(true);
            } else {
                // Blocking
                try {
                    for (ByteBuffer buffer : buffers) {
                        long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
                        if (timeout <= 0) {
                            SendResult sr = new SendResult(new SocketTimeoutException());
                            handler.onResult(sr);
                            return;
                        }
                        socketWrapper.setWriteTimeout(timeout);
                        socketWrapper.write(true, buffer);
                    }
                    long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
                    if (timeout <= 0) {
                        SendResult sr = new SendResult(new SocketTimeoutException());
                        handler.onResult(sr);
                        return;
                    }
                    socketWrapper.setWriteTimeout(timeout);
                    socketWrapper.flush(true);
                    handler.onResult(SENDRESULT_OK);
                } catch (IOException e) {
                    SendResult sr = new SendResult(e);
                    handler.onResult(sr);
                }
            }
        }
    }


    @Override
    protected void updateStats(long payloadLength) {
        upgradeInfo.addMsgsSent(1);
        upgradeInfo.addBytesSent(payloadLength);
    }


    public void onWritePossible(boolean useDispatch) {
        // Note: Unused for async IO
        ByteBuffer[] buffers = this.buffers;
        if (buffers == null) {
            // Servlet 3.1 will call the write listener once even if nothing
            // was written
            return;
        }
        boolean complete = false;
        try {
            socketWrapper.flush(false);
            // If this is false there will be a call back when it is true
            while (socketWrapper.isReadyForWrite()) {
                complete = true;
                for (ByteBuffer buffer : buffers) {
                    if (buffer.hasRemaining()) {
                        complete = false;
                        socketWrapper.write(false, buffer);
                        break;
                    }
                }
                if (complete) {
                    socketWrapper.flush(false);
                    complete = socketWrapper.isReadyForWrite();
                    if (complete) {
                        wsWriteTimeout.unregister(this);
                        clearHandler(null, useDispatch);
                    }
                    break;
                }
            }
        } catch (IOException | IllegalStateException e) {
            wsWriteTimeout.unregister(this);
            clearHandler(e, useDispatch);
            close();
        }

        if (!complete) {
            // Async write is in progress
            long timeout = getSendTimeout();
            if (timeout > 0) {
                // Register with timeout thread
                timeoutExpiry = timeout + System.currentTimeMillis();
                wsWriteTimeout.register(this);
            }
        }
    }


    @Override
    protected void doClose() {
        if (handler != null) {
            // close() can be triggered by a wide range of scenarios. It is far
            // simpler just to always use a dispatch than it is to try and track
            // whether or not this method was called by the same thread that
            // triggered the write
            clearHandler(new EOFException(), true);
        }
        try {
            socketWrapper.close();
        } catch (Exception e) {
            if (log.isInfoEnabled()) {
                log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), e);
            }
        }
        wsWriteTimeout.unregister(this);
    }


    protected long getTimeoutExpiry() {
        return timeoutExpiry;
    }


    /*
     * Currently this is only called from the background thread so we could just call clearHandler() with useDispatch ==
     * false but the method parameter was added in case other callers started to use this method to make sure that those
     * callers think through what the correct value of useDispatch is for them.
     */
    protected void onTimeout(boolean useDispatch) {
        if (handler != null) {
            clearHandler(new SocketTimeoutException(), useDispatch);
        }
        close();
    }


    @Override
    protected void setTransformation(Transformation transformation) {
        // Overridden purely so it is visible to other classes in this package
        super.setTransformation(transformation);
    }


    /**
     * @param t           The throwable associated with any error that occurred
     * @param useDispatch Should {@link SendHandler#onResult(SendResult)} be called from a new thread, keeping in mind
     *                        the requirements of {@link javax.websocket.RemoteEndpoint.Async}
     */
    void clearHandler(Throwable t, boolean useDispatch) {
        // Setting the result marks this (partial) message as
        // complete which means the next one may be sent which
        // could update the value of the handler. Therefore, keep a
        // local copy before signalling the end of the (partial)
        // message.
        SendHandler sh = handler;
        handler = null;
        buffers = null;
        if (sh != null) {
            if (useDispatch) {
                OnResultRunnable r = new OnResultRunnable(sh, t);
                try {
                    socketWrapper.execute(r);
                } catch (RejectedExecutionException ree) {
                    // Can't use the executor so call the runnable directly.
                    // This may not be strictly specification compliant in all
                    // cases but during shutdown only close messages are going
                    // to be sent so there should not be the issue of nested
                    // calls leading to stack overflow as described in bug
                    // 55715. The issues with nested calls was the reason for
                    // the separate thread requirement in the specification.
                    r.run();
                }
            } else {
                if (t == null) {
                    sh.onResult(new SendResult());
                } else {
                    sh.onResult(new SendResult(t));
                }
            }
        }
    }


    @Override
    protected Lock getLock() {
        return socketWrapper.getLock();
    }


    private static class OnResultRunnable implements Runnable {

        private final SendHandler sh;
        private final Throwable t;

        private OnResultRunnable(SendHandler sh, Throwable t) {
            this.sh = sh;
            this.t = t;
        }

        @Override
        public void run() {
            if (t == null) {
                sh.onResult(new SendResult());
            } else {
                sh.onResult(new SendResult(t));
            }
        }
    }
}