File: errgroup.go

package info (click to toggle)
golang-github-minio-pkg 3.1.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,500 kB
  • sloc: xml: 37; makefile: 35; asm: 22
file content (133 lines) | stat: -rw-r--r-- 3,821 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package errgroup

import (
	"context"
	"sync"
	"sync/atomic"
)

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group can be used if errors should not be tracked.
type Group struct {
	firstErr  int64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
	wg        sync.WaitGroup
	bucket    chan struct{}
	errs      []error
	cancel    context.CancelFunc
	ctxCancel <-chan struct{} // nil if no context.
	ctxErr    func() error
}

// WithNErrs returns a new Group with length of errs slice upto nerrs,
// upon Wait() errors are returned collected from all tasks.
func WithNErrs(nerrs int) *Group {
	return &Group{errs: make([]error, nerrs), firstErr: -1}
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the slice of errors from all function calls.
func (g *Group) Wait() []error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.errs
}

// WaitErr blocks until all function calls from the Go method have returned, then
// returns the first error returned.
func (g *Group) WaitErr() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	if g.firstErr >= 0 && len(g.errs) > int(g.firstErr) {
		// len(g.errs) > int(g.firstErr) is for then used uninitialized.
		return g.errs[g.firstErr]
	}
	return nil
}

// WithConcurrency allows to limit the concurrency of the group.
// This must be called before starting any async processes.
// There is no order to which functions are allowed to run.
// If n <= 0 no concurrency limits are enforced.
// g is modified and returned as well.
func (g *Group) WithConcurrency(n int) *Group {
	if n <= 0 {
		g.bucket = nil
		return g
	}

	// Fill bucket with tokens
	g.bucket = make(chan struct{}, n)
	for i := 0; i < n; i++ {
		g.bucket <- struct{}{}
	}
	return g
}

// WithCancelOnError will return a context that is canceled
// as soon as an error occurs.
// The returned CancelFunc must always be called similar to context.WithCancel.
// If the supplied context is canceled any goroutines waiting for execution are also canceled.
func (g *Group) WithCancelOnError(ctx context.Context) (context.Context, context.CancelFunc) {
	ctx, g.cancel = context.WithCancel(ctx)
	g.ctxCancel = ctx.Done()
	g.ctxErr = ctx.Err
	return ctx, g.cancel
}

// Go calls the given function in a new goroutine.
//
// The errors will be collected in errs slice and returned by Wait().
func (g *Group) Go(f func() error, index int) {
	g.wg.Add(1)
	go func() {
		defer g.wg.Done()
		if g.bucket != nil {
			// Wait for token
			select {
			case <-g.bucket:
				defer func() {
					// Put back token..
					g.bucket <- struct{}{}
				}()
			case <-g.ctxCancel:
				if len(g.errs) > index {
					atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index))
					g.errs[index] = g.ctxErr()
				}
				return
			}
		}
		if err := f(); err != nil {
			if len(g.errs) > index {
				atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index))
				g.errs[index] = err
			}
			if g.cancel != nil {
				g.cancel()
			}
		}
	}()
}