File: watch.go

package info (click to toggle)
golang-github-hashicorp-go-memdb 0.0~git20170123.0.c01f56b-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 204 kB
  • ctags: 159
  • sloc: makefile: 5
file content (108 lines) | stat: -rw-r--r-- 2,788 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package memdb

import "time"

// WatchSet is a collection of watch channels.
type WatchSet map[<-chan struct{}]struct{}

// NewWatchSet constructs a new watch set.
func NewWatchSet() WatchSet {
	return make(map[<-chan struct{}]struct{})
}

// Add appends a watchCh to the WatchSet if non-nil.
func (w WatchSet) Add(watchCh <-chan struct{}) {
	if w == nil {
		return
	}

	if _, ok := w[watchCh]; !ok {
		w[watchCh] = struct{}{}
	}
}

// AddWithLimit appends a watchCh to the WatchSet if non-nil, and if the given
// softLimit hasn't been exceeded. Otherwise, it will watch the given alternate
// channel. It's expected that the altCh will be the same on many calls to this
// function, so you will exceed the soft limit a little bit if you hit this, but
// not by much.
//
// This is useful if you want to track individual items up to some limit, after
// which you watch a higher-level channel (usually a channel from start start of
// an iterator higher up in the radix tree) that will watch a superset of items.
func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-chan struct{}) {
	// This is safe for a nil WatchSet so we don't need to check that here.
	if len(w) < softLimit {
		w.Add(watchCh)
	} else {
		w.Add(altCh)
	}
}

// Watch is used to wait for either the watch set to trigger or a timeout.
// Returns true on timeout.
func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
	if w == nil {
		return false
	}

	if n := len(w); n <= aFew {
		idx := 0
		chunk := make([]<-chan struct{}, aFew)
		for watchCh := range w {
			chunk[idx] = watchCh
			idx++
		}
		return watchFew(chunk, timeoutCh)
	} else {
		return w.watchMany(timeoutCh)
	}
}

// watchMany is used if there are many watchers.
func (w WatchSet) watchMany(timeoutCh <-chan time.Time) bool {
	// Make a fake timeout channel we can feed into watchFew to cancel all
	// the blocking goroutines.
	doneCh := make(chan time.Time)
	defer close(doneCh)

	// Set up a goroutine for each watcher.
	triggerCh := make(chan struct{}, 1)
	watcher := func(chunk []<-chan struct{}) {
		if timeout := watchFew(chunk, doneCh); !timeout {
			select {
			case triggerCh <- struct{}{}:
			default:
			}
		}
	}

	// Apportion the watch channels into chunks we can feed into the
	// watchFew helper.
	idx := 0
	chunk := make([]<-chan struct{}, aFew)
	for watchCh := range w {
		subIdx := idx % aFew
		chunk[subIdx] = watchCh
		idx++

		// Fire off this chunk and start a fresh one.
		if idx%aFew == 0 {
			go watcher(chunk)
			chunk = make([]<-chan struct{}, aFew)
		}
	}

	// Make sure to watch any residual channels in the last chunk.
	if idx%aFew != 0 {
		go watcher(chunk)
	}

	// Wait for a channel to trigger or timeout.
	select {
	case <-triggerCh:
		return false
	case <-timeoutCh:
		return true
	}
}