File: sequel.rb

package info (click to toggle)
ruby-moneta 1.6.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,776 kB
  • sloc: ruby: 13,201; sh: 178; makefile: 7
file content (325 lines) | stat: -rw-r--r-- 10,471 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
require 'sequel'

module Moneta
  module Adapters
    # Sequel backend
    # @api public
    class Sequel < Adapter
      autoload :MySQL, 'moneta/adapters/sequel/mysql'
      autoload :Postgres, 'moneta/adapters/sequel/postgres'
      autoload :PostgresHStore, 'moneta/adapters/sequel/postgres_hstore'
      autoload :SQLite, 'moneta/adapters/sequel/sqlite'

      supports :create, :increment, :each_key

      config :table, default: :moneta, coerce: :to_sym
      config :optimize, default: true
      config :create_table, default: true
      config :key_column, default: :k
      config :value_column, default: :v
      config :hstore, coerce: :to_s
      config :each_key_server

      backend do |db:, extensions: [], connection_validation_timeout: nil, **options|
        ::Sequel.connect(db, options).tap do |backend|
          extensions.map(&:to_sym).each(&backend.method(:extension))

          if connection_validation_timeout
            backend.pool.connection_validation_timeout = connection_validation_timeout
          end
        end
      end

      # @param [Hash] options
      # @option options [String] :db Sequel database
      # @option options [String, Symbol] :table (:moneta) Table name
      # @option options [Array] :extensions ([]) List of Sequel extensions
      # @option options [Integer] :connection_validation_timeout (nil) Sequel connection_validation_timeout
      # @option options [Sequel::Database] :backend Use existing backend instance
      # @option options [Boolean] :optimize (true) Set to false to prevent database-specific optimisations
      # @option options [Proc, Boolean] :create_table (true) Provide a Proc for creating the table, or
      #   set to false to disable table creation all together.  If a Proc is given, it will be
      #   called regardless of whether the table exists already.
      # @option options [Symbol] :key_column (:k) The name of the key column
      # @option options [Symbol] :value_column (:v) The name of the value column
      # @option options [String] :hstore If using Postgres, keys and values are stored in a single
      #   row of the table in the value_column using the hstore format.  The row to use is
      #   the one where the value_column is equal to the value of this option, and will be created
      #   if it doesn't exist.
      # @option options [Symbol] :each_key_server Some adapters are unable to do
      #   multiple operations with a single connection. For these, it is
      #   possible to specify a separate connection to use for `#each_key`.  Use
      #   in conjunction with Sequel's `:servers` option
      # @option options All other options passed to `Sequel#connect`
      def initialize(options = {})
        super

        if config.hstore
          extend Sequel::PostgresHStore
        elsif config.optimize
          add_optimizations
        end

        if config.create_table.respond_to?(:call)
          config.create_table.call(backend)
        elsif config.create_table
          create_table
        end

        @table = backend[config.table]
        prepare_statements
      end

      # (see Proxy#key?)
      def key?(key, options = {})
        @key.call(key: key) != nil
      end

      # (see Proxy#load)
      def load(key, options = {})
        if row = @load.call(key: key)
          row[config.value_column]
        end
      end

      # (see Proxy#store)
      def store(key, value, options = {})
        blob_value = blob(value)
        unless @store_update.call(key: key, value: blob_value) == 1
          @create.call(key: key, value: blob_value)
        end
        value
      rescue ::Sequel::DatabaseError
        tries ||= 0
        (tries += 1) < 10 ? retry : raise
      end

      # (see Proxy#create)
      def create(key, value, options = {})
        @create.call(key: key, value: blob(value))
        true
      rescue ::Sequel::ConstraintViolation
        false
      end

      # (see Proxy#increment)
      def increment(key, amount = 1, options = {})
        backend.transaction do
          if existing = @load_for_update.call(key: key)
            existing_value = existing[config.value_column]
            amount += Integer(existing_value)
            raise IncrementError, "no update" unless @increment_update.call(
              key: key,
              value: existing_value,
              new_value: blob(amount.to_s)
            ) == 1
          else
            @create.call(key: key, value: blob(amount.to_s))
          end
          amount
        end
      rescue ::Sequel::DatabaseError
        # Concurrent modification might throw a bunch of different errors
        tries ||= 0
        (tries += 1) < 10 ? retry : raise
      end

      # (see Proxy#delete)
      def delete(key, options = {})
        value = load(key, options)
        @delete.call(key: key)
        value
      end

      # (see Proxy#clear)
      def clear(options = {})
        @table.delete
        self
      end

      # (see Proxy#close)
      def close
        backend.disconnect
        nil
      end

      # (see Proxy#slice)
      def slice(*keys, **options)
        @slice.all(keys).map! { |row| [row[config.key_column], row[config.value_column]] }
      end

      # (see Proxy#values_at)
      def values_at(*keys, **options)
        pairs = Hash[slice(*keys, **options)]
        keys.map { |key| pairs[key] }
      end

      # (see Proxy#fetch_values)
      def fetch_values(*keys, **options)
        return values_at(*keys, **options) unless block_given?
        existing = Hash[slice(*keys, **options)]
        keys.map do |key|
          if existing.key? key
            existing[key]
          else
            yield key
          end
        end
      end

      # (see Proxy#merge!)
      def merge!(pairs, options = {})
        backend.transaction do
          existing = Hash[slice_for_update(pairs)]
          update_pairs, insert_pairs = pairs.partition { |k, _| existing.key?(k) }
          @table.import([config.key_column, config.value_column], blob_pairs(insert_pairs))

          if block_given?
            update_pairs.map! do |key, new_value|
              [key, yield(key, existing[key], new_value)]
            end
          end

          update_pairs.each do |key, value|
            @store_update.call(key: key, value: blob(value))
          end
        end

        self
      end

      # (see Proxy#each_key)
      def each_key
        return enum_for(:each_key) { @table.count } unless block_given?

        key_column = config.key_column
        if config.each_key_server
          @table.server(config.each_key_server).order(key_column).select(key_column).paged_each do |row|
            yield row[key_column]
          end
        else
          @table.select(key_column).order(key_column).paged_each(stream: false) do |row|
            yield row[key_column]
          end
        end
        self
      end

      protected

      # @api private
      def add_optimizations
        case backend.database_type
        when :mysql
          extend Sequel::MySQL
        when :postgres
          if matches = backend.get(::Sequel[:version].function).match(/PostgreSQL (\d+)\.(\d+)/)
            # Our optimisations only work on Postgres 9.5+
            major, minor = matches[1..2].map(&:to_i)
            extend Sequel::Postgres if major > 9 || (major == 9 && minor >= 5)
          end
        when :sqlite
          extend Sequel::SQLite
        end
      end

      def blob(str)
        ::Sequel.blob(str) unless str == nil
      end

      def blob_pairs(pairs)
        pairs.map do |key, value|
          [key, blob(value)]
        end
      end

      def create_table
        key_column = config.key_column
        value_column = config.value_column
        backend.create_table?(config.table) do
          String key_column, null: false, primary_key: true
          File value_column
        end
      end

      def slice_for_update(pairs)
        @slice_for_update.all(pairs.map { |k, _| k }.to_a).map! do |row|
          [row[config.key_column], row[config.value_column]]
        end
      end

      def yield_merge_pairs(pairs)
        existing = Hash[slice_for_update(pairs)]
        pairs.map do |key, new_value|
          new_value = yield(key, existing[key], new_value) if existing.key?(key)
          [key, new_value]
        end
      end

      def statement_id(id)
        "moneta_#{config.table}_#{id}".to_sym
      end

      def prepare_statements
        prepare_key
        prepare_load
        prepare_store
        prepare_create
        prepare_increment
        prepare_delete
        prepare_slice
      end

      def prepare_key
        @key = @table
          .where(config.key_column => :$key).select(1)
          .prepare(:first, statement_id(:key))
      end

      def prepare_load
        @load = @table
          .where(config.key_column => :$key).select(config.value_column)
          .prepare(:first, statement_id(:load))
      end

      def prepare_store
        @store_update = @table
          .where(config.key_column => :$key)
          .prepare(:update, statement_id(:store_update), config.value_column => :$value)
      end

      def prepare_create
        @create = @table
          .prepare(:insert, statement_id(:create), config.key_column => :$key, config.value_column => :$value)
      end

      def prepare_increment
        @load_for_update = @table
          .where(config.key_column => :$key).for_update
          .select(config.value_column)
          .prepare(:first, statement_id(:load_for_update))
        @increment_update ||= @table
          .where(config.key_column => :$key, config.value_column => :$value)
          .prepare(:update, statement_id(:increment_update), config.value_column => :$new_value)
      end

      def prepare_delete
        @delete = @table.where(config.key_column => :$key)
          .prepare(:delete, statement_id(:delete))
      end

      def prepare_slice
        @slice_for_update = ::Sequel::Dataset::PlaceholderLiteralizer.loader(@table) do |pl, ds|
          ds.filter(config.key_column => pl.arg).select(config.key_column, config.value_column).for_update
        end

        @slice = ::Sequel::Dataset::PlaceholderLiteralizer.loader(@table) do |pl, ds|
          ds.filter(config.key_column => pl.arg).select(config.key_column, config.value_column)
        end
      end

      # @api private
      class IncrementError < ::Sequel::DatabaseError; end
    end
  end
end