1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
package stream
import (
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func ExampleStream() {
times := []int{20, 52, 16, 45, 4, 80}
stream := New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() Callback {
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(dur) }
})
}
stream.Wait()
// Output:
// 20ms
// 52ms
// 16ms
// 45ms
// 4ms
// 80ms
}
func TestStream(t *testing.T) {
t.Parallel()
t.Run("simple", func(t *testing.T) {
t.Parallel()
s := New()
var res []int
for i := 0; i < 5; i++ {
i := i
s.Go(func() Callback {
i *= 2
return func() {
res = append(res, i)
}
})
}
s.Wait()
require.Equal(t, []int{0, 2, 4, 6, 8}, res)
})
t.Run("max goroutines", func(t *testing.T) {
t.Parallel()
s := New().WithMaxGoroutines(5)
var currentTaskCount atomic.Int64
var currentCallbackCount atomic.Int64
for i := 0; i < 50; i++ {
s.Go(func() Callback {
curr := currentTaskCount.Add(1)
if curr > 5 {
t.Fatal("too many concurrent tasks being executed")
}
defer currentTaskCount.Add(-1)
time.Sleep(time.Millisecond)
return func() {
curr := currentCallbackCount.Add(1)
if curr > 1 {
t.Fatal("too many concurrent callbacks being executed")
}
time.Sleep(time.Millisecond)
defer currentCallbackCount.Add(-1)
}
})
}
s.Wait()
})
t.Run("panic in task is propagated", func(t *testing.T) {
t.Parallel()
s := New().WithMaxGoroutines(5)
s.Go(func() Callback {
panic("something really bad happened in the task")
})
require.Panics(t, s.Wait)
})
t.Run("panic in callback is propagated", func(t *testing.T) {
t.Parallel()
s := New().WithMaxGoroutines(5)
s.Go(func() Callback {
return func() {
panic("something really bad happened in the callback")
}
})
require.Panics(t, s.Wait)
})
t.Run("panic in callback does not block producers", func(t *testing.T) {
t.Parallel()
s := New().WithMaxGoroutines(5)
s.Go(func() Callback {
return func() {
panic("something really bad happened in the callback")
}
})
for i := 0; i < 100; i++ {
s.Go(func() Callback {
return func() {}
})
}
require.Panics(t, s.Wait)
})
}
func BenchmarkStream(b *testing.B) {
b.Run("startup and teardown", func(b *testing.B) {
for i := 0; i < b.N; i++ {
s := New()
s.Go(func() Callback { return func() {} })
s.Wait()
}
})
b.Run("per task", func(b *testing.B) {
n := 0
s := New()
for i := 0; i < b.N; i++ {
s.Go(func() Callback {
return func() {
n += 1
}
})
}
s.Wait()
})
}
|