1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
package dispatcher
import (
"sync"
"time"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/identity"
"github.com/moby/swarmkit/v2/manager/dispatcher/heartbeat"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const rateLimitCount = 3
type registeredNode struct {
SessionID string
Heartbeat *heartbeat.Heartbeat
Registered time.Time
Attempts int
Node *api.Node
Disconnect chan struct{} // signal to disconnect
mu sync.Mutex
}
// checkSessionID determines if the SessionID has changed and returns the
// appropriate GRPC error code.
//
// This may not belong here in the future.
func (rn *registeredNode) checkSessionID(sessionID string) error {
rn.mu.Lock()
defer rn.mu.Unlock()
// Before each message send, we need to check the nodes sessionID hasn't
// changed. If it has, we will the stream and make the node
// re-register.
if sessionID == "" || rn.SessionID != sessionID {
return status.Errorf(codes.InvalidArgument, ErrSessionInvalid.Error())
}
return nil
}
type nodeStore struct {
periodChooser *periodChooser
gracePeriodMultiplierNormal time.Duration
gracePeriodMultiplierUnknown time.Duration
rateLimitPeriod time.Duration
nodes map[string]*registeredNode
mu sync.RWMutex
}
func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, rateLimitPeriod time.Duration) *nodeStore {
return &nodeStore{
nodes: make(map[string]*registeredNode),
periodChooser: newPeriodChooser(hbPeriod, hbEpsilon),
gracePeriodMultiplierNormal: time.Duration(graceMultiplier),
gracePeriodMultiplierUnknown: time.Duration(graceMultiplier) * 2,
rateLimitPeriod: rateLimitPeriod,
}
}
func (s *nodeStore) updatePeriod(hbPeriod, hbEpsilon time.Duration, gracePeriodMultiplier int) {
s.mu.Lock()
s.periodChooser = newPeriodChooser(hbPeriod, hbEpsilon)
s.gracePeriodMultiplierNormal = time.Duration(gracePeriodMultiplier)
s.gracePeriodMultiplierUnknown = s.gracePeriodMultiplierNormal * 2
s.mu.Unlock()
}
func (s *nodeStore) Len() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.nodes)
}
func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error {
s.mu.Lock()
defer s.mu.Unlock()
rn := ®isteredNode{
Node: n,
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierUnknown, expireFunc)
return nil
}
// CheckRateLimit returns error if node with specified id is allowed to re-register
// again.
func (s *nodeStore) CheckRateLimit(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
if existRn, ok := s.nodes[id]; ok {
if time.Since(existRn.Registered) > s.rateLimitPeriod {
existRn.Attempts = 0
}
existRn.Attempts++
if existRn.Attempts > rateLimitCount {
return status.Errorf(codes.Unavailable, "node %s exceeded rate limit count of registrations", id)
}
existRn.Registered = time.Now()
}
return nil
}
// Add adds new node and returns it, it replaces existing without notification.
func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode {
s.mu.Lock()
defer s.mu.Unlock()
var attempts int
var registered time.Time
if existRn, ok := s.nodes[n.ID]; ok {
attempts = existRn.Attempts
registered = existRn.Registered
existRn.Heartbeat.Stop()
delete(s.nodes, n.ID)
}
if registered.IsZero() {
registered = time.Now()
}
rn := ®isteredNode{
SessionID: identity.NewID(), // session ID is local to the dispatcher.
Node: n,
Registered: registered,
Attempts: attempts,
Disconnect: make(chan struct{}),
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierNormal, expireFunc)
return rn
}
func (s *nodeStore) Get(id string) (*registeredNode, error) {
s.mu.RLock()
rn, ok := s.nodes[id]
s.mu.RUnlock()
if !ok {
return nil, status.Errorf(codes.NotFound, ErrNodeNotRegistered.Error())
}
return rn, nil
}
func (s *nodeStore) GetWithSession(id, sid string) (*registeredNode, error) {
s.mu.RLock()
rn, ok := s.nodes[id]
s.mu.RUnlock()
if !ok {
return nil, status.Errorf(codes.NotFound, ErrNodeNotRegistered.Error())
}
return rn, rn.checkSessionID(sid)
}
func (s *nodeStore) Heartbeat(id, sid string) (time.Duration, error) {
rn, err := s.GetWithSession(id, sid)
if err != nil {
return 0, err
}
period := s.periodChooser.Choose() // base period for node
grace := period * s.gracePeriodMultiplierNormal
rn.mu.Lock()
rn.Heartbeat.Update(grace)
rn.Heartbeat.Beat()
rn.mu.Unlock()
return period, nil
}
func (s *nodeStore) Delete(id string) *registeredNode {
s.mu.Lock()
var node *registeredNode
if rn, ok := s.nodes[id]; ok {
delete(s.nodes, id)
rn.Heartbeat.Stop()
node = rn
}
s.mu.Unlock()
return node
}
func (s *nodeStore) Disconnect(id string) {
s.mu.Lock()
if rn, ok := s.nodes[id]; ok {
close(rn.Disconnect)
rn.Heartbeat.Stop()
}
s.mu.Unlock()
}
// Clean removes all nodes and stops their heartbeats.
// It's equivalent to invalidate all sessions.
func (s *nodeStore) Clean() {
s.mu.Lock()
for _, rn := range s.nodes {
rn.Heartbeat.Stop()
}
s.nodes = make(map[string]*registeredNode)
s.mu.Unlock()
}
|