File: database.rb

package info (click to toggle)
libsequel-core-ruby 1.5.1-1
  • links: PTS
  • area: main
  • in suites: lenny
  • size: 648 kB
  • ctags: 840
  • sloc: ruby: 10,949; makefile: 36
file content (463 lines) | stat: -rw-r--r-- 14,480 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
require 'uri'

module Sequel
  DATABASES = []
  # A Database object represents a virtual connection to a database.
  # The Database class is meant to be subclassed by database adapters in order
  # to provide the functionality needed for executing queries.
  class Database
    ADAPTERS = %w'ado db2 dbi informix jdbc mysql odbc odbc_mssql openbase oracle postgres sqlite'.collect{|x| x.to_sym}
    attr_reader :opts, :pool
    attr_accessor :logger

    # Constructs a new instance of a database connection with the specified
    # options hash.
    #
    # Sequel::Database is an abstract class that is not useful by itself.
    def initialize(opts = {}, &block)
      @opts = opts
      
      # Determine if the DB is single threaded or multi threaded
      @single_threaded = opts[:single_threaded] || @@single_threaded
      # Construct pool
      if @single_threaded
        @pool = SingleThreadedPool.new(&block)
      else
        @pool = ConnectionPool.new(opts[:max_connections] || 4, &block)
      end
      @pool.connection_proc = block || proc {connect}

      @logger = opts[:logger]
      ::Sequel::DATABASES.push(self)
    end
    
    # Connects to the database. This method should be overriden by descendants.
    def connect
      raise NotImplementedError, "#connect should be overriden by adapters"
    end
    
    # Disconnects from the database. This method should be overriden by 
    # descendants.
    def disconnect
      raise NotImplementedError, "#disconnect should be overriden by adapters"
    end
    
    # Returns true if the database is using a multi-threaded connection pool.
    def multi_threaded?
      !@single_threaded
    end
    
    # Returns true if the database is using a single-threaded connection pool.
    def single_threaded?
      @single_threaded
    end
    
    # Returns the URI identifying the database.
    def uri
      uri = URI::Generic.new(
        self.class.adapter_scheme.to_s,
        nil,
        @opts[:host],
        @opts[:port],
        nil,
        "/#{@opts[:database]}",
        nil,
        nil,
        nil
      )
      uri.user = @opts[:user]
      uri.password = @opts[:password] if uri.user
      uri.to_s
    end
    alias url uri # Because I don't care much for the semantic difference.
    
    # Returns a blank dataset
    def dataset
      ds = Sequel::Dataset.new(self)
    end
    
    # Fetches records for an arbitrary SQL statement. If a block is given,
    # it is used to iterate over the records:
    #
    #   DB.fetch('SELECT * FROM items') {|r| p r}
    #
    # If a block is not given, the method returns a dataset instance:
    #
    #   DB.fetch('SELECT * FROM items').print
    #
    # Fetch can also perform parameterized queries for protection against SQL
    # injection:
    #
    #   DB.fetch('SELECT * FROM items WHERE name = ?', my_name).print
    #
    # A short-hand form for Database#fetch is Database#[]:
    #
    #   DB['SELECT * FROM items'].each {|r| p r}
    #
    def fetch(sql, *args, &block)
      ds = dataset
      sql = sql.gsub('?') {|m|  ds.literal(args.shift)}
      if block
        ds.fetch_rows(sql, &block)
      else
        ds.opts[:sql] = sql
        ds
      end
    end
    alias_method :>>, :fetch
    
    # Converts a query block into a dataset. For more information see 
    # Dataset#query.
    def query(&block)
      dataset.query(&block)
    end
    
    # Returns a new dataset with the from method invoked. If a block is given,
    # it is used as a filter on the dataset.
    def from(*args, &block)
      ds = dataset.from(*args)
      block ? ds.filter(&block) : ds
    end
    
    # Returns a new dataset with the select method invoked.
    def select(*args); dataset.select(*args); end
    
    # Returns a dataset from the database. If the first argument is a string,
    # the method acts as an alias for Database#fetch, returning a dataset for
    # arbitrary SQL:
    #
    #   DB['SELECT * FROM items WHERE name = ?', my_name].print
    #
    # Otherwise, the dataset returned has its from option set to the given
    # arguments:
    #
    #   DB[:items].sql #=> "SELECT * FROM items"
    #
    def [](*args)
      (String === args.first) ? fetch(*args) : from(*args)
    end
    
    # Returns a single value from the database, e.g.:
    #
    #   # SELECT 1
    #   DB.get(1) #=> 1 
    #
    #   # SELECT version()
    #   DB.get(:version[]) #=> ...
    def get(expr)
      dataset.get(expr)
    end
    
    # Raises a Sequel::Error::NotImplemented. This method is overriden in descendants.
    def execute(sql)
      raise NotImplementedError, "#execute should be overriden by adapters"
    end
    
    # Executes the supplied SQL statement. The SQL can be supplied as a string
    # or as an array of strings. If an array is give, comments and excessive 
    # white space are removed. See also Array#to_sql.
    def <<(sql); execute((Array === sql) ? sql.to_sql : sql); end
    
    # Acquires a database connection, yielding it to the passed block.
    def synchronize(&block)
      @pool.hold(&block)
    end

    # Returns true if there is a database connection
    def test_connection
      @pool.hold {|conn|}
      true
    end
    
    include Schema::SQL
    
    # default serial primary key definition. this should be overriden for each adapter.
    def serial_primary_key_options
      {:primary_key => true, :type => :integer, :auto_increment => true}
    end
    
    # Creates a table. The easiest way to use this method is to provide a
    # block:
    #   DB.create_table :posts do
    #     primary_key :id, :serial
    #     column :title, :text
    #     column :content, :text
    #     index :title
    #   end
    def create_table(name, &block)
      g = Schema::Generator.new(self, &block)
      create_table_sql_list(name, *g.create_info).each {|sql| execute(sql)}
    end
    
    # Forcibly creates a table. If the table already exists it is dropped.
    def create_table!(name, &block)
      drop_table(name) rescue nil
      create_table(name, &block)
    end
    
    # Drops one or more tables corresponding to the given table names.
    def drop_table(*names)
      names.each {|n| execute(drop_table_sql(n))}
    end
    
    # Renames a table:
    #
    #   DB.tables #=> [:items]
    #   DB.rename_table :items, :old_items
    #   DB.tables #=> [:old_items]
    def rename_table(*args)
      execute(rename_table_sql(*args))
    end
    
    # Alters the given table with the specified block. Here are the currently
    # available operations:
    #
    #   DB.alter_table :items do
    #     add_column :category, :text, :default => 'ruby'
    #     drop_column :category
    #     rename_column :cntr, :counter
    #     set_column_type :value, :float
    #     set_column_default :value, :float
    #     add_index [:group, :category]
    #     drop_index [:group, :category]
    #   end
    #
    # Note that #add_column accepts all the options available for column
    # definitions using create_table, and #add_index accepts all the options
    # available for index definition.
    def alter_table(name, &block)
      g = Schema::AlterTableGenerator.new(self, &block)
      alter_table_sql_list(name, g.operations).each {|sql| execute(sql)}
    end
    
    # Adds a column to the specified table. This method expects a column name,
    # a datatype and optionally a hash with additional constraints and options:
    #
    #   DB.add_column :items, :name, :text, :unique => true, :null => false
    #   DB.add_column :items, :category, :text, :default => 'ruby'
    def add_column(table, *args)
      alter_table(table) {add_column(*args)}
    end
    
    # Removes a column from the specified table:
    #
    #   DB.drop_column :items, :category
    def drop_column(table, *args)
      alter_table(table) {drop_column(*args)}
    end
    
    # Renames a column in the specified table. This method expects the current
    # column name and the new column name:
    #
    #   DB.rename_column :items, :cntr, :counter
    def rename_column(table, *args)
      alter_table(table) {rename_column(*args)}
    end
    
    # Set the data type for the given column in the given table:
    #
    #   DB.set_column_type :items, :price, :float
    def set_column_type(table, *args)
      alter_table(table) {set_column_type(*args)}
    end
    
    # Sets the default value for the given column in the given table:
    #
    #   DB.set_column_default :items, :category, 'perl!'
    def set_column_default(table, *args)
      alter_table(table) {set_column_default(*args)}
    end
    
    # Adds an index to a table for the given columns:
    # 
    #   DB.add_index :posts, :title
    #   DB.add_index :posts, [:author, :title], :unique => true
    def add_index(table, *args)
      alter_table(table) {add_index(*args)}
    end
    
    # Removes an index for the given table and column/s:
    #
    #   DB.drop_index :posts, :title
    #   DB.drop_index :posts, [:author, :title]
    def drop_index(table, columns)
      alter_table(table) {drop_index(columns)}
    end
    
    # Returns true if the given table exists.
    def table_exists?(name)
      if respond_to?(:tables)
        tables.include?(name.to_sym)
      else
        from(name).first
        true
      end
    rescue
      false
    end
    
    # Creates a view based on a dataset or an SQL string:
    #
    #   DB.create_view(:cheap_items, "SELECT * FROM items WHERE price < 100")
    #   DB.create_view(:ruby_items, DB[:items].filter(:category => 'ruby'))
    def create_view(name, source)
      source = source.sql if source.is_a?(Dataset)
      execute("CREATE VIEW #{name} AS #{source}")
    end
    
    # Creates a view, replacing it if it already exists:
    #
    #   DB.create_or_replace_view(:cheap_items, "SELECT * FROM items WHERE price < 100")
    #   DB.create_or_replace_view(:ruby_items, DB[:items].filter(:category => 'ruby'))
    def create_or_replace_view(name, source)
      source = source.sql if source.is_a?(Dataset)
      execute("CREATE OR REPLACE VIEW #{name} AS #{source}")
    end
    
    # Drops a view:
    #
    #   DB.drop_view(:cheap_items)
    def drop_view(name)
      execute("DROP VIEW #{name}")
    end
    
    SQL_BEGIN = 'BEGIN'.freeze
    SQL_COMMIT = 'COMMIT'.freeze
    SQL_ROLLBACK = 'ROLLBACK'.freeze

    # A simple implementation of SQL transactions. Nested transactions are not 
    # supported - calling #transaction within a transaction will reuse the 
    # current transaction. May be overridden for databases that support nested 
    # transactions.
    def transaction
      @pool.hold do |conn|
        @transactions ||= []
        if @transactions.include? Thread.current
          return yield(conn)
        end
        conn.execute(SQL_BEGIN)
        begin
          @transactions << Thread.current
          result = yield(conn)
          conn.execute(SQL_COMMIT)
          result
        rescue => e
          conn.execute(SQL_ROLLBACK)
          raise e unless Error::Rollback === e
        ensure
          @transactions.delete(Thread.current)
        end
      end
    end
    
    # Returns a string representation of the database object including the
    # class name and the connection URI.
    def inspect
      "#<#{self.class}: #{(uri rescue opts).inspect}>" 
    end

    @@adapters = Hash.new
    
    class << self
    private
      # Sets the adapter scheme for the Database class. Call this method in
      # descendnants of Database to allow connection using a URL. For example the
      # following:
      #   class DB2::Database < Sequel::Database
      #     set_adapter_scheme :db2
      #     ...
      #   end
      # would allow connection using:
      #   Sequel.open('db2://user:password@dbserver/mydb')
      def set_adapter_scheme(scheme)
        @scheme = scheme
        @@adapters[scheme.to_sym] = self
      end
    end
    
    # Returns the scheme for the Database class.
    def self.adapter_scheme
      @scheme
    end
    
    # Converts a uri to an options hash. These options are then passed
    # to a newly created database object.
    def self.uri_to_options(uri)
      if uri.is_a?(String)
        uri = URI.parse(uri)
      end
      {
        :user => uri.user,
        :password => uri.password,
        :host => uri.host,
        :port => uri.port,
        :database => (uri.path =~ /\/(.*)/) && ($1)
      }
    end
    
    def self.adapter_class(scheme)
      scheme = scheme.to_sym
      
      if (klass = @@adapters[scheme]).nil?
        # attempt to load the adapter file
        begin
          require File.join(File.dirname(__FILE__), "adapters/#{scheme}")
        rescue LoadError => e
          raise Error::AdapterNotFound, "Could not load #{scheme} adapter:\n  #{e.message}"
        end
        
        # make sure we actually loaded the adapter
        if (klass = @@adapters[scheme]).nil?
          raise Error::AdapterNotFound, "Could not load #{scheme} adapter"
        end
      end
      return klass
    end
        
    # call-seq:
    #   Sequel::Database.connect(conn_string)
    #   Sequel::Database.connect(opts)
    #   Sequel.connect(conn_string)
    #   Sequel.connect(opts)
    #   Sequel.open(conn_string)
    #   Sequel.open(opts)
    #
    # Creates a new database object based on the supplied connection string
    # and or options. If a URI is used, the URI scheme determines the database
    # class used, and the rest of the string specifies the connection options. 
    # For example:
    #
    #   DB = Sequel.open 'sqlite:///blog.db'
    #
    # The second form of this method takes an options:
    #
    #   DB = Sequel.open :adapter => :sqlite, :database => 'blog.db'
    def self.connect(conn_string, opts = nil)
      if conn_string.is_a?(String)
        uri = URI.parse(conn_string)
        scheme = uri.scheme
        scheme = :dbi if scheme =~ /^dbi-(.+)/
        c = adapter_class(scheme)
        opts = c.uri_to_options(uri).merge(opts || {})
      else
        opts = conn_string.merge(opts || {})
        c = adapter_class(opts[:adapter] || opts['adapter'])
      end
      # process opts a bit
      opts = opts.inject({}) do |m, kv| k, v = *kv
        k = :user if k == 'username'
        m[k.to_sym] = v
        m
      end
      c.new(opts)
    end
    
    @@single_threaded = false
    
    # Sets the default single_threaded mode for new databases.
    def self.single_threaded=(value)
      @@single_threaded = value
    end
  end
end