1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
package sctp
import (
"fmt"
"io"
"math/rand"
"sync"
"testing"
"time"
)
const (
STREAM_TEST_CLIENTS = 128
STREAM_TEST_STREAMS = 11
)
func TestStreams(t *testing.T) {
var rMu sync.Mutex
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randomStr := func(strlen int) string {
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result := make([]byte, strlen)
rMu.Lock()
for i := range result {
result[i] = chars[r.Intn(len(chars))]
}
rMu.Unlock()
return string(result)
}
addr, _ := ResolveSCTPAddr("sctp", "127.0.0.1:0")
ln, err := ListenSCTPExt("sctp", addr, InitMsg{NumOstreams: STREAM_TEST_STREAMS, MaxInstreams: STREAM_TEST_STREAMS})
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
addr = ln.Addr().(*SCTPAddr)
t.Logf("Listen on %s", ln.Addr())
go func() {
for {
c, err := ln.Accept()
sconn := c.(*SCTPConn)
if err != nil {
t.Errorf("failed to accept: %v", err)
return
}
defer sconn.Close()
sconn.SubscribeEvents(SCTP_EVENT_DATA_IO)
go func() {
totalrcvd := 0
for {
buf := make([]byte, 512)
n, info, err := sconn.SCTPRead(buf)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
if n == 0 {
break
}
t.Logf("EOF on server connection. Total bytes received: %d, bytes received: %d", totalrcvd, n)
} else {
t.Errorf("Server connection read err: %v. Total bytes received: %d, bytes received: %d", err, totalrcvd, n)
return
}
}
t.Logf("server read: info: %+v, payload: %s", info, string(buf[:n]))
n, err = sconn.SCTPWrite(buf[:n], info)
if err != nil {
t.Error(err)
return
}
}
}()
}
}()
wait := make(chan struct{})
i := 0
for ; i < STREAM_TEST_CLIENTS; i++ {
go func(test int) {
defer func() { wait <- struct{}{} }()
conn, err := DialSCTPExt(
"sctp", nil, addr, InitMsg{NumOstreams: STREAM_TEST_STREAMS, MaxInstreams: STREAM_TEST_STREAMS})
if err != nil {
t.Errorf("failed to dial address %s, test #%d: %v", addr.String(), test, err)
return
}
defer conn.Close()
conn.SubscribeEvents(SCTP_EVENT_DATA_IO)
for ppid := uint16(0); ppid < STREAM_TEST_STREAMS; ppid++ {
info := &SndRcvInfo{
Stream: uint16(ppid),
PPID: uint32(ppid),
}
rMu.Lock()
randomLen := r.Intn(255)
rMu.Unlock()
text := fmt.Sprintf("Test %s ***\n\t\t%d %d ***", randomStr(randomLen), test, ppid)
n, err := conn.SCTPWrite([]byte(text), info)
if err != nil {
t.Errorf("failed to write %s, len: %d, err: %v, bytes written: %d", text, len(text), err, n)
return
}
rn := 0
cn := 0
buf := make([]byte, 512)
for {
cn, info, err = conn.SCTPRead(buf[rn:])
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
rn += cn
break
}
t.Errorf("failed to read: %v", err)
return
}
if info.Stream != ppid {
t.Errorf("Mismatched PPIDs: %d != %d", info.Stream, ppid)
return
}
rn += cn
if rn >= n {
break
}
}
rtext := string(buf[:rn])
if rtext != text {
t.Fatalf("Mismatched payload: %s != %s", rtext, text)
}
}
}(i)
}
for ; i > 0; i-- {
select {
case <-wait:
case <-time.After(time.Second * 30):
close(wait)
t.Fatal("timed out")
}
}
}
|