
|
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
import (
"context"
"errors"
"fmt"
"path/filepath"
"runtime"
"sync"
)
// asyncTestState permits handling asynchronous operations in a synchronous test.
//
// For example, a test may want to write to a stream and observe that
// STREAM frames are sent with the contents of the write in response
// to MAX_STREAM_DATA frames received from the peer.
// The Stream.Write is an asynchronous operation, but the test is simpler
// if we can start the write, observe the first STREAM frame sent,
// send a MAX_STREAM_DATA frame, observe the next STREAM frame sent, etc.
//
// We do this by instrumenting points where operations can block.
// We start async operations like Write in a goroutine,
// and wait for the operation to either finish or hit a blocking point.
// When the connection event loop is idle, we check a list of
// blocked operations to see if any can be woken.
type asyncTestState struct {
mu sync.Mutex
notify chan struct{}
blocked map[*blockedAsync]struct{}
}
// An asyncOp is an asynchronous operation that results in (T, error).
type asyncOp[T any] struct {
v T
err error
caller string
tc *testConn
donec chan struct{}
cancelFunc context.CancelFunc
}
// cancel cancels the async operation's context, and waits for
// the operation to complete.
func (a *asyncOp[T]) cancel() {
select {
case <-a.donec:
return // already done
default:
}
a.cancelFunc()
<-a.tc.asyncTestState.notify
select {
case <-a.donec:
default:
panic(fmt.Errorf("%v: async op failed to finish after being canceled", a.caller))
}
}
var errNotDone = errors.New("async op is not done")
// result returns the result of the async operation.
// It returns errNotDone if the operation is still in progress.
//
// Note that unlike a traditional async/await, this doesn't block
// waiting for the operation to complete. Since tests have full
// control over the progress of operations, an asyncOp can only
// become done in reaction to the test taking some action.
func (a *asyncOp[T]) result() (v T, err error) {
a.tc.wait()
select {
case <-a.donec:
return a.v, a.err
default:
return v, errNotDone
}
}
// A blockedAsync is a blocked async operation.
type blockedAsync struct {
until func() bool // when this returns true, the operation is unblocked
donec chan struct{} // closed when the operation is unblocked
}
type asyncContextKey struct{}
// runAsync starts an asynchronous operation.
//
// The function f should call a blocking function such as
// Stream.Write or Conn.AcceptStream and return its result.
// It must use the provided context.
func runAsync[T any](tc *testConn, f func(context.Context) (T, error)) *asyncOp[T] {
as := &tc.asyncTestState
if as.notify == nil {
as.notify = make(chan struct{})
as.mu.Lock()
as.blocked = make(map[*blockedAsync]struct{})
as.mu.Unlock()
}
_, file, line, _ := runtime.Caller(1)
ctx := context.WithValue(context.Background(), asyncContextKey{}, true)
ctx, cancel := context.WithCancel(ctx)
a := &asyncOp[T]{
tc: tc,
caller: fmt.Sprintf("%v:%v", filepath.Base(file), line),
donec: make(chan struct{}),
cancelFunc: cancel,
}
go func() {
a.v, a.err = f(ctx)
close(a.donec)
as.notify <- struct{}{}
}()
tc.t.Cleanup(func() {
if _, err := a.result(); err == errNotDone {
tc.t.Errorf("%v: async operation is still executing at end of test", a.caller)
a.cancel()
}
})
// Wait for the operation to either finish or block.
<-as.notify
tc.wait()
return a
}
// waitUntil waits for a blocked async operation to complete.
// The operation is complete when the until func returns true.
func (as *asyncTestState) waitUntil(ctx context.Context, until func() bool) error {
if until() {
return nil
}
if err := ctx.Err(); err != nil {
// Context has already expired.
return err
}
if ctx.Value(asyncContextKey{}) == nil {
// Context is not one that we've created, and hasn't expired.
// This probably indicates that we've tried to perform a
// blocking operation without using the async test harness here,
// which may have unpredictable results.
panic("blocking async point with unexpected Context")
}
b := &blockedAsync{
until: until,
donec: make(chan struct{}),
}
// Record this as a pending blocking operation.
as.mu.Lock()
as.blocked[b] = struct{}{}
as.mu.Unlock()
// Notify the creator of the operation that we're blocked,
// and wait to be woken up.
as.notify <- struct{}{}
select {
case <-b.donec:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// wakeAsync tries to wake up a blocked async operation.
// It returns true if one was woken, false otherwise.
func (as *asyncTestState) wakeAsync() bool {
as.mu.Lock()
var woken *blockedAsync
for w := range as.blocked {
if w.until() {
woken = w
delete(as.blocked, w)
break
}
}
as.mu.Unlock()
if woken == nil {
return false
}
close(woken.donec)
<-as.notify // must not hold as.mu while blocked here
return true
}
|