File: em-hiredis.rb

package info (click to toggle)
ruby-em-synchrony 1.0.5-3.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 572 kB
  • sloc: ruby: 3,458; sh: 37; sql: 7; makefile: 2
file content (117 lines) | stat: -rw-r--r-- 3,347 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
begin
  require 'em-hiredis'
rescue LoadError => error
  raise 'Missing EM-Synchrony dependency: gem install em-hiredis'
end

module EventMachine
  module Hiredis
    
    def self.connect(uri = nil)
      client = setup(uri)
      EM::Synchrony.sync client.connect
      client
    end
    
    class Client
      def pubsub
        return @pubsub if @pubsub

        client = PubsubClient.new(@host, @port, @password, @db)
        EM::Synchrony.sync client.connect
        @pubsub = client
      end
    end
    
    class BaseClient
      def self.connect(host = 'localhost', port = 6379)
        conn = new(host, port)
        EM::Synchrony.sync conn.connect
        conn
      end
      
      def connect
        @auto_reconnect = true
        @connection = EM.connect(@host, @port, Connection, @host, @port)
        
        @connection.on(:closed) do
          if @connected
            @defs.each { |d| d.fail(Error.new("Redis disconnected")) }
            @defs = []
            @deferred_status = nil
            @connected = false
            if @auto_reconnect
              # Next tick avoids reconnecting after for example EM.stop
              EM.next_tick { reconnect }
            end
            emit(:disconnected)
            EM::Hiredis.logger.info("#{@connection} Disconnected")
          else
            if @auto_reconnect
              @reconnect_failed_count += 1
              @reconnect_timer = EM.add_timer(EM::Hiredis.reconnect_timeout) {
                @reconnect_timer = nil
                reconnect
              }
              emit(:reconnect_failed, @reconnect_failed_count)
              EM::Hiredis.logger.info("#{@connection} Reconnect failed")
              
              if @reconnect_failed_count >= 4
                emit(:failed)
                self.fail(Error.new("Could not connect after 4 attempts"))
              end
            end
          end
        end
        
        @connection.on(:connected) do
          Fiber.new do
            @connected = true
            @reconnect_failed_count = 0
            @failed = false
            
            select(@db) unless @db == 0
            auth(@password) if @password
            
            @command_queue.each do |df, command, args|
              @connection.send_command(command, args)
              @defs.push(df)
            end
            @command_queue = []
            
            emit(:connected)
            EM::Hiredis.logger.info("#{@connection} Connected")
            succeed
            
            if @reconnecting
              @reconnecting = false
              emit(:reconnected)
            end
          end.resume
        end
        
        @connection.on(:message) do |reply|
          if RuntimeError === reply
            raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
            deferred = @defs.shift
            error = RedisError.new(reply.message)
            error.redis_error = reply
            deferred.fail(error) if deferred
          else
            handle_reply(reply)
          end
        end
        
        @connected = false
        @reconnecting = false
        
        return self
      end
      
      alias :old_method_missing :method_missing
      def method_missing(sym, *args)
        EM::Synchrony.sync old_method_missing(sym, *args)
      end
    end
  end
end