File: postgresql.rb

package info (click to toggle)
ruby-sequel 5.63.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,408 kB
  • sloc: ruby: 113,747; makefile: 3
file content (236 lines) | stat: -rw-r--r-- 8,246 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# frozen-string-literal: true

Sequel::JDBC.load_driver('org.postgresql.Driver', :Postgres)
require_relative '../shared/postgres'

module Sequel
  module JDBC
    Sequel.synchronize do
      DATABASE_SETUP[:postgresql] = proc do |db|
        db.dataset_class = Sequel::JDBC::Postgres::Dataset
        db.extend(Sequel::JDBC::Postgres::DatabaseMethods)
        org.postgresql.Driver
      end
    end

    module Postgres
      module DatabaseMethods
        include Sequel::Postgres::DatabaseMethods

        # Add the primary_keys and primary_key_sequences instance variables,
        # so we can get the correct return values for inserted rows.
        def self.extended(db)
          super
          db.send(:initialize_postgres_adapter)
        end

        # Remove any current entry for the oid in the oid_convertor_map.
        def add_conversion_proc(oid, *)
          super
          Sequel.synchronize{@oid_convertor_map.delete(oid)}
        end

        # See Sequel::Postgres::Adapter#copy_into
        def copy_into(table, opts=OPTS)
          data = opts[:data]
          data = Array(data) if data.is_a?(String)

          if defined?(yield) && data
            raise Error, "Cannot provide both a :data option and a block to copy_into"
          elsif !defined?(yield) && !data
            raise Error, "Must provide either a :data option or a block to copy_into"
          end

          synchronize(opts[:server]) do |conn|
            begin
              copy_manager = org.postgresql.copy.CopyManager.new(conn)
              copier = copy_manager.copy_in(copy_into_sql(table, opts))
              if defined?(yield)
                while buf = yield
                  java_bytes = buf.to_java_bytes
                  copier.writeToCopy(java_bytes, 0, java_bytes.length)
                end
              else
                data.each do |d|
                  java_bytes = d.to_java_bytes
                  copier.writeToCopy(java_bytes, 0, java_bytes.length)
                end
              end
            rescue Exception => e
              copier.cancelCopy if copier
              raise
            ensure
              unless e
                begin
                  copier.endCopy
                rescue NativeException => e2
                  raise_error(e2)
                end
              end
            end
          end
        end
        
        # See Sequel::Postgres::Adapter#copy_table
        def copy_table(table, opts=OPTS)
          synchronize(opts[:server]) do |conn|
            copy_manager = org.postgresql.copy.CopyManager.new(conn)
            copier = copy_manager.copy_out(copy_table_sql(table, opts))
            begin
              if defined?(yield)
                while buf = copier.readFromCopy
                  yield(String.from_java_bytes(buf))
                end
                nil
              else
                b = String.new
                while buf = copier.readFromCopy
                  b << String.from_java_bytes(buf)
                end
                b
              end
            rescue => e
              raise_error(e, :disconnect=>true)
            ensure
              if buf && !e
                raise DatabaseDisconnectError, "disconnecting as a partial COPY may leave the connection in an unusable state"
              end
            end
          end
        end

        def oid_convertor_proc(oid)
          if (conv = Sequel.synchronize{@oid_convertor_map[oid]}).nil?
            conv = if pr = conversion_procs[oid]
              lambda do |r, i|
                if v = r.getString(i)
                  pr.call(v)
                end
              end
            else
              false
            end
            Sequel.synchronize{@oid_convertor_map[oid] = conv}
          end
          conv
        end

        private
        
        def disconnect_error?(exception, opts)
          super || exception.message =~ /\A(This connection has been closed\.|FATAL: terminating connection due to administrator command|An I\/O error occurred while sending to the backend\.)\z/
        end

        # For PostgreSQL-specific types, return the string that should be used
        # as the PGObject value. Returns nil by default, loading pg_* extensions
        # will override this to add support for specific types.
        def bound_variable_arg(arg, conn)
          nil
        end

        # Work around issue when using Sequel's bound variable support where the
        # same SQL is used in different bound variable calls, but the schema has
        # changed between the calls.  This is necessary as jdbc-postgres versions
        # after 9.4.1200 violate the JDBC API.  These versions cache separate
        # PreparedStatement instances, which are eventually prepared server side after the
        # prepareThreshold is met.  The JDBC API violation is that PreparedStatement#close
        # does not release the server side prepared statement.
        def prepare_jdbc_statement(conn, sql, opts)
          ps = super
          unless opts[:name]
            ps.prepare_threshold = 0
          end
          ps
        end

        # If the given argument is a recognized PostgreSQL-specific type, create
        # a PGObject instance with unknown type and the bound argument string value,
        # and set that as the prepared statement argument.
        def set_ps_arg(cps, arg, i)
          if v = bound_variable_arg(arg, nil)
            obj = org.postgresql.util.PGobject.new
            obj.setType("unknown")
            obj.setValue(v)
            cps.setObject(i, obj)
          else
            super
          end
        end

        # Use setNull for nil arguments as the default behavior of setString
        # with nil doesn't appear to work correctly on PostgreSQL.
        def set_ps_arg_nil(cps, i)
          cps.setNull(i, JavaSQL::Types::NULL)
        end

        # Execute the connection configuration SQL queries on the connection.
        def setup_connection_with_opts(conn, opts)
          conn = super
          statement(conn) do |stmt|
            connection_configuration_sqls(opts).each{|sql| log_connection_yield(sql, conn){stmt.execute(sql)}}
          end
          conn
        end

        def setup_type_convertor_map
          super
          @oid_convertor_map = {}
        end
      end
      
      class Dataset < JDBC::Dataset
        include Sequel::Postgres::DatasetMethods

        # Warn when calling as the fetch size is ignored by the JDBC adapter currently.
        def with_fetch_size(size)
          warn("Sequel::JDBC::Postgres::Dataset#with_fetch_size does not currently have an effect.", :uplevel=>1)
          super
        end
        
        private
        
        # Literalize strings similar to the native postgres adapter
        def literal_string_append(sql, v)
          sql << "'" << db.synchronize(@opts[:server]){|c| c.escape_string(v)} << "'"
        end

        # SQL fragment for Sequel::SQLTime, containing just the time part
        def literal_sqltime(v)
          v.strftime("'%H:%M:%S#{sprintf(".%03d", (v.usec/1000.0).round)}'")
        end

        STRING_TYPE = Java::JavaSQL::Types::VARCHAR
        ARRAY_TYPE = Java::JavaSQL::Types::ARRAY
        PG_SPECIFIC_TYPES = [Java::JavaSQL::Types::ARRAY, Java::JavaSQL::Types::OTHER, Java::JavaSQL::Types::STRUCT, Java::JavaSQL::Types::TIME_WITH_TIMEZONE, Java::JavaSQL::Types::TIME].freeze

        # Return PostgreSQL hstore types as ruby Hashes instead of
        # Java HashMaps.  Only used if the database does not have a
        # conversion proc for the type.
        HSTORE_METHOD = Object.new
        def HSTORE_METHOD.call(r, i)
          if v = r.getObject(i)
            v.to_hash
          end
        end 

        def type_convertor(map, meta, type, i)
          case type
          when *PG_SPECIFIC_TYPES
            oid = meta.getField(i).getOID
            if pr = db.oid_convertor_proc(oid)
              pr
            elsif oid == 2950 # UUID
              map[STRING_TYPE]
            elsif meta.getPGType(i) == 'hstore'
              HSTORE_METHOD
            else
              super
            end
          else
            super
          end
        end
      end
    end
  end
end