1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
package p2p
import (
"fmt"
"io"
"net"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-wire"
)
type Peer struct {
BaseService
outbound bool
mconn *MConnection
*NodeInfo
Key string
Data *CMap // User data.
}
// NOTE: blocking
// Before creating a peer with newPeer(), perform a handshake on connection.
func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
var peerNodeInfo = new(NodeInfo)
var err1 error
var err2 error
Parallel(
func() {
var n int
wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
},
func() {
var n int
wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2)
log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
})
if err1 != nil {
return nil, err1
}
if err2 != nil {
return nil, err2
}
peerNodeInfo.RemoteAddr = conn.RemoteAddr().String()
return peerNodeInfo, nil
}
// NOTE: call peerHandshake on conn before calling newPeer().
func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
PanicSanity(Fmt("Unknown channel %X", chID))
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
p.Stop()
onPeerError(p, r)
}
mconn := NewMConnection(config, conn, chDescs, onReceive, onError)
p = &Peer{
outbound: outbound,
mconn: mconn,
NodeInfo: peerNodeInfo,
Key: peerNodeInfo.PubKey.KeyString(),
Data: NewCMap(),
}
p.BaseService = *NewBaseService(log, "Peer", p)
return p
}
func (p *Peer) OnStart() error {
p.BaseService.OnStart()
_, err := p.mconn.Start()
return err
}
func (p *Peer) OnStop() {
p.BaseService.OnStop()
p.mconn.Stop()
}
func (p *Peer) Connection() *MConnection {
return p.mconn
}
func (p *Peer) IsOutbound() bool {
return p.outbound
}
func (p *Peer) Send(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
return p.mconn.Send(chID, msg)
}
func (p *Peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
return p.mconn.TrySend(chID, msg)
}
func (p *Peer) CanSend(chID byte) bool {
if !p.IsRunning() {
return false
}
return p.mconn.CanSend(chID)
}
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
var n_ int
wire.WriteString(p.Key, w, &n_, &err)
n += int64(n_)
return
}
func (p *Peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
} else {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
}
}
func (p *Peer) Equals(other *Peer) bool {
return p.Key == other.Key
}
func (p *Peer) Get(key string) interface{} {
return p.Data.Get(key)
}
|