File: peersmanager.go

package info (click to toggle)
sia 1.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,340 kB
  • sloc: makefile: 80; sh: 52
file content (180 lines) | stat: -rw-r--r-- 6,101 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package gateway

import (
	"github.com/NebulousLabs/Sia/build"
	"github.com/NebulousLabs/Sia/modules"
	"github.com/NebulousLabs/fastrand"
)

// managedPeerManagerConnect is a blocking function which tries to connect to
// the input addreess as a peer.
func (g *Gateway) managedPeerManagerConnect(addr modules.NetAddress) {
	g.log.Debugf("[PMC] [%v] Attempting connection", addr)
	err := g.managedConnect(addr)
	if err == errPeerExists {
		// This peer is already connected to us. Safety around the
		// oubound peers relates to the fact that we have picked out
		// the outbound peers instead of allow the attacker to pick out
		// the peers for us. Because we have made the selection, it is
		// okay to set the peer as an outbound peer.
		//
		// The nodelist size check ensures that an attacker can't flood
		// a new node with a bunch of inbound requests. Doing so would
		// result in a nodelist that's entirely full of attacker nodes.
		// There's not much we can do about that anyway, but at least
		// we can hold off making attacker nodes 'outbound' peers until
		// our nodelist has had time to fill up naturally.
		g.mu.Lock()
		p, exists := g.peers[addr]
		if exists {
			// Have to check it exists because we released the lock, a
			// race condition could mean that the peer was disconnected
			// before this code block was reached.
			p.Inbound = false
			if n, ok := g.nodes[p.NetAddress]; ok && !n.WasOutboundPeer {
				n.WasOutboundPeer = true
				g.nodes[n.NetAddress] = n
			}
			g.log.Debugf("[PMC] [SUCCESS] [%v] existing peer has been converted to outbound peer", addr)
		}
		g.mu.Unlock()
	} else if err != nil {
		g.log.Debugf("[PMC] [ERROR] [%v] WARN: removing peer because automatic connect failed: %v\n", addr, err)

		// Remove the node, but only if there are enough nodes in the node list.
		g.mu.Lock()
		if len(g.nodes) > pruneNodeListLen {
			g.removeNode(addr)
		}
		g.mu.Unlock()
	} else {
		g.log.Debugf("[PMC] [SUCCESS] [%v] peer successfully added", addr)
	}
}

// numOutboundPeers returns the number of outbound peers in the gateway.
func (g *Gateway) numOutboundPeers() int {
	n := 0
	for _, p := range g.peers {
		if !p.Inbound {
			n++
		}
	}
	return n
}

// permanentPeerManager tries to keep the Gateway well-connected. As long as
// the Gateway is not well-connected, it tries to connect to random nodes.
func (g *Gateway) permanentPeerManager(closedChan chan struct{}) {
	// Send a signal upon shutdown.
	defer close(closedChan)
	defer g.log.Debugln("INFO: [PPM] Permanent peer manager is shutting down")

	// permanentPeerManager will attempt to connect to peers asynchronously,
	// such that multiple connection attempts can be open at once, but a
	// limited number.
	connectionLimiterChan := make(chan struct{}, maxConcurrentOutboundPeerRequests)

	g.log.Debugln("INFO: [PPM] Permanent peer manager has started")

	for {
		// Fetch the set of nodes to try.
		g.mu.RLock()
		nodes := g.buildPeerManagerNodeList()
		g.mu.RUnlock()
		if len(nodes) == 0 {
			g.log.Debugln("[PPM] Node list is empty, sleeping")
			if !g.managedSleep(noNodesDelay) {
				return
			}
			continue
		}

		for _, addr := range nodes {
			// Break as soon as we have enough outbound peers.
			g.mu.RLock()
			numOutboundPeers := g.numOutboundPeers()
			isOutboundPeer := g.peers[addr] != nil && !g.peers[addr].Inbound
			g.mu.RUnlock()
			if numOutboundPeers >= wellConnectedThreshold {
				g.log.Debugln("INFO: [PPM] Gateway has enough peers, sleeping.")
				if !g.managedSleep(wellConnectedDelay) {
					return
				}
				break
			}
			if isOutboundPeer {
				// Skip current outbound peers.
				if !g.managedSleep(acquiringPeersDelay) {
					return
				}
				continue
			}

			g.log.Debugln("[PPM] Fetched a random node:", addr)

			// We need at least some of our outbound peers to be remote peers. If
			// we already have reached a certain threshold of outbound peers and
			// this peer is a local peer, do not consider it for an outbound peer.
			// Sleep briefly to prevent the gateway from hogging the CPU if all
			// peers are local.
			if numOutboundPeers >= maxLocalOutboundPeers && addr.IsLocal() && build.Release != "testing" {
				g.log.Debugln("[PPM] Ignorning selected peer; this peer is local and we already have multiple outbound peers:", addr)
				if !g.managedSleep(unwantedLocalPeerDelay) {
					return
				}
				continue
			}

			// Try connecting to that peer in a goroutine. Do not block unless
			// there are currently 3 or more peer connection attempts open at once.
			// Before spawning the thread, make sure that there is enough room by
			// throwing a struct into the buffered channel.
			g.log.Debugln("[PPM] Trying to connect to a node:", addr)
			connectionLimiterChan <- struct{}{}
			go func(addr modules.NetAddress) {
				// After completion, take the struct out of the channel so that the
				// next thread may proceed.
				defer func() {
					<-connectionLimiterChan
				}()

				if err := g.threads.Add(); err != nil {
					return
				}
				defer g.threads.Done()
				// peerManagerConnect will handle all of its own logging.
				g.managedPeerManagerConnect(addr)
			}(addr)

			// Wait a bit before trying the next peer. The peer connections are
			// non-blocking, so they should be spaced out to avoid spinning up an
			// uncontrolled number of threads and therefore peer connections.
			if !g.managedSleep(acquiringPeersDelay) {
				return
			}
		}
	}
}

// buildPeerManagerNodeList returns the gateway's node list in the order that
// permanentPeerManager should attempt to connect to them.
func (g *Gateway) buildPeerManagerNodeList() []modules.NetAddress {
	// flatten the node map, inserting in random order
	nodes := make([]modules.NetAddress, len(g.nodes))
	perm := fastrand.Perm(len(nodes))
	for _, node := range g.nodes {
		nodes[perm[0]] = node.NetAddress
		perm = perm[1:]
	}

	// swap the outbound nodes to the front of the list
	numOutbound := 0
	for i, node := range nodes {
		if g.nodes[node].WasOutboundPeer {
			nodes[numOutbound], nodes[i] = nodes[i], nodes[numOutbound]
			numOutbound++
		}
	}
	return nodes
}