1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
package certmagic
import (
"context"
"errors"
"log"
"runtime"
"sync"
"time"
"go.uber.org/zap"
)
var jm = &jobManager{maxConcurrentJobs: 1000}
type jobManager struct {
mu sync.Mutex
maxConcurrentJobs int
activeWorkers int
queue []namedJob
names map[string]struct{}
}
type namedJob struct {
name string
job func() error
logger *zap.Logger
}
// Submit enqueues the given job with the given name. If name is non-empty
// and a job with the same name is already enqueued or running, this is a
// no-op. If name is empty, no duplicate prevention will occur. The job
// manager will then run this job as soon as it is able.
func (jm *jobManager) Submit(logger *zap.Logger, name string, job func() error) {
jm.mu.Lock()
defer jm.mu.Unlock()
if jm.names == nil {
jm.names = make(map[string]struct{})
}
if name != "" {
// prevent duplicate jobs
if _, ok := jm.names[name]; ok {
return
}
jm.names[name] = struct{}{}
}
jm.queue = append(jm.queue, namedJob{name, job, logger})
if jm.activeWorkers < jm.maxConcurrentJobs {
jm.activeWorkers++
go jm.worker()
}
}
func (jm *jobManager) worker() {
defer func() {
if err := recover(); err != nil {
buf := make([]byte, stackTraceBufferSize)
buf = buf[:runtime.Stack(buf, false)]
log.Printf("panic: certificate worker: %v\n%s", err, buf)
}
}()
for {
jm.mu.Lock()
if len(jm.queue) == 0 {
jm.activeWorkers--
jm.mu.Unlock()
return
}
next := jm.queue[0]
jm.queue = jm.queue[1:]
jm.mu.Unlock()
if err := next.job(); err != nil {
next.logger.Error("job failed", zap.Error(err))
}
if next.name != "" {
jm.mu.Lock()
delete(jm.names, next.name)
jm.mu.Unlock()
}
}
}
func doWithRetry(ctx context.Context, log *zap.Logger, f func(context.Context) error) error {
var attempts int
ctx = context.WithValue(ctx, AttemptsCtxKey, &attempts)
// the initial intervalIndex is -1, signaling
// that we should not wait for the first attempt
start, intervalIndex := time.Now(), -1
var err error
for time.Since(start) < maxRetryDuration {
var wait time.Duration
if intervalIndex >= 0 {
wait = retryIntervals[intervalIndex]
}
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return context.Canceled
case <-timer.C:
err = f(ctx)
attempts++
if err == nil || errors.Is(err, context.Canceled) {
return err
}
var errNoRetry ErrNoRetry
if errors.As(err, &errNoRetry) {
return err
}
if intervalIndex < len(retryIntervals)-1 {
intervalIndex++
}
if time.Since(start) < maxRetryDuration {
log.Error("will retry",
zap.Error(err),
zap.Int("attempt", attempts),
zap.Duration("retrying_in", retryIntervals[intervalIndex]),
zap.Duration("elapsed", time.Since(start)),
zap.Duration("max_duration", maxRetryDuration))
} else {
log.Error("final attempt; giving up",
zap.Error(err),
zap.Int("attempt", attempts),
zap.Duration("elapsed", time.Since(start)),
zap.Duration("max_duration", maxRetryDuration))
return nil
}
}
}
return err
}
// ErrNoRetry is an error type which signals
// to stop retries early.
type ErrNoRetry struct{ Err error }
// Unwrap makes it so that e wraps e.Err.
func (e ErrNoRetry) Unwrap() error { return e.Err }
func (e ErrNoRetry) Error() string { return e.Err.Error() }
type retryStateCtxKey struct{}
// AttemptsCtxKey is the context key for the value
// that holds the attempt counter. The value counts
// how many times the operation has been attempted.
// A value of 0 means first attempt.
var AttemptsCtxKey retryStateCtxKey
// retryIntervals are based on the idea of exponential
// backoff, but weighed a little more heavily to the
// front. We figure that intermittent errors would be
// resolved after the first retry, but any errors after
// that would probably require at least a few minutes
// or hours to clear up: either for DNS to propagate, for
// the administrator to fix their DNS or network config,
// or some other external factor needs to change. We
// chose intervals that we think will be most useful
// without introducing unnecessary delay. The last
// interval in this list will be used until the time
// of maxRetryDuration has elapsed.
var retryIntervals = []time.Duration{
1 * time.Minute,
2 * time.Minute,
2 * time.Minute,
5 * time.Minute, // elapsed: 10 min
10 * time.Minute,
10 * time.Minute,
10 * time.Minute,
20 * time.Minute, // elapsed: 1 hr
20 * time.Minute,
20 * time.Minute,
20 * time.Minute, // elapsed: 2 hr
30 * time.Minute,
30 * time.Minute, // elapsed: 3 hr
30 * time.Minute,
30 * time.Minute, // elapsed: 4 hr
30 * time.Minute,
30 * time.Minute, // elapsed: 5 hr
1 * time.Hour, // elapsed: 6 hr
1 * time.Hour,
1 * time.Hour, // elapsed: 8 hr
2 * time.Hour,
2 * time.Hour, // elapsed: 12 hr
3 * time.Hour,
3 * time.Hour, // elapsed: 18 hr
6 * time.Hour, // repeat for up to maxRetryDuration
}
// maxRetryDuration is the maximum duration to try
// doing retries using the above intervals.
const maxRetryDuration = 24 * time.Hour * 30
|