1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
package fasthttputil
import (
"errors"
"net"
"sync"
)
// ErrInmemoryListenerClosed indicates that the InmemoryListener is already closed.
var ErrInmemoryListenerClosed = errors.New("InmemoryListener is already closed: use of closed network connection")
// InmemoryListener provides in-memory dialer<->net.Listener implementation.
//
// It may be used either for fast in-process client<->server communications
// without network stack overhead or for client<->server tests.
type InmemoryListener struct {
listenerAddr net.Addr
conns chan acceptConn
addrLock sync.RWMutex
lock sync.Mutex
closed bool
}
type acceptConn struct {
conn net.Conn
accepted chan struct{}
}
// NewInmemoryListener returns new in-memory dialer<->net.Listener.
func NewInmemoryListener() *InmemoryListener {
return &InmemoryListener{
conns: make(chan acceptConn, 1024),
}
}
// SetLocalAddr sets the (simulated) local address for the listener.
func (ln *InmemoryListener) SetLocalAddr(localAddr net.Addr) {
ln.addrLock.Lock()
defer ln.addrLock.Unlock()
ln.listenerAddr = localAddr
}
// Accept implements net.Listener's Accept.
//
// It is safe calling Accept from concurrently running goroutines.
//
// Accept returns new connection per each Dial call.
func (ln *InmemoryListener) Accept() (net.Conn, error) {
c, ok := <-ln.conns
if !ok {
return nil, ErrInmemoryListenerClosed
}
close(c.accepted)
return c.conn, nil
}
// Close implements net.Listener's Close.
func (ln *InmemoryListener) Close() error {
var err error
ln.lock.Lock()
if !ln.closed {
close(ln.conns)
ln.closed = true
} else {
err = ErrInmemoryListenerClosed
}
ln.lock.Unlock()
return err
}
type inmemoryAddr int
func (inmemoryAddr) Network() string {
return "inmemory"
}
func (inmemoryAddr) String() string {
return "InmemoryListener"
}
// Addr implements net.Listener's Addr.
func (ln *InmemoryListener) Addr() net.Addr {
ln.addrLock.RLock()
defer ln.addrLock.RUnlock()
if ln.listenerAddr != nil {
return ln.listenerAddr
}
return inmemoryAddr(0)
}
// Dial creates new client<->server connection.
// Just like a real Dial it only returns once the server
// has accepted the connection.
//
// It is safe calling Dial from concurrently running goroutines.
func (ln *InmemoryListener) Dial() (net.Conn, error) {
return ln.DialWithLocalAddr(nil)
}
// DialWithLocalAddr creates new client<->server connection.
// Just like a real Dial it only returns once the server
// has accepted the connection. The local address of the
// client connection can be set with local.
//
// It is safe calling Dial from concurrently running goroutines.
func (ln *InmemoryListener) DialWithLocalAddr(local net.Addr) (net.Conn, error) {
pc := NewPipeConns()
pc.SetAddresses(local, ln.Addr(), ln.Addr(), local)
cConn := pc.Conn1()
sConn := pc.Conn2()
ln.lock.Lock()
accepted := make(chan struct{})
if !ln.closed {
ln.conns <- acceptConn{conn: sConn, accepted: accepted}
// Wait until the connection has been accepted.
<-accepted
} else {
_ = sConn.Close()
_ = cConn.Close()
cConn = nil
}
ln.lock.Unlock()
if cConn == nil {
return nil, ErrInmemoryListenerClosed
}
return cConn, nil
}
|