1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
// Copyright (c) 2014-2019 Ludovic Fauvet
// Licensed under the MIT license
package database
import (
"sync"
"time"
"github.com/gomodule/redigo/redis"
"github.com/op/go-logging"
)
var (
log = logging.MustGetLogger("main")
)
type pubsubEvent string
const (
CLUSTER pubsubEvent = "_mirrorbits_cluster"
FILE_UPDATE pubsubEvent = "_mirrorbits_file_update"
MIRROR_UPDATE pubsubEvent = "_mirrorbits_mirror_update"
MIRROR_FILE_UPDATE pubsubEvent = "_mirrorbits_mirror_file_update"
PUBSUB_RECONNECTED pubsubEvent = "_mirrorbits_pubsub_reconnected"
)
// Pubsub is the internal structure of the publish/subscribe handler
type Pubsub struct {
r *Redis
rconn redis.Conn
connlock sync.Mutex
extSubscribers map[string][]chan string
extSubscribersLock sync.RWMutex
stop chan bool
wg sync.WaitGroup
}
// NewPubsub returns a new instance of the publish/subscribe handler
func NewPubsub(r *Redis) *Pubsub {
pubsub := new(Pubsub)
pubsub.r = r
pubsub.stop = make(chan bool)
pubsub.extSubscribers = make(map[string][]chan string)
go pubsub.updateEvents()
return pubsub
}
// Close all the connections to the pubsub server
func (p *Pubsub) Close() {
close(p.stop)
p.connlock.Lock()
if p.rconn != nil {
// FIXME Calling p.rconn.Close() here will block indefinitely in redigo
p.rconn.Send("UNSUBSCRIBE")
p.rconn.Send("QUIT")
p.rconn.Flush()
}
p.connlock.Unlock()
p.wg.Wait()
}
// SubscribeEvent allows subscription to a particular kind of events and receive a
// notification when an event is dispatched on the given channel.
func (p *Pubsub) SubscribeEvent(event pubsubEvent, channel chan string) {
p.extSubscribersLock.Lock()
defer p.extSubscribersLock.Unlock()
listeners := p.extSubscribers[string(event)]
listeners = append(listeners, channel)
p.extSubscribers[string(event)] = listeners
}
func (p *Pubsub) updateEvents() {
p.wg.Add(1)
defer p.wg.Done()
disconnected := false
connect:
for {
select {
case <-p.stop:
return
default:
}
p.connlock.Lock()
p.rconn = p.r.Get()
if _, err := p.rconn.Do("PING"); err != nil {
disconnected = true
p.rconn.Close()
p.rconn = nil
p.connlock.Unlock()
if RedisIsLoading(err) {
// Doing a PING after (re-connection) prevents cases where redis
// is currently loading the dataset and is still not ready.
log.Warning("Redis is still loading the dataset in memory")
}
time.Sleep(500 * time.Millisecond)
continue
}
p.connlock.Unlock()
log.Debug("Subscribing pubsub")
psc := redis.PubSubConn{Conn: p.rconn}
psc.Subscribe(CLUSTER)
psc.Subscribe(FILE_UPDATE)
psc.Subscribe(MIRROR_UPDATE)
psc.Subscribe(MIRROR_FILE_UPDATE)
if disconnected == true {
// This is a way to keep the cache active while disconnected
// from redis but still clear the cache (possibly outdated)
// after a successful reconnection.
disconnected = false
p.handleMessage(string(PUBSUB_RECONNECTED), nil)
}
for {
switch v := psc.Receive().(type) {
case redis.Message:
//log.Debugf("Redis message on channel %s: message: %s", v.Channel, v.Data)
p.handleMessage(v.Channel, v.Data)
case redis.Subscription:
log.Debugf("Redis subscription on channel %s: %s (%d)", v.Channel, v.Kind, v.Count)
case error:
select {
case <-p.stop:
return
default:
}
log.Errorf("Pubsub disconnected: %s", v)
psc.Close()
p.rconn.Close()
time.Sleep(50 * time.Millisecond)
disconnected = true
goto connect
}
}
}
}
// Notify subscribers of the new message
func (p *Pubsub) handleMessage(channel string, data []byte) {
p.extSubscribersLock.RLock()
defer p.extSubscribersLock.RUnlock()
listeners := p.extSubscribers[channel]
for _, listener := range listeners {
// Block if the listener is not available
listener <- string(data)
}
}
// Publish a message on the pubsub server
func Publish(r redis.Conn, event pubsubEvent, message string) error {
_, err := r.Do("PUBLISH", string(event), message)
return err
}
// SendPublish add the message to a transaction
func SendPublish(r redis.Conn, event pubsubEvent, message string) error {
err := r.Send("PUBLISH", string(event), message)
return err
}
|