File: cache.go

package info (click to toggle)
golang-github-go-kit-kit 0.13.0-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,784 kB
  • sloc: sh: 22; makefile: 11
file content (102 lines) | stat: -rw-r--r-- 2,209 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package instance

import (
	"reflect"
	"sort"
	"sync"

	"github.com/go-kit/kit/sd"
)

// Cache keeps track of resource instances provided to it via Update method
// and implements the Instancer interface
type Cache struct {
	mtx   sync.RWMutex
	state sd.Event
	reg   registry
}

// NewCache creates a new Cache.
func NewCache() *Cache {
	return &Cache{
		reg: registry{},
	}
}

// Update receives new instances from service discovery, stores them internally,
// and notifies all registered listeners.
func (c *Cache) Update(event sd.Event) {
	c.mtx.Lock()
	defer c.mtx.Unlock()

	sort.Strings(event.Instances)
	if reflect.DeepEqual(c.state, event) {
		return // no need to broadcast the same instances
	}

	c.state = event
	c.reg.broadcast(event)
}

// State returns the current state of discovery (instances or error) as sd.Event
func (c *Cache) State() sd.Event {
	c.mtx.RLock()
	event := c.state
	c.mtx.RUnlock()
	eventCopy := copyEvent(event)
	return eventCopy
}

// Stop implements Instancer. Since the cache is just a plain-old store of data,
// Stop is a no-op.
func (c *Cache) Stop() {}

// Register implements Instancer.
func (c *Cache) Register(ch chan<- sd.Event) {
	c.mtx.Lock()
	defer c.mtx.Unlock()
	c.reg.register(ch)
	event := c.state
	eventCopy := copyEvent(event)
	// always push the current state to new channels
	ch <- eventCopy
}

// Deregister implements Instancer.
func (c *Cache) Deregister(ch chan<- sd.Event) {
	c.mtx.Lock()
	defer c.mtx.Unlock()
	c.reg.deregister(ch)
}

// registry is not goroutine-safe.
type registry map[chan<- sd.Event]struct{}

func (r registry) broadcast(event sd.Event) {
	for c := range r {
		eventCopy := copyEvent(event)
		c <- eventCopy
	}
}

func (r registry) register(c chan<- sd.Event) {
	r[c] = struct{}{}
}

func (r registry) deregister(c chan<- sd.Event) {
	delete(r, c)
}

// copyEvent does a deep copy on sd.Event
func copyEvent(e sd.Event) sd.Event {
	// observers all need their own copy of event
	// because they can directly modify event.Instances
	// for example, by calling sort.Strings
	if e.Instances == nil {
		return e
	}
	instances := make([]string, len(e.Instances))
	copy(instances, e.Instances)
	e.Instances = instances
	return e
}