File: pubsub.go

package info (click to toggle)
mirrorbits 0.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 984 kB
  • sloc: sh: 675; makefile: 93
file content (163 lines) | stat: -rw-r--r-- 4,277 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright (c) 2014-2019 Ludovic Fauvet
// Licensed under the MIT license

package database

import (
	"sync"
	"time"

	"github.com/gomodule/redigo/redis"
	"github.com/op/go-logging"
)

var (
	log = logging.MustGetLogger("main")
)

type pubsubEvent string

const (
	CLUSTER            pubsubEvent = "_mirrorbits_cluster"
	FILE_UPDATE        pubsubEvent = "_mirrorbits_file_update"
	MIRROR_UPDATE      pubsubEvent = "_mirrorbits_mirror_update"
	MIRROR_FILE_UPDATE pubsubEvent = "_mirrorbits_mirror_file_update"

	PUBSUB_RECONNECTED pubsubEvent = "_mirrorbits_pubsub_reconnected"
)

// Pubsub is the internal structure of the publish/subscribe handler
type Pubsub struct {
	r                  *Redis
	rconn              redis.Conn
	connlock           sync.Mutex
	extSubscribers     map[string][]chan string
	extSubscribersLock sync.RWMutex
	stop               chan bool
	wg                 sync.WaitGroup
}

// NewPubsub returns a new instance of the publish/subscribe handler
func NewPubsub(r *Redis) *Pubsub {
	pubsub := new(Pubsub)
	pubsub.r = r
	pubsub.stop = make(chan bool)
	pubsub.extSubscribers = make(map[string][]chan string)
	go pubsub.updateEvents()
	return pubsub
}

// Close all the connections to the pubsub server
func (p *Pubsub) Close() {
	close(p.stop)
	p.connlock.Lock()
	if p.rconn != nil {
		// FIXME Calling p.rconn.Close() here will block indefinitely in redigo
		p.rconn.Send("UNSUBSCRIBE")
		p.rconn.Send("QUIT")
		p.rconn.Flush()
	}
	p.connlock.Unlock()
	p.wg.Wait()
}

// SubscribeEvent allows subscription to a particular kind of events and receive a
// notification when an event is dispatched on the given channel.
func (p *Pubsub) SubscribeEvent(event pubsubEvent, channel chan string) {
	p.extSubscribersLock.Lock()
	defer p.extSubscribersLock.Unlock()

	listeners := p.extSubscribers[string(event)]
	listeners = append(listeners, channel)
	p.extSubscribers[string(event)] = listeners
}

func (p *Pubsub) updateEvents() {
	p.wg.Add(1)
	defer p.wg.Done()
	disconnected := false
connect:
	for {
		select {
		case <-p.stop:
			return
		default:
		}
		p.connlock.Lock()
		p.rconn = p.r.Get()
		if _, err := p.rconn.Do("PING"); err != nil {
			disconnected = true
			p.rconn.Close()
			p.rconn = nil
			p.connlock.Unlock()
			if RedisIsLoading(err) {
				// Doing a PING after (re-connection) prevents cases where redis
				// is currently loading the dataset and is still not ready.
				log.Warning("Redis is still loading the dataset in memory")
			}
			time.Sleep(500 * time.Millisecond)
			continue
		}
		p.connlock.Unlock()
		log.Debug("Subscribing pubsub")
		psc := redis.PubSubConn{Conn: p.rconn}

		psc.Subscribe(CLUSTER)
		psc.Subscribe(FILE_UPDATE)
		psc.Subscribe(MIRROR_UPDATE)
		psc.Subscribe(MIRROR_FILE_UPDATE)

		if disconnected == true {
			// This is a way to keep the cache active while disconnected
			// from redis but still clear the cache (possibly outdated)
			// after a successful reconnection.
			disconnected = false
			p.handleMessage(string(PUBSUB_RECONNECTED), nil)
		}
		for {
			switch v := psc.Receive().(type) {
			case redis.Message:
				//log.Debugf("Redis message on channel %s: message: %s", v.Channel, v.Data)
				p.handleMessage(v.Channel, v.Data)
			case redis.Subscription:
				log.Debugf("Redis subscription on channel %s: %s (%d)", v.Channel, v.Kind, v.Count)
			case error:
				select {
				case <-p.stop:
					return
				default:
				}
				log.Errorf("Pubsub disconnected: %s", v)
				psc.Close()
				p.rconn.Close()
				time.Sleep(50 * time.Millisecond)
				disconnected = true
				goto connect
			}
		}
	}
}

// Notify subscribers of the new message
func (p *Pubsub) handleMessage(channel string, data []byte) {
	p.extSubscribersLock.RLock()
	defer p.extSubscribersLock.RUnlock()

	listeners := p.extSubscribers[channel]
	for _, listener := range listeners {
		// Block if the listener is not available
		listener <- string(data)
	}
}

// Publish a message on the pubsub server
func Publish(r redis.Conn, event pubsubEvent, message string) error {
	_, err := r.Do("PUBLISH", string(event), message)
	return err
}

// SendPublish add the message to a transaction
func SendPublish(r redis.Conn, event pubsubEvent, message string) error {
	err := r.Send("PUBLISH", string(event), message)
	return err
}