File: job_context.go

package info (click to toggle)
receptor 1.5.5-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,772 kB
  • sloc: python: 1,643; makefile: 305; sh: 174
file content (109 lines) | stat: -rw-r--r-- 2,945 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package utils

import (
	"context"
	"sync"
	"time"
)

// JobContext is a synchronization object that combines the functions of a Context and a WaitGroup.
// The expected lifecycle is:
//   - Caller calls JobContext.NewJob() with a parent context and a count of workers expected.
//   - Caller launches the given number of workers, passing the JobContext to them.
//   - Workers can check for termination by using the JobContext as a context.Context.
//   - Workers can cancel the overall job by calling JobContext.Cancel().
//   - Workers must call JobContext.WorkerDone() when they complete, like sync.WaitGroup.Done().
//   - The caller, or other goroutines. can call JobContext.Wait() to wait for job completion.
//
// A single JobContext can only run one job at a time.  If JobContext.NewJob() is called while a job
// is already running, that job will be cancelled and waited on prior to starting the new job.
type JobContext struct {
	Ctx         context.Context
	JcCancel    context.CancelFunc
	Wg          *sync.WaitGroup
	JcRunning   bool
	RunningLock *sync.Mutex
}

// NewJob starts a new job with a defined number of workers.  If a prior job is running, it is cancelled.
func (mw *JobContext) NewJob(ctx context.Context, workers int, returnIfRunning bool) bool {
	if mw.RunningLock == nil {
		mw.RunningLock = &sync.Mutex{}
	}

	mw.RunningLock.Lock()
	for mw.JcRunning {
		if returnIfRunning {
			mw.RunningLock.Unlock()

			return false
		}
		mw.JcCancel()
		mw.RunningLock.Unlock()
		mw.Wait()
		mw.RunningLock.Lock()
	}

	mw.JcRunning = true
	mw.Ctx, mw.JcCancel = context.WithCancel(ctx)
	mw.Wg = &sync.WaitGroup{}
	mw.Wg.Add(workers)
	mw.RunningLock.Unlock()
	go func() {
		mw.Wg.Wait()
		mw.RunningLock.Lock()
		mw.JcRunning = false
		mw.JcCancel()
		mw.RunningLock.Unlock()
	}()

	return true
}

// WorkerDone signals that a worker is finished, like sync.WaitGroup.Done().
func (mw *JobContext) WorkerDone() {
	mw.Wg.Done()
}

// Wait waits for the current job to complete, like sync.WaitGroup.Wait().
// If no job has been started, always just returns.
func (mw *JobContext) Wait() {
	if mw.Wg != nil {
		mw.Wg.Wait()
	}
}

// Done implements Context.Done().
func (mw *JobContext) Done() <-chan struct{} {
	return mw.Ctx.Done()
}

// Err implements Context.Err().
func (mw *JobContext) Err() error {
	return mw.Ctx.Err()
}

// Deadline implements Context.Deadline().
func (mw *JobContext) Deadline() (time time.Time, ok bool) {
	return mw.Ctx.Deadline()
}

// Value implements Context.Value().
func (mw *JobContext) Value(key interface{}) interface{} {
	return mw.Ctx.Value(key)
}

// Cancel cancels the JobContext's context.  If no job has been started, this does nothing.
func (mw *JobContext) Cancel() {
	if mw.JcCancel != nil {
		mw.JcCancel()
	}
}

// Running returns true if a job is currently running.
func (mw *JobContext) Running() bool {
	mw.RunningLock.Lock()
	defer mw.RunningLock.Unlock()

	return mw.JcRunning
}