1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
package turbotunnel
import (
"container/heap"
"net"
"sync"
"time"
)
// clientRecord is a record of a recently seen client, with the time it was last
// seen and a send queue.
type clientRecord struct {
Addr net.Addr
LastSeen time.Time
SendQueue chan []byte
}
// ClientMap manages a mapping of live clients (keyed by address, which will be
// a ClientID) to their respective send queues. ClientMap's functions are safe
// to call from multiple goroutines.
type ClientMap struct {
// We use an inner structure to avoid exposing public heap.Interface
// functions to users of clientMap.
inner clientMapInner
// Synchronizes access to inner.
lock sync.Mutex
}
// NewClientMap creates a ClientMap that expires clients after a timeout.
//
// The timeout does not have to be kept in sync with QUIC's internal idle
// timeout. If a client is removed from the client map while the QUIC session is
// still live, the worst that can happen is a loss of whatever packets were in
// the send queue at the time. If QUIC later decides to send more packets to the
// same client, we'll instantiate a new send queue, and if the client ever
// connects again with the proper client ID, we'll deliver them.
func NewClientMap(timeout time.Duration) *ClientMap {
m := &ClientMap{
inner: clientMapInner{
byAge: make([]*clientRecord, 0),
byAddr: make(map[net.Addr]int),
},
}
go func() {
for {
time.Sleep(timeout / 2)
now := time.Now()
m.lock.Lock()
m.inner.removeExpired(now, timeout)
m.lock.Unlock()
}
}()
return m
}
// SendQueue returns the send queue corresponding to addr, creating it if
// necessary.
func (m *ClientMap) SendQueue(addr net.Addr) chan []byte {
m.lock.Lock()
queue := m.inner.SendQueue(addr, time.Now())
m.lock.Unlock()
return queue
}
// clientMapInner is the inner type of ClientMap, implementing heap.Interface.
// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs)
// to heap indices, to allow looking up by address. Unlike ClientMap,
// clientMapInner requires external synchonization.
type clientMapInner struct {
byAge []*clientRecord
byAddr map[net.Addr]int
}
// removeExpired removes all client records whose LastSeen timestamp is more
// than timeout in the past.
func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) {
for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
heap.Pop(inner)
}
}
// SendQueue finds the existing client record corresponding to addr, or creates
// a new one if none exists yet. It updates the client record's LastSeen time
// and returns its SendQueue.
func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
var record *clientRecord
i, ok := inner.byAddr[addr]
if ok {
// Found one, update its LastSeen.
record = inner.byAge[i]
record.LastSeen = now
heap.Fix(inner, i)
} else {
// Not found, create a new one.
record = &clientRecord{
Addr: addr,
LastSeen: now,
SendQueue: make(chan []byte, queueSize),
}
heap.Push(inner, record)
}
return record.SendQueue
}
// heap.Interface for clientMapInner.
func (inner *clientMapInner) Len() int {
if len(inner.byAge) != len(inner.byAddr) {
panic("inconsistent clientMap")
}
return len(inner.byAge)
}
func (inner *clientMapInner) Less(i, j int) bool {
return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
}
func (inner *clientMapInner) Swap(i, j int) {
inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
inner.byAddr[inner.byAge[i].Addr] = i
inner.byAddr[inner.byAge[j].Addr] = j
}
func (inner *clientMapInner) Push(x interface{}) {
record := x.(*clientRecord)
if _, ok := inner.byAddr[record.Addr]; ok {
panic("duplicate address in clientMap")
}
// Insert into byAddr map.
inner.byAddr[record.Addr] = len(inner.byAge)
// Insert into byAge slice.
inner.byAge = append(inner.byAge, record)
}
func (inner *clientMapInner) Pop() interface{} {
n := len(inner.byAddr)
// Remove from byAge slice.
record := inner.byAge[n-1]
inner.byAge[n-1] = nil
inner.byAge = inner.byAge[:n-1]
// Remove from byAddr map.
delete(inner.byAddr, record.Addr)
close(record.SendQueue)
return record
}
|