1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
package main
import (
"log"
"bytes"
"encoding/gob"
"github.com/weaveworks/mesh"
)
// Peer encapsulates state and implements mesh.Gossiper.
// It should be passed to mesh.Router.NewGossip,
// and the resulting Gossip registered in turn,
// before calling mesh.Router.Start.
type peer struct {
st *state
send mesh.Gossip
actions chan<- func()
quit chan struct{}
logger *log.Logger
}
// peer implements mesh.Gossiper.
var _ mesh.Gossiper = &peer{}
// Construct a peer with empty state.
// Be sure to register a channel, later,
// so we can make outbound communication.
func newPeer(self mesh.PeerName, logger *log.Logger) *peer {
actions := make(chan func())
p := &peer{
st: newState(self),
send: nil, // must .register() later
actions: actions,
quit: make(chan struct{}),
logger: logger,
}
go p.loop(actions)
return p
}
func (p *peer) loop(actions <-chan func()) {
for {
select {
case f := <-actions:
f()
case <-p.quit:
return
}
}
}
// register the result of a mesh.Router.NewGossip.
func (p *peer) register(send mesh.Gossip) {
p.actions <- func() { p.send = send }
}
// Return the current value of the counter.
func (p *peer) get() int {
return p.st.get()
}
// Increment the counter by one.
func (p *peer) incr() (result int) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
st := p.st.incr()
if p.send != nil {
p.send.GossipBroadcast(st)
} else {
p.logger.Printf("no sender configured; not broadcasting update right now")
}
result = st.get()
}
<-c
return result
}
func (p *peer) stop() {
close(p.quit)
}
// Return a copy of our complete state.
func (p *peer) Gossip() (complete mesh.GossipData) {
complete = p.st.copy()
p.logger.Printf("Gossip => complete %v", complete.(*state).set)
return complete
}
// Merge the gossiped data represented by buf into our state.
// Return the state information that was modified.
func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return nil, err
}
delta = p.st.mergeDelta(set)
if delta == nil {
p.logger.Printf("OnGossip %v => delta %v", set, delta)
} else {
p.logger.Printf("OnGossip %v => delta %v", set, delta.(*state).set)
}
return delta, nil
}
// Merge the gossiped data represented by buf into our state.
// Return the state information that was modified.
func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (received mesh.GossipData, err error) {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return nil, err
}
received = p.st.mergeReceived(set)
if received == nil {
p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received)
} else {
p.logger.Printf("OnGossipBroadcast %s %v => delta %v", src, set, received.(*state).set)
}
return received, nil
}
// Merge the gossiped data represented by buf into our state.
func (p *peer) OnGossipUnicast(src mesh.PeerName, buf []byte) error {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return err
}
complete := p.st.mergeComplete(set)
p.logger.Printf("OnGossipUnicast %s %v => complete %v", src, set, complete)
return nil
}
|