1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
/*-------------------------------------------------------------------------
*
* Copyright (c) 2009, PostgreSQL Global Development Group
*
* IDENTIFICATION
* $PostgreSQL: pgjdbc/org/postgresql/copy/PGCopyOutputStream.java,v 1.1 2009/07/01 05:00:39 jurka Exp $
*
*-------------------------------------------------------------------------
*/
package org.postgresql.copy;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.util.GT;
/**
* OutputStream for buffered input into a PostgreSQL COPY FROM STDIN operation
*/
public class PGCopyOutputStream extends OutputStream implements CopyIn {
private CopyIn op;
private final byte[] copyBuffer;
private final byte[] singleByteBuffer = new byte[1];
private int at = 0;
/**
* Uses given connection for specified COPY FROM STDIN operation
* @param connection database connection to use for copying (protocol version 3 required)
* @param sql COPY FROM STDIN statement
* @throws SQLException if initializing the operation fails
*/
public PGCopyOutputStream(PGConnection connection, String sql) throws SQLException {
this(connection, sql, CopyManager.DEFAULT_BUFFER_SIZE);
}
/**
* Uses given connection for specified COPY FROM STDIN operation
* @param connection database connection to use for copying (protocol version 3 required)
* @param sql COPY FROM STDIN statement
* @param bufferSize try to send this many bytes at a time
* @throws SQLException if initializing the operation fails
*/
public PGCopyOutputStream(PGConnection connection, String sql, int bufferSize) throws SQLException {
this(connection.getCopyAPI().copyIn(sql), bufferSize);
}
/**
* Use given CopyIn operation for writing
* @param op COPY FROM STDIN operation
*/
public PGCopyOutputStream(CopyIn op) {
this(op, CopyManager.DEFAULT_BUFFER_SIZE);
}
/**
* Use given CopyIn operation for writing
* @param op COPY FROM STDIN operation
* @param bufferSize try to send this many bytes at a time
*/
public PGCopyOutputStream(CopyIn op, int bufferSize) {
this.op = op;
copyBuffer = new byte[bufferSize];
}
public void write(int b) throws IOException {
checkClosed();
if(b<0 || b>255)
throw new IOException(GT.tr("Cannot write to copy a byte of value {0}", new Integer(b)));
singleByteBuffer[0] = (byte)b;
write(singleByteBuffer, 0, 1);
}
public void write(byte[] buf) throws IOException {
write(buf, 0, buf.length);
}
public void write(byte[] buf, int off, int siz) throws IOException {
checkClosed();
try {
writeToCopy(buf, off, siz);
} catch(SQLException se) {
IOException ioe = new IOException("Write to copy failed.");
ioe.initCause(se);
throw ioe;
}
}
private void checkClosed() throws IOException {
if (op == null) {
throw new IOException(GT.tr("This copy stream is closed."));
}
}
public void close() throws IOException {
// Don't complain about a double close.
if (op == null)
return;
try{
endCopy();
} catch(SQLException se) {
IOException ioe = new IOException("Ending write to copy failed.");
ioe.initCause(se);
throw ioe;
}
op = null;
}
public void flush() throws IOException {
try {
op.writeToCopy(copyBuffer, 0, at);
at = 0;
op.flushCopy();
} catch (SQLException e) {
IOException ioe = new IOException("Unable to flush stream");
ioe.initCause(e);
throw ioe;
}
}
public void writeToCopy(byte[] buf, int off, int siz) throws SQLException {
if(at > 0 && siz > copyBuffer.length - at) { // would not fit into rest of our buf, so flush buf
op.writeToCopy(copyBuffer, 0, at);
at = 0;
}
if(siz > copyBuffer.length) { // would still not fit into buf, so just pass it through
op.writeToCopy(buf, off, siz);
} else { // fits into our buf, so save it there
System.arraycopy(buf, off, copyBuffer, at, siz);
at += siz;
}
}
public int getFormat() {
return op.getFormat();
}
public int getFieldFormat(int field) {
return op.getFieldFormat(field);
}
public void cancelCopy() throws SQLException {
op.cancelCopy();
}
public int getFieldCount() {
return op.getFieldCount();
}
public boolean isActive() {
return op.isActive();
}
public void flushCopy() throws SQLException {
op.flushCopy();
}
public long endCopy() throws SQLException {
if(at > 0) {
op.writeToCopy(copyBuffer, 0, at);
}
op.endCopy();
return getHandledRowCount();
}
public long getHandledRowCount() {
return op.getHandledRowCount();
}
}
|