1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
package pool
import (
"context"
)
// ResultContextPool is a pool that runs tasks that take a context and return a
// result. The context passed to the task will be canceled if any of the tasks
// return an error, which makes its functionality different than just capturing
// a context with the task closure.
//
// The configuration methods (With*) will panic if they are used after calling
// Go() for the first time.
type ResultContextPool[T any] struct {
contextPool ContextPool
agg resultAggregator[T]
collectErrored bool
}
// Go submits a task to the pool. If all goroutines in the pool
// are busy, a call to Go() will block until the task can be started.
func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
p.contextPool.Go(func(ctx context.Context) error {
res, err := f(ctx)
if err == nil || p.collectErrored {
p.agg.add(res)
}
return err
})
}
// Wait cleans up all spawned goroutines, propagates any panics, and
// returns an error if any of the tasks errored.
func (p *ResultContextPool[T]) Wait() ([]T, error) {
err := p.contextPool.Wait()
return p.agg.results, err
}
// WithCollectErrored configures the pool to still collect the result of a task
// even if the task returned an error. By default, the result of tasks that errored
// are ignored and only the error is collected.
func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T] {
p.panicIfInitialized()
p.collectErrored = true
return p
}
// WithFirstError configures the pool to only return the first error
// returned by a task. By default, Wait() will return a combined error.
func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T] {
p.panicIfInitialized()
p.contextPool.WithFirstError()
return p
}
// WithCancelOnError configures the pool to cancel its context as soon as
// any task returns an error. By default, the pool's context is not
// canceled until the parent context is canceled.
func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T] {
p.panicIfInitialized()
p.contextPool.WithCancelOnError()
return p
}
// WithMaxGoroutines limits the number of goroutines in a pool.
// Defaults to unlimited. Panics if n < 1.
func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T] {
p.panicIfInitialized()
p.contextPool.WithMaxGoroutines(n)
return p
}
func (p *ResultContextPool[T]) panicIfInitialized() {
p.contextPool.panicIfInitialized()
}
|