1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
# frozen-string-literal: true
Sequel::JDBC.load_driver('org.postgresql.Driver', :Postgres)
require_relative '../shared/postgres'
module Sequel
module JDBC
Sequel.synchronize do
DATABASE_SETUP[:postgresql] = proc do |db|
db.dataset_class = Sequel::JDBC::Postgres::Dataset
db.extend(Sequel::JDBC::Postgres::DatabaseMethods)
org.postgresql.Driver
end
end
module Postgres
module DatabaseMethods
include Sequel::Postgres::DatabaseMethods
# Add the primary_keys and primary_key_sequences instance variables,
# so we can get the correct return values for inserted rows.
def self.extended(db)
super
db.send(:initialize_postgres_adapter)
end
# Remove any current entry for the oid in the oid_convertor_map.
def add_conversion_proc(oid, *)
super
Sequel.synchronize{@oid_convertor_map.delete(oid)}
end
# See Sequel::Postgres::Adapter#copy_into
def copy_into(table, opts=OPTS)
data = opts[:data]
data = Array(data) if data.is_a?(String)
if defined?(yield) && data
raise Error, "Cannot provide both a :data option and a block to copy_into"
elsif !defined?(yield) && !data
raise Error, "Must provide either a :data option or a block to copy_into"
end
synchronize(opts[:server]) do |conn|
begin
copy_manager = org.postgresql.copy.CopyManager.new(conn)
copier = copy_manager.copy_in(copy_into_sql(table, opts))
if defined?(yield)
while buf = yield
java_bytes = buf.to_java_bytes
copier.writeToCopy(java_bytes, 0, java_bytes.length)
end
else
data.each do |d|
java_bytes = d.to_java_bytes
copier.writeToCopy(java_bytes, 0, java_bytes.length)
end
end
rescue Exception => e
copier.cancelCopy if copier
raise
ensure
unless e
begin
copier.endCopy
rescue NativeException => e2
raise_error(e2)
end
end
end
end
end
# See Sequel::Postgres::Adapter#copy_table
def copy_table(table, opts=OPTS)
synchronize(opts[:server]) do |conn|
copy_manager = org.postgresql.copy.CopyManager.new(conn)
copier = copy_manager.copy_out(copy_table_sql(table, opts))
begin
if defined?(yield)
while buf = copier.readFromCopy
yield(String.from_java_bytes(buf))
end
nil
else
b = String.new
while buf = copier.readFromCopy
b << String.from_java_bytes(buf)
end
b
end
rescue => e
raise_error(e, :disconnect=>true)
ensure
if buf && !e
raise DatabaseDisconnectError, "disconnecting as a partial COPY may leave the connection in an unusable state"
end
end
end
end
def oid_convertor_proc(oid)
if (conv = Sequel.synchronize{@oid_convertor_map[oid]}).nil?
conv = if pr = conversion_procs[oid]
lambda do |r, i|
if v = r.getString(i)
pr.call(v)
end
end
else
false
end
Sequel.synchronize{@oid_convertor_map[oid] = conv}
end
conv
end
private
def disconnect_error?(exception, opts)
super || exception.message =~ /\A(This connection has been closed\.|FATAL: terminating connection due to administrator command|An I\/O error occurred while sending to the backend\.)\z/
end
# For PostgreSQL-specific types, return the string that should be used
# as the PGObject value. Returns nil by default, loading pg_* extensions
# will override this to add support for specific types.
def bound_variable_arg(arg, conn)
nil
end
# Work around issue when using Sequel's bound variable support where the
# same SQL is used in different bound variable calls, but the schema has
# changed between the calls. This is necessary as jdbc-postgres versions
# after 9.4.1200 violate the JDBC API. These versions cache separate
# PreparedStatement instances, which are eventually prepared server side after the
# prepareThreshold is met. The JDBC API violation is that PreparedStatement#close
# does not release the server side prepared statement.
def prepare_jdbc_statement(conn, sql, opts)
ps = super
unless opts[:name]
ps.prepare_threshold = 0
end
ps
end
# If the given argument is a recognized PostgreSQL-specific type, create
# a PGObject instance with unknown type and the bound argument string value,
# and set that as the prepared statement argument.
def set_ps_arg(cps, arg, i)
if v = bound_variable_arg(arg, nil)
obj = org.postgresql.util.PGobject.new
obj.setType("unknown")
obj.setValue(v)
cps.setObject(i, obj)
else
super
end
end
# Use setNull for nil arguments as the default behavior of setString
# with nil doesn't appear to work correctly on PostgreSQL.
def set_ps_arg_nil(cps, i)
cps.setNull(i, JavaSQL::Types::NULL)
end
# Execute the connection configuration SQL queries on the connection.
def setup_connection_with_opts(conn, opts)
conn = super
statement(conn) do |stmt|
connection_configuration_sqls(opts).each{|sql| log_connection_yield(sql, conn){stmt.execute(sql)}}
end
conn
end
def setup_type_convertor_map
super
@oid_convertor_map = {}
end
end
class Dataset < JDBC::Dataset
include Sequel::Postgres::DatasetMethods
# Warn when calling as the fetch size is ignored by the JDBC adapter currently.
def with_fetch_size(size)
warn("Sequel::JDBC::Postgres::Dataset#with_fetch_size does not currently have an effect.", :uplevel=>1)
super
end
private
# Literalize strings similar to the native postgres adapter
def literal_string_append(sql, v)
sql << "'" << db.synchronize(@opts[:server]){|c| c.escape_string(v)} << "'"
end
# SQL fragment for Sequel::SQLTime, containing just the time part
def literal_sqltime(v)
v.strftime("'%H:%M:%S#{sprintf(".%03d", (v.usec/1000.0).round)}'")
end
STRING_TYPE = Java::JavaSQL::Types::VARCHAR
ARRAY_TYPE = Java::JavaSQL::Types::ARRAY
PG_SPECIFIC_TYPES = [Java::JavaSQL::Types::ARRAY, Java::JavaSQL::Types::OTHER, Java::JavaSQL::Types::STRUCT, Java::JavaSQL::Types::TIME_WITH_TIMEZONE, Java::JavaSQL::Types::TIME].freeze
# Return PostgreSQL hstore types as ruby Hashes instead of
# Java HashMaps. Only used if the database does not have a
# conversion proc for the type.
HSTORE_METHOD = Object.new
def HSTORE_METHOD.call(r, i)
if v = r.getObject(i)
v.to_hash
end
end
def type_convertor(map, meta, type, i)
case type
when *PG_SPECIFIC_TYPES
oid = meta.getField(i).getOID
if pr = db.oid_convertor_proc(oid)
pr
elsif oid == 2950 # UUID
map[STRING_TYPE]
elsif meta.getPGType(i) == 'hstore'
HSTORE_METHOD
else
super
end
else
super
end
end
end
end
end
end
|