1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
package main
import (
"bytes"
"sync"
"encoding/gob"
"github.com/weaveworks/mesh"
)
// state is an implementation of a G-counter.
type state struct {
mtx sync.RWMutex
set map[mesh.PeerName]int
self mesh.PeerName
}
// state implements GossipData.
var _ mesh.GossipData = &state{}
// Construct an empty state object, ready to receive updates.
// This is suitable to use at program start.
// Other peers will populate us with data.
func newState(self mesh.PeerName) *state {
return &state{
set: map[mesh.PeerName]int{},
self: self,
}
}
func (st *state) get() (result int) {
st.mtx.RLock()
defer st.mtx.RUnlock()
for _, v := range st.set {
result += v
}
return result
}
func (st *state) incr() (complete *state) {
st.mtx.Lock()
defer st.mtx.Unlock()
st.set[st.self]++
return &state{
set: st.set,
}
}
func (st *state) copy() *state {
st.mtx.RLock()
defer st.mtx.RUnlock()
return &state{
set: st.set,
}
}
// Encode serializes our complete state to a slice of byte-slices.
// In this simple example, we use a single gob-encoded
// buffer: see https://golang.org/pkg/encoding/gob/
func (st *state) Encode() [][]byte {
st.mtx.RLock()
defer st.mtx.RUnlock()
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(st.set); err != nil {
panic(err)
}
return [][]byte{buf.Bytes()}
}
// Merge merges the other GossipData into this one,
// and returns our resulting, complete state.
func (st *state) Merge(other mesh.GossipData) (complete mesh.GossipData) {
return st.mergeComplete(other.(*state).copy().set)
}
// Merge the set into our state, abiding increment-only semantics.
// Return a non-nil mesh.GossipData representation of the received set.
func (st *state) mergeReceived(set map[mesh.PeerName]int) (received mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()
for peer, v := range set {
if v <= st.set[peer] {
delete(set, peer) // optimization: make the forwarded data smaller
continue
}
st.set[peer] = v
}
return &state{
set: set, // all remaining elements were novel to us
}
}
// Merge the set into our state, abiding increment-only semantics.
// Return any key/values that have been mutated, or nil if nothing changed.
func (st *state) mergeDelta(set map[mesh.PeerName]int) (delta mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()
for peer, v := range set {
if v <= st.set[peer] {
delete(set, peer) // requirement: it's not part of a delta
continue
}
st.set[peer] = v
}
if len(set) <= 0 {
return nil // per OnGossip requirements
}
return &state{
set: set, // all remaining elements were novel to us
}
}
// Merge the set into our state, abiding increment-only semantics.
// Return our resulting, complete state.
func (st *state) mergeComplete(set map[mesh.PeerName]int) (complete mesh.GossipData) {
st.mtx.Lock()
defer st.mtx.Unlock()
for peer, v := range set {
if v > st.set[peer] {
st.set[peer] = v
}
}
return &state{
set: st.set, // n.b. can't .copy() due to lock contention
}
}
|