1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
/**
*
*/
package nokogiri.internals;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import nokogiri.XmlSaxPushParser;
/**
* A smart input stream that signals the caller when a chunk of data is consumed
* from the stream. The main use of this stream is to synchronize the
* {@link XmlSaxPushParser} and the {@link XmlSaxParser} which runs in a
* different thread.
*
* @author John Shahid <jvshahid@gmail.com>
*/
public class NokogiriBlockingQueueInputStream extends InputStream
{
private final LinkedBlockingQueue<Task> queue;
protected Task currentTask;
protected ByteArrayInputStream currentStream;
protected int position;
protected boolean closed = false;
public static final ByteArrayInputStream END = new ByteArrayInputStream(new byte[0]);
private static class Task extends FutureTask<Void>
{
private final ByteArrayInputStream stream;
public
Task(ByteArrayInputStream stream)
{
super(new Callable<Void>() {
@Override
public Void call() throws Exception {
// TODO Auto-generated method stub
return null;
}
});
this.stream = stream;
}
public ByteArrayInputStream
getStream()
{
return stream;
}
@Override
public void
run()
{
// don't do anything
}
@Override
public boolean
runAndReset()
{
// don't do anything
return true;
}
@Override
public void
set(Void v)
{
super.set(v);
}
}
public
NokogiriBlockingQueueInputStream()
{
queue = new LinkedBlockingQueue<Task>();
}
/**
* This method shouldn't be called unless the parser has finished parsing or
* threw an exception while doing so, otherwise, there'll be the protential
* that the read method will block indefinitely.
*/
@Override
public synchronized void
close()
{
closed = true;
List<Task> tasks = new LinkedList<Task>();
queue.drainTo(tasks);
tasks.add(currentTask);
for (Task task : tasks) {
task.set(null);
}
}
/**
* Add @param stream to the end of the queue of data that will be returned by
* {@link #read()} and its variants. The method will @return a future whose
* {@link Future#get()} will block until the data in @param stream is read.
*
* Passing the special stream {@link #END} to this method, will cause
* {@link #read()} to return an eof indicator (i.e. -1) to the caller, after
* all the data inserted before {@link #END} is processed.
*
* @return
*/
public synchronized Future<Void>
addChunk(ByteArrayInputStream stream) throws ClosedStreamException
{
if (closed) {
throw new ClosedStreamException("Cannot add a chunk to a closed stream");
}
Task task = new Task(stream);
queue.add(task);
return task;
}
/*
* (non-Javadoc)
*
* @see java.io.InputStream#read()
*/
@Override
public int
read() throws IOException
{
if (currentTask == null || currentStream.available() == 0)
if (getNextTask() == -1) {
return -1;
}
return currentStream.read();
}
/*
* (non-Javadoc)
*
* @see java.io.InputStream#read(byte[], int, int)
*/
@Override
public int
read(byte[] bytes, int off, int len)
{
if (currentTask == null || currentStream.available() == 0) {
if (getNextTask() == -1) {
currentTask.set(null);
return -1;
}
}
return currentStream.read(bytes, off, len);
}
protected int
getNextTask()
{
while (true) {
try {
if (currentTask != null) {
currentTask.set(null);
}
currentTask = queue.take();
currentStream = currentTask.getStream();
return currentStream.available() == 0 ? -1 : currentStream.available();
} catch (InterruptedException ex) {
// keep retrying to read
}
}
}
}
|