1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
|
package proxy
import (
"bufio"
"io"
"net"
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Assert io.Closer implementation
var _ io.Closer = new(Conn)
// testConnPairSetup creates a TCP connection by listening on a random port, and
// returns both ends. Ready to have data sent down them. It also returns a
// closer function that will close both conns and the listener.
func testConnPairSetup(t *testing.T) (net.Conn, net.Conn, func()) {
t.Helper()
l, err := net.Listen("tcp", "localhost:0")
require.Nil(t, err)
ch := make(chan net.Conn, 1)
go func() {
src, err := l.Accept()
require.Nil(t, err)
ch <- src
}()
dst, err := net.Dial("tcp", l.Addr().String())
require.Nil(t, err)
src := <-ch
stopper := func() {
l.Close()
src.Close()
dst.Close()
}
return src, dst, stopper
}
// testConnPipelineSetup creates a pipeline consiting of two TCP connection
// pairs and a Conn that copies bytes between them. Data flow looks like this:
//
// src1 <---> dst1 <== Conn.CopyBytes ==> src2 <---> dst2
//
// The returned values are the src1 and dst2 which should be able to send and
// receive to each other via the Conn, the Conn itself (not running), and a
// stopper func to close everything.
func testConnPipelineSetup(t *testing.T) (net.Conn, net.Conn, *Conn, func()) {
src1, dst1, stop1 := testConnPairSetup(t)
src2, dst2, stop2 := testConnPairSetup(t)
c := NewConn(dst1, src2)
return src1, dst2, c, func() {
c.Close()
stop1()
stop2()
}
}
func TestConn(t *testing.T) {
t.Skip("DM-skipped")
t.Parallel()
src, dst, c, stop := testConnPipelineSetup(t)
defer stop()
retCh := make(chan error, 1)
go func() {
retCh <- c.CopyBytes()
}()
// Now write/read into the other ends of the pipes (src1, dst2)
srcR := bufio.NewReader(src)
dstR := bufio.NewReader(dst)
_, err := src.Write([]byte("ping 1\n"))
require.Nil(t, err)
_, err = dst.Write([]byte("ping 2\n"))
require.Nil(t, err)
got, err := dstR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "ping 1\n", got)
got, err = srcR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "ping 2\n", got)
retry.Run(t, func(r *retry.R) {
tx, rx := c.Stats()
assert.Equal(r, uint64(7), tx)
assert.Equal(r, uint64(7), rx)
})
_, err = src.Write([]byte("pong 1\n"))
require.Nil(t, err)
_, err = dst.Write([]byte("pong 2\n"))
require.Nil(t, err)
got, err = dstR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "pong 1\n", got)
got, err = srcR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "pong 2\n", got)
retry.Run(t, func(r *retry.R) {
tx, rx := c.Stats()
assert.Equal(r, uint64(14), tx)
assert.Equal(r, uint64(14), rx)
})
c.Close()
ret := <-retCh
require.Nil(t, ret, "Close() should not cause error return")
}
func TestConnSrcClosing(t *testing.T) {
t.Parallel()
src, dst, c, stop := testConnPipelineSetup(t)
defer stop()
retCh := make(chan error, 1)
go func() {
retCh <- c.CopyBytes()
}()
// Wait until we can actually get some bytes through both ways so we know that
// the copy goroutines are running.
srcR := bufio.NewReader(src)
dstR := bufio.NewReader(dst)
_, err := src.Write([]byte("ping 1\n"))
require.Nil(t, err)
_, err = dst.Write([]byte("ping 2\n"))
require.Nil(t, err)
got, err := dstR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "ping 1\n", got)
got, err = srcR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "ping 2\n", got)
// If we close the src conn, we expect CopyBytes to return and dst to be
// closed too. No good way to assert that the conn is closed really other than
// assume the retCh receive will hang unless CopyBytes exits and that
// CopyBytes defers Closing both.
testTimer := time.AfterFunc(3*time.Second, func() {
panic("test timeout")
})
src.Close()
<-retCh
testTimer.Stop()
}
func TestConnDstClosing(t *testing.T) {
t.Parallel()
src, dst, c, stop := testConnPipelineSetup(t)
defer stop()
retCh := make(chan error, 1)
go func() {
retCh <- c.CopyBytes()
}()
// Wait until we can actually get some bytes through both ways so we know that
// the copy goroutines are running.
srcR := bufio.NewReader(src)
dstR := bufio.NewReader(dst)
_, err := src.Write([]byte("ping 1\n"))
require.Nil(t, err)
_, err = dst.Write([]byte("ping 2\n"))
require.Nil(t, err)
got, err := dstR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "ping 1\n", got)
got, err = srcR.ReadString('\n')
require.Nil(t, err)
require.Equal(t, "ping 2\n", got)
// If we close the dst conn, we expect CopyBytes to return and src to be
// closed too. No good way to assert that the conn is closed really other than
// assume the retCh receive will hang unless CopyBytes exits and that
// CopyBytes defers Closing both. i.e. if this test doesn't time out it's
// good!
testTimer := time.AfterFunc(3*time.Second, func() {
panic("test timeout")
})
src.Close()
<-retCh
testTimer.Stop()
}
|