1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
package async
import (
"runtime"
"sync"
)
// Job is the generic interface object representing the data being
// pushed to the jobs queue and being passed to the workers.
type Job interface{}
// Logic is the implementation of the logic each worker will execute.
type Logic func(arg Job)
// WorkQueue is the object representing an async jobs queue with
// a given number of concurrent workers.
type WorkQueue struct {
workers int
jobChan chan Job
stopChan chan struct{}
jobs sync.WaitGroup
done sync.WaitGroup
logic Logic
}
// NewQueue creates a new job queue with a specific worker logic.
// If workers is greater or equal than zero, it will be auto
// scaled to the number of logical CPUs usable by the current
// process.
func NewQueue(workers int, logic Logic) *WorkQueue {
return createQueue(workers, logic, 0)
}
func NewBufferedQueue(workers int, logic Logic, size int) *WorkQueue {
return createQueue(workers, logic, size)
}
func createQueue(workers int, logic Logic, bufferSize int) *WorkQueue {
if workers <= 0 {
workers = runtime.NumCPU()
}
wq := &WorkQueue{
workers: workers,
jobChan: make(chan Job, bufferSize),
stopChan: make(chan struct{}),
jobs: sync.WaitGroup{},
done: sync.WaitGroup{},
logic: logic,
}
for i := 0; i < workers; i++ {
wq.done.Add(1)
go wq.worker(i)
}
return wq
}
func (wq *WorkQueue) worker(id int) {
defer wq.done.Done()
for {
select {
case <-wq.stopChan:
return
case job := <-wq.jobChan:
wq.logic(job)
wq.jobs.Done()
}
}
}
// Add pushes a new job to the queue.
func (wq *WorkQueue) Add(job Job) {
wq.jobs.Add(1)
wq.jobChan <- job
}
// Wait stops until all the workers stopped.
func (wq *WorkQueue) Wait() {
wq.done.Wait()
}
// WaitDone stops until all jobs on the queue have been processed.
func (wq *WorkQueue) WaitDone() {
wq.jobs.Wait()
}
// Stop stops the job queue and the workers.
func (wq *WorkQueue) Stop() {
close(wq.stopChan)
wq.jobs.Wait()
wq.done.Wait()
close(wq.jobChan)
}
|