1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
// Copyright 2020 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package syncevent
import (
"gvisor.dev/gvisor/pkg/sync"
)
// Broadcaster is an implementation of Source that supports any number of
// subscribed Receivers.
//
// The zero value of Broadcaster is valid and has no subscribed Receivers.
// Broadcaster is not copyable by value.
//
// All Broadcaster methods may be called concurrently from multiple goroutines.
type Broadcaster struct {
// Broadcaster is implemented as a hash table where keys are assigned by
// the Broadcaster and returned as SubscriptionIDs, making it safe to use
// the identity function for hashing. The hash table resolves collisions
// using linear probing and features Robin Hood insertion and backward
// shift deletion in order to support a relatively high load factor
// efficiently, which matters since the cost of Broadcast is linear in the
// size of the table.
// mu protects the following fields.
mu sync.Mutex
// Invariants: len(table) is 0 or a power of 2.
table []broadcasterSlot
// load is the number of entries in table with receiver != nil.
load int
lastID SubscriptionID
}
type broadcasterSlot struct {
// Invariants: If receiver == nil, then filter == NoEvents and id == 0.
// Otherwise, id != 0.
receiver *Receiver
filter Set
id SubscriptionID
}
const (
broadcasterMinNonZeroTableSize = 2 // must be a power of 2 > 1
broadcasterMaxLoadNum = 13
broadcasterMaxLoadDen = 16
)
// SubscribeEvents implements Source.SubscribeEvents.
func (b *Broadcaster) SubscribeEvents(r *Receiver, filter Set) SubscriptionID {
b.mu.Lock()
// Assign an ID for this subscription.
b.lastID++
id := b.lastID
// Expand the table if over the maximum load factor:
//
// load / len(b.table) > broadcasterMaxLoadNum / broadcasterMaxLoadDen
// load * broadcasterMaxLoadDen > broadcasterMaxLoadNum * len(b.table)
b.load++
if (b.load * broadcasterMaxLoadDen) > (broadcasterMaxLoadNum * len(b.table)) {
// Double the number of slots in the new table.
newlen := broadcasterMinNonZeroTableSize
if len(b.table) != 0 {
newlen = 2 * len(b.table)
}
if newlen <= cap(b.table) {
// Reuse excess capacity in the current table, moving entries not
// already in their first-probed positions to better ones.
newtable := b.table[:newlen]
newmask := uint64(newlen - 1)
for i := range b.table {
if b.table[i].receiver != nil && uint64(b.table[i].id)&newmask != uint64(i) {
entry := b.table[i]
b.table[i] = broadcasterSlot{}
broadcasterTableInsert(newtable, entry.id, entry.receiver, entry.filter)
}
}
b.table = newtable
} else {
newtable := make([]broadcasterSlot, newlen)
// Copy existing entries to the new table.
for i := range b.table {
if b.table[i].receiver != nil {
broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
}
}
// Switch to the new table.
b.table = newtable
}
}
broadcasterTableInsert(b.table, id, r, filter)
b.mu.Unlock()
return id
}
// Preconditions:
// - table must not be full.
// - len(table) is a power of 2.
func broadcasterTableInsert(table []broadcasterSlot, id SubscriptionID, r *Receiver, filter Set) {
entry := broadcasterSlot{
receiver: r,
filter: filter,
id: id,
}
mask := uint64(len(table) - 1)
i := uint64(id) & mask
disp := uint64(0)
for {
if table[i].receiver == nil {
table[i] = entry
return
}
// If we've been displaced farther from our first-probed slot than the
// element stored in this one, swap elements and switch to inserting
// the replaced one. (This is Robin Hood insertion.)
slotDisp := (i - uint64(table[i].id)) & mask
if disp > slotDisp {
table[i], entry = entry, table[i]
disp = slotDisp
}
i = (i + 1) & mask
disp++
}
}
// UnsubscribeEvents implements Source.UnsubscribeEvents.
func (b *Broadcaster) UnsubscribeEvents(id SubscriptionID) {
b.mu.Lock()
mask := uint64(len(b.table) - 1)
i := uint64(id) & mask
for {
if b.table[i].id == id {
// Found the element to remove. Move all subsequent elements
// backward until we either find an empty slot, or an element that
// is already in its first-probed slot. (This is backward shift
// deletion.)
for {
next := (i + 1) & mask
if b.table[next].receiver == nil {
break
}
if uint64(b.table[next].id)&mask == next {
break
}
b.table[i] = b.table[next]
i = next
}
b.table[i] = broadcasterSlot{}
break
}
i = (i + 1) & mask
}
// If a table 1/4 of the current size would still be at or under the
// maximum load factor (i.e. the current table size is at least two
// expansions bigger than necessary), halve the size of the table to reduce
// the cost of Broadcast. Since we are concerned with iteration time and
// not memory usage, reuse the existing slice to reduce future allocations
// from table re-expansion.
b.load--
if len(b.table) > broadcasterMinNonZeroTableSize && (b.load*(4*broadcasterMaxLoadDen)) <= (broadcasterMaxLoadNum*len(b.table)) {
newlen := len(b.table) / 2
newtable := b.table[:newlen]
for i := newlen; i < len(b.table); i++ {
if b.table[i].receiver != nil {
broadcasterTableInsert(newtable, b.table[i].id, b.table[i].receiver, b.table[i].filter)
b.table[i] = broadcasterSlot{}
}
}
b.table = newtable
}
b.mu.Unlock()
}
// Broadcast notifies all Receivers subscribed to the Broadcaster of the subset
// of events to which they subscribed. The order in which Receivers are
// notified is unspecified.
func (b *Broadcaster) Broadcast(events Set) {
b.mu.Lock()
for i := range b.table {
if intersection := events & b.table[i].filter; intersection != 0 {
// We don't need to check if broadcasterSlot.receiver is nil, since
// if it is then broadcasterSlot.filter is 0.
b.table[i].receiver.Notify(intersection)
}
}
b.mu.Unlock()
}
// FilteredEvents returns the set of events for which Broadcast will notify at
// least one Receiver, i.e. the union of filters for all subscribed Receivers.
func (b *Broadcaster) FilteredEvents() Set {
var es Set
b.mu.Lock()
for i := range b.table {
es |= b.table[i].filter
}
b.mu.Unlock()
return es
}
|