File: streams.go

package info (click to toggle)
golang-github-gocql-gocql 0.0~git20171009.0.2416cf3-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 872 kB
  • sloc: sh: 68; makefile: 2
file content (140 lines) | stat: -rw-r--r-- 3,260 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package streams

import (
	"math"
	"strconv"
	"sync/atomic"
)

const bucketBits = 64

// IDGenerator tracks and allocates streams which are in use.
type IDGenerator struct {
	NumStreams   int
	inuseStreams int32
	numBuckets   uint32

	// streams is a bitset where each bit represents a stream, a 1 implies in use
	streams []uint64
	offset  uint32
}

func New(protocol int) *IDGenerator {
	maxStreams := 128
	if protocol > 2 {
		maxStreams = 32768
	}

	buckets := maxStreams / 64
	// reserve stream 0
	streams := make([]uint64, buckets)
	streams[0] = 1 << 63

	return &IDGenerator{
		NumStreams: maxStreams,
		streams:    streams,
		numBuckets: uint32(buckets),
		offset:     uint32(buckets) - 1,
	}
}

func streamFromBucket(bucket, streamInBucket int) int {
	return (bucket * bucketBits) + streamInBucket
}

func (s *IDGenerator) GetStream() (int, bool) {
	// based closely on the java-driver stream ID generator
	// avoid false sharing subsequent requests.
	offset := atomic.LoadUint32(&s.offset)
	for !atomic.CompareAndSwapUint32(&s.offset, offset, (offset+1)%s.numBuckets) {
		offset = atomic.LoadUint32(&s.offset)
	}
	offset = (offset + 1) % s.numBuckets

	for i := uint32(0); i < s.numBuckets; i++ {
		pos := int((i + offset) % s.numBuckets)

		bucket := atomic.LoadUint64(&s.streams[pos])
		if bucket == math.MaxUint64 {
			// all streams in use
			continue
		}

		for j := 0; j < bucketBits; j++ {
			mask := uint64(1 << streamOffset(j))
			for bucket&mask == 0 {
				if atomic.CompareAndSwapUint64(&s.streams[pos], bucket, bucket|mask) {
					atomic.AddInt32(&s.inuseStreams, 1)
					return streamFromBucket(int(pos), j), true
				}
				bucket = atomic.LoadUint64(&s.streams[pos])
			}
		}
	}

	return 0, false
}

func bitfmt(b uint64) string {
	return strconv.FormatUint(b, 16)
}

// returns the bucket offset of a given stream
func bucketOffset(i int) int {
	return i / bucketBits
}

func streamOffset(stream int) uint64 {
	return bucketBits - uint64(stream%bucketBits) - 1
}

func isSet(bits uint64, stream int) bool {
	return bits>>streamOffset(stream)&1 == 1
}

func (s *IDGenerator) isSet(stream int) bool {
	bits := atomic.LoadUint64(&s.streams[bucketOffset(stream)])
	return isSet(bits, stream)
}

func (s *IDGenerator) String() string {
	size := s.numBuckets * (bucketBits + 1)
	buf := make([]byte, 0, size)
	for i := 0; i < int(s.numBuckets); i++ {
		bits := atomic.LoadUint64(&s.streams[i])
		buf = append(buf, bitfmt(bits)...)
		buf = append(buf, ' ')
	}
	return string(buf[:size-1 : size-1])
}

func (s *IDGenerator) Clear(stream int) (inuse bool) {
	offset := bucketOffset(stream)
	bucket := atomic.LoadUint64(&s.streams[offset])

	mask := uint64(1) << streamOffset(stream)
	if bucket&mask != mask {
		// already cleared
		return false
	}

	for !atomic.CompareAndSwapUint64(&s.streams[offset], bucket, bucket & ^mask) {
		bucket = atomic.LoadUint64(&s.streams[offset])
		if bucket&mask != mask {
			// already cleared
			return false
		}
	}

	// TODO: make this account for 0 stream being reserved
	if atomic.AddInt32(&s.inuseStreams, -1) < 0 {
		// TODO(zariel): remove this
		panic("negative streams inuse")
	}

	return true
}

func (s *IDGenerator) Available() int {
	return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1
}