1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
# frozen_string_literal: true
require "delegate"
class Redis
class PipelinedConnection
attr_accessor :db
def initialize(pipeline, futures = [], exception: true)
@pipeline = pipeline
@futures = futures
@exception = exception
end
include Commands
def pipelined
yield self
end
def multi
transaction = MultiConnection.new(@pipeline, @futures)
send_command([:multi])
size = @futures.size
yield transaction
multi_future = MultiFuture.new(@futures[size..-1])
@pipeline.call_v([:exec]) do |result|
multi_future._set(result)
end
@futures << multi_future
multi_future
end
private
def synchronize
yield self
end
def send_command(command, &block)
future = Future.new(command, block, @exception)
@pipeline.call_v(command) do |result|
future._set(result)
end
@futures << future
future
end
def send_blocking_command(command, timeout, &block)
future = Future.new(command, block, @exception)
@pipeline.blocking_call_v(timeout, command) do |result|
future._set(result)
end
@futures << future
future
end
end
class MultiConnection < PipelinedConnection
def multi
raise Redis::BaseError, "Can't nest multi transaction"
end
private
# Blocking commands inside transaction behave like non-blocking.
# It shouldn't be done though.
# https://redis.io/commands/blpop/#blpop-inside-a-multi--exec-transaction
def send_blocking_command(command, _timeout, &block)
send_command(command, &block)
end
end
class FutureNotReady < RuntimeError
def initialize
super("Value will be available once the pipeline executes.")
end
end
class Future < BasicObject
FutureNotReady = ::Redis::FutureNotReady.new
def initialize(command, coerce, exception)
@command = command
@object = FutureNotReady
@coerce = coerce
@exception = exception
end
def inspect
"<Redis::Future #{@command.inspect}>"
end
def _set(object)
@object = @coerce ? @coerce.call(object) : object
value
end
def value
::Kernel.raise(@object) if @exception && @object.is_a?(::StandardError)
@object
end
def is_a?(other)
self.class.ancestors.include?(other)
end
def class
Future
end
end
class MultiFuture < Future
def initialize(futures)
@futures = futures
@command = [:exec]
@object = FutureNotReady
end
def _set(replies)
@object = if replies
@futures.map.with_index do |future, index|
future._set(replies[index])
future.value
end
else
replies
end
end
end
end
|