1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
package memstream
import (
"container/list"
"math/rand"
"time"
)
var random = rand.New(rand.NewSource(time.Now().UnixNano()))
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[random.Intn(len(letters))]
}
return string(b)
}
func genEpoch() string {
return randString(4)
}
// Item to be kept inside stream.
type Item struct {
Offset uint64
Value interface{}
}
// Stream is a non-thread safe in-memory data structure that
// maintains a stream of values limited by size and provides
// methods to access a range of values from provided position.
type Stream struct {
top uint64
list *list.List
index map[uint64]*list.Element
epoch string
}
// New creates new Stream.
func New() *Stream {
return &Stream{
list: list.New(),
index: make(map[uint64]*list.Element),
epoch: genEpoch(),
}
}
// Add item to stream.
func (s *Stream) Add(v interface{}, size int) (uint64, error) {
s.top++
item := Item{
Offset: s.top,
Value: v,
}
el := s.list.PushBack(item)
s.index[item.Offset] = el
for s.list.Len() > size {
el := s.list.Front()
item := el.Value.(Item)
s.list.Remove(el)
delete(s.index, item.Offset)
}
return s.top, nil
}
// Top returns top of stream.
func (s *Stream) Top() uint64 {
return s.top
}
// Epoch returns epoch of stream.
func (s *Stream) Epoch() string {
return s.epoch
}
// Reset stream.
func (s *Stream) Reset() {
s.top = 0
s.epoch = genEpoch()
s.Clear()
}
// Clear stream data.
func (s *Stream) Clear() {
s.list = list.New()
s.index = make(map[uint64]*list.Element)
}
// Get items since provided position.
// If seq is zero then elements since current first element in stream will be returned.
func (s *Stream) Get(offset uint64, limit int) ([]Item, uint64, error) {
if offset >= s.top+1 {
return nil, s.top, nil
}
var el *list.Element
if offset > 0 {
var ok bool
el, ok = s.index[offset]
if !ok {
el = s.list.Front()
}
} else {
el = s.list.Front()
}
if el == nil {
return nil, s.top, nil
}
var resultCap int
if limit > 0 {
resultCap = limit
if resultCap > s.list.Len() {
resultCap = s.list.Len()
}
} else {
resultCap = s.list.Len()
}
result := make([]Item, 0, resultCap)
item := el.Value.(Item)
result = append(result, item)
i := 1
for e := el.Next(); e != nil; e = e.Next() {
if limit >= 0 && i >= limit {
break
}
i++
item := e.Value.(Item)
result = append(result, item)
}
return result, s.top, nil
}
|