1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
/*-------------------------------------------------------------------------
*
* Copyright (c) 2009, PostgreSQL Global Development Group
*
* IDENTIFICATION
* $PostgreSQL: pgjdbc/org/postgresql/copy/PGCopyInputStream.java,v 1.1 2009/07/01 05:00:39 jurka Exp $
*
*-------------------------------------------------------------------------
*/
package org.postgresql.copy;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
/**
* InputStream for reading from a PostgreSQL COPY TO STDOUT operation
*/
public class PGCopyInputStream extends InputStream implements CopyOut {
private CopyOut op;
private byte[] buf;
private int at, len;
/**
* Uses given connection for specified COPY TO STDOUT operation
* @param connection database connection to use for copying (protocol version 3 required)
* @param sql COPY TO STDOUT statement
* @throws SQLException if initializing the operation fails
*/
public PGCopyInputStream(PGConnection connection, String sql) throws SQLException {
this(connection.getCopyAPI().copyOut(sql));
}
/**
* Use given CopyOut operation for reading
* @param op COPY TO STDOUT operation
* @throws SQLException if initializing the operation fails
*/
public PGCopyInputStream(CopyOut op) {
this.op = op;
}
private boolean gotBuf() throws IOException {
if(at >= len) {
try {
buf = op.readFromCopy();
} catch(SQLException sqle) {
throw new IOException(GT.tr("Copying from database failed: {0}", sqle));
}
if(buf == null) {
at = -1;
return false;
} else {
at = 0;
len = buf.length;
return true;
}
}
return buf != null;
}
private void checkClosed() throws IOException {
if (op == null) {
throw new IOException(GT.tr("This copy stream is closed."));
}
}
public int available() throws IOException {
checkClosed();
return ( buf != null ? len - at : 0 );
}
public int read() throws IOException {
checkClosed();
return gotBuf() ? buf[at++] : -1;
}
public int read(byte[] buf) throws IOException {
return read(buf, 0, buf.length);
}
public int read(byte[] buf, int off, int siz) throws IOException {
checkClosed();
int got = 0;
while( got < siz && gotBuf() ) {
buf[off+got++] = this.buf[at++];
}
return got;
}
public byte[] readFromCopy() throws SQLException {
byte[] result = buf;
try {
if(gotBuf()) {
if(at>0 || len < buf.length) {
byte[] ba = new byte[len-at];
for(int i=at; i<len; i++)
ba[i-at] = buf[i];
result = ba;
}
at = len; // either partly or fully returned, buffer is exhausted
}
} catch(IOException ioe) {
throw new PSQLException(GT.tr("Read from copy failed."), PSQLState.CONNECTION_FAILURE);
}
return result;
}
public void close() throws IOException {
// Don't complain about a double close.
if (op == null)
return;
try {
op.cancelCopy();
} catch(SQLException se) {
IOException ioe = new IOException("Failed to close copy reader.");
ioe.initCause(se);
throw ioe;
}
op = null;
}
public void cancelCopy() throws SQLException {
op.cancelCopy();
}
public int getFormat() {
return op.getFormat();
}
public int getFieldFormat(int field) {
return op.getFieldFormat(field);
}
public int getFieldCount() {
return op.getFieldCount();
}
public boolean isActive() {
return op.isActive();
}
public long getHandledRowCount() {
return op.getHandledRowCount();
}
}
|