File: packet_transport.go

package info (click to toggle)
golang-github-weaveworks-mesh 0%2Bgit20161024.3dd75b1-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 412 kB
  • sloc: sh: 59; makefile: 7
file content (102 lines) | stat: -rw-r--r-- 3,077 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package metcd

import (
	"net"

	"github.com/coreos/etcd/raft/raftpb"

	"github.com/weaveworks/mesh"
	"github.com/weaveworks/mesh/meshconn"
)

// packetTransport takes ownership of the net.PacketConn.
// Incoming messages are unmarshaled from the conn and send to incomingc.
// Outgoing messages are received from outgoingc and marshaled to the conn.
type packetTransport struct {
	conn         net.PacketConn
	translate    peerTranslator
	incomingc    chan<- raftpb.Message // to controller
	outgoingc    <-chan raftpb.Message // from controller
	unreachablec chan<- uint64         // to controller
	logger       mesh.Logger
}

func newPacketTransport(
	conn net.PacketConn,
	translate peerTranslator,
	incomingc chan<- raftpb.Message,
	outgoingc <-chan raftpb.Message,
	unreachablec chan<- uint64,
	logger mesh.Logger,
) *packetTransport {
	t := &packetTransport{
		conn:         conn,
		translate:    translate,
		incomingc:    incomingc,
		outgoingc:    outgoingc,
		unreachablec: unreachablec,
		logger:       logger,
	}
	go t.recvLoop()
	go t.sendLoop()
	return t
}

type peerTranslator func(uid mesh.PeerUID) (mesh.PeerName, error)

func (t *packetTransport) stop() {
	t.conn.Close()
}

func (t *packetTransport) recvLoop() {
	defer t.logger.Printf("packet transport: recv loop exit")
	const maxRecvLen = 8192
	b := make([]byte, maxRecvLen)
	for {
		n, remote, err := t.conn.ReadFrom(b)
		if err != nil {
			t.logger.Printf("packet transport: recv: %v (aborting)", err)
			return
		} else if n >= cap(b) {
			t.logger.Printf("packet transport: recv from %s: short read, %d >= %d (continuing)", remote, n, cap(b))
			continue
		}
		var msg raftpb.Message
		if err := msg.Unmarshal(b[:n]); err != nil {
			t.logger.Printf("packet transport: recv from %s (sz %d): %v (%s) (continuing)", remote, n, err, b[:n])
			continue
		}
		//t.logger.Printf("packet transport: recv from %s (sz %d/%d) OK", remote, n, msg.Size())
		t.incomingc <- msg
	}
}

func (t *packetTransport) sendLoop() {
	defer t.logger.Printf("packet transport: send loop exit")
	for msg := range t.outgoingc {
		b, err := msg.Marshal()
		if err != nil {
			t.logger.Printf("packet transport: send to Raft ID %x: %v (continuing)", msg.To, err)
			continue
		}
		peerName, err := t.translate(mesh.PeerUID(msg.To))
		if err != nil {
			select {
			case t.unreachablec <- msg.To:
				t.logger.Printf("packet transport: send to Raft ID %x: %v (unreachable; continuing) (%s)", msg.To, err, msg.Type)
			default:
				t.logger.Printf("packet transport: send to Raft ID %x: %v (unreachable, report dropped; continuing) (%s)", msg.To, err, msg.Type)
			}
			continue
		}
		dst := meshconn.MeshAddr{PeerName: peerName}
		if n, err := t.conn.WriteTo(b, dst); err != nil {
			t.logger.Printf("packet transport: send to Mesh peer %s: %v (continuing)", dst, err)
			continue
		} else if n < len(b) {
			t.logger.Printf("packet transport: send to Mesh peer %s: short write, %d < %d (continuing)", dst, n, len(b))
			continue
		}
		//t.logger.Printf("packet transport: send to %s (sz %d/%d) OK", dst, msg.Size(), len(b))
	}
}