File: PGCopyInputStream.java

package info (click to toggle)
libpgjava 8.4-701-1
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 3,532 kB
  • ctags: 4,162
  • sloc: java: 33,948; xml: 3,158; makefile: 14; sh: 10
file content (153 lines) | stat: -rw-r--r-- 4,286 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*-------------------------------------------------------------------------
*
* Copyright (c) 2009, PostgreSQL Global Development Group
*
* IDENTIFICATION
*   $PostgreSQL: pgjdbc/org/postgresql/copy/PGCopyInputStream.java,v 1.1 2009/07/01 05:00:39 jurka Exp $
*
*-------------------------------------------------------------------------
*/
package org.postgresql.copy;

import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;

import org.postgresql.PGConnection;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;

/**
 * InputStream for reading from a PostgreSQL COPY TO STDOUT operation
 */
public class PGCopyInputStream extends InputStream implements CopyOut {
    private CopyOut op;
    private byte[] buf;
    private int at, len;

    /**
     * Uses given connection for specified COPY TO STDOUT operation
     * @param connection database connection to use for copying (protocol version 3 required)
     * @param sql COPY TO STDOUT statement
     * @throws SQLException if initializing the operation fails
     */
    public PGCopyInputStream(PGConnection connection, String sql) throws SQLException {
        this(connection.getCopyAPI().copyOut(sql));
    }

    /**
     * Use given CopyOut operation for reading
     * @param op COPY TO STDOUT operation
     * @throws SQLException if initializing the operation fails
     */
    public PGCopyInputStream(CopyOut op) {
        this.op = op;
    }

    private boolean gotBuf() throws IOException {
        if(at >= len) {
            try {
                buf = op.readFromCopy();
            } catch(SQLException sqle) {
                throw new IOException(GT.tr("Copying from database failed: {0}", sqle));
            }
            if(buf == null) {
                at = -1;
                return false;
            } else {
                at = 0;
                len = buf.length;
                return true;
            }
        }
        return buf != null;
    }

    private void checkClosed() throws IOException {
        if (op == null) {
            throw new IOException(GT.tr("This copy stream is closed."));
        }
    }

    
    public int available() throws IOException {
        checkClosed();
        return ( buf != null ? len - at : 0 );
    }
    
    public int read() throws IOException {
        checkClosed();
        return gotBuf() ? buf[at++] : -1;
    }

    public int read(byte[] buf) throws IOException {
        return read(buf, 0, buf.length); 
    }
    
    public int read(byte[] buf, int off, int siz) throws IOException {
        checkClosed();
        int got = 0;
        while( got < siz && gotBuf() ) {
            buf[off+got++] = this.buf[at++];
        }
        return got;
    }

    public byte[] readFromCopy() throws SQLException {
        byte[] result = buf;
        try {
            if(gotBuf()) {
                if(at>0 || len < buf.length) {
                    byte[] ba = new byte[len-at];
                    for(int i=at; i<len; i++)
                        ba[i-at] = buf[i];
                    result = ba;
                }
                at = len; // either partly or fully returned, buffer is exhausted
            }
        } catch(IOException ioe) {
            throw new PSQLException(GT.tr("Read from copy failed."), PSQLState.CONNECTION_FAILURE);
        }
        return result;
    }

    public void close() throws IOException {
        // Don't complain about a double close.
        if (op == null)
            return;

        try {
            op.cancelCopy();
        } catch(SQLException se) {
            IOException ioe = new IOException("Failed to close copy reader.");
            ioe.initCause(se);
            throw ioe;
        }
        op = null;
    }

    public void cancelCopy() throws SQLException {
        op.cancelCopy();
    }

    public int getFormat() {
        return op.getFormat();
    }
    
    public int getFieldFormat(int field) {
        return op.getFieldFormat(field);
    }

    public int getFieldCount() {
        return op.getFieldCount();
    }

    public boolean isActive() {
        return op.isActive();
    }

    public long getHandledRowCount() {
        return op.getHandledRowCount();
    }
}