1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
package mesh
import (
"bytes"
"hash/fnv"
"sync"
"time"
)
// surrogateGossiper ignores unicasts and relays broadcasts and gossips.
type surrogateGossiper struct {
sync.Mutex
prevUpdates []prevUpdate
}
type prevUpdate struct {
update []byte
hash uint64
t time.Time
}
var _ Gossiper = &surrogateGossiper{}
// Hook to mock time for testing
var now = func() time.Time { return time.Now() }
// OnGossipUnicast implements Gossiper.
func (*surrogateGossiper) OnGossipUnicast(sender PeerName, msg []byte) error {
return nil
}
// OnGossipBroadcast implements Gossiper.
func (*surrogateGossiper) OnGossipBroadcast(_ PeerName, update []byte) (GossipData, error) {
return newSurrogateGossipData(update), nil
}
// Gossip implements Gossiper.
func (*surrogateGossiper) Gossip() GossipData {
return nil
}
// OnGossip should return "everything new I've just learnt".
// surrogateGossiper doesn't understand the content of messages, but it can eliminate simple duplicates
func (s *surrogateGossiper) OnGossip(update []byte) (GossipData, error) {
hash := fnv.New64a()
_, _ = hash.Write(update)
updateHash := hash.Sum64()
s.Lock()
defer s.Unlock()
for _, p := range s.prevUpdates {
if updateHash == p.hash && bytes.Equal(update, p.update) {
return nil, nil
}
}
// Delete anything that's older than the gossip interval, so we don't grow forever
// (this time limit is arbitrary; surrogateGossiper should pass on new gossip immediately
// so there should be no reason for a duplicate to show up after a long time)
updateTime := now()
deleteBefore := updateTime.Add(-gossipInterval)
keepFrom := len(s.prevUpdates)
for i, p := range s.prevUpdates {
if p.t.After(deleteBefore) {
keepFrom = i
break
}
}
s.prevUpdates = append(s.prevUpdates[keepFrom:], prevUpdate{update, updateHash, updateTime})
return newSurrogateGossipData(update), nil
}
// surrogateGossipData is a simple in-memory GossipData.
type surrogateGossipData struct {
messages [][]byte
}
var _ GossipData = &surrogateGossipData{}
func newSurrogateGossipData(msg []byte) *surrogateGossipData {
return &surrogateGossipData{messages: [][]byte{msg}}
}
// Encode implements GossipData.
func (d *surrogateGossipData) Encode() [][]byte {
return d.messages
}
// Merge implements GossipData.
func (d *surrogateGossipData) Merge(other GossipData) GossipData {
o := other.(*surrogateGossipData)
messages := make([][]byte, 0, len(d.messages)+len(o.messages))
messages = append(messages, d.messages...)
messages = append(messages, o.messages...)
return &surrogateGossipData{messages: messages}
}
|