1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
package recovery
import (
"math"
"sort"
"github.com/centrifugal/protocol"
)
// uniquePublications returns slice of unique Publications.
func uniquePublications(s []*protocol.Publication) []*protocol.Publication {
keys := make(map[uint64]struct{})
list := make([]*protocol.Publication, 0, len(s))
for _, entry := range s {
val := entry.Offset
if _, value := keys[val]; !value {
keys[val] = struct{}{}
list = append(list, entry)
}
}
return list
}
// MergePublications allows to merge recovered pubs with buffered pubs
// collected during extracting recovered so result is ordered and with
// duplicates removed.
func MergePublications(recoveredPubs []*protocol.Publication, bufferedPubs []*protocol.Publication, isLegacyOrder bool) ([]*protocol.Publication, bool) {
if len(bufferedPubs) > 0 {
recoveredPubs = append(recoveredPubs, bufferedPubs...)
}
if isLegacyOrder {
sort.Slice(recoveredPubs, func(i, j int) bool {
return recoveredPubs[i].Offset > recoveredPubs[j].Offset
})
} else {
sort.Slice(recoveredPubs, func(i, j int) bool {
return recoveredPubs[i].Offset < recoveredPubs[j].Offset
})
}
if len(bufferedPubs) > 0 {
if len(recoveredPubs) > 1 {
recoveredPubs = uniquePublications(recoveredPubs)
}
prevOffset := recoveredPubs[0].Offset
for _, p := range recoveredPubs[1:] {
pubOffset := p.Offset
var isWrongOffset bool
if isLegacyOrder {
isWrongOffset = pubOffset != prevOffset-1
} else {
isWrongOffset = pubOffset != prevOffset+1
}
if isWrongOffset {
return nil, false
}
prevOffset = pubOffset
}
}
return recoveredPubs, true
}
// PackUint64 ...
func PackUint64(seq, gen uint32) uint64 {
return uint64(gen)*uint64(math.MaxUint32) + uint64(seq)
}
// UnpackUint64 ...
func UnpackUint64(val uint64) (uint32, uint32) {
return uint32(val), uint32(val >> 32)
}
|