1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
package listener
import (
"errors"
"io"
"net"
"sync"
"time"
)
type conn struct {
net.Conn
shard *shard
mode mode
mu sync.Mutex // Protects the state machine below
busy bool // connection is in use (i.e., not idle)
closed bool // connection is closed
disowned bool // if true, this connection is no longer under our management
}
// This intentionally looks a lot like the one in package net.
var errClosing = errors.New("use of closed network connection")
func (c *conn) init() error {
c.shard.wg.Add(1)
if shouldExit := c.shard.track(c); shouldExit {
c.Close()
return errClosing
}
return nil
}
func (c *conn) Read(b []byte) (n int, err error) {
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
if c.disowned {
return
}
// This protects against a Close/Read race. We're not really
// concerned about the general case (it's fundamentally racy),
// but are mostly trying to prevent a race between a new request
// getting read off the wire in one thread while the connection
// is being gracefully shut down in another.
if c.closed && err == nil {
n = 0
err = errClosing
return
}
if c.mode != Manual && !c.busy && !c.closed {
c.busy = true
c.shard.markInUse(c)
}
}()
return c.Conn.Read(b)
}
func (c *conn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.disowned {
return c.Conn.Close()
} else if c.closed {
return errClosing
}
c.closed = true
c.shard.disown(c)
defer c.shard.wg.Done()
return c.Conn.Close()
}
func (c *conn) SetReadDeadline(t time.Time) error {
c.mu.Lock()
if !c.disowned && c.mode == Deadline {
defer c.markIdle()
}
c.mu.Unlock()
return c.Conn.SetReadDeadline(t)
}
func (c *conn) ReadFrom(r io.Reader) (int64, error) {
return io.Copy(c.Conn, r)
}
func (c *conn) markIdle() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.busy {
return
}
c.busy = false
if exit := c.shard.markIdle(c); exit && !c.closed && !c.disowned {
c.closed = true
c.shard.disown(c)
defer c.shard.wg.Done()
c.Conn.Close()
return
}
}
func (c *conn) markInUse() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.busy && !c.closed && !c.disowned {
c.busy = true
c.shard.markInUse(c)
}
}
func (c *conn) closeIfIdle() error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.busy && !c.closed && !c.disowned {
c.closed = true
c.shard.disown(c)
defer c.shard.wg.Done()
return c.Conn.Close()
}
return nil
}
var errAlreadyDisowned = errors.New("listener: conn already disowned")
func (c *conn) disown() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.disowned {
return errAlreadyDisowned
}
c.shard.disown(c)
c.disowned = true
c.shard.wg.Done()
return nil
}
|