File: future_result.rb

package info (click to toggle)
rails 2%3A7.2.2.1%2Bdfsg-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 43,352 kB
  • sloc: ruby: 349,799; javascript: 30,703; yacc: 46; sql: 43; sh: 29; makefile: 27
file content (178 lines) | stat: -rw-r--r-- 3,788 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# frozen_string_literal: true

module ActiveRecord
  class FutureResult # :nodoc:
    class Complete
      attr_reader :result
      delegate :empty?, :to_a, to: :result

      def initialize(result)
        @result = result
      end

      def pending?
        false
      end

      def canceled?
        false
      end

      def then(&block)
        Promise::Complete.new(@result.then(&block))
      end
    end

    class EventBuffer
      def initialize(future_result, instrumenter)
        @future_result = future_result
        @instrumenter = instrumenter
        @events = []
      end

      def instrument(name, payload = {}, &block)
        event = @instrumenter.new_event(name, payload)
        begin
          event.record(&block)
        ensure
          @events << event
        end
      end

      def flush
        events, @events = @events, []
        events.each do |event|
          event.payload[:lock_wait] = @future_result.lock_wait
          ActiveSupport::Notifications.publish_event(event)
        end
      end
    end

    Canceled = Class.new(ActiveRecordError)

    def self.wrap(result)
      case result
      when self, Complete
        result
      else
        Complete.new(result)
      end
    end

    delegate :empty?, :to_a, to: :result

    attr_reader :lock_wait

    def initialize(pool, *args, **kwargs)
      @mutex = Mutex.new

      @session = nil
      @pool = pool
      @args = args
      @kwargs = kwargs

      @pending = true
      @error = nil
      @result = nil
      @instrumenter = ActiveSupport::Notifications.instrumenter
      @event_buffer = nil
    end

    def then(&block)
      Promise.new(self, block)
    end

    def schedule!(session)
      @session = session
      @pool.schedule_query(self)
    end

    def execute!(connection)
      execute_query(connection)
    end

    def cancel
      @pending = false
      @error = Canceled
      self
    end

    def execute_or_skip
      return unless pending?

      @pool.with_connection do |connection|
        return unless @mutex.try_lock
        begin
          if pending?
            @event_buffer = EventBuffer.new(self, @instrumenter)
            connection.with_instrumenter(@event_buffer) do
              execute_query(connection, async: true)
            end
          end
        ensure
          @mutex.unlock
        end
      end
    end

    def result
      execute_or_wait
      @event_buffer&.flush

      if canceled?
        raise Canceled
      elsif @error
        raise @error
      else
        @result
      end
    end

    def pending?
      @pending && (!@session || @session.active?)
    end

    def canceled?
      @session && !@session.active?
    end

    private
      def execute_or_wait
        if pending?
          start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond)
          @mutex.synchronize do
            if pending?
              @pool.with_connection do |connection|
                execute_query(connection)
              end
            else
              @lock_wait = (Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start)
            end
          end
        else
          @lock_wait = 0.0
        end
      end

      def execute_query(connection, async: false)
        @result = exec_query(connection, *@args, **@kwargs, async: async)
      rescue => error
        @error = error
      ensure
        @pending = false
      end

      def exec_query(connection, *args, **kwargs)
        connection.internal_exec_query(*args, **kwargs)
      end

      class SelectAll < FutureResult # :nodoc:
        private
          def exec_query(*, **)
            super
          rescue ::RangeError
            ActiveRecord::Result.empty
          end
      end
  end
end