1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
package streams
import (
"math"
"strconv"
"sync/atomic"
)
const bucketBits = 64
// IDGenerator tracks and allocates streams which are in use.
type IDGenerator struct {
NumStreams int
inuseStreams int32
numBuckets uint32
// streams is a bitset where each bit represents a stream, a 1 implies in use
streams []uint64
offset uint32
}
func New(protocol int) *IDGenerator {
maxStreams := 128
if protocol > 2 {
maxStreams = 32768
}
buckets := maxStreams / 64
// reserve stream 0
streams := make([]uint64, buckets)
streams[0] = 1 << 63
return &IDGenerator{
NumStreams: maxStreams,
streams: streams,
numBuckets: uint32(buckets),
offset: uint32(buckets) - 1,
}
}
func streamFromBucket(bucket, streamInBucket int) int {
return (bucket * bucketBits) + streamInBucket
}
func (s *IDGenerator) GetStream() (int, bool) {
// based closely on the java-driver stream ID generator
// avoid false sharing subsequent requests.
offset := atomic.LoadUint32(&s.offset)
for !atomic.CompareAndSwapUint32(&s.offset, offset, (offset+1)%s.numBuckets) {
offset = atomic.LoadUint32(&s.offset)
}
offset = (offset + 1) % s.numBuckets
for i := uint32(0); i < s.numBuckets; i++ {
pos := int((i + offset) % s.numBuckets)
bucket := atomic.LoadUint64(&s.streams[pos])
if bucket == math.MaxUint64 {
// all streams in use
continue
}
for j := 0; j < bucketBits; j++ {
mask := uint64(1 << streamOffset(j))
for bucket&mask == 0 {
if atomic.CompareAndSwapUint64(&s.streams[pos], bucket, bucket|mask) {
atomic.AddInt32(&s.inuseStreams, 1)
return streamFromBucket(int(pos), j), true
}
bucket = atomic.LoadUint64(&s.streams[pos])
}
}
}
return 0, false
}
func bitfmt(b uint64) string {
return strconv.FormatUint(b, 16)
}
// returns the bucket offset of a given stream
func bucketOffset(i int) int {
return i / bucketBits
}
func streamOffset(stream int) uint64 {
return bucketBits - uint64(stream%bucketBits) - 1
}
func isSet(bits uint64, stream int) bool {
return bits>>streamOffset(stream)&1 == 1
}
func (s *IDGenerator) isSet(stream int) bool {
bits := atomic.LoadUint64(&s.streams[bucketOffset(stream)])
return isSet(bits, stream)
}
func (s *IDGenerator) String() string {
size := s.numBuckets * (bucketBits + 1)
buf := make([]byte, 0, size)
for i := 0; i < int(s.numBuckets); i++ {
bits := atomic.LoadUint64(&s.streams[i])
buf = append(buf, bitfmt(bits)...)
buf = append(buf, ' ')
}
return string(buf[:size-1 : size-1])
}
func (s *IDGenerator) Clear(stream int) (inuse bool) {
offset := bucketOffset(stream)
bucket := atomic.LoadUint64(&s.streams[offset])
mask := uint64(1) << streamOffset(stream)
if bucket&mask != mask {
// already cleared
return false
}
for !atomic.CompareAndSwapUint64(&s.streams[offset], bucket, bucket & ^mask) {
bucket = atomic.LoadUint64(&s.streams[offset])
if bucket&mask != mask {
// already cleared
return false
}
}
// TODO: make this account for 0 stream being reserved
if atomic.AddInt32(&s.inuseStreams, -1) < 0 {
// TODO(zariel): remove this
panic("negative streams inuse")
}
return true
}
func (s *IDGenerator) Available() int {
return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1
}
|