1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
/*-------------------------------------------------------------------------
*
* Copyright (c) 2009, PostgreSQL Global Development Group
*
* IDENTIFICATION
* $PostgreSQL: pgjdbc/org/postgresql/copy/CopyManager.java,v 1.1 2009/07/01 05:00:39 jurka Exp $
*
*-------------------------------------------------------------------------
*/
/**
* Bulk data copy for PostgreSQL
*/
package org.postgresql.copy;
import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.Reader;
import java.io.Writer;
import java.sql.SQLException;
import org.postgresql.core.Encoding;
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.BaseConnection;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
/**
* API for PostgreSQL COPY bulk data transfer
*/
public class CopyManager {
// I don't know what the best buffer size is, so we let people specify it if
// they want, and if they don't know, we don't make them guess, so that if we
// do figure it out we can just set it here and they reap the rewards.
// Note that this is currently being used for both a number of bytes and a number
// of characters.
final static int DEFAULT_BUFFER_SIZE = 65536;
private final Encoding encoding;
private final QueryExecutor queryExecutor;
public CopyManager(BaseConnection connection) throws SQLException {
this.encoding = connection.getEncoding();
this.queryExecutor = connection.getQueryExecutor();
}
public CopyIn copyIn(String sql) throws SQLException {
CopyOperation op = null;
try {
op = queryExecutor.startCopy(sql);
return (CopyIn) op;
} catch(ClassCastException cce) {
op.cancelCopy();
throw new PSQLException(GT.tr("Requested CopyIn but got {0}", op.getClass().getName()), PSQLState.WRONG_OBJECT_TYPE, cce);
}
}
public CopyOut copyOut(String sql) throws SQLException {
CopyOperation op = null;
try {
op = queryExecutor.startCopy(sql);
return (CopyOut) op;
} catch(ClassCastException cce) {
op.cancelCopy();
throw new PSQLException(GT.tr("Requested CopyOut but got {0}", op.getClass().getName()), PSQLState.WRONG_OBJECT_TYPE, cce);
}
}
/**
* Pass results of a COPY TO STDOUT query from database into a Writer.
* @param sql COPY TO STDOUT statement
* @param to the stream to write the results to (row by row)
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage errors
* @throws IOException upon writer or database connection failure
*/
public long copyOut(final String sql, Writer to) throws SQLException, IOException {
byte[] buf;
CopyOut cp = copyOut(sql);
try {
while ( (buf = cp.readFromCopy()) != null ) {
to.write(encoding.decode(buf));
}
return cp.getHandledRowCount();
} finally { // see to it that we do not leave the connection locked
if(cp.isActive())
cp.cancelCopy();
}
}
/**
* Pass results of a COPY TO STDOUT query from database into an OutputStream.
* @param sql COPY TO STDOUT statement
* @param to the stream to write the results to (row by row)
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage errors
* @throws IOException upon output stream or database connection failure
*/
public long copyOut(final String sql, OutputStream to) throws SQLException, IOException {
byte[] buf;
CopyOut cp = copyOut(sql);
try {
while( (buf = cp.readFromCopy()) != null ) {
to.write(buf);
}
return cp.getHandledRowCount();
} finally { // see to it that we do not leave the connection locked
if(cp.isActive())
cp.cancelCopy();
}
}
/**
* Use COPY FROM STDIN for very fast copying from a Reader into a database table.
* @param sql COPY FROM STDIN statement
* @param from a CSV file or such
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage issues
* @throws IOException upon reader or database connection failure
*/
public long copyIn(final String sql, Reader from) throws SQLException, IOException {
return copyIn(sql, from, DEFAULT_BUFFER_SIZE);
}
/**
* Use COPY FROM STDIN for very fast copying from a Reader into a database table.
* @param sql COPY FROM STDIN statement
* @param from a CSV file or such
* @param bufferSize number of characters to buffer and push over network to server at once
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage issues
* @throws IOException upon reader or database connection failure
*/
public long copyIn(final String sql, Reader from, int bufferSize) throws SQLException, IOException {
char[] cbuf = new char[bufferSize];
int len;
CopyIn cp = copyIn(sql);
try {
while ( (len = from.read(cbuf)) > 0) {
byte[] buf = encoding.encode(new String(cbuf));
cp.writeToCopy(buf, 0, buf.length);
}
return cp.endCopy();
} finally { // see to it that we do not leave the connection locked
if(cp.isActive())
cp.cancelCopy();
}
}
/**
* Use COPY FROM STDIN for very fast copying from an InputStream into a database table.
* @param sql COPY FROM STDIN statement
* @param from a CSV file or such
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage issues
* @throws IOException upon input stream or database connection failure
*/
public long copyIn(final String sql, InputStream from) throws SQLException, IOException {
return copyIn(sql, from, DEFAULT_BUFFER_SIZE);
}
/**
* Use COPY FROM STDIN for very fast copying from an InputStream into a database table.
* @param sql COPY FROM STDIN statement
* @param from a CSV file or such
* @param bufferSize number of bytes to buffer and push over network to server at once
* @return number of rows updated for server 8.2 or newer; -1 for older
* @throws SQLException on database usage issues
* @throws IOException upon input stream or database connection failure
*/
public long copyIn(final String sql, InputStream from, int bufferSize) throws SQLException, IOException {
byte[] buf = new byte[bufferSize];
int len;
CopyIn cp = copyIn(sql);
try {
while( (len = from.read(buf)) > 0 ) {
cp.writeToCopy(buf, 0, len);
}
return cp.endCopy();
} finally { // see to it that we do not leave the connection locked
if(cp.isActive())
cp.cancelCopy();
}
}
}
|