File: cluster.rb

package info (click to toggle)
ruby-redis-cluster-client 0.10.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: ruby: 2,214; makefile: 4
file content (161 lines) | stat: -rw-r--r-- 4,547 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# frozen_string_literal: true

require 'redis_client/cluster/concurrent_worker'
require 'redis_client/cluster/pipeline'
require 'redis_client/cluster/pub_sub'
require 'redis_client/cluster/router'
require 'redis_client/cluster/transaction'
require 'redis_client/cluster/optimistic_locking'

class RedisClient
  class Cluster
    ZERO_CURSOR_FOR_SCAN = '0'

    attr_reader :config

    def initialize(config, pool: nil, concurrency: nil, **kwargs)
      @config = config
      @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
      @command_builder = config.command_builder

      @pool = pool
      @kwargs = kwargs
      @router = nil
      @mutex = Mutex.new
    end

    def inspect
      node_keys = @router.nil? ? @config.startup_nodes.keys : router.node_keys
      "#<#{self.class.name} #{node_keys.join(', ')}>"
    end

    def call(*args, **kwargs, &block)
      command = @command_builder.generate(args, kwargs)
      router.send_command(:call_v, command, &block)
    end

    def call_v(command, &block)
      command = @command_builder.generate(command)
      router.send_command(:call_v, command, &block)
    end

    def call_once(*args, **kwargs, &block)
      command = @command_builder.generate(args, kwargs)
      router.send_command(:call_once_v, command, &block)
    end

    def call_once_v(command, &block)
      command = @command_builder.generate(command)
      router.send_command(:call_once_v, command, &block)
    end

    def blocking_call(timeout, *args, **kwargs, &block)
      command = @command_builder.generate(args, kwargs)
      router.send_command(:blocking_call_v, command, timeout, &block)
    end

    def blocking_call_v(timeout, command, &block)
      command = @command_builder.generate(command)
      router.send_command(:blocking_call_v, command, timeout, &block)
    end

    def scan(*args, **kwargs, &block)
      raise ArgumentError, 'block required' unless block

      seed = Random.new_seed
      cursor = ZERO_CURSOR_FOR_SCAN
      loop do
        cursor, keys = router.scan('SCAN', cursor, *args, seed: seed, **kwargs)
        keys.each(&block)
        break if cursor == ZERO_CURSOR_FOR_SCAN
      end
    end

    def sscan(key, *args, **kwargs, &block)
      node = router.assign_node(['SSCAN', key])
      router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
    end

    def hscan(key, *args, **kwargs, &block)
      node = router.assign_node(['HSCAN', key])
      router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
    end

    def zscan(key, *args, **kwargs, &block)
      node = router.assign_node(['ZSCAN', key])
      router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
    end

    def pipelined(exception: true)
      seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
      pipeline = ::RedisClient::Cluster::Pipeline.new(
        router,
        @command_builder,
        @concurrent_worker,
        exception: exception,
        seed: seed
      )

      yield pipeline
      return [] if pipeline.empty?

      pipeline.execute
    end

    def multi(watch: nil)
      if watch.nil? || watch.empty?
        transaction = ::RedisClient::Cluster::Transaction.new(router, @command_builder)
        yield transaction
        return transaction.execute
      end

      ::RedisClient::Cluster::OptimisticLocking.new(router).watch(watch) do |c, slot, asking|
        transaction = ::RedisClient::Cluster::Transaction.new(
          router, @command_builder, node: c, slot: slot, asking: asking
        )
        yield transaction
        transaction.execute
      end
    end

    def pubsub
      ::RedisClient::Cluster::PubSub.new(router, @command_builder)
    end

    def with(...)
      raise NotImplementedError, 'No way to use'
    end

    def close
      @router&.close
      @concurrent_worker.close
      nil
    end

    private

    def router
      return @router unless @router.nil?

      @mutex.synchronize do
        @router ||= ::RedisClient::Cluster::Router.new(@config, @concurrent_worker, pool: @pool, **@kwargs)
      end
    end

    def method_missing(name, *args, **kwargs, &block)
      if router.command_exists?(name)
        args.unshift(name)
        command = @command_builder.generate(args, kwargs)
        return router.send_command(:call_v, command, &block)
      end

      super
    end

    def respond_to_missing?(name, include_private = false)
      return true if router.command_exists?(name)

      super
    end
  end
end