1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
/**
*
*/
package nokogiri.internals;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import nokogiri.XmlSaxPushParser;
/**
* A smart input stream that signals the caller when a chunk of data is consumed
* from the stream. The main use of this stream is to synchronize the
* {@link XmlSaxPushParser} and the {@link XmlSaxParser} which runs in a
* different thread.
*
* @author John Shahid <jvshahid@gmail.com>
*/
public class NokogiriBlockingQueueInputStream extends InputStream {
private final LinkedBlockingQueue<Task> queue;
protected Task currentTask;
protected ByteArrayInputStream currentStream;
protected int position;
protected boolean closed = false;
public static final ByteArrayInputStream END = new ByteArrayInputStream(new byte[0]);
private static class Task extends FutureTask<Void> {
private final ByteArrayInputStream stream;
public Task(ByteArrayInputStream stream) {
super(new Callable<Void>() {
@Override
public Void call() throws Exception {
// TODO Auto-generated method stub
return null;
}
});
this.stream = stream;
}
public ByteArrayInputStream getStream() {
return stream;
}
@Override
public void run() {
// don't do anything
}
@Override
public boolean runAndReset() {
// don't do anything
return true;
}
@Override
public void set(Void v) {
super.set(v);
}
}
public NokogiriBlockingQueueInputStream() {
queue = new LinkedBlockingQueue<Task>();
}
/**
* This method shouldn't be called unless the parser has finished parsing or
* threw an exception while doing so, otherwise, there'll be the protential
* that the read method will block indefinitely.
*/
@Override
public synchronized void close() {
closed = true;
List<Task> tasks = new LinkedList<Task>();
queue.drainTo(tasks);
tasks.add(currentTask);
for (Task task : tasks) {
task.set(null);
}
}
/**
* Add @param stream to the end of the queue of data that will be returned by
* {@link #read()} and its variants. The method will @return a future whose
* {@link Future#get()} will block until the data in @param stream is read.
*
* Passing the special stream {@link #END} to this method, will cause
* {@link #read()} to return an eof indicator (i.e. -1) to the caller, after
* all the data inserted before {@link #END} is processed.
*
* @return
*/
public synchronized Future<Void> addChunk(ByteArrayInputStream stream) throws ClosedStreamException {
if (closed)
throw new ClosedStreamException("Cannot add a chunk to a closed stream");
Task task = new Task(stream);
queue.add(task);
return task;
}
/*
* (non-Javadoc)
*
* @see java.io.InputStream#read()
*/
@Override
public int read() throws IOException {
if (currentTask == null || currentStream.available() == 0)
if (getNextTask() == -1)
return -1;
return currentStream.read();
}
/*
* (non-Javadoc)
*
* @see java.io.InputStream#read(byte[], int, int)
*/
@Override
public int read(byte[] bytes, int off, int len) {
if (currentTask == null || currentStream.available() == 0) {
if (getNextTask() == -1) {
currentTask.set(null);
return -1;
}
}
return currentStream.read(bytes, off, len);
}
protected int getNextTask() {
while (true) {
try {
if (currentTask != null)
currentTask.set(null);
currentTask = queue.take();
currentStream = currentTask.getStream();
return currentStream.available() == 0 ? -1 : currentStream.available();
} catch (InterruptedException ex) {
// keep retrying to read
}
}
}
}
|