1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
# frozen_string_literal: true
class Redis
class SubscribedClient
def initialize(client)
@client = client
@write_monitor = Monitor.new
end
def call_v(command)
@write_monitor.synchronize do
@client.call_v(command)
end
end
def subscribe(*channels, &block)
subscription("subscribe", "unsubscribe", channels, block)
end
def subscribe_with_timeout(timeout, *channels, &block)
subscription("subscribe", "unsubscribe", channels, block, timeout)
end
def psubscribe(*channels, &block)
subscription("psubscribe", "punsubscribe", channels, block)
end
def psubscribe_with_timeout(timeout, *channels, &block)
subscription("psubscribe", "punsubscribe", channels, block, timeout)
end
def ssubscribe(*channels, &block)
subscription("ssubscribe", "sunsubscribe", channels, block)
end
def ssubscribe_with_timeout(timeout, *channels, &block)
subscription("ssubscribe", "sunsubscribe", channels, block, timeout)
end
def unsubscribe(*channels)
call_v([:unsubscribe, *channels])
end
def punsubscribe(*channels)
call_v([:punsubscribe, *channels])
end
def sunsubscribe(*channels)
call_v([:sunsubscribe, *channels])
end
def close
@client.close
end
protected
def subscription(start, stop, channels, block, timeout = 0)
sub = Subscription.new(&block)
case start
when "ssubscribe" then channels.each { |c| call_v([start, c]) } # avoid cross-slot keys
else call_v([start, *channels])
end
while event = @client.next_event(timeout)
if event.is_a?(::RedisClient::CommandError)
raise Client::ERROR_MAPPING.fetch(event.class), event.message
end
type, *rest = event
if callback = sub.callbacks[type]
callback.call(*rest)
end
break if type == stop && rest.last == 0
end
# No need to unsubscribe here. The real client closes the connection
# whenever an exception is raised (see #ensure_connected).
end
end
class Subscription
attr :callbacks
def initialize
@callbacks = {}
yield(self)
end
def subscribe(&block)
@callbacks["subscribe"] = block
end
def unsubscribe(&block)
@callbacks["unsubscribe"] = block
end
def message(&block)
@callbacks["message"] = block
end
def psubscribe(&block)
@callbacks["psubscribe"] = block
end
def punsubscribe(&block)
@callbacks["punsubscribe"] = block
end
def pmessage(&block)
@callbacks["pmessage"] = block
end
def ssubscribe(&block)
@callbacks["ssubscribe"] = block
end
def sunsubscribe(&block)
@callbacks["sunsubscribe"] = block
end
def smessage(&block)
@callbacks["smessage"] = block
end
end
end
|