1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
|
package peerdiscovery
import (
"fmt"
"net"
"time"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
// IPVersion specifies the version of the Internet Protocol to be used.
type IPVersion uint
const (
IPv4 IPVersion = 4
IPv6 IPVersion = 6
)
// Discovered is the structure of the discovered peers,
// which holds their local address (port removed) and
// a payload if there is one.
type Discovered struct {
// Address is the local address of a discovered peer.
Address string
// Payload is the associated payload from discovered peer.
Payload []byte
Metadata *Metadata
}
// Metadata is the metadata associated with a discovered peer.
// To update the metadata, assign your own metadata to the Metadata.Data field.
// The metadata is not protected by a mutex, so you must do this yourself.
// The metadata update happens by pointer, to keep the library backwards compatible.
type Metadata struct {
Data interface{}
}
func (d Discovered) String() string {
return fmt.Sprintf("address: %s, payload: %s", d.Address, d.Payload)
}
// Settings are the settings that can be specified for
// doing peer discovery.
type Settings struct {
// Limit is the number of peers to discover, use < 1 for unlimited.
Limit int
// Port is the port to broadcast on (the peers must also broadcast using the same port).
// The default port is 9999.
Port string
// MulticastAddress specifies the multicast address.
// You should be able to use any of 224.0.0.0/4 or ff00::/8.
// By default it uses the Simple Service Discovery Protocol
// address (239.255.255.250 for IPv4 or ff02::c for IPv6).
MulticastAddress string
// Payload is the bytes that are sent out with each broadcast. Must be short.
Payload []byte
// PayloadFunc is the function that will be called to dynamically generate payload
// before every broadcast. If this pointer is nil `Payload` field will be broadcasted instead.
PayloadFunc func() []byte
// Delay is the amount of time between broadcasts. The default delay is 1 second.
Delay time.Duration
// TimeLimit is the amount of time to spend discovering, if the limit is not reached.
// A negative limit indiciates scanning until the limit was reached or, if an
// unlimited scanning was requested, no timeout.
// The default time limit is 10 seconds.
TimeLimit time.Duration
// StopChan is a channel to stop the peer discvoery immediatley after reception.
StopChan chan struct{}
// AllowSelf will allow discovery the local machine (default false)
AllowSelf bool
// DisableBroadcast will not allow sending out a broadcast
DisableBroadcast bool
// IPVersion specifies the version of the Internet Protocol (default IPv4)
IPVersion IPVersion
// Notify will be called each time a new peer was discovered.
// The default is nil, which means no notification whatsoever.
Notify func(Discovered)
// NotifyLost will be called each time a peer was lost.
// The default is nil, which means no notification whatsoever.
// This function should not take too long to execute, as it is called
// from the peer garbage collector.
NotifyLost func(LostPeer)
portNum int
multicastAddressNumbers net.IP
}
type NetPacketConn interface {
JoinGroup(ifi *net.Interface, group net.Addr) error
SetMulticastInterface(ini *net.Interface) error
SetMulticastTTL(int) error
ReadFrom(buf []byte) (int, net.Addr, error)
WriteTo(buf []byte, dst net.Addr) (int, error)
}
// Discover will use the created settings to scan for LAN peers. It will return
// an array of the discovered peers and their associate payloads. It will not
// return broadcasts sent to itself.
func Discover(settings ...Settings) (discoveries []Discovered, err error) {
_, discoveries, err = newPeerDiscovery(settings...)
if err != nil {
return nil, err
}
return discoveries, nil
}
func NewPeerDiscovery(settings ...Settings) (pd *PeerDiscovery, err error) {
pd, discoveries, err := newPeerDiscovery(settings...)
if notify := pd.settings.Notify; notify != nil {
for _, d := range discoveries {
notify(d)
}
}
return pd, err
}
func newPeerDiscovery(settings ...Settings) (pd *PeerDiscovery, discoveries []Discovered, err error) {
s := Settings{}
if len(settings) > 0 {
s = settings[0]
}
p, err := initialize(s)
if err != nil {
return nil, nil, err
}
p.RLock()
address := net.JoinHostPort(p.settings.MulticastAddress, p.settings.Port)
portNum := p.settings.portNum
tickerDuration := p.settings.Delay
timeLimit := p.settings.TimeLimit
p.RUnlock()
ifaces, err := filterInterfaces(p.settings.IPVersion == IPv4)
if err != nil {
return nil, nil, err
}
if len(ifaces) == 0 {
err = fmt.Errorf("no multicast interface found")
return nil, nil, err
}
// Open up a connection
c, err := net.ListenPacket(fmt.Sprintf("udp%d", p.settings.IPVersion), address)
if err != nil {
return nil, nil, err
}
defer c.Close()
group := p.settings.multicastAddressNumbers
// ipv{4,6} have an own PacketConn, which does not implement net.PacketConn
var p2 NetPacketConn
if p.settings.IPVersion == IPv4 {
p2 = PacketConn4{ipv4.NewPacketConn(c)}
} else {
p2 = PacketConn6{ipv6.NewPacketConn(c)}
}
for i := range ifaces {
p2.JoinGroup(&ifaces[i], &net.UDPAddr{IP: group, Port: portNum})
}
go p.listen(c)
ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()
start := time.Now()
for {
p.RLock()
if len(p.received) >= p.settings.Limit && p.settings.Limit > 0 {
p.exit = true
}
p.RUnlock()
if !s.DisableBroadcast {
payload := p.settings.Payload
if p.settings.PayloadFunc != nil {
payload = p.settings.PayloadFunc()
}
// write to multicast
broadcast(p2, payload, ifaces, &net.UDPAddr{IP: group, Port: portNum})
}
select {
case <-p.settings.StopChan:
p.exit = true
case <-ticker.C:
}
if p.exit || timeLimit > 0 && time.Since(start) > timeLimit {
break
}
}
if !s.DisableBroadcast {
payload := p.settings.Payload
if p.settings.PayloadFunc != nil {
payload = p.settings.PayloadFunc()
}
// send out broadcast that is finished
broadcast(p2, payload, ifaces, &net.UDPAddr{IP: group, Port: portNum})
}
p.RLock()
discoveries = make([]Discovered, len(p.received))
i := 0
for ip, peerState := range p.received {
discoveries[i] = Discovered{
Address: ip,
Payload: peerState.lastPayload,
Metadata: peerState.metadata,
}
i++
}
p.RUnlock()
go p.gc()
return p, discoveries, nil
}
|