File: queuepacketconn_test.go

package info (click to toggle)
snowflake 2.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,104 kB
  • sloc: makefile: 5
file content (234 lines) | stat: -rw-r--r-- 6,842 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package turbotunnel

import (
	"bytes"
	"fmt"
	"net"
	"sync"
	"testing"
	"time"

	"github.com/xtaci/kcp-go/v5"
)

type emptyAddr struct{}

func (_ emptyAddr) Network() string { return "empty" }
func (_ emptyAddr) String() string  { return "empty" }

type intAddr int

func (i intAddr) Network() string { return "int" }
func (i intAddr) String() string  { return fmt.Sprintf("%d", i) }

// Run with -benchmem to see memory allocations.
func BenchmarkQueueIncoming(b *testing.B) {
	conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, 500)
	defer conn.Close()

	b.ResetTimer()
	var p [500]byte
	for i := 0; i < b.N; i++ {
		conn.QueueIncoming(p[:], emptyAddr{})
	}
	b.StopTimer()
}

// BenchmarkWriteTo benchmarks the QueuePacketConn.WriteTo function.
func BenchmarkWriteTo(b *testing.B) {
	conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, 500)
	defer conn.Close()

	b.ResetTimer()
	var p [500]byte
	for i := 0; i < b.N; i++ {
		conn.WriteTo(p[:], emptyAddr{})
	}
	b.StopTimer()
}

// TestQueueIncomingOversize tests that QueueIncoming truncates packets that are
// larger than the MTU.
func TestQueueIncomingOversize(t *testing.T) {
	const payload = "abcdefghijklmnopqrstuvwxyz"
	conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, len(payload)-1)
	defer conn.Close()
	conn.QueueIncoming([]byte(payload), emptyAddr{})
	var p [500]byte
	n, _, err := conn.ReadFrom(p[:])
	if err != nil {
		t.Fatal(err)
	}
	if !bytes.Equal(p[:n], []byte(payload[:len(payload)-1])) {
		t.Fatalf("payload was %+q, expected %+q", p[:n], payload[:len(payload)-1])
	}
}

// TestWriteToOversize tests that WriteTo truncates packets that are larger than
// the MTU.
func TestWriteToOversize(t *testing.T) {
	const payload = "abcdefghijklmnopqrstuvwxyz"
	conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, len(payload)-1)
	defer conn.Close()
	conn.WriteTo([]byte(payload), emptyAddr{})
	p := <-conn.OutgoingQueue(emptyAddr{})
	if !bytes.Equal(p, []byte(payload[:len(payload)-1])) {
		t.Fatalf("payload was %+q, expected %+q", p, payload[:len(payload)-1])
	}
}

// TestRestoreMTU tests that Restore ignores any inputs that are not at least
// MTU-sized.
func TestRestoreMTU(t *testing.T) {
	const mtu = 500
	const payload = "hello"
	conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, mtu)
	defer conn.Close()
	conn.Restore(make([]byte, mtu-1))
	// This WriteTo may use the short slice we just gave to Restore.
	conn.WriteTo([]byte(payload), emptyAddr{})
	// Read the queued slice and ensure its capacity is at least the MTU.
	p := <-conn.OutgoingQueue(emptyAddr{})
	if cap(p) != mtu {
		t.Fatalf("cap was %v, expected %v", cap(p), mtu)
	}
	// Check the payload while we're at it.
	if !bytes.Equal(p, []byte(payload)) {
		t.Fatalf("payload was %+q, expected %+q", p, payload)
	}
}

// TestRestoreCap tests that Restore can use slices whose cap is at least the
// MTU, even if the len is shorter.
func TestRestoreCap(t *testing.T) {
	const mtu = 500
	const payload = "hello"
	conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, mtu)
	defer conn.Close()
	conn.Restore(make([]byte, 0, mtu))
	conn.WriteTo([]byte(payload), emptyAddr{})
	p := <-conn.OutgoingQueue(emptyAddr{})
	if !bytes.Equal(p, []byte(payload)) {
		t.Fatalf("payload was %+q, expected %+q", p, payload)
	}
}

// DiscardPacketConn is a net.PacketConn whose ReadFrom method block forever and
// whose WriteTo method discards whatever it is called with.
type DiscardPacketConn struct{}

func (_ DiscardPacketConn) ReadFrom(_ []byte) (int, net.Addr, error)  { select {} } // block forever
func (_ DiscardPacketConn) WriteTo(p []byte, _ net.Addr) (int, error) { return len(p), nil }
func (_ DiscardPacketConn) Close() error                              { return nil }
func (_ DiscardPacketConn) LocalAddr() net.Addr                       { return emptyAddr{} }
func (_ DiscardPacketConn) SetDeadline(t time.Time) error             { return nil }
func (_ DiscardPacketConn) SetReadDeadline(t time.Time) error         { return nil }
func (_ DiscardPacketConn) SetWriteDeadline(t time.Time) error        { return nil }

// TranscriptPacketConn keeps a log of the []byte argument to every call to
// WriteTo.
type TranscriptPacketConn struct {
	Transcript [][]byte
	lock       sync.Mutex
	net.PacketConn
}

func NewTranscriptPacketConn(inner net.PacketConn) *TranscriptPacketConn {
	return &TranscriptPacketConn{
		PacketConn: inner,
	}
}

func (c *TranscriptPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
	c.lock.Lock()
	defer c.lock.Unlock()

	p2 := make([]byte, len(p))
	copy(p2, p)
	c.Transcript = append(c.Transcript, p2)

	return c.PacketConn.WriteTo(p, addr)
}

// Tests that QueuePacketConn.WriteTo is compatible with the way kcp-go uses
// PacketConn, allocating source buffers in a sync.Pool.
//
// https://bugs.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/40260
func TestQueuePacketConnWriteToKCP(t *testing.T) {
	// Start a goroutine to constantly exercise kcp UDPSession.tx, writing
	// packets with payload "XXXX".
	done := make(chan struct{}, 0)
	defer close(done)
	ready := make(chan struct{}, 0)
	go func() {
		var readyClose sync.Once
		defer readyClose.Do(func() { close(ready) })
		pconn := DiscardPacketConn{}
		defer pconn.Close()
	loop:
		for {
			select {
			case <-done:
				break loop
			default:
			}
			// Create a new UDPSession, send once, then discard the
			// UDPSession.
			conn, err := kcp.NewConn2(intAddr(2), nil, 0, 0, pconn)
			if err != nil {
				panic(err)
			}
			_, err = conn.Write([]byte("XXXX"))
			if err != nil {
				panic(err)
			}
			conn.Close()
			// Signal the main test to start once we have done one
			// iterator of this noisy loop.
			readyClose.Do(func() { close(ready) })
		}
	}()

	pconn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, 500)
	defer pconn.Close()
	addr1 := intAddr(1)
	outgoing := pconn.OutgoingQueue(addr1)

	// Once the "XXXX" goroutine is started, repeatedly send a packet, wait,
	// then retrieve it and check whether it has changed since being sent.
	<-ready
	for i := 0; i < 10; i++ {
		transcript := NewTranscriptPacketConn(pconn)
		conn, err := kcp.NewConn2(addr1, nil, 0, 0, transcript)
		if err != nil {
			panic(err)
		}
		_, err = conn.Write([]byte("hello world"))
		if err != nil {
			panic(err)
		}

		err = conn.Close()
		if err != nil {
			panic(err)
		}

		// A sleep after the Write makes buffer reuse more likely.
		time.Sleep(100 * time.Millisecond)

		if len(transcript.Transcript) == 0 {
			panic("empty transcript")
		}

		for j, tr := range transcript.Transcript {
			p := <-outgoing
			// This test is meant to detect unsynchronized memory
			// changes, so freeze the slice we just read.
			p2 := make([]byte, len(p))
			copy(p2, p)
			if !bytes.Equal(p2, tr) {
				t.Fatalf("%d %d packet changed between send and recv\nsend: %+q\nrecv: %+q", i, j, tr, p2)
			}
		}
	}
}