File: mux_test.go

package info (click to toggle)
golang-github-pion-webrtc.v3 3.1.56-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,392 kB
  • sloc: javascript: 595; sh: 28; makefile: 5
file content (153 lines) | stat: -rw-r--r-- 3,210 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package mux

import (
	"io"
	"net"
	"testing"
	"time"

	"github.com/pion/logging"
	"github.com/pion/transport/v2/packetio"
	"github.com/pion/transport/v2/test"
	"github.com/stretchr/testify/require"
)

const testPipeBufferSize = 8192

func TestNoEndpoints(t *testing.T) {
	// In memory pipe
	ca, cb := net.Pipe()
	require.NoError(t, cb.Close())

	m := NewMux(Config{
		Conn:          ca,
		BufferSize:    testPipeBufferSize,
		LoggerFactory: logging.NewDefaultLoggerFactory(),
	})
	require.NoError(t, m.dispatch(make([]byte, 1)))
	require.NoError(t, m.Close())
	require.NoError(t, ca.Close())
}

type muxErrorConnReadResult struct {
	err  error
	data []byte
}

// muxErrorConn
type muxErrorConn struct {
	net.Conn
	readResults []muxErrorConnReadResult
}

func (m *muxErrorConn) Read(b []byte) (n int, err error) {
	err = m.readResults[0].err
	copy(b, m.readResults[0].data)
	n = len(m.readResults[0].data)

	m.readResults = m.readResults[1:]
	return
}

/*
Don't end the mux readLoop for packetio.ErrTimeout or io.ErrShortBuffer, assert the following

  - io.ErrShortBuffer and packetio.ErrTimeout don't end the read loop

  - io.EOF ends the loop

    pion/webrtc#1720
*/
func TestNonFatalRead(t *testing.T) {
	// Limit runtime in case of deadlocks
	lim := test.TimeOut(time.Second * 20)
	defer lim.Stop()

	expectedData := []byte("expectedData")

	// In memory pipe
	ca, cb := net.Pipe()
	require.NoError(t, cb.Close())

	conn := &muxErrorConn{ca, []muxErrorConnReadResult{
		// Non-fatal timeout error
		{packetio.ErrTimeout, nil},
		{nil, expectedData},
		{io.ErrShortBuffer, nil},
		{nil, expectedData},
		{io.EOF, nil},
	}}

	m := NewMux(Config{
		Conn:          conn,
		BufferSize:    testPipeBufferSize,
		LoggerFactory: logging.NewDefaultLoggerFactory(),
	})

	e := m.NewEndpoint(MatchAll)

	buff := make([]byte, testPipeBufferSize)
	n, err := e.Read(buff)
	require.NoError(t, err)
	require.Equal(t, buff[:n], expectedData)

	n, err = e.Read(buff)
	require.NoError(t, err)
	require.Equal(t, buff[:n], expectedData)

	<-m.closedCh
	require.NoError(t, m.Close())
	require.NoError(t, ca.Close())
}

// If a endpoint returns packetio.ErrFull it is a non-fatal error and shouldn't cause
// the mux to be destroyed
// pion/webrtc#2180
func TestNonFatalDispatch(t *testing.T) {
	in, out := net.Pipe()

	m := NewMux(Config{
		Conn:          out,
		LoggerFactory: logging.NewDefaultLoggerFactory(),
		BufferSize:    1500,
	})

	e := m.NewEndpoint(MatchSRTP)
	e.buffer.SetLimitSize(1)

	for i := 0; i <= 25; i++ {
		srtpPacket := []byte{128, 1, 2, 3, 4}
		_, err := in.Write(srtpPacket)
		require.NoError(t, err)
	}

	require.NoError(t, m.Close())
	require.NoError(t, in.Close())
	require.NoError(t, out.Close())
}

func BenchmarkDispatch(b *testing.B) {
	m := &Mux{
		endpoints: make(map[*Endpoint]MatchFunc),
		log:       logging.NewDefaultLoggerFactory().NewLogger("mux"),
	}

	e := m.NewEndpoint(MatchSRTP)
	m.NewEndpoint(MatchSRTCP)

	buf := []byte{128, 1, 2, 3, 4}
	buf2 := make([]byte, 1200)

	b.StartTimer()

	for i := 0; i < b.N; i++ {
		err := m.dispatch(buf)
		if err != nil {
			b.Errorf("dispatch: %v", err)
		}
		_, err = e.buffer.Read(buf2)
		if err != nil {
			b.Errorf("read: %v", err)
		}
	}
}