1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
package recovery
import (
"sync"
"sync/atomic"
"github.com/centrifugal/protocol"
)
// PubSubSync wraps logic to synchronize recovery with PUB/SUB.
type PubSubSync struct {
subSyncMu sync.RWMutex
subSync map[string]*subscribeState
}
// NewPubSubSync creates new PubSubSyncer.
func NewPubSubSync() *PubSubSync {
return &PubSubSync{
subSync: make(map[string]*subscribeState),
}
}
type subscribeState struct {
// The following fields help us to synchronize PUB/SUB and history messages
// during publication recovery process in channel.
inSubscribe uint32
pubBufferMu sync.Mutex
pubBufferLocked bool
pubBuffer []*protocol.Publication
}
// SyncPublication ...
func (c *PubSubSync) SyncPublication(channel string, pub *protocol.Publication, syncedFn func()) {
if c.isInSubscribe(channel) {
// Client currently in process of subscribing to this channel. In this case we keep
// publications in slice buffer. Publications from this temporary buffer will be sent in
// subscribe reply.
c.LockBuffer(channel)
if c.isInSubscribe(channel) {
// Sync point not reached yet - put Publication to tmp slice.
c.appendPubToBuffer(channel, pub)
c.unlockBuffer(channel)
return
}
// Sync point already passed - send Publication into connection.
c.unlockBuffer(channel)
}
syncedFn()
}
// StartBuffering ...
func (c *PubSubSync) StartBuffering(channel string) {
c.subSyncMu.Lock()
defer c.subSyncMu.Unlock()
s := &subscribeState{}
c.subSync[channel] = s
atomic.StoreUint32(&s.inSubscribe, 1)
}
// StopBuffering ...
func (c *PubSubSync) StopBuffering(channel string) {
c.subSyncMu.Lock()
defer c.subSyncMu.Unlock()
s, ok := c.subSync[channel]
if !ok {
return
}
atomic.StoreUint32(&s.inSubscribe, 0)
if s.pubBufferLocked {
s.pubBufferMu.Unlock()
}
delete(c.subSync, channel)
}
func (c *PubSubSync) isInSubscribe(channel string) bool {
c.subSyncMu.RLock()
defer c.subSyncMu.RUnlock()
s, ok := c.subSync[channel]
if !ok {
return false
}
return atomic.LoadUint32(&s.inSubscribe) == 1
}
// LockBuffer ...
func (c *PubSubSync) LockBuffer(channel string) {
c.subSyncMu.Lock()
s, ok := c.subSync[channel]
if !ok {
c.subSyncMu.Unlock()
return
}
s.pubBufferLocked = true
c.subSyncMu.Unlock()
s.pubBufferMu.Lock()
}
// UnlockBuffer ...
func (c *PubSubSync) unlockBuffer(channel string) {
c.subSyncMu.Lock()
defer c.subSyncMu.Unlock()
s, ok := c.subSync[channel]
if !ok {
return
}
if s.pubBufferLocked {
s.pubBufferMu.Unlock()
}
}
func (c *PubSubSync) appendPubToBuffer(channel string, pub *protocol.Publication) {
c.subSyncMu.RLock()
defer c.subSyncMu.RUnlock()
s := c.subSync[channel]
s.pubBuffer = append(s.pubBuffer, pub)
}
// ReadBuffered ...
func (c *PubSubSync) ReadBuffered(channel string) []*protocol.Publication {
c.subSyncMu.RLock()
defer c.subSyncMu.RUnlock()
s := c.subSync[channel]
pubs := make([]*protocol.Publication, len(s.pubBuffer))
copy(pubs, s.pubBuffer)
s.pubBuffer = nil
return pubs
}
|