File: pub_sub.rb

package info (click to toggle)
ruby-redis-cluster-client 0.10.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: ruby: 2,214; makefile: 4
file content (144 lines) | stat: -rw-r--r-- 3,835 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/normalized_cmd_name'

class RedisClient
  class Cluster
    class PubSub
      class State
        def initialize(client, queue)
          @client = client
          @worker = nil
          @queue = queue
        end

        def call(command)
          @client.call_v(command)
        end

        def ensure_worker
          @worker = spawn_worker(@client, @queue) unless @worker&.alive?
        end

        def close
          @worker.exit if @worker&.alive?
          @client.close
        end

        private

        def spawn_worker(client, queue)
          # Ruby VM allocates 1 MB memory as a stack for a thread.
          # It is a fixed size but we can modify the size with some environment variables.
          # So it consumes memory 1 MB multiplied a number of workers.
          Thread.new(client, queue) do |pubsub, q|
            loop do
              q << pubsub.next_event
            rescue StandardError => e
              q << e
            end
          end
        end
      end

      BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))

      def initialize(router, command_builder)
        @router = router
        @command_builder = command_builder
        @queue = SizedQueue.new(BUF_SIZE)
        @state_dict = {}
      end

      def call(*args, **kwargs)
        _call(@command_builder.generate(args, kwargs))
        nil
      end

      def call_v(command)
        _call(@command_builder.generate(command))
        nil
      end

      def close
        @state_dict.each_value(&:close)
        @state_dict.clear
        @queue.clear
        @queue.close
        nil
      end

      def next_event(timeout = nil)
        @state_dict.each_value(&:ensure_worker)
        max_duration = calc_max_duration(timeout)
        starting = obtain_current_time

        loop do
          break if max_duration > 0 && obtain_current_time - starting > max_duration

          case event = @queue.pop(true)
          when StandardError then raise event
          when Array then break event
          end
        rescue ThreadError
          sleep 0.005
        end
      end

      private

      def _call(command)
        case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
        when 'subscribe', 'psubscribe', 'ssubscribe' then call_to_single_state(command)
        when 'unsubscribe', 'punsubscribe' then call_to_all_states(command)
        when 'sunsubscribe' then call_for_sharded_states(command)
        else call_to_single_state(command)
        end
      end

      def call_to_single_state(command)
        node_key = @router.find_node_key(command)
        try_call(node_key, command)
      end

      def call_to_all_states(command)
        @state_dict.each_value { |s| s.call(command) }
      end

      def call_for_sharded_states(command)
        if command.size == 1
          call_to_all_states(command)
        else
          call_to_single_state(command)
        end
      end

      def try_call(node_key, command, retry_count: 1)
        add_state(node_key).call(command)
      rescue ::RedisClient::CommandError => e
        raise if !e.message.start_with?('MOVED') || retry_count <= 0

        # for sharded pub/sub
        node_key = e.message.split[2]
        retry_count -= 1
        retry
      end

      def add_state(node_key)
        return @state_dict[node_key] if @state_dict.key?(node_key)

        state = State.new(@router.find_node(node_key).pubsub, @queue)
        @state_dict[node_key] = state
      end

      def obtain_current_time
        Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
      end

      def calc_max_duration(timeout)
        timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
      end
    end
  end
end