1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
|
package uk.ac.bristol.star.cdf.record;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Abstract Buf implementation that divides the byte sequence into one
* or more contiguous data banks.
* Each bank contains a run of bytes short enough to be indexed by
* a 4-byte integer.
*
* @author Mark Taylor
* @since 18 Jun 2013
*/
public abstract class BankBuf implements Buf {
private final long size_;
private boolean isBit64_;
private boolean isBigendian_;
private static final Logger logger_ =
Logger.getLogger( BankBuf.class.getName() );
/**
* Constructor.
*
* @param size total size of buffer
* @param isBit64 64bit-ness of buf
* @param isBigendian true for big-endian data, false for little-endian
*/
protected BankBuf( long size, boolean isBit64, boolean isBigendian ) {
size_ = size;
isBit64_ = isBit64;
isBigendian_ = isBigendian;
}
/**
* Returns the bank which can read a given number of bytes starting
* at the given offset.
*
* <p>Implementation: in most cases this will return one of the
* large banks that this object has allocated.
* However, in the case that the requested run straddles a bank
* boundary it may be necessary to generate a short-lived bank
* just to return from this method.
*
* @param offset start of required sequence
* @param count number of bytes in required sequence
* @return bank
*/
protected abstract Bank getBank( long offset, int count )
throws IOException;
/**
* Returns a list of active banks. Banks which have not been
* created yet do not need to be included.
*/
protected abstract List<Bank> getExistingBanks();
/**
* Returns an iterator over banks starting with the one containing
* the given offset.
* If followed to the end, the returned sequence
* will go all the way to the end of the buf.
* The first bank does not need to start at the
* given offset, only to contain it.
*
* @param offset starting byte offset into buf
* @return iterator over data banks
*/
protected abstract Iterator<Bank> getBankIterator( long offset );
public long getLength() {
return size_;
}
public int readUnsignedByte( Pointer ptr ) throws IOException {
long pos = ptr.getAndIncrement( 1 );
Bank bank = getBank( pos, 1 );
return bank.byteBuffer_.get( bank.adjust( pos ) ) & 0xff;
}
public int readInt( Pointer ptr ) throws IOException {
long pos = ptr.getAndIncrement( 4 );
Bank bank = getBank( pos, 4 );
return bank.byteBuffer_.getInt( bank.adjust( pos ) );
}
public long readOffset( Pointer ptr ) throws IOException {
int nbyte = isBit64_ ? 8 : 4;
long pos = ptr.getAndIncrement( nbyte );
Bank bank = getBank( pos, nbyte );
int apos = bank.adjust( pos );
return isBit64_ ? bank.byteBuffer_.getLong( apos )
: (long) bank.byteBuffer_.getInt( apos );
}
public String readAsciiString( Pointer ptr, int nbyte ) throws IOException {
long offset = ptr.getAndIncrement( nbyte );
Bank bank = getBank( offset, nbyte );
return Bufs.readAsciiString( bank.byteBuffer_, bank.adjust( offset ),
nbyte );
}
public synchronized void setBit64( boolean isBit64 ) {
isBit64_ = isBit64;
}
public boolean isBit64() {
return isBit64_;
}
public synchronized void setEncoding( boolean bigend ) {
isBigendian_ = bigend;
for ( Bank bank : getExistingBanks() ) {
bank.setEncoding( isBigendian_ );
}
}
public boolean isBigendian() {
return isBigendian_;
}
public void readDataBytes( long offset, int count, byte[] array )
throws IOException {
Bank bank = getBank( offset, count );
Bufs.readBytes( bank.dataBuffer_, bank.adjust( offset ), count, array );
}
public void readDataShorts( long offset, int count, short[] array )
throws IOException {
Bank bank = getBank( offset, count * 2 );
Bufs.readShorts( bank.dataBuffer_, bank.adjust( offset ),
count, array );
}
public void readDataInts( long offset, int count, int[] array )
throws IOException {
Bank bank = getBank( offset, count * 4 );
Bufs.readInts( bank.dataBuffer_, bank.adjust( offset ), count, array );
}
public void readDataLongs( long offset, int count, long[] array )
throws IOException {
Bank bank = getBank( offset, count * 8 );
Bufs.readLongs( bank.dataBuffer_, bank.adjust( offset ), count, array );
}
public void readDataFloats( long offset, int count, float[] array )
throws IOException {
Bank bank = getBank( offset, count * 4 );
Bufs.readFloats( bank.dataBuffer_, bank.adjust( offset ),
count, array );
}
public void readDataDoubles( long offset, int count, double[] array )
throws IOException {
Bank bank = getBank( offset, count * 8 );
Bufs.readDoubles( bank.dataBuffer_, bank.adjust( offset ),
count, array );
}
public InputStream createInputStream( final long offset ) {
final Iterator<Bank> bankIt = getBankIterator( offset );
Enumeration<InputStream> inEn = new Enumeration<InputStream>() {
boolean isFirst = true;
public boolean hasMoreElements() {
return bankIt.hasNext();
}
public InputStream nextElement() {
Bank bank = bankIt.next();
ByteBuffer bbuf = bank.byteBuffer_.duplicate();
bbuf.position( isFirst ? bank.adjust( offset ) : 0 );
isFirst = false;
return Bufs.createByteBufferInputStream( bbuf );
}
};
return new SequenceInputStream( inEn );
}
public Buf fillNewBuf( long count, InputStream in ) throws IOException {
return count <= Integer.MAX_VALUE
? fillNewSingleBuf( (int) count, in )
: fillNewMultiBuf( count, in );
}
/**
* Implementation of fillNewBuf that works for small (<2^31-byte)
* byte sequences.
*
* @param count size of new buffer in bytes
* @param in input stream containing byte sequence
* @return buffer containing stream content
*/
private Buf fillNewSingleBuf( int count, InputStream in )
throws IOException {
// Memory is allocated outside of the JVM heap.
ByteBuffer bbuf = ByteBuffer.allocateDirect( count );
ReadableByteChannel chan = Channels.newChannel( in );
while ( count > 0 ) {
int nr = chan.read( bbuf );
if ( nr < 0 ) {
throw new EOFException();
}
else {
count -= nr;
}
}
return Bufs.createBuf( bbuf, isBit64_, isBigendian_ );
}
/**
* Implementation of fillNewBuf that uses multiple ByteBuffers to
* cope with large (>2^31-byte) byte sequences.
*
* @param count size of new buffer in bytes
* @param in input stream containing byte sequence
* @return buffer containing stream content
*/
private Buf fillNewMultiBuf( long count, InputStream in )
throws IOException {
// Writes data to a temporary file.
File file = File.createTempFile( "cdfbuf", ".bin" );
file.deleteOnExit();
int bufsiz = 64 * 1024;
byte[] buf = new byte[ bufsiz ];
OutputStream out = new FileOutputStream( file );
while ( count > 0 ) {
int nr = in.read( buf );
out.write( buf, 0, nr );
count -= nr;
}
out.close();
return Bufs.createBuf( file, isBit64_, isBigendian_ );
}
/**
* Returns a BankBuf based on a single supplied ByteBuffer.
*
* @param byteBuffer NIO buffer containing data
* @param isBit64 64bit-ness of buf
* @param isBigendian true for big-endian data, false for little-endian
* @return new buf
*/
public static BankBuf createSingleBankBuf( ByteBuffer byteBuffer,
boolean isBit64,
boolean isBigendian ) {
return new SingleBankBuf( byteBuffer, isBit64, isBigendian );
}
/**
* Returns a BankBuf based on an array of supplied ByteBuffers.
*
* @param byteBuffers NIO buffers containing data (when concatenated)
* @param isBit64 64bit-ness of buf
* @param isBigendian true for big-endian data, false for little-endian
* @return new buf
*/
public static BankBuf createMultiBankBuf( ByteBuffer[] byteBuffers,
boolean isBit64,
boolean isBigendian ) {
return new PreMultiBankBuf( byteBuffers, isBit64, isBigendian );
}
/**
* Returns a BankBuf based on supplied file channel.
*
* @param channel readable file containing data
* @param size number of bytes in channel
* @param bankSize maximum size for individual data banks
* @param isBit64 64bit-ness of buf
* @param isBigendian true for big-endian data, false for little-endian
* @return new buf
*/
public static BankBuf createMultiBankBuf( FileChannel channel, long size,
int bankSize, boolean isBit64,
boolean isBigendian ) {
return new LazyMultiBankBuf( channel, size, bankSize,
isBit64, isBigendian );
}
/**
* BankBuf implementation based on a single NIO buffer.
*/
private static class SingleBankBuf extends BankBuf {
private final Bank bank_;
/**
* Constructor.
*
* @param byteBuffer NIO buffer containing data
* @param isBit64 64bit-ness of buf
* @param isBigendian true for big-endian data,
* false for little-endian
*/
SingleBankBuf( ByteBuffer byteBuffer, boolean isBit64,
boolean isBigendian ) {
super( byteBuffer.capacity(), isBit64, isBigendian );
bank_ = new Bank( byteBuffer, 0, isBigendian );
}
public Bank getBank( long offset, int count ) {
return bank_;
}
public List<Bank> getExistingBanks() {
return Collections.singletonList( bank_ );
}
public Iterator<Bank> getBankIterator( long offset ) {
return Collections.singletonList( bank_ ).iterator();
}
}
/**
* BankBuf implementation based on a supplied array of NIO buffers
* representing contiguous subsequences of the data.
*/
private static class PreMultiBankBuf extends BankBuf {
private final Bank[] banks_;
private final long[] starts_;
private final long[] ends_;
private int iCurrentBank_;
/**
* Constructor.
*
* @param byteBuffers NIO buffers containing data (when concatenated)
* @param isBit64 64bit-ness of buf
* @param isBigendian true for big-endian data,
* false for little-endian
*/
PreMultiBankBuf( ByteBuffer[] byteBuffers,
boolean isBit64, boolean isBigendian ) {
super( sumSizes( byteBuffers ), isBit64, isBigendian );
int nbank = byteBuffers.length;
banks_ = new Bank[ nbank ];
starts_ = new long[ nbank ];
ends_ = new long[ nbank ];
long pos = 0L;
for ( int ibank = 0; ibank < nbank; ibank++ ) {
ByteBuffer byteBuffer = byteBuffers[ ibank ];
banks_[ ibank ] = new Bank( byteBuffer, pos, isBigendian );
starts_[ ibank ] = pos;
pos += byteBuffer.capacity();
ends_[ ibank ] = pos;
}
iCurrentBank_ = 0;
}
protected Bank getBank( long offset, int count ) {
// This is not synchronized, which means that the value of
// iCurrentBank_ might be out of date (have been updated by
// another thread). It's OK not to defend against that,
// since the out-of-date value would effectively just give
// us a thread-local cached value, which is in fact an
// advantage rather than otherwise.
int ibank = iCurrentBank_;
// Test if the most recently-used value is still correct
// (usually it will be) and return it if so.
if ( offset >= starts_[ ibank ] &&
offset + count <= ends_[ ibank ] ) {
return banks_[ ibank ];
}
// Otherwise, find the bank corresponding to the requested offset.
else {
ibank = -1;
for ( int ib = 0; ib < banks_.length; ib++ ) {
if ( offset >= starts_[ ib ] && offset < ends_[ ib ] ) {
ibank = ib;
break;
}
}
// Update the cached value.
iCurrentBank_ = ibank;
// If it contains the whole requested run, return it.
if ( offset + count <= ends_[ ibank ] ) {
return banks_[ ibank ];
}
// Otherwise, the requested region straddles multiple banks.
// This should be a fairly unusual occurrence.
// Build a temporary bank to satisfy the request and return it.
else {
byte[] tmp = new byte[ count ];
int bankOff = (int) ( offset - starts_[ ibank ] );
int tmpOff = 0;
int n = (int) ( ends_[ ibank ] - offset );
while ( count > 0 ) {
ByteBuffer bbuf = banks_[ ibank ].byteBuffer_;
synchronized ( bbuf ) {
bbuf.position( bankOff );
bbuf.get( tmp, tmpOff, n );
}
count -= n;
tmpOff += n;
bankOff = 0;
ibank++;
n = (int) Math.min( count,
ends_[ ibank ] - starts_[ ibank ] );
}
return new Bank( ByteBuffer.wrap( tmp ), offset,
isBigendian() );
}
}
}
public List<Bank> getExistingBanks() {
return Arrays.asList( banks_ );
}
public Iterator<Bank> getBankIterator( final long offset ) {
Iterator<Bank> it = Arrays.asList( banks_ ).iterator();
for ( int ib = 0; ib < banks_.length; ib++ ) {
if ( offset >= starts_[ ib ] ) {
return it;
}
it.next();
}
return it; // empty
}
/**
* Returns the sum of the sizes of all the elements of a supplied array
* of NIO buffers.
*
* @param byteBuffers buffer array
* @return number of bytes in concatenation of all buffers
*/
private static long sumSizes( ByteBuffer[] byteBuffers ) {
long size = 0;
for ( int i = 0; i < byteBuffers.length; i++ ) {
size += byteBuffers[ i ].capacity();
}
return size;
}
}
/**
* BankBuf implementation that uses multiple data banks,
* but constructs (maps) them lazily as required.
* The original data is supplied in a FileChannel.
* All banks except (probably) the final one are the same size,
* supplied at construction time.
*/
private static class LazyMultiBankBuf extends BankBuf {
private final FileChannel channel_;
private final long size_;
private final long bankSize_;
private final Bank[] banks_;
/**
* Constructor.
*
* @param channel readable file containing data
* @param size number of bytes in channel
* @param bankSize maximum size for individual data banks
* @param isBit64 64bit-ness of buf
* @param isBigendian true for big-endian data,
* false for little-endian
*/
LazyMultiBankBuf( FileChannel channel, long size, int bankSize,
boolean isBit64, boolean isBigendian ) {
super( size, isBit64, isBigendian );
channel_ = channel;
size_ = size;
bankSize_ = bankSize;
int nbank = (int) ( ( ( size - 1 ) / bankSize ) + 1 );
banks_ = new Bank[ nbank ];
}
public Bank getBank( long offset, int count ) throws IOException {
// Find out the index of the bank containing the starting offset.
int ibank = (int) ( offset / bankSize_ );
// If the requested read amount is fully contained in that bank,
// lazily obtain and return it.
int over = (int) ( offset + count - ( ibank + 1 ) * bankSize_ );
if ( over <= 0 ) {
return getBankByIndex( ibank );
}
// Otherwise, the requested region straddles multiple banks.
// This should be a fairly unusual occurrence.
// Build a temporary bank to satisfy the request and return it.
else {
byte[] tmp = new byte[ count ];
int bankOff = (int) ( bankSize_ - count + over );
int tmpOff = 0;
int n = count - over;
while ( count > 0 ){
ByteBuffer bbuf = getBankByIndex( ibank ).byteBuffer_;
synchronized ( bbuf ){
bbuf.position( bankOff );
bbuf.get( tmp, tmpOff, n );
}
count -= n;
tmpOff += n;
bankOff = 0;
ibank++;
n = (int) Math.min( count, bankSize_ );
}
return new Bank( ByteBuffer.wrap( tmp ), offset,
isBigendian() );
}
}
public List<Bank> getExistingBanks() {
List<Bank> list = new ArrayList<Bank>();
for ( int ib = 0; ib < banks_.length; ib++ ) {
Bank bank = banks_[ ib ];
if ( bank != null ) {
list.add( bank );
}
}
return list;
}
public Iterator<Bank> getBankIterator( final long offset ) {
return new Iterator<Bank>() {
int ibank = (int) ( offset / bankSize_ );
public boolean hasNext() {
return ibank < banks_.length;
}
public Bank next() {
try {
return getBankByIndex( ibank++ );
}
catch ( IOException e ) {
logger_.log( Level.WARNING, "Error acquiring bank", e );
return null;
}
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
/**
* Lazily obtains and returns a numbered bank. Will not return null.
*
* @param ibank bank index
*/
private Bank getBankByIndex( int ibank ) throws IOException {
if ( banks_[ ibank ] == null ) {
long start = ibank * bankSize_;
long end = Math.min( ( ( ibank + 1 ) * bankSize_ ), size_ );
int leng = (int) ( end - start );
ByteBuffer bbuf =
channel_.map( FileChannel.MapMode.READ_ONLY, start, leng );
banks_[ ibank ] = new Bank( bbuf, start, isBigendian() );
}
return banks_[ ibank ];
}
}
/**
* Data bank for use within BankBuf class and its subclasses.
* This stores a subsequence of bytes for the Buf, and records
* its position within the whole sequence.
*/
protected static class Bank {
/** Raw buffer. */
private final ByteBuffer byteBuffer_;
/** Buffer adjusted for endianness. */
private final ByteBuffer dataBuffer_;
private final long start_;
private final int size_;
/**
* Constructor.
*
* @param byteBuffer NIO buffer containing data
* @param start offset into the full sequence at which this bank
* is considered to start
* @param isBigendian true for big-endian, false for little-endian
*/
public Bank( ByteBuffer byteBuffer, long start, boolean isBigendian ) {
byteBuffer_ = byteBuffer;
dataBuffer_ = byteBuffer.duplicate();
start_ = start;
size_ = byteBuffer.capacity();
setEncoding( isBigendian );
}
/**
* Returns the position within this bank's buffer that corresponds
* to an offset into the full byte sequence.
*
* @param pos offset into Buf
* @return pos - start
* @throws IllegalArgumentException pos is not between start and
* start+size
*/
private int adjust( long pos ) {
long offset = pos - start_;
if ( offset >= 0 && offset < size_ ) {
return (int) offset;
}
else {
throw new IllegalArgumentException( "Out of range: " + pos
+ " for bank at " + start_ );
}
}
/**
* Resets the endianness for the data buffer of this bank.
*
* @param isBigendian true for big-endian, false for little-endian
*/
private void setEncoding( boolean isBigendian ) {
dataBuffer_.order( isBigendian ? ByteOrder.BIG_ENDIAN
: ByteOrder.LITTLE_ENDIAN );
}
}
}
|