File: WindowAllocationManager.java

package info (click to toggle)
tomcat11 11.0.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 47,028 kB
  • sloc: java: 366,244; xml: 55,681; jsp: 4,783; sh: 1,304; perl: 324; makefile: 25; ansic: 14
file content (241 lines) | stat: -rw-r--r-- 9,338 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.coyote.http2;

import java.util.concurrent.TimeUnit;

import org.apache.coyote.ActionCode;
import org.apache.coyote.Response;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;

/**
 * Tracks whether the stream is waiting for an allocation to the stream flow control window, to the connection flow
 * control window or not waiting for an allocation and only issues allocation notifications when the stream is known to
 * be waiting for the notification.
 * <p>
 * It is possible for a stream to be waiting for a connection allocation when a stream allocation is made. Therefore
 * this class tracks the type of allocation that the stream is waiting for to ensure that notifications are correctly
 * triggered.
 * <p>
 * With the implementation at the time of writing, it is not possible for a stream to receive an unexpected connection
 * notification as these are only issues to streams in the backlog and a stream must be waiting for a connection
 * allocation in order to be placed on the backlog. However, as a precaution, this class protects against unexpected
 * connection notifications.
 * <p>
 * It is important for asynchronous processing not to notify unless a notification is expected else a dispatch will be
 * performed unnecessarily which may lead to unexpected results.
 * <p>
 * A previous implementation used separate locks for the stream and connection notifications. However, correct handling
 * of allocation waiting requires holding the stream lock when making the decision to wait. Therefore both allocations
 * need to wait on the Stream.
 */
class WindowAllocationManager {

    private static final Log log = LogFactory.getLog(WindowAllocationManager.class);
    private static final StringManager sm = StringManager.getManager(WindowAllocationManager.class);

    private static final int NONE = 0;
    private static final int STREAM = 1;
    private static final int CONNECTION = 2;

    private final Stream stream;

    private int waitingFor = NONE;

    WindowAllocationManager(Stream stream) {
        this.stream = stream;
    }

    void waitForStream(long timeout) throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitFor.stream", stream.getConnectionId(),
                    stream.getIdAsString(), Long.toString(timeout)));
        }

        waitFor(STREAM, timeout);
    }


    void waitForConnection(long timeout) throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitFor.connection", stream.getConnectionId(),
                    stream.getIdAsString(), Integer.toString(stream.getConnectionAllocationRequested()),
                    Long.toString(timeout)));
        }

        waitFor(CONNECTION, timeout);
    }


    void waitForStreamNonBlocking() {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.stream", stream.getConnectionId(),
                    stream.getIdAsString()));
        }

        waitForNonBlocking(STREAM);
    }


    void waitForConnectionNonBlocking() {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.connection", stream.getConnectionId(),
                    stream.getIdAsString()));
        }

        waitForNonBlocking(CONNECTION);
    }


    void notifyStream() {
        notify(STREAM);
    }


    void notifyConnection() {
        notify(CONNECTION);
    }


    void notifyAny() {
        notify(STREAM | CONNECTION);
    }


    boolean isWaitingForStream() {
        return isWaitingFor(STREAM);
    }


    boolean isWaitingForConnection() {
        return isWaitingFor(CONNECTION);
    }


    private boolean isWaitingFor(int waitTarget) {
        stream.windowAllocationLock.lock();
        try {
            return (waitingFor & waitTarget) > 0;
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }


    private void waitFor(int waitTarget, final long timeout) throws InterruptedException {
        stream.windowAllocationLock.lock();
        try {
            if (waitingFor != NONE) {
                throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
                        stream.getConnectionId(), stream.getIdAsString()));
            }

            waitingFor = waitTarget;
            long startNanos = -1;

            // Loop to handle spurious wake-ups
            do {
                if (timeout < 0) {
                    stream.windowAllocationAvailable.await();
                } else {
                    long timeoutRemaining;
                    if (startNanos == -1) {
                        startNanos = System.nanoTime();
                        timeoutRemaining = timeout;
                    } else {
                        long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                        if (elapsedMillis == 0) {
                            elapsedMillis = 1;
                        }
                        timeoutRemaining = timeout - elapsedMillis;
                        if (timeoutRemaining <= 0) {
                            return;
                        }
                    }
                    stream.windowAllocationAvailable.await(timeoutRemaining, TimeUnit.MILLISECONDS);
                }
            } while (waitingFor != NONE);
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }


    private void waitForNonBlocking(int waitTarget) {
        stream.windowAllocationLock.lock();
        try {
            if (waitingFor == NONE) {
                waitingFor = waitTarget;
            } else if (waitingFor == waitTarget) {
                // NO-OP
                // Non-blocking post-processing may attempt to flush
            } else {
                throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
                        stream.getConnectionId(), stream.getIdAsString()));
            }
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }


    private void notify(int notifyTarget) {

        stream.windowAllocationLock.lock();
        try {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("windowAllocationManager.notify", stream.getConnectionId(),
                        stream.getIdAsString(), Integer.toString(waitingFor), Integer.toString(notifyTarget)));
            }

            if ((notifyTarget & waitingFor) > NONE) {
                // Reset this here so multiple notifies (possible with a
                // backlog containing multiple streams and small window updates)
                // are handled correctly (only the first should trigger a call
                // to stream.notify()). Additional notify() calls may trigger
                // unexpected timeouts.
                waitingFor = NONE;
                Response response = stream.getCoyoteResponse();
                if (response != null) {
                    if (response.getWriteListener() == null) {
                        // Blocking, so use notify to release StreamOutputBuffer
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("windowAllocationManager.notified", stream.getConnectionId(),
                                    stream.getIdAsString()));
                        }
                        stream.windowAllocationAvailable.signal();
                    } else {
                        // Non-blocking so dispatch
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("windowAllocationManager.dispatched", stream.getConnectionId(),
                                    stream.getIdAsString()));
                        }
                        response.action(ActionCode.DISPATCH_WRITE, null);
                        // Need to explicitly execute dispatches on the StreamProcessor
                        // as this thread is being processed by an UpgradeProcessor
                        // which won't see this dispatch
                        response.action(ActionCode.DISPATCH_EXECUTE, null);
                    }
                }
            }
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }
}