File: connection_maker.go

package info (click to toggle)
golang-github-weaveworks-mesh 0%2Bgit20161024.3dd75b1-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 412 kB
  • sloc: sh: 59; makefile: 7
file content (399 lines) | stat: -rw-r--r-- 11,709 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
package mesh

import (
	"fmt"
	"math/rand"
	"net"
	"time"
	"unicode"
)

const (
	initialInterval = 2 * time.Second
	maxInterval     = 6 * time.Minute
	resetAfter      = 1 * time.Minute
)

type peerAddrs map[string]*net.TCPAddr

// ConnectionMaker initiates and manages connections to peers.
type connectionMaker struct {
	ourself          *localPeer
	peers            *Peers
	localAddr        string
	port             int
	discovery        bool
	targets          map[string]*target
	connections      map[Connection]struct{}
	directPeers      peerAddrs
	terminationCount int
	actionChan       chan<- connectionMakerAction
	logger           Logger
}

// TargetState describes the connection state of a remote target.
type targetState int

const (
	targetWaiting targetState = iota
	targetAttempting
	targetConnected
	targetSuspended
)

// Information about an address where we may find a peer.
type target struct {
	state       targetState
	lastError   error         // reason for disconnection last time
	tryAfter    time.Time     // next time to try this address
	tryInterval time.Duration // retry delay on next failure
}

// The actor closure used by ConnectionMaker. If an action returns true, the
// ConnectionMaker will check the state of its targets, and reconnect to
// relevant candidates.
type connectionMakerAction func() bool

// newConnectionMaker returns a usable ConnectionMaker, seeded with
// peers, making outbound connections from localAddr, and listening on
// port. If discovery is true, ConnectionMaker will attempt to
// initiate new connections with peers it's not directly connected to.
func newConnectionMaker(ourself *localPeer, peers *Peers, localAddr string, port int, discovery bool, logger Logger) *connectionMaker {
	actionChan := make(chan connectionMakerAction, ChannelSize)
	cm := &connectionMaker{
		ourself:     ourself,
		peers:       peers,
		localAddr:   localAddr,
		port:        port,
		discovery:   discovery,
		directPeers: peerAddrs{},
		targets:     make(map[string]*target),
		connections: make(map[Connection]struct{}),
		actionChan:  actionChan,
		logger:      logger,
	}
	go cm.queryLoop(actionChan)
	return cm
}

// InitiateConnections creates new connections to the provided peers,
// specified in host:port format. If replace is true, any existing direct
// peers are forgotten.
//
// TODO(pb): Weave Net invokes router.ConnectionMaker.InitiateConnections;
// it may be better to provide that on Router directly.
func (cm *connectionMaker) InitiateConnections(peers []string, replace bool) []error {
	errors := []error{}
	addrs := peerAddrs{}
	for _, peer := range peers {
		host, port, err := net.SplitHostPort(peer)
		if err != nil {
			host = peer
			port = "0" // we use that as an indication that "no port was supplied"
		}
		if host == "" || !isAlnum(port) {
			errors = append(errors, fmt.Errorf("invalid peer name %q, should be host[:port]", peer))
		} else if addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%s", host, port)); err != nil {
			errors = append(errors, err)
		} else {
			addrs[peer] = addr
		}
	}
	cm.actionChan <- func() bool {
		if replace {
			cm.directPeers = peerAddrs{}
		}
		for peer, addr := range addrs {
			cm.directPeers[peer] = addr
			// curtail any existing reconnect interval
			if target, found := cm.targets[cm.completeAddr(*addr)]; found {
				target.nextTryNow()
			}
		}
		return true
	}
	return errors
}

func isAlnum(s string) bool {
	for _, c := range s {
		if !unicode.In(c, unicode.Letter, unicode.Digit) {
			return false
		}
	}
	return true
}

// ForgetConnections removes direct connections to the provided peers,
// specified in host:port format.
//
// TODO(pb): Weave Net invokes router.ConnectionMaker.ForgetConnections;
// it may be better to provide that on Router directly.
func (cm *connectionMaker) ForgetConnections(peers []string) {
	cm.actionChan <- func() bool {
		for _, peer := range peers {
			delete(cm.directPeers, peer)
		}
		return true
	}
}

// Targets takes a snapshot of the targets (direct peers),
// either just the ones we are still trying, or all of them.
// Note these are the same things that InitiateConnections and ForgetConnections talks about,
// but a method to retrieve 'Connections' would obviously return the current connections.
func (cm *connectionMaker) Targets(activeOnly bool) []string {
	resultChan := make(chan []string, 0)
	cm.actionChan <- func() bool {
		var slice []string
		for peer, addr := range cm.directPeers {
			if activeOnly {
				if target, ok := cm.targets[cm.completeAddr(*addr)]; ok && target.tryAfter.IsZero() {
					continue
				}
			}
			slice = append(slice, peer)
		}
		resultChan <- slice
		return false
	}
	return <-resultChan
}

// connectionAborted marks the target identified by address as broken, and
// puts it in the TargetWaiting state.
func (cm *connectionMaker) connectionAborted(address string, err error) {
	cm.actionChan <- func() bool {
		target := cm.targets[address]
		target.state = targetWaiting
		target.lastError = err
		target.nextTryLater()
		return true
	}
}

// connectionCreated registers the passed connection, and marks the target
// identified by conn.RemoteTCPAddr() as established, and puts it in the
// TargetConnected state.
func (cm *connectionMaker) connectionCreated(conn Connection) {
	cm.actionChan <- func() bool {
		cm.connections[conn] = struct{}{}
		if conn.isOutbound() {
			target := cm.targets[conn.remoteTCPAddress()]
			target.state = targetConnected
		}
		return false
	}
}

// connectionTerminated unregisters the passed connection, and marks the
// target identified by conn.RemoteTCPAddr() as Waiting.
func (cm *connectionMaker) connectionTerminated(conn Connection, err error) {
	cm.actionChan <- func() bool {
		if err != errConnectToSelf {
			cm.terminationCount++
		}
		delete(cm.connections, conn)
		if conn.isOutbound() {
			target := cm.targets[conn.remoteTCPAddress()]
			target.state = targetWaiting
			target.lastError = err
			_, peerNameCollision := err.(*peerNameCollisionError)
			switch {
			case peerNameCollision || err == errConnectToSelf:
				target.nextTryNever()
			case time.Now().After(target.tryAfter.Add(resetAfter)):
				target.nextTryNow()
			default:
				target.nextTryLater()
			}
		}
		return true
	}
}

// refresh sends a no-op action into the ConnectionMaker, purely so that the
// ConnectionMaker will check the state of its targets and reconnect to
// relevant candidates.
func (cm *connectionMaker) refresh() {
	cm.actionChan <- func() bool { return true }
}

func (cm *connectionMaker) queryLoop(actionChan <-chan connectionMakerAction) {
	timer := time.NewTimer(maxDuration)
	run := func() { timer.Reset(cm.checkStateAndAttemptConnections()) }
	for {
		select {
		case action := <-actionChan:
			if action() {
				run()
			}
		case <-timer.C:
			run()
		}
	}
}

func (cm *connectionMaker) completeAddr(addr net.TCPAddr) string {
	if addr.Port == 0 {
		addr.Port = cm.port
	}
	return addr.String()
}

func (cm *connectionMaker) checkStateAndAttemptConnections() time.Duration {
	var (
		validTarget  = make(map[string]struct{})
		directTarget = make(map[string]struct{})
	)
	ourConnectedPeers, ourConnectedTargets, ourInboundIPs := cm.ourConnections()

	addTarget := func(address string) {
		if _, connected := ourConnectedTargets[address]; connected {
			return
		}
		validTarget[address] = struct{}{}
		if _, found := cm.targets[address]; found {
			return
		}
		tgt := &target{state: targetWaiting}
		tgt.nextTryNow()
		cm.targets[address] = tgt
	}

	// Add direct targets that are not connected
	for _, addr := range cm.directPeers {
		attempt := true
		if addr.Port == 0 {
			// If a peer was specified w/o a port, then we do not
			// attempt to connect to it if we have any inbound
			// connections from that IP.
			if _, connected := ourInboundIPs[addr.IP.String()]; connected {
				attempt = false
			}
		}
		address := cm.completeAddr(*addr)
		directTarget[address] = struct{}{}
		if attempt {
			addTarget(address)
		}
	}

	// Add targets for peers that someone else is connected to, but we
	// aren't
	if cm.discovery {
		cm.addPeerTargets(ourConnectedPeers, addTarget)
	}

	return cm.connectToTargets(validTarget, directTarget)
}

func (cm *connectionMaker) ourConnections() (peerNameSet, map[string]struct{}, map[string]struct{}) {
	var (
		ourConnectedPeers   = make(peerNameSet)
		ourConnectedTargets = make(map[string]struct{})
		ourInboundIPs       = make(map[string]struct{})
	)
	for conn := range cm.connections {
		address := conn.remoteTCPAddress()
		ourConnectedPeers[conn.Remote().Name] = struct{}{}
		ourConnectedTargets[address] = struct{}{}
		if conn.isOutbound() {
			continue
		}
		if ip, _, err := net.SplitHostPort(address); err == nil { // should always succeed
			ourInboundIPs[ip] = struct{}{}
		}
	}
	return ourConnectedPeers, ourConnectedTargets, ourInboundIPs
}

func (cm *connectionMaker) addPeerTargets(ourConnectedPeers peerNameSet, addTarget func(string)) {
	cm.peers.forEach(func(peer *Peer) {
		if peer == cm.ourself.Peer {
			return
		}
		// Modifying peer.connections requires a write lock on Peers,
		// and since we are holding a read lock (due to the ForEach),
		// access without locking the peer is safe.
		for otherPeer, conn := range peer.connections {
			if otherPeer == cm.ourself.Name {
				continue
			}
			if _, connected := ourConnectedPeers[otherPeer]; connected {
				continue
			}
			address := conn.remoteTCPAddress()
			if conn.isOutbound() {
				addTarget(address)
			} else if ip, _, err := net.SplitHostPort(address); err == nil {
				// There is no point connecting to the (likely
				// ephemeral) remote port of an inbound connection
				// that some peer has. Let's try to connect on the
				// weave port instead.
				addTarget(fmt.Sprintf("%s:%d", ip, cm.port))
			}
		}
	})
}

func (cm *connectionMaker) connectToTargets(validTarget map[string]struct{}, directTarget map[string]struct{}) time.Duration {
	now := time.Now() // make sure we catch items just added
	after := maxDuration
	for address, target := range cm.targets {
		if target.state != targetWaiting && target.state != targetSuspended {
			continue
		}
		if _, valid := validTarget[address]; !valid {
			// Not valid: suspend reconnects if direct peer,
			// otherwise forget this target entirely
			if _, direct := directTarget[address]; direct {
				target.state = targetSuspended
			} else {
				delete(cm.targets, address)
			}
			continue
		}
		if target.tryAfter.IsZero() {
			continue
		}
		target.state = targetWaiting
		switch duration := target.tryAfter.Sub(now); {
		case duration <= 0:
			target.state = targetAttempting
			_, isCmdLineTarget := directTarget[address]
			go cm.attemptConnection(address, isCmdLineTarget)
		case duration < after:
			after = duration
		}
	}
	return after
}

func (cm *connectionMaker) attemptConnection(address string, acceptNewPeer bool) {
	cm.logger.Printf("->[%s] attempting connection", address)
	if err := cm.ourself.createConnection(cm.localAddr, address, acceptNewPeer, cm.logger); err != nil {
		cm.logger.Printf("->[%s] error during connection attempt: %v", address, err)
		cm.connectionAborted(address, err)
	}
}

func (t *target) nextTryNever() {
	t.tryAfter = time.Time{}
	t.tryInterval = maxInterval
}

func (t *target) nextTryNow() {
	t.tryAfter = time.Now()
	t.tryInterval = initialInterval
}

// The delay at the nth retry is a random value in the range
// [i-i/2,i+i/2], where i = InitialInterval * 1.5^(n-1).
func (t *target) nextTryLater() {
	t.tryAfter = time.Now().Add(t.tryInterval/2 + time.Duration(rand.Int63n(int64(t.tryInterval))))
	t.tryInterval = t.tryInterval * 3 / 2
	if t.tryInterval > maxInterval {
		t.tryInterval = maxInterval
	}
}