File: pipeline.rb

package info (click to toggle)
ruby-redis 5.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,160 kB
  • sloc: ruby: 11,445; makefile: 117; sh: 24
file content (131 lines) | stat: -rw-r--r-- 2,819 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# frozen_string_literal: true

require "delegate"

class Redis
  class PipelinedConnection
    attr_accessor :db

    def initialize(pipeline, futures = [], exception: true)
      @pipeline = pipeline
      @futures = futures
      @exception = exception
    end

    include Commands

    def pipelined
      yield self
    end

    def multi
      transaction = MultiConnection.new(@pipeline, @futures)
      send_command([:multi])
      size = @futures.size
      yield transaction
      multi_future = MultiFuture.new(@futures[size..-1])
      @pipeline.call_v([:exec]) do |result|
        multi_future._set(result)
      end
      @futures << multi_future
      multi_future
    end

    private

    def synchronize
      yield self
    end

    def send_command(command, &block)
      future = Future.new(command, block, @exception)
      @pipeline.call_v(command) do |result|
        future._set(result)
      end
      @futures << future
      future
    end

    def send_blocking_command(command, timeout, &block)
      future = Future.new(command, block, @exception)
      @pipeline.blocking_call_v(timeout, command) do |result|
        future._set(result)
      end
      @futures << future
      future
    end
  end

  class MultiConnection < PipelinedConnection
    def multi
      raise Redis::BaseError, "Can't nest multi transaction"
    end

    private

    # Blocking commands inside transaction behave like non-blocking.
    # It shouldn't be done though.
    # https://redis.io/commands/blpop/#blpop-inside-a-multi--exec-transaction
    def send_blocking_command(command, _timeout, &block)
      send_command(command, &block)
    end
  end

  class FutureNotReady < RuntimeError
    def initialize
      super("Value will be available once the pipeline executes.")
    end
  end

  class Future < BasicObject
    FutureNotReady = ::Redis::FutureNotReady.new

    def initialize(command, coerce, exception)
      @command = command
      @object = FutureNotReady
      @coerce = coerce
      @exception = exception
    end

    def inspect
      "<Redis::Future #{@command.inspect}>"
    end

    def _set(object)
      @object = @coerce ? @coerce.call(object) : object
      value
    end

    def value
      ::Kernel.raise(@object) if @exception && @object.is_a?(::StandardError)
      @object
    end

    def is_a?(other)
      self.class.ancestors.include?(other)
    end

    def class
      Future
    end
  end

  class MultiFuture < Future
    def initialize(futures)
      @futures = futures
      @command = [:exec]
      @object = FutureNotReady
    end

    def _set(replies)
      @object = if replies
        @futures.map.with_index do |future, index|
          future._set(replies[index])
          future.value
        end
      else
        replies
      end
    end
  end
end