1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
|
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v2store
import (
"container/list"
"path"
"strings"
"sync"
"sync/atomic"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
)
// A watcherHub contains all subscribed watchers
// watchers is a map with watched path as key and watcher as value
// EventHistory keeps the old events for watcherHub. It is used to help
// watcher to get a continuous event history. Or a watcher might miss the
// event happens between the end of the first watch command and the start
// of the second command.
type watcherHub struct {
// count must be the first element to keep 64-bit alignment for atomic
// access
count int64 // current number of watchers.
mutex sync.Mutex
watchers map[string]*list.List
EventHistory *EventHistory
}
// newWatchHub creates a watcherHub. The capacity determines how many events we will
// keep in the eventHistory.
// Typically, we only need to keep a small size of history[smaller than 20K].
// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
func newWatchHub(capacity int) *watcherHub {
return &watcherHub{
watchers: make(map[string]*list.List),
EventHistory: newEventHistory(capacity),
}
}
// Watch function returns a Watcher.
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
// If index is zero, watch will start from the current index + 1.
func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *v2error.Error) {
reportWatchRequest()
event, err := wh.EventHistory.scan(key, recursive, index)
if err != nil {
err.Index = storeIndex
return nil, err
}
w := &watcher{
eventChan: make(chan *Event, 100), // use a buffered channel
recursive: recursive,
stream: stream,
sinceIndex: index,
startIndex: storeIndex,
hub: wh,
}
wh.mutex.Lock()
defer wh.mutex.Unlock()
// If the event exists in the known history, append the EtcdIndex and return immediately
if event != nil {
ne := event.Clone()
ne.EtcdIndex = storeIndex
w.eventChan <- ne
return w, nil
}
l, ok := wh.watchers[key]
var elem *list.Element
if ok { // add the new watcher to the back of the list
elem = l.PushBack(w)
} else { // create a new list and add the new watcher
l = list.New()
elem = l.PushBack(w)
wh.watchers[key] = l
}
w.remove = func() {
if w.removed { // avoid removing it twice
return
}
w.removed = true
l.Remove(elem)
atomic.AddInt64(&wh.count, -1)
reportWatcherRemoved()
if l.Len() == 0 {
delete(wh.watchers, key)
}
}
atomic.AddInt64(&wh.count, 1)
reportWatcherAdded()
return w, nil
}
func (wh *watcherHub) add(e *Event) {
wh.EventHistory.addEvent(e)
}
// notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) {
e = wh.EventHistory.addEvent(e) // add event into the eventHistory
segments := strings.Split(e.Node.Key, "/")
currPath := "/"
// walk through all the segments of the path and notify the watchers
// if the path is "/foo/bar", it will notify watchers with path "/",
// "/foo" and "/foo/bar"
for _, segment := range segments {
currPath = path.Join(currPath, segment)
// notify the watchers who interests in the changes of current path
wh.notifyWatchers(e, currPath, false)
}
}
func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
wh.mutex.Lock()
defer wh.mutex.Unlock()
l, ok := wh.watchers[nodePath]
if ok {
curr := l.Front()
for curr != nil {
next := curr.Next() // save reference to the next one in the list
w, _ := curr.Value.(*watcher)
originalPath := e.Node.Key == nodePath
if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
if !w.stream { // do not remove the stream watcher
// if we successfully notify a watcher
// we need to remove the watcher from the list
// and decrease the counter
w.removed = true
l.Remove(curr)
atomic.AddInt64(&wh.count, -1)
reportWatcherRemoved()
}
}
curr = next // update current to the next element in the list
}
if l.Len() == 0 {
// if we have notified all watcher in the list
// we can delete the list
delete(wh.watchers, nodePath)
}
}
}
// clone function clones the watcherHub and return the cloned one.
// only clone the static content. do not clone the current watchers.
func (wh *watcherHub) clone() *watcherHub {
clonedHistory := wh.EventHistory.clone()
return &watcherHub{
EventHistory: clonedHistory,
}
}
// isHidden checks to see if key path is considered hidden to watch path i.e. the
// last element is hidden or it's within a hidden directory
func isHidden(watchPath, keyPath string) bool {
// When deleting a directory, watchPath might be deeper than the actual keyPath
// For example, when deleting /foo we also need to notify watchers on /foo/bar.
if len(watchPath) > len(keyPath) {
return false
}
// if watch path is just a "/", after path will start without "/"
// add a "/" to deal with the special case when watchPath is "/"
afterPath := path.Clean("/" + keyPath[len(watchPath):])
return strings.Contains(afterPath, "/_")
}
|