1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
# frozen_string_literal: true
module ActiveRecord
class FutureResult # :nodoc:
class Complete
attr_reader :result
delegate :empty?, :to_a, to: :result
def initialize(result)
@result = result
end
def pending?
false
end
def canceled?
false
end
def then(&block)
Promise::Complete.new(@result.then(&block))
end
end
class EventBuffer
def initialize(future_result, instrumenter)
@future_result = future_result
@instrumenter = instrumenter
@events = []
end
def instrument(name, payload = {}, &block)
event = @instrumenter.new_event(name, payload)
begin
event.record(&block)
ensure
@events << event
end
end
def flush
events, @events = @events, []
events.each do |event|
event.payload[:lock_wait] = @future_result.lock_wait
ActiveSupport::Notifications.publish_event(event)
end
end
end
Canceled = Class.new(ActiveRecordError)
def self.wrap(result)
case result
when self, Complete
result
else
Complete.new(result)
end
end
delegate :empty?, :to_a, to: :result
attr_reader :lock_wait
def initialize(pool, *args, **kwargs)
@mutex = Mutex.new
@session = nil
@pool = pool
@args = args
@kwargs = kwargs
@pending = true
@error = nil
@result = nil
@instrumenter = ActiveSupport::Notifications.instrumenter
@event_buffer = nil
end
def then(&block)
Promise.new(self, block)
end
def schedule!(session)
@session = session
@pool.schedule_query(self)
end
def execute!(connection)
execute_query(connection)
end
def cancel
@pending = false
@error = Canceled
self
end
def execute_or_skip
return unless pending?
@pool.with_connection do |connection|
return unless @mutex.try_lock
begin
if pending?
@event_buffer = EventBuffer.new(self, @instrumenter)
connection.with_instrumenter(@event_buffer) do
execute_query(connection, async: true)
end
end
ensure
@mutex.unlock
end
end
end
def result
execute_or_wait
@event_buffer&.flush
if canceled?
raise Canceled
elsif @error
raise @error
else
@result
end
end
def pending?
@pending && (!@session || @session.active?)
end
def canceled?
@session && !@session.active?
end
private
def execute_or_wait
if pending?
start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond)
@mutex.synchronize do
if pending?
@pool.with_connection do |connection|
execute_query(connection)
end
else
@lock_wait = (Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start)
end
end
else
@lock_wait = 0.0
end
end
def execute_query(connection, async: false)
@result = exec_query(connection, *@args, **@kwargs, async: async)
rescue => error
@error = error
ensure
@pending = false
end
def exec_query(connection, *args, **kwargs)
connection.internal_exec_query(*args, **kwargs)
end
class SelectAll < FutureResult # :nodoc:
private
def exec_query(*, **)
super
rescue ::RangeError
ActiveRecord::Result.empty
end
end
end
end
|