File: try.go

package info (click to toggle)
golang-github-juju-utils 0.0~git20171220.f38c0b0-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,748 kB
  • sloc: makefile: 20
file content (199 lines) | stat: -rw-r--r-- 4,872 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Copyright 2013 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.

package parallel

import (
	"errors"
	"io"
	"sync"

	"gopkg.in/tomb.v1"
)

var (
	ErrStopped = errors.New("try was stopped")
	ErrClosed  = errors.New("try was closed")
)

// Try represents an attempt made concurrently
// by a number of goroutines.
type Try struct {
	tomb          tomb.Tomb
	closeMutex    sync.Mutex
	close         chan struct{}
	limiter       chan struct{}
	start         chan func()
	result        chan result
	combineErrors func(err0, err1 error) error
	maxParallel   int
	endResult     io.Closer
}

// NewTry returns an object that runs functions concurrently until one
// succeeds. The result of the first function that returns without an
// error is available from the Result method. If maxParallel is
// positive, it limits the number of concurrently running functions.
//
// The function combineErrors(oldErr, newErr) is called to determine
// the error return (see the Result method). The first time it is called,
// oldErr will be nil; subsequently oldErr will be the error previously
// returned by combineErrors. If combineErrors is nil, the last
// encountered error is chosen.
func NewTry(maxParallel int, combineErrors func(err0, err1 error) error) *Try {
	if combineErrors == nil {
		combineErrors = chooseLastError
	}
	t := &Try{
		combineErrors: combineErrors,
		maxParallel:   maxParallel,
		close:         make(chan struct{}, 1),
		result:        make(chan result),
		start:         make(chan func()),
	}
	if t.maxParallel > 0 {
		t.limiter = make(chan struct{}, t.maxParallel)
		for i := 0; i < t.maxParallel; i++ {
			t.limiter <- struct{}{}
		}
	}
	go func() {
		defer t.tomb.Done()
		val, err := t.loop()
		t.endResult = val
		t.tomb.Kill(err)
	}()
	return t
}

func chooseLastError(err0, err1 error) error {
	return err1
}

type result struct {
	val io.Closer
	err error
}

func (t *Try) loop() (io.Closer, error) {
	var err error
	close := t.close
	nrunning := 0
	for {
		select {
		case f := <-t.start:
			nrunning++
			go f()
		case r := <-t.result:
			if r.err == nil {
				return r.val, r.err
			}
			err = t.combineErrors(err, r.err)
			nrunning--
			if close == nil && nrunning == 0 {
				return nil, err
			}
		case <-t.tomb.Dying():
			if err == nil {
				return nil, ErrStopped
			}
			return nil, err
		case <-close:
			close = nil
			if nrunning == 0 {
				return nil, err
			}
		}
	}
}

// Start requests the given function to be started, waiting until there
// are less than maxParallel functions running if necessary. It returns
// an error if the function has not been started (ErrClosed if the Try
// has been closed, and ErrStopped if the try is finishing).
//
// The function should listen on the stop channel and return if it
// receives a value, though this is advisory only - the Try does not
// wait for all started functions to return before completing.
//
// If the function returns a nil error but some earlier try was
// successful (that is, the returned value is being discarded),
// its returned value will be closed by calling its Close method.
func (t *Try) Start(try func(stop <-chan struct{}) (io.Closer, error)) error {
	if t.limiter != nil {
		// Wait for availability slot.
		select {
		case <-t.limiter:
		case <-t.tomb.Dying():
			return ErrStopped
		case <-t.close:
			return ErrClosed
		}
	}
	dying := t.tomb.Dying()
	f := func() {
		val, err := try(dying)
		if t.limiter != nil {
			// Signal availability slot is now free.
			t.limiter <- struct{}{}
		}
		// Deliver result.
		select {
		case t.result <- result{val, err}:
		case <-dying:
			if err == nil {
				val.Close()
			}
		}
	}
	select {
	case t.start <- f:
		return nil
	case <-dying:
		return ErrStopped
	case <-t.close:
		return ErrClosed
	}
}

// Close closes the Try. No more functions will be started
// if Start is called, and the Try will terminate when all
// outstanding functions have completed (or earlier
// if one succeeds)
func (t *Try) Close() {
	t.closeMutex.Lock()
	defer t.closeMutex.Unlock()
	select {
	case <-t.close:
	default:
		close(t.close)
	}
}

// Dead returns a channel that is closed when the
// Try completes.
func (t *Try) Dead() <-chan struct{} {
	return t.tomb.Dead()
}

// Wait waits for the Try to complete and returns the same
// error returned by Result.
func (t *Try) Wait() error {
	return t.tomb.Wait()
}

// Result waits for the Try to complete and returns the result of the
// first successful function started by Start.
//
// If no function succeeded, the last error returned by
// combineErrors is returned. If there were no errors or
// combineErrors returned nil, ErrStopped is returned.
func (t *Try) Result() (io.Closer, error) {
	err := t.tomb.Wait()
	return t.endResult, err
}

// Kill stops the try and all its currently executing functions.
func (t *Try) Kill() {
	t.tomb.Kill(nil)
}