1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
import (
"context"
"errors"
"fmt"
"path/filepath"
"runtime"
"sync"
)
// asyncTestState permits handling asynchronous operations in a synchronous test.
//
// For example, a test may want to write to a stream and observe that
// STREAM frames are sent with the contents of the write in response
// to MAX_STREAM_DATA frames received from the peer.
// The Stream.Write is an asynchronous operation, but the test is simpler
// if we can start the write, observe the first STREAM frame sent,
// send a MAX_STREAM_DATA frame, observe the next STREAM frame sent, etc.
//
// We do this by instrumenting points where operations can block.
// We start async operations like Write in a goroutine,
// and wait for the operation to either finish or hit a blocking point.
// When the connection event loop is idle, we check a list of
// blocked operations to see if any can be woken.
type asyncTestState struct {
mu sync.Mutex
notify chan struct{}
blocked map[*blockedAsync]struct{}
}
// An asyncOp is an asynchronous operation that results in (T, error).
type asyncOp[T any] struct {
v T
err error
caller string
tc *testConn
donec chan struct{}
cancelFunc context.CancelFunc
}
// cancel cancels the async operation's context, and waits for
// the operation to complete.
func (a *asyncOp[T]) cancel() {
select {
case <-a.donec:
return // already done
default:
}
a.cancelFunc()
<-a.tc.asyncTestState.notify
select {
case <-a.donec:
default:
panic(fmt.Errorf("%v: async op failed to finish after being canceled", a.caller))
}
}
var errNotDone = errors.New("async op is not done")
// result returns the result of the async operation.
// It returns errNotDone if the operation is still in progress.
//
// Note that unlike a traditional async/await, this doesn't block
// waiting for the operation to complete. Since tests have full
// control over the progress of operations, an asyncOp can only
// become done in reaction to the test taking some action.
func (a *asyncOp[T]) result() (v T, err error) {
a.tc.wait()
select {
case <-a.donec:
return a.v, a.err
default:
return v, errNotDone
}
}
// A blockedAsync is a blocked async operation.
type blockedAsync struct {
until func() bool // when this returns true, the operation is unblocked
donec chan struct{} // closed when the operation is unblocked
}
type asyncContextKey struct{}
// runAsync starts an asynchronous operation.
//
// The function f should call a blocking function such as
// Stream.Write or Conn.AcceptStream and return its result.
// It must use the provided context.
func runAsync[T any](tc *testConn, f func(context.Context) (T, error)) *asyncOp[T] {
as := &tc.asyncTestState
if as.notify == nil {
as.notify = make(chan struct{})
as.mu.Lock()
as.blocked = make(map[*blockedAsync]struct{})
as.mu.Unlock()
}
_, file, line, _ := runtime.Caller(1)
ctx := context.WithValue(context.Background(), asyncContextKey{}, true)
ctx, cancel := context.WithCancel(ctx)
a := &asyncOp[T]{
tc: tc,
caller: fmt.Sprintf("%v:%v", filepath.Base(file), line),
donec: make(chan struct{}),
cancelFunc: cancel,
}
go func() {
a.v, a.err = f(ctx)
close(a.donec)
as.notify <- struct{}{}
}()
tc.t.Cleanup(func() {
if _, err := a.result(); err == errNotDone {
tc.t.Errorf("%v: async operation is still executing at end of test", a.caller)
a.cancel()
}
})
// Wait for the operation to either finish or block.
<-as.notify
tc.wait()
return a
}
// waitUntil waits for a blocked async operation to complete.
// The operation is complete when the until func returns true.
func (as *asyncTestState) waitUntil(ctx context.Context, until func() bool) error {
if until() {
return nil
}
if err := ctx.Err(); err != nil {
// Context has already expired.
return err
}
if ctx.Value(asyncContextKey{}) == nil {
// Context is not one that we've created, and hasn't expired.
// This probably indicates that we've tried to perform a
// blocking operation without using the async test harness here,
// which may have unpredictable results.
panic("blocking async point with unexpected Context")
}
b := &blockedAsync{
until: until,
donec: make(chan struct{}),
}
// Record this as a pending blocking operation.
as.mu.Lock()
as.blocked[b] = struct{}{}
as.mu.Unlock()
// Notify the creator of the operation that we're blocked,
// and wait to be woken up.
as.notify <- struct{}{}
select {
case <-b.donec:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// wakeAsync tries to wake up a blocked async operation.
// It returns true if one was woken, false otherwise.
func (as *asyncTestState) wakeAsync() bool {
as.mu.Lock()
var woken *blockedAsync
for w := range as.blocked {
if w.until() {
woken = w
delete(as.blocked, w)
break
}
}
as.mu.Unlock()
if woken == nil {
return false
}
close(woken.donec)
<-as.notify // must not hold as.mu while blocked here
return true
}
|