1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
|
package compbio.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.log4j.Logger;
/**
* A utility to access the content of the file from a specific position while
* the file is being written to from external processes.
*
* @author pvtroshin
* @version 1.0 December 2010
*/
public class FileWatcher {
public static final int MIN_CHUNK_SIZE_BYTES = 256; // 256 byte
public static final int MAX_CHUNK_SIZE_BYTES = 1024 * 1024 * 10; // 10Mb
private static final Charset charset = Charset.forName("UTF-8");
private static final CharsetDecoder decoder = charset.newDecoder();
private static final Logger log = Logger.getLogger(FileWatcher.class);
private long read = 0;
private int chunkSizeBytes;
private FileChannel fc;
private FileWatcher(FileInputStream input, int chunkSizeBytes) {
assert input != null : "Input must be provided!";
assert chunkSizeBytes >= 1;
this.chunkSizeBytes = chunkSizeBytes;
fc = input.getChannel();
}
/**
*
* @param input
* @param chunkSizeBytes
* @return
*/
public static FileWatcher newFileWatcher(String input, int chunkSizeBytes) {
validateInput(input, chunkSizeBytes);
File file = new File(input);
FileInputStream freader = null;
try {
freader = new FileInputStream(file);
} catch (FileNotFoundException e) {
log.error(e.getMessage());
throw new IllegalArgumentException("File " + file.getAbsolutePath()
+ " must exist!");
}
return new FileWatcher(freader, chunkSizeBytes);
}
public static FileWatcher newProgressWatcher(String input) {
if (Util.isEmpty(input)) {
throw new NullPointerException("Input must be provided!");
}
File file = new File(input);
FileInputStream freader = null;
try {
freader = new FileInputStream(file);
} catch (FileNotFoundException e) {
log.error(e.getMessage());
throw new IllegalArgumentException("File " + file.getAbsolutePath()
+ " must exist!");
}
return new FileWatcher(freader, 3);
}
public boolean disconnected() {
return fc == null;
}
public String pull() throws IOException {
if (disconnected()) {
throw new IllegalStateException(
"This FileWatcher has disconnected from the file. Please construct a new one.");
}
/*
* It is legal to still pull even if there is not data in the file given
* the knowledge that it will be there later, thus return empty string
* if there is no data available as yet.
*/
if (!hasMore()) {
return "";
}
// Get the file's size and then map it into memory
long sz = fc.size();
// Could cast safely as chunkSize is in int range
if (chunkSizeBytes > sz) {
chunkSizeBytes = (int) sz;
}
// Could cast safely as chunkSize is in int range
// Allocate Buffer of exact size needed to accommodate the remaining
// data, to make sure that the data transfered is exactly the same
// as in
// the file
// log.debug("Size: " + sz);
// log.debug("Read: " + read);
if (sz < read) {
throw new IllegalStateException(
"File has been reduced during pooling! File size is " + sz
+ ". While " + read
+ " bytes has been already read from this file!");
}
// log.debug("Changing the chunk size to: " + chunkSizeBytes);
return read(read).toString();
}
private String read(long fromPosition) throws IOException {
read = fromPosition;
long sz = fc.size();
if (read > 0 && sz - read < chunkSizeBytes) {
chunkSizeBytes = (int) (sz - read);
}
// log.debug("Changing the chunk size to: " + chunkSizeBytes);
ByteBuffer bf = ByteBuffer.allocateDirect(chunkSizeBytes);
// Set cursor to the last position of the last read operation
fc.position(read);
int length = fc.read(bf);
// Position cursor to the beginning of the data chunk
bf.rewind();
CharBuffer cb = decoder.decode(bf);
// If something was read from file increment the position
if (length > 0) {
read += length;
}
return cb.toString();
}
public String pull(long position) throws IOException {
if (disconnected()) {
throw new IllegalStateException(
"This FileWatcher has disconnected from the file. Please construct a new one.");
}
long size = fc.size();
if (position == size) {
log.debug("Cursor is at the end of the file! "
+ "Nothing is going to be returned as a result of pull oparation!"
+ "Consider checking whether the file has more data (FileWatcher.hasMore()) "
+ "before pulling to avoid waist of resources!");
}
if (position < 0 || position > size) {
throw new IndexOutOfBoundsException(
"Position must be within 0 and file size (" + size
+ ") but given value is " + position);
}
return read(position);
}
public static void validateInput(String input, int chunkSizeBytes) {
if (chunkSizeBytes > MAX_CHUNK_SIZE_BYTES
|| chunkSizeBytes < MIN_CHUNK_SIZE_BYTES) {
throw new IllegalArgumentException(
"chunkSize must be between 256 bytes and 10Mb. But given value is: "
+ chunkSizeBytes);
}
}
public boolean hasMore() throws IOException {
if (disconnected()) {
throw new IllegalStateException(
"This FileWatcher has disconnected from the file. Please construct a new one.");
}
// Ask channel about the size, as it could have been modified from the
// previous read
long size = fc.size();
// log.debug("HasMore read: " + read + " size: " + size);
return read < size;
}
public long getCursorPosition() {
if (disconnected()) {
throw new IllegalStateException(
"This FileWatcher has disconnected from the file. Please construct a new one.");
}
return read;
}
/**
* Disconnect method must be called explicitly, as this class is used to
* read incomplete files (e.g. when writing continues) it is impossible to
* know from within this class where the file was read completely or not.
* Thus it is the responsibility of the caller to call this method.
*/
public void disconnect() {
if (fc != null) {
// Close the channel and the stream
try {
fc.close();
} catch (IOException e) {
log.error(e.getMessage());
} finally {
fc = null;
}
}
}
public byte getProgress() throws IOException {
String progress = get3Chars();
if (Util.isEmpty(progress)) {
return 0;
}
progress = progress.trim();
assert progress.length() <= 3;
return Byte.parseByte(progress);
}
String get3Chars() throws IOException {
if (disconnected()) {
throw new IllegalStateException(
"This FileWatcher has disconnected from the file. Please construct a new one.");
}
if (chunkSizeBytes > 3) {
chunkSizeBytes = 3;
}
return read(0);
}
}
|