1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
package task
import (
"context"
"fmt"
"strconv"
"sync"
"time"
)
// Group of tasks sharing the same lifecycle.
//
// All tasks in a group will be started and stopped at the same time.
type Group struct {
cancel context.CancelFunc
wg sync.WaitGroup
tasks []Task
running map[int]bool
mu sync.Mutex
}
// Add a new task to the group, returning its index.
func (g *Group) Add(f Func, schedule Schedule) *Task {
g.mu.Lock()
defer g.mu.Unlock()
i := len(g.tasks)
g.tasks = append(g.tasks, Task{
f: f,
schedule: schedule,
reset: make(chan struct{}, 16), // Buffered to not block senders
})
return &g.tasks[i]
}
// Start all the tasks in the group.
func (g *Group) Start(ctx context.Context) {
// Lock access to the g.running and g.tasks map for the entirety of this function so that
// concurrent calls to Start() or Add(0) don't race. This ensures all tasks in this group
// are started based on a consistent snapshot of g.running and g.tasks.
g.mu.Lock()
defer g.mu.Unlock()
ctx, g.cancel = context.WithCancel(ctx)
if g.running == nil {
g.running = make(map[int]bool)
}
for i := range g.tasks {
if g.running[i] {
continue
}
g.running[i] = true
task := g.tasks[i] // Local variable for the closure below.
g.wg.Add(1)
go func(i int) {
defer g.wg.Done()
task.loop(ctx)
// Ensure running map is updated before wait group Done() is called.
g.mu.Lock()
defer g.mu.Unlock()
g.running[i] = false
}(i)
}
}
// Stop all tasks in the group.
//
// This works by sending a cancellation signal to all tasks of the
// group and waiting for them to terminate.
//
// If a task is idle (i.e. not executing its task function) it will terminate
// immediately.
//
// If a task is busy executing its task function, the cancellation signal will
// propagate through the context passed to it, and the task will block waiting
// for the function to terminate.
//
// In case the given timeout expires before all tasks complete, this method
// exits immediately and returns an error, otherwise it returns nil.
func (g *Group) Stop(timeout time.Duration) error {
if g.cancel == nil {
// We were not even started
return nil
}
g.cancel()
graceful := make(chan struct{}, 1)
go func() {
g.wg.Wait()
close(graceful)
}()
// Wait for graceful termination, but abort if the context expires.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
g.mu.Lock()
defer g.mu.Unlock()
running := []string{}
for i, value := range g.running {
if value {
running = append(running, strconv.Itoa(i))
}
}
return fmt.Errorf("Task(s) still running: IDs %v", running)
case <-graceful:
return nil
}
}
|