File: pg_streaming.rb

package info (click to toggle)
ruby-sequel-pg 1.18.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 204 kB
  • sloc: ansic: 1,938; ruby: 348; makefile: 2
file content (169 lines) | stat: -rw-r--r-- 4,654 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# :nocov:
unless Sequel::Postgres.respond_to?(:supports_streaming?)
  raise LoadError, "either sequel_pg not loaded, or an old version of sequel_pg loaded"
end
unless Sequel::Postgres.supports_streaming?
  raise LoadError, "streaming is not supported by the version of libpq in use"
end
# :nocov:

# Database methods necessary to support streaming.  You should load this extension
# into your database object:
#
#   DB.extension(:pg_streaming)
#
# Then you can call #stream on your datasets to use the streaming support:
#
#   DB[:table].stream.each{|row| ...}
#
# Or change a set so that all dataset calls use streaming:
#
#   DB.stream_all_queries = true
module Sequel::Postgres::Streaming
  attr_accessor :stream_all_queries

  # Also extend the database's datasets to support streaming.
  # This extension requires modifying connections, so disconnect
  # so that new connections will get the methods.
  def self.extended(db)
    db.extend_datasets(DatasetMethods)
    db.stream_all_queries = false
    db.disconnect
  end

  # Make sure all new connections have the appropriate methods added.
  def connect(server)
    conn = super
    conn.extend(AdapterMethods)
    conn
  end

  private

  # If streaming is requested, and a prepared statement is not
  # used, tell the connection to use single row mode for the query.
  def _execute(conn, sql, opts={}, &block)
    if opts[:stream] && !sql.is_a?(Symbol) 
      conn.single_row_mode = true
    end
    super
  end

  # If streaming is requested, send the prepared statement instead
  # of executing it and blocking.
  def _execute_prepared_statement(conn, ps_name, args, opts)
    if opts[:stream]
      conn.send_prepared_statement(ps_name, args)
    else
      super
    end
  end

  module AdapterMethods
    # Whether the next query on this connection should use
    # single_row_mode.
    attr_accessor :single_row_mode

    # Send the prepared statement on this connection using
    # single row mode.
    def send_prepared_statement(ps_name, args)
      send_query_prepared(ps_name, args)
      set_single_row_mode
      block
      self
    end

    private

    # :nocov:
    unless Sequel::Postgres::Adapter.method_defined?(:send_query_params)
      def send_query_params(*args)
        send_query(*args)
      end
    end
    # :nocov:

    if Sequel::Database.instance_methods.map(&:to_s).include?('log_connection_yield')
      # If using single row mode, send the query instead of executing it.
      def execute_query(sql, args)
        if @single_row_mode
          @single_row_mode = false
          @db.log_connection_yield(sql, self, args){args ? send_query_params(sql, args) : send_query(sql)}
          set_single_row_mode
          block
          self
        else
          super
        end
      end
    else
      # :nocov:
      def execute_query(sql, args)
        if @single_row_mode
          @single_row_mode = false
          @db.log_yield(sql, args){args ? send_query(sql, args) : send_query(sql)}
          set_single_row_mode
          block
          self
        else
          super
        end
      end
      # :nocov:
    end
  end

  # Dataset methods used to implement streaming.
  module DatasetMethods
    # If streaming has been requested and the current dataset
    # can be streamed, request the database use streaming when
    # executing this query, and use yield_each_row to process
    # the separate PGresult for each row in the connection.
    def fetch_rows(sql)
      if stream_results?
        execute(sql, :stream=>true) do |conn|
          yield_each_row(conn){|h| yield h}
        end
      else
        super
      end
    end

    # Use streaming to implement paging.
    def paged_each(opts=Sequel::OPTS, &block)
      unless block_given?
        return enum_for(:paged_each, opts)
      end

      if stream_results?
        each(&block)
      else
        super
      end
    end

    # Return a clone of the dataset that will use streaming to load
    # rows.
    def stream
      clone(:stream=>true)
    end

    private

    # Only stream results if streaming has been specifically requested
    # and the query is streamable.
    def stream_results?
      (@opts[:stream] || db.stream_all_queries) && streamable?
    end

    # Queries using cursors are not streamable, and queries that use
    # the map/select_map/to_hash/to_hash_groups optimizations are not
    # streamable, but other queries are streamable.
    def streamable?
      spgt = (o = @opts)[:_sequel_pg_type]
      (spgt.nil? || spgt == :model) && !o[:cursor]
    end
  end
end

Sequel::Database.register_extension(:pg_streaming, Sequel::Postgres::Streaming)