File: task.go

package info (click to toggle)
golang-github-containerd-stargz-snapshotter 0.14.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,348 kB
  • sloc: sh: 3,634; python: 534; makefile: 91; ansic: 4
file content (155 lines) | stat: -rw-r--r-- 5,811 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
   Copyright The containerd Authors.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/

package task

import (
	"context"
	"sync"
	"sync/atomic"
	"time"

	"golang.org/x/sync/semaphore"
)

// NewBackgroundTaskManager provides a task manager. You can specify the
// concurrency of background tasks. When running a background task, this will be
// forced to wait until no prioritized task is running for some period. You can
// specify the period through the argument of this function, too.
func NewBackgroundTaskManager(concurrency int64, period time.Duration) *BackgroundTaskManager {
	return &BackgroundTaskManager{
		backgroundSem:                semaphore.NewWeighted(concurrency),
		prioritizedTaskSilencePeriod: period,
		prioritizedTaskStartNotify:   make(chan struct{}),
		prioritizedTaskDoneCond:      sync.NewCond(&sync.Mutex{}),
	}
}

// BackgroundTaskManager is a task manager which manages prioritized tasks and
// background tasks execution. Background tasks are less important than
// prioritized tasks. You can let these background tasks not to use compute
// resources (CPU, NW, etc...) during more important tasks(=prioritized tasks)
// running.
//
// When you run a prioritised task and don't want background tasks to use
// resources you can tell it this manager by calling DoPrioritizedTask method.
// DonePrioritizedTask method must be called at the end of the prioritised task
// execution.
//
// For running a background task, you can use InvokeBackgroundTask method. The
// background task must be able to be cancelled via context.Context argument.
// The task is forced to wait until no prioritized task is running for some
// period. You can specify the period when making this manager instance. The
// limited number of background tasks run simultaneously and you can specify the
// concurrency when making this manager instance too. If a prioritized task
// starts during the execution of background tasks, all background tasks running
// will be cancelled via context. These cancelled tasks will be executed again
// later, same as other background tasks (when no prioritized task is running
// for some period).
type BackgroundTaskManager struct {
	prioritizedTasks             int64
	backgroundSem                *semaphore.Weighted
	prioritizedTaskSilencePeriod time.Duration
	prioritizedTaskStartNotify   chan struct{}
	prioritizedTaskStartNotifyMu sync.Mutex
	prioritizedTaskDoneCond      *sync.Cond
}

// DoPrioritizedTask tells the manager that we are running a prioritized task
// and don't want background tasks to disturb resources(CPU, NW, etc...)
func (ts *BackgroundTaskManager) DoPrioritizedTask() {
	// Notify the prioritized task execution to background tasks.
	ts.prioritizedTaskStartNotifyMu.Lock()
	atomic.AddInt64(&ts.prioritizedTasks, 1)
	close(ts.prioritizedTaskStartNotify)
	ts.prioritizedTaskStartNotify = make(chan struct{})
	ts.prioritizedTaskStartNotifyMu.Unlock()
}

// DonePrioritizedTask tells the manager that we've done a prioritized task
// and don't want background tasks to disturb resources(CPU, NW, etc...)
func (ts *BackgroundTaskManager) DonePrioritizedTask() {
	go func() {
		// Notify the task completion after `ts.prioritizedTaskSilencePeriod`
		// so that background tasks aren't invoked immediately.
		time.Sleep(ts.prioritizedTaskSilencePeriod)
		atomic.AddInt64(&ts.prioritizedTasks, -1)
		ts.prioritizedTaskDoneCond.L.Lock()
		ts.prioritizedTaskDoneCond.Broadcast()
		ts.prioritizedTaskDoneCond.L.Unlock()
	}()
}

// InvokeBackgroundTask invokes a background task. The task is started only when
// no prioritized tasks are running. Prioritized task's execution stops the
// execution of all background tasks. Background task must be able to be
// cancelled via context.Context argument and be able to be restarted again.
func (ts *BackgroundTaskManager) InvokeBackgroundTask(do func(context.Context), timeout time.Duration) {
	for {
		// Wait until all prioritized tasks are done
		for {
			if atomic.LoadInt64(&ts.prioritizedTasks) <= 0 {
				break
			}

			// waits until a prioritized task is done
			ts.prioritizedTaskDoneCond.L.Lock()
			if atomic.LoadInt64(&ts.prioritizedTasks) > 0 {
				ts.prioritizedTaskDoneCond.Wait()
			}
			ts.prioritizedTaskDoneCond.L.Unlock()
		}

		// limited number of background tasks can run at once.
		// if prioritized tasks are running, cancel this task.
		if func() bool {
			ts.backgroundSem.Acquire(context.Background(), 1)
			defer ts.backgroundSem.Release(1)

			// Get notify the prioritized tasks execution.
			ts.prioritizedTaskStartNotifyMu.Lock()
			ch := ts.prioritizedTaskStartNotify
			tasks := atomic.LoadInt64(&ts.prioritizedTasks)
			ts.prioritizedTaskStartNotifyMu.Unlock()
			if tasks > 0 {
				return false
			}

			// Invoke the background task. if some prioritized tasks added during
			// execution, cancel it and try it later.
			var (
				done        = make(chan struct{})
				ctx, cancel = context.WithTimeout(context.Background(), timeout)
			)
			defer cancel()
			go func() {
				do(ctx)
				close(done)
			}()

			// Wait until the background task is done or canceled.
			select {
			case <-ch: // some prioritized tasks started; retry it later
				cancel()
				return false
			case <-done: // All tasks completed
			}
			return true
		}() {
			break
		}
	}
}