1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
package utils
import (
"context"
"sync"
"time"
)
// JobContext is a synchronization object that combines the functions of a Context and a WaitGroup.
// The expected lifecycle is:
// - Caller calls JobContext.NewJob() with a parent context and a count of workers expected.
// - Caller launches the given number of workers, passing the JobContext to them.
// - Workers can check for termination by using the JobContext as a context.Context.
// - Workers can cancel the overall job by calling JobContext.Cancel().
// - Workers must call JobContext.WorkerDone() when they complete, like sync.WaitGroup.Done().
// - The caller, or other goroutines. can call JobContext.Wait() to wait for job completion.
//
// A single JobContext can only run one job at a time. If JobContext.NewJob() is called while a job
// is already running, that job will be cancelled and waited on prior to starting the new job.
type JobContext struct {
Ctx context.Context
JcCancel context.CancelFunc
Wg *sync.WaitGroup
JcRunning bool
RunningLock *sync.Mutex
}
// NewJob starts a new job with a defined number of workers. If a prior job is running, it is cancelled.
func (mw *JobContext) NewJob(ctx context.Context, workers int, returnIfRunning bool) bool {
if mw.RunningLock == nil {
mw.RunningLock = &sync.Mutex{}
}
mw.RunningLock.Lock()
for mw.JcRunning {
if returnIfRunning {
mw.RunningLock.Unlock()
return false
}
mw.JcCancel()
mw.RunningLock.Unlock()
mw.Wait()
mw.RunningLock.Lock()
}
mw.JcRunning = true
mw.Ctx, mw.JcCancel = context.WithCancel(ctx)
mw.Wg = &sync.WaitGroup{}
mw.Wg.Add(workers)
mw.RunningLock.Unlock()
go func() {
mw.Wg.Wait()
mw.RunningLock.Lock()
mw.JcRunning = false
mw.JcCancel()
mw.RunningLock.Unlock()
}()
return true
}
// WorkerDone signals that a worker is finished, like sync.WaitGroup.Done().
func (mw *JobContext) WorkerDone() {
mw.Wg.Done()
}
// Wait waits for the current job to complete, like sync.WaitGroup.Wait().
// If no job has been started, always just returns.
func (mw *JobContext) Wait() {
if mw.Wg != nil {
mw.Wg.Wait()
}
}
// Done implements Context.Done().
func (mw *JobContext) Done() <-chan struct{} {
return mw.Ctx.Done()
}
// Err implements Context.Err().
func (mw *JobContext) Err() error {
return mw.Ctx.Err()
}
// Deadline implements Context.Deadline().
func (mw *JobContext) Deadline() (time time.Time, ok bool) {
return mw.Ctx.Deadline()
}
// Value implements Context.Value().
func (mw *JobContext) Value(key interface{}) interface{} {
return mw.Ctx.Value(key)
}
// Cancel cancels the JobContext's context. If no job has been started, this does nothing.
func (mw *JobContext) Cancel() {
if mw.JcCancel != nil {
mw.JcCancel()
}
}
// Running returns true if a job is currently running.
func (mw *JobContext) Running() bool {
mw.RunningLock.Lock()
defer mw.RunningLock.Unlock()
return mw.JcRunning
}
|