1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
package snowflake_client
import (
"container/list"
"errors"
"fmt"
"log"
"sync"
)
// Peers is a container that keeps track of multiple WebRTC remote peers.
// Implements |SnowflakeCollector|.
//
// Maintaining a set of pre-connected Peers with fresh but inactive datachannels
// allows allows rapid recovery when the current WebRTC Peer disconnects.
//
// Note: For now, only one remote can be active at any given moment.
// This is a property of Tor circuits & its current multiplexing constraints,
// but could be updated if that changes.
// (Also, this constraint does not necessarily apply to the more generic PT
// version of Snowflake)
type Peers struct {
Tongue
bytesLogger bytesLogger
snowflakeChan chan *WebRTCPeer
activePeers *list.List
melt chan struct{}
collectLock sync.Mutex
closeOnce sync.Once
}
// NewPeers constructs a fresh container of remote peers.
func NewPeers(tongue Tongue) (*Peers, error) {
p := &Peers{}
// Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
if tongue == nil {
return nil, errors.New("missing Tongue to catch Snowflakes with")
}
p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax())
p.activePeers = list.New()
p.melt = make(chan struct{})
p.Tongue = tongue
return p, nil
}
// Collect connects to and adds a new remote peer as part of |SnowflakeCollector| interface.
func (p *Peers) Collect() (*WebRTCPeer, error) {
// Engage the Snowflake Catching interface, which must be available.
p.collectLock.Lock()
defer p.collectLock.Unlock()
select {
case <-p.melt:
return nil, fmt.Errorf("Snowflakes have melted")
default:
}
if nil == p.Tongue {
return nil, errors.New("missing Tongue to catch Snowflakes with")
}
cnt := p.Count()
capacity := p.Tongue.GetMax()
s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity)
if cnt >= capacity {
return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity)
}
log.Println("WebRTC: Collecting a new Snowflake.", s)
// BUG: some broker conflict here.
connection, err := p.Tongue.Catch()
if nil != err {
return nil, err
}
// Track new valid Snowflake in internal collection and pass along.
p.activePeers.PushBack(connection)
p.snowflakeChan <- connection
return connection, nil
}
// Pop blocks until an available, valid snowflake appears.
// Pop will return nil after End has been called.
func (p *Peers) Pop() *WebRTCPeer {
for {
snowflake, ok := <-p.snowflakeChan
if !ok {
return nil
}
if snowflake.Closed() {
continue
}
// Set to use the same rate-limited traffic logger to keep consistency.
snowflake.bytesLogger = p.bytesLogger
return snowflake
}
}
// Melted returns a channel that will close when peers stop being collected.
// Melted is a necessary part of |SnowflakeCollector| interface.
func (p *Peers) Melted() <-chan struct{} {
return p.melt
}
// Count returns the total available Snowflakes (including the active ones)
// The count only reduces when connections themselves close, rather than when
// they are popped.
func (p *Peers) Count() int {
p.purgeClosedPeers()
return p.activePeers.Len()
}
func (p *Peers) purgeClosedPeers() {
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
// Purge those marked for deletion.
if conn.Closed() {
p.activePeers.Remove(e)
}
e = next
}
}
// End closes all active connections to Peers contained here, and stops the
// collection of future Peers.
func (p *Peers) End() {
p.closeOnce.Do(func() {
close(p.melt)
p.collectLock.Lock()
defer p.collectLock.Unlock()
close(p.snowflakeChan)
cnt := p.Count()
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
conn.Close()
p.activePeers.Remove(e)
e = next
}
log.Printf("WebRTC: melted all %d snowflakes.", cnt)
})
}
|