File: sync.go

package info (click to toggle)
golang-github-centrifugal-centrifuge 0.15.0%2Bgit20210306.f435ba2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 1,612 kB
  • sloc: javascript: 102; makefile: 2
file content (127 lines) | stat: -rw-r--r-- 3,005 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package recovery

import (
	"sync"
	"sync/atomic"

	"github.com/centrifugal/protocol"
)

// PubSubSync wraps logic to synchronize recovery with PUB/SUB.
type PubSubSync struct {
	subSyncMu sync.RWMutex
	subSync   map[string]*subscribeState
}

// NewPubSubSync creates new PubSubSyncer.
func NewPubSubSync() *PubSubSync {
	return &PubSubSync{
		subSync: make(map[string]*subscribeState),
	}
}

type subscribeState struct {
	// The following fields help us to synchronize PUB/SUB and history messages
	// during publication recovery process in channel.
	inSubscribe     uint32
	pubBufferMu     sync.Mutex
	pubBufferLocked bool
	pubBuffer       []*protocol.Publication
}

// SyncPublication ...
func (c *PubSubSync) SyncPublication(channel string, pub *protocol.Publication, syncedFn func()) {
	if c.isInSubscribe(channel) {
		// Client currently in process of subscribing to this channel. In this case we keep
		// publications in slice buffer. Publications from this temporary buffer will be sent in
		// subscribe reply.
		c.LockBuffer(channel)
		if c.isInSubscribe(channel) {
			// Sync point not reached yet - put Publication to tmp slice.
			c.appendPubToBuffer(channel, pub)
			c.unlockBuffer(channel)
			return
		}
		// Sync point already passed - send Publication into connection.
		c.unlockBuffer(channel)
	}
	syncedFn()
}

// StartBuffering ...
func (c *PubSubSync) StartBuffering(channel string) {
	c.subSyncMu.Lock()
	defer c.subSyncMu.Unlock()
	s := &subscribeState{}
	c.subSync[channel] = s
	atomic.StoreUint32(&s.inSubscribe, 1)
}

// StopBuffering ...
func (c *PubSubSync) StopBuffering(channel string) {
	c.subSyncMu.Lock()
	defer c.subSyncMu.Unlock()
	s, ok := c.subSync[channel]
	if !ok {
		return
	}
	atomic.StoreUint32(&s.inSubscribe, 0)
	if s.pubBufferLocked {
		s.pubBufferMu.Unlock()
	}
	delete(c.subSync, channel)
}

func (c *PubSubSync) isInSubscribe(channel string) bool {
	c.subSyncMu.RLock()
	defer c.subSyncMu.RUnlock()
	s, ok := c.subSync[channel]
	if !ok {
		return false
	}
	return atomic.LoadUint32(&s.inSubscribe) == 1
}

// LockBuffer ...
func (c *PubSubSync) LockBuffer(channel string) {
	c.subSyncMu.Lock()
	s, ok := c.subSync[channel]
	if !ok {
		c.subSyncMu.Unlock()
		return
	}
	s.pubBufferLocked = true
	c.subSyncMu.Unlock()
	s.pubBufferMu.Lock()
}

// UnlockBuffer ...
func (c *PubSubSync) unlockBuffer(channel string) {
	c.subSyncMu.Lock()
	defer c.subSyncMu.Unlock()
	s, ok := c.subSync[channel]
	if !ok {
		return
	}
	if s.pubBufferLocked {
		s.pubBufferMu.Unlock()
	}
}

func (c *PubSubSync) appendPubToBuffer(channel string, pub *protocol.Publication) {
	c.subSyncMu.RLock()
	defer c.subSyncMu.RUnlock()
	s := c.subSync[channel]
	s.pubBuffer = append(s.pubBuffer, pub)
}

// ReadBuffered ...
func (c *PubSubSync) ReadBuffered(channel string) []*protocol.Publication {
	c.subSyncMu.RLock()
	defer c.subSyncMu.RUnlock()
	s := c.subSync[channel]
	pubs := make([]*protocol.Publication, len(s.pubBuffer))
	copy(pubs, s.pubBuffer)
	s.pubBuffer = nil
	return pubs
}