File: conn_test.go

package info (click to toggle)
consul 1.8.7%2Bdfsg1-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, bullseye-backports
  • size: 57,848 kB
  • sloc: javascript: 25,918; sh: 3,807; makefile: 135; cpp: 102
file content (206 lines) | stat: -rw-r--r-- 5,040 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package proxy

import (
	"bufio"
	"io"
	"net"
	"testing"
	"time"

	"github.com/hashicorp/consul/sdk/testutil/retry"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

// Assert io.Closer implementation
var _ io.Closer = new(Conn)

// testConnPairSetup creates a TCP connection by listening on a random port, and
// returns both ends. Ready to have data sent down them. It also returns a
// closer function that will close both conns and the listener.
func testConnPairSetup(t *testing.T) (net.Conn, net.Conn, func()) {
	t.Helper()

	l, err := net.Listen("tcp", "localhost:0")
	require.Nil(t, err)

	ch := make(chan net.Conn, 1)
	go func() {
		src, err := l.Accept()
		require.Nil(t, err)
		ch <- src
	}()

	dst, err := net.Dial("tcp", l.Addr().String())
	require.Nil(t, err)

	src := <-ch

	stopper := func() {
		l.Close()
		src.Close()
		dst.Close()
	}

	return src, dst, stopper
}

// testConnPipelineSetup creates a pipeline consiting of two TCP connection
// pairs and a Conn that copies bytes between them. Data flow looks like this:
//
//   src1 <---> dst1 <== Conn.CopyBytes ==> src2 <---> dst2
//
// The returned values are the src1 and dst2 which should be able to send and
// receive to each other via the Conn, the Conn itself (not running), and a
// stopper func to close everything.
func testConnPipelineSetup(t *testing.T) (net.Conn, net.Conn, *Conn, func()) {
	src1, dst1, stop1 := testConnPairSetup(t)
	src2, dst2, stop2 := testConnPairSetup(t)
	c := NewConn(dst1, src2)
	return src1, dst2, c, func() {
		c.Close()
		stop1()
		stop2()
	}
}

func TestConn(t *testing.T) {
t.Skip("DM-skipped")
	t.Parallel()

	src, dst, c, stop := testConnPipelineSetup(t)
	defer stop()

	retCh := make(chan error, 1)
	go func() {
		retCh <- c.CopyBytes()
	}()

	// Now write/read into the other ends of the pipes (src1, dst2)
	srcR := bufio.NewReader(src)
	dstR := bufio.NewReader(dst)

	_, err := src.Write([]byte("ping 1\n"))
	require.Nil(t, err)
	_, err = dst.Write([]byte("ping 2\n"))
	require.Nil(t, err)

	got, err := dstR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "ping 1\n", got)

	got, err = srcR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "ping 2\n", got)

	retry.Run(t, func(r *retry.R) {
		tx, rx := c.Stats()
		assert.Equal(r, uint64(7), tx)
		assert.Equal(r, uint64(7), rx)
	})

	_, err = src.Write([]byte("pong 1\n"))
	require.Nil(t, err)
	_, err = dst.Write([]byte("pong 2\n"))
	require.Nil(t, err)

	got, err = dstR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "pong 1\n", got)

	got, err = srcR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "pong 2\n", got)

	retry.Run(t, func(r *retry.R) {
		tx, rx := c.Stats()
		assert.Equal(r, uint64(14), tx)
		assert.Equal(r, uint64(14), rx)
	})

	c.Close()

	ret := <-retCh
	require.Nil(t, ret, "Close() should not cause error return")
}

func TestConnSrcClosing(t *testing.T) {
	t.Parallel()

	src, dst, c, stop := testConnPipelineSetup(t)
	defer stop()

	retCh := make(chan error, 1)
	go func() {
		retCh <- c.CopyBytes()
	}()

	// Wait until we can actually get some bytes through both ways so we know that
	// the copy goroutines are running.
	srcR := bufio.NewReader(src)
	dstR := bufio.NewReader(dst)

	_, err := src.Write([]byte("ping 1\n"))
	require.Nil(t, err)
	_, err = dst.Write([]byte("ping 2\n"))
	require.Nil(t, err)

	got, err := dstR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "ping 1\n", got)
	got, err = srcR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "ping 2\n", got)

	// If we close the src conn, we expect CopyBytes to return and dst to be
	// closed too. No good way to assert that the conn is closed really other than
	// assume the retCh receive will hang unless CopyBytes exits and that
	// CopyBytes defers Closing both.
	testTimer := time.AfterFunc(3*time.Second, func() {
		panic("test timeout")
	})
	src.Close()
	<-retCh
	testTimer.Stop()
}

func TestConnDstClosing(t *testing.T) {
	t.Parallel()

	src, dst, c, stop := testConnPipelineSetup(t)
	defer stop()

	retCh := make(chan error, 1)
	go func() {
		retCh <- c.CopyBytes()
	}()

	// Wait until we can actually get some bytes through both ways so we know that
	// the copy goroutines are running.
	srcR := bufio.NewReader(src)
	dstR := bufio.NewReader(dst)

	_, err := src.Write([]byte("ping 1\n"))
	require.Nil(t, err)
	_, err = dst.Write([]byte("ping 2\n"))
	require.Nil(t, err)

	got, err := dstR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "ping 1\n", got)
	got, err = srcR.ReadString('\n')
	require.Nil(t, err)
	require.Equal(t, "ping 2\n", got)

	// If we close the dst conn, we expect CopyBytes to return and src to be
	// closed too. No good way to assert that the conn is closed really other than
	// assume the retCh receive will hang unless CopyBytes exits and that
	// CopyBytes defers Closing both. i.e. if this test doesn't time out it's
	// good!
	testTimer := time.AfterFunc(3*time.Second, func() {
		panic("test timeout")
	})
	src.Close()
	<-retCh
	testTimer.Stop()
}