1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll_test
import (
"internal/poll"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
var closeHook atomic.Value // func(fd int)
func init() {
closeFunc := poll.CloseFunc
poll.CloseFunc = func(fd int) (err error) {
if v := closeHook.Load(); v != nil {
if hook := v.(func(int)); hook != nil {
hook(fd)
}
}
return closeFunc(fd)
}
}
func TestSplicePipePool(t *testing.T) {
if runtime.Compiler == "gccgo" {
t.Skip("gofrontend conservative stack collection causes this test to fail")
}
const N = 64
var (
p *poll.SplicePipe
ps []*poll.SplicePipe
allFDs []int
pendingFDs sync.Map // fd → struct{}{}
err error
)
closeHook.Store(func(fd int) { pendingFDs.Delete(fd) })
t.Cleanup(func() { closeHook.Store((func(int))(nil)) })
for i := 0; i < N; i++ {
p, _, err = poll.GetPipe()
if err != nil {
t.Skipf("failed to create pipe due to error(%v), skip this test", err)
}
_, pwfd := poll.GetPipeFds(p)
allFDs = append(allFDs, pwfd)
pendingFDs.Store(pwfd, struct{}{})
ps = append(ps, p)
}
for _, p = range ps {
poll.PutPipe(p)
}
ps = nil
p = nil
// Exploit the timeout of "go test" as a timer for the subsequent verification.
timeout := 5 * time.Minute
if deadline, ok := t.Deadline(); ok {
timeout = deadline.Sub(time.Now())
timeout -= timeout / 10 // Leave 10% headroom for cleanup.
}
expiredTime := time.NewTimer(timeout)
defer expiredTime.Stop()
// Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool
// to either be deallocated and closed, or to time out.
for {
runtime.GC()
time.Sleep(10 * time.Millisecond)
// Detect whether all pipes are closed properly.
var leakedFDs []int
pendingFDs.Range(func(k, v any) bool {
leakedFDs = append(leakedFDs, k.(int))
return true
})
if len(leakedFDs) == 0 {
break
}
select {
case <-expiredTime.C:
t.Logf("all descriptors: %v", allFDs)
t.Fatalf("leaked descriptors: %v", leakedFDs)
default:
}
}
}
func BenchmarkSplicePipe(b *testing.B) {
b.Run("SplicePipeWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p, _, err := poll.GetPipe()
if err != nil {
continue
}
poll.PutPipe(p)
}
})
b.Run("SplicePipeWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p := poll.NewPipe()
if p == nil {
b.Skip("newPipe returned nil")
}
poll.DestroyPipe(p)
}
})
}
func BenchmarkSplicePipePoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p, _, err := poll.GetPipe()
if err != nil {
continue
}
poll.PutPipe(p)
}
})
}
func BenchmarkSplicePipeNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p := poll.NewPipe()
if p == nil {
b.Skip("newPipe returned nil")
}
poll.DestroyPipe(p)
}
})
}
|