1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nonblocking;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
/**
* This doesn't do anything particularly useful - it just writes a series of
* numbers to the response body while demonstrating how to perform non-blocking
* writes.
*/
public class NumberWriter extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setContentType("text/plain");
resp.setCharacterEncoding("UTF-8");
// Non-blocking IO requires async
AsyncContext ac = req.startAsync();
// Use a single listener for read and write. Listeners often need to
// share state to coordinate reads and writes and this is much easier as
// a single object.
@SuppressWarnings("unused")
NumberWriterListener listener = new NumberWriterListener(
ac, req.getInputStream(), resp.getOutputStream());
}
/**
* Keep in mind that each call may well be on a different thread to the
* previous call. Ensure that changes in values will be visible across
* threads. There should only ever be one container thread at a time calling
* the listener.
*/
private static class NumberWriterListener implements ReadListener,
WriteListener {
private static final int LIMIT = 10000;
private final AsyncContext ac;
private final ServletInputStream sis;
private final ServletOutputStream sos;
private final AtomicInteger counter = new AtomicInteger(0);
private volatile boolean readFinished = false;
private byte[] buffer = new byte[8192];
private NumberWriterListener(AsyncContext ac, ServletInputStream sis,
ServletOutputStream sos) {
this.ac = ac;
this.sis = sis;
this.sos = sos;
// In Tomcat, the order the listeners are set controls the order
// that the first calls are made. In this case, the read listener
// will be called before the write listener.
sis.setReadListener(this);
sos.setWriteListener(this);
}
@Override
public void onDataAvailable() throws IOException {
// There should be no data to read
int read = 0;
// Loop as long as there is data to read. If isReady() returns false
// the socket will be added to the poller and onDataAvailable() will
// be called again as soon as there is more data to read.
while (sis.isReady() && read > -1) {
read = sis.read(buffer);
if (read > 0) {
throw new IOException("Data was present in input stream");
}
}
}
@Override
public void onAllDataRead() throws IOException {
readFinished = true;
// If sos is not ready to write data, the call to isReady() will
// register the socket with the poller which will trigger a call to
// onWritePossible() when the socket is ready to have data written
// to it.
if (sos.isReady()) {
onWritePossible();
}
}
@Override
public void onWritePossible() throws IOException {
if (readFinished) {
int i = counter.get();
boolean ready = true;
while (i < LIMIT && ready) {
i = counter.incrementAndGet();
String msg = String.format("%1$020d\n", Integer.valueOf(i));
sos.write(msg.getBytes(StandardCharsets.UTF_8));
ready = sos.isReady();
}
if (i == LIMIT) {
ac.complete();
}
}
}
@Override
public void onError(Throwable throwable) {
// Should probably log the throwable
ac.complete();
}
}
}
|