File: group.go

package info (click to toggle)
incus 6.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 24,428 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (121 lines) | stat: -rw-r--r-- 2,752 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package task

import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"time"
)

// Group of tasks sharing the same lifecycle.
//
// All tasks in a group will be started and stopped at the same time.
type Group struct {
	cancel  context.CancelFunc
	wg      sync.WaitGroup
	tasks   []Task
	running map[int]bool
	mu      sync.Mutex
}

// Add a new task to the group, returning its index.
func (g *Group) Add(f Func, schedule Schedule) *Task {
	g.mu.Lock()
	defer g.mu.Unlock()

	i := len(g.tasks)
	g.tasks = append(g.tasks, Task{
		f:        f,
		schedule: schedule,
		reset:    make(chan struct{}, 16), // Buffered to not block senders
	})

	return &g.tasks[i]
}

// Start all the tasks in the group.
func (g *Group) Start(ctx context.Context) {
	// Lock access to the g.running and g.tasks map for the entirety of this function so that
	// concurrent calls to Start() or Add(0) don't race. This ensures all tasks in this group
	// are started based on a consistent snapshot of g.running and g.tasks.
	g.mu.Lock()
	defer g.mu.Unlock()

	ctx, g.cancel = context.WithCancel(ctx)

	if g.running == nil {
		g.running = make(map[int]bool)
	}

	for i := range g.tasks {
		if g.running[i] {
			continue
		}

		g.running[i] = true
		task := g.tasks[i] // Local variable for the closure below.
		g.wg.Add(1)

		go func(i int) {
			defer g.wg.Done()

			task.loop(ctx)

			// Ensure running map is updated before wait group Done() is called.
			g.mu.Lock()
			defer g.mu.Unlock()

			g.running[i] = false
		}(i)
	}
}

// Stop all tasks in the group.
//
// This works by sending a cancellation signal to all tasks of the
// group and waiting for them to terminate.
//
// If a task is idle (i.e. not executing its task function) it will terminate
// immediately.
//
// If a task is busy executing its task function, the cancellation signal will
// propagate through the context passed to it, and the task will block waiting
// for the function to terminate.
//
// In case the given timeout expires before all tasks complete, this method
// exits immediately and returns an error, otherwise it returns nil.
func (g *Group) Stop(timeout time.Duration) error {
	if g.cancel == nil {
		// We were not even started
		return nil
	}

	g.cancel()

	graceful := make(chan struct{}, 1)
	go func() {
		g.wg.Wait()
		close(graceful)
	}()

	// Wait for graceful termination, but abort if the context expires.
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	select {
	case <-ctx.Done():
		g.mu.Lock()
		defer g.mu.Unlock()

		running := []string{}
		for i, value := range g.running {
			if value {
				running = append(running, strconv.Itoa(i))
			}
		}

		return fmt.Errorf("Task(s) still running: IDs %v", running)
	case <-graceful:
		return nil
	}
}