1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
package memberlist
import (
"bytes"
"fmt"
"io"
"net"
"strconv"
"time"
)
// MockNetwork is used as a factory that produces MockTransport instances which
// are uniquely addressed and wired up to talk to each other.
type MockNetwork struct {
transportsByAddr map[string]*MockTransport
transportsByName map[string]*MockTransport
port int
}
// NewTransport returns a new MockTransport with a unique address, wired up to
// talk to the other transports in the MockNetwork.
func (n *MockNetwork) NewTransport(name string) *MockTransport {
n.port += 1
addr := fmt.Sprintf("127.0.0.1:%d", n.port)
transport := &MockTransport{
net: n,
addr: &MockAddress{addr, name},
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
}
if n.transportsByAddr == nil {
n.transportsByAddr = make(map[string]*MockTransport)
}
n.transportsByAddr[addr] = transport
if n.transportsByName == nil {
n.transportsByName = make(map[string]*MockTransport)
}
n.transportsByName[name] = transport
return transport
}
// MockAddress is a wrapper which adds the net.Addr interface to our mock
// address scheme.
type MockAddress struct {
addr string
name string
}
// See net.Addr.
func (a *MockAddress) Network() string {
return "mock"
}
// See net.Addr.
func (a *MockAddress) String() string {
return a.addr
}
// MockTransport directly plumbs messages to other transports its MockNetwork.
type MockTransport struct {
net *MockNetwork
addr *MockAddress
packetCh chan *Packet
streamCh chan net.Conn
}
var _ NodeAwareTransport = (*MockTransport)(nil)
// See Transport.
func (t *MockTransport) FinalAdvertiseAddr(string, int) (net.IP, int, error) {
host, portStr, err := net.SplitHostPort(t.addr.String())
if err != nil {
return nil, 0, err
}
ip := net.ParseIP(host)
if ip == nil {
return nil, 0, fmt.Errorf("Failed to parse IP %q", host)
}
port, err := strconv.ParseInt(portStr, 10, 16)
if err != nil {
return nil, 0, err
}
return ip, int(port), nil
}
// See Transport.
func (t *MockTransport) WriteTo(b []byte, addr string) (time.Time, error) {
a := Address{Addr: addr, Name: ""}
return t.WriteToAddress(b, a)
}
// See NodeAwareTransport.
func (t *MockTransport) WriteToAddress(b []byte, a Address) (time.Time, error) {
dest, err := t.getPeer(a)
if err != nil {
return time.Time{}, err
}
now := time.Now()
dest.packetCh <- &Packet{
Buf: b,
From: t.addr,
Timestamp: now,
}
return now, nil
}
// See Transport.
func (t *MockTransport) PacketCh() <-chan *Packet {
return t.packetCh
}
// See NodeAwareTransport.
func (t *MockTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error {
if shouldClose {
defer conn.Close()
}
// Copy everything from the stream into packet buffer.
var buf bytes.Buffer
if _, err := io.Copy(&buf, conn); err != nil {
return fmt.Errorf("failed to read packet: %v", err)
}
// Check the length - it needs to have at least one byte to be a proper
// message. This is checked elsewhere for writes coming in directly from
// the UDP socket.
if n := buf.Len(); n < 1 {
return fmt.Errorf("packet too short (%d bytes) %s", n, LogAddress(addr))
}
// Inject the packet.
t.packetCh <- &Packet{
Buf: buf.Bytes(),
From: addr,
Timestamp: now,
}
return nil
}
// See Transport.
func (t *MockTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
a := Address{Addr: addr, Name: ""}
return t.DialAddressTimeout(a, timeout)
}
// See NodeAwareTransport.
func (t *MockTransport) DialAddressTimeout(a Address, timeout time.Duration) (net.Conn, error) {
dest, err := t.getPeer(a)
if err != nil {
return nil, err
}
p1, p2 := net.Pipe()
dest.streamCh <- p1
return p2, nil
}
// See Transport.
func (t *MockTransport) StreamCh() <-chan net.Conn {
return t.streamCh
}
// See NodeAwareTransport.
func (t *MockTransport) IngestStream(conn net.Conn) error {
t.streamCh <- conn
return nil
}
// See Transport.
func (t *MockTransport) Shutdown() error {
return nil
}
func (t *MockTransport) getPeer(a Address) (*MockTransport, error) {
var (
dest *MockTransport
ok bool
)
if a.Name != "" {
dest, ok = t.net.transportsByName[a.Name]
} else {
dest, ok = t.net.transportsByAddr[a.Addr]
}
if !ok {
return nil, fmt.Errorf("No route to %s", a)
}
return dest, nil
}
|